A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow.
In this project we created high-grade data pipelines that are dynamic and built from reusable tasks, can be monitored, and allow easy backfills. a data quality operator is implemented to report volume discrepancies in real-time.
The source datasets consist of JSON logs that tell about user activity in the application and JSON metadata about the songs the users listen to.
The goal of this project is to author a data pipeline workflow created with custom operators within the Airflow platform that perform tasks such as staging data, populating the data warehouse, and running quality checks. A high-level implementation of the pipeline is as illustrated below:
Here are the s3 links for datasets used in this project:
Log data: s3://udacity-dend/log_data
Song data: s3://udacity-dend/song_data
Data-Pipeline-with-Airflow
├── dags # Airflow DAGs location
│ ├── create_tables_dag.py # DAG used for creating tables on Redshift
│ └── etl_dag.py
├── plugins
│ ├── __init__.py
│ ├── helpers
│ │ ├── __init__.py
│ │ └── sql_queries.py # All sql queries required
│ └── operators
│ ├── __init__.py
│ ├── data_quality.py # DataQualityOperator
│ ├── load_dimension.py # LoadDimensionOperator
│ ├── load_fact.py # LoadFactOperator
│ └── stage_redshift.py # S3ToRedshiftOperator
|── crate_table.sql
|── images
| └── dag_graph_view.png # DAG Graph View
└── README.md # Project description
create_tables_dag.py
: Dag for creating tables on Redshiftetl_dag.py
: Defines main DAG, tasks and link the tasks in required order.stage_redshift.py
: DefinesStageToRedshiftOperator
to copy JSON data from S3 to staging tables in the Redshift viacopy
command.load_dimension.py
: DefinesLoadDimensionOperator
to load a dimension table from staging table(s).load_fact.py
: DefinesLoadFactOperator
to load fact table from staging table(s).data_quality.py
: DefinesDataQualityOperator
to run data quality checks on all tables passed as parameter.sql_queries.py
: Contains SQL queries for the ETL pipeline (provided in template).
This code uses python 3
and assumes that Apache Airflow is installed and configured.
For AWS credentials, enter the following values:
- Conn Id: Enter
aws_credentials
. - Conn Type: Enter
Amazon Web Services
. - Login: Enter your Access key ID from the IAM User credentials you downloaded earlier.
- Password: Enter your Secret access key from the IAM User credentials you downloaded earlier.
Use the following values in Airflow’s UI to configure connection to Redshift:
- Conn Id: Enter
redshift
. - Conn Type: Enter
Postgres
. - Host: Enter the endpoint of your Redshift cluster, excluding the port at the end. You can find this by selecting your cluster in the Clusters page of the Amazon Redshift console. See where this is located in the screenshot below. IMPORTANT: Make sure to NOT include the port at the end of the Redshift endpoint string.
- Schema: Enter
dev
. This is the Redshift database you want to connect to. - Login: Enter
awsuser
. - Password: Enter the password you created when launching your Redshift cluster.
- Port: Enter
5439
.