[go: up one dir, main page]

Skip to content

Sampsonyu/Data_Pipeline_with_Airflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Data_Pipeline_with_Airflow

Project Description

A music streaming company, Sparkify, has decided that it is time to introduce more automation and monitoring to their data warehouse ETL pipelines and come to the conclusion that the best tool to achieve this is Apache Airflow.

In this project we created high-grade data pipelines that are dynamic and built from reusable tasks, can be monitored, and allow easy backfills. a data quality operator is implemented to report volume discrepancies in real-time.

The source datasets consist of JSON logs that tell about user activity in the application and JSON metadata about the songs the users listen to.

Data Pipeline

The goal of this project is to author a data pipeline workflow created with custom operators within the Airflow platform that perform tasks such as staging data, populating the data warehouse, and running quality checks. A high-level implementation of the pipeline is as illustrated below: ERD image

Datasets

Here are the s3 links for datasets used in this project:

Log data: s3://udacity-dend/log_data

Song data: s3://udacity-dend/song_data

Project Structure

    
Data-Pipeline-with-Airflow    
├── dags                           # Airflow DAGs location
│   ├── create_tables_dag.py       # DAG used for creating tables on Redshift
│   └── etl_dag.py
├── plugins
│   ├── __init__.py
│   ├── helpers
│   │   ├── __init__.py
│   │   └── sql_queries.py         # All sql queries required
│   └── operators
│       ├── __init__.py
│       ├── data_quality.py        # DataQualityOperator
│       ├── load_dimension.py      # LoadDimensionOperator
│       ├── load_fact.py           # LoadFactOperator
│       └── stage_redshift.py      # S3ToRedshiftOperator
|── crate_table.sql
|── images
|   └── dag_graph_view.png         # DAG Graph View
└── README.md                      # Project description
    
  • create_tables_dag.py: Dag for creating tables on Redshift
  • etl_dag.py: Defines main DAG, tasks and link the tasks in required order.
  • stage_redshift.py: Defines StageToRedshiftOperator to copy JSON data from S3 to staging tables in the Redshift via copy command.
  • load_dimension.py: Defines LoadDimensionOperator to load a dimension table from staging table(s).
  • load_fact.py: Defines LoadFactOperator to load fact table from staging table(s).
  • data_quality.py: Defines DataQualityOperator to run data quality checks on all tables passed as parameter.
  • sql_queries.py: Contains SQL queries for the ETL pipeline (provided in template).

Configuration

This code uses python 3 and assumes that Apache Airflow is installed and configured.

For AWS credentials, enter the following values:

  • Conn Id: Enter aws_credentials.
  • Conn Type: Enter Amazon Web Services.
  • Login: Enter your Access key ID from the IAM User credentials you downloaded earlier.
  • Password: Enter your Secret access key from the IAM User credentials you downloaded earlier.

Use the following values in Airflow’s UI to configure connection to Redshift:

  • Conn Id: Enter redshift.
  • Conn Type: Enter Postgres.
  • Host: Enter the endpoint of your Redshift cluster, excluding the port at the end. You can find this by selecting your cluster in the Clusters page of the Amazon Redshift console. See where this is located in the screenshot below. IMPORTANT: Make sure to NOT include the port at the end of the Redshift endpoint string.
  • Schema: Enter dev. This is the Redshift database you want to connect to.
  • Login: Enter awsuser.
  • Password: Enter the password you created when launching your Redshift cluster.
  • Port: Enter 5439.