#Table of Contents
- Introduction
- Data Set
- Data Transformations
- Schemas
- Live Demo
- Presentation Deck
- Instructions to Run this Pipeline
#Introduction This is a data engineering project at Insight Data Science. There are two goals that this project aims to accomplish:
- Provide an API for data scientists and cab dispatchers, for analyzing long term trends in cab behavior w.r.t metrics such as average pickups, dropoffs, occupancy, miles travelled etc.
- Enable a framework for real-time monitoring of cab locations, so that a user can know the unoccupied cabs across a city and zoom in on a specific neighborhood to spot and catch available cabs nearest to them.
#Data Set Historical: The project is based on historical geolocation data for 500 yellow cabs in San Francisco, collected over a month's time frame (11 million rows). The data is available as a time series, with updates on individual cab occupancy and locations at a time interval of 1 minute (approximately). The following table provides a snap shot of the raw data set (500 text files, each representing one cab):
cabID, lat, long, occucpancy, timestamp
Real-Time: The historical data set is played back to simulate real-time behavior.
AWS Clusters: A distributed AWS cluster of four ec2 machines is being used for this project. All the components (ingestion, batch and real-time processing) are configured and run in distributed mode, with one master and 3 slaves.
-
Ingestion Layer (Kafka): The raw data is consumed by a message broker, configured in publish-subscribe mode.
- Each cab ID is assigned a separate key in order to preserve the temporal ordering of data for each cab (kafka guarantees in order delivery within each partition, and all data for a particular key would reside in one partition).
- All keys are published into a common topic. Related files: producer.py, kafka_consumer.py.
-
Batch Layer (HDFS, Hadoop): A kafka consumer stores the data into HDFS. Additional columns are added to the dataset to generate metrics as described in the ensuing section. This is accomplished using Hive (and MrJob). Following this, tables representing the aggregate views for serving queries at the user end are generated using Hive.
-
Serving Layer (HBase): Datastore tables store the aggregate views for hour of day, day of week and individual cab profiles as generated by Hive. The table schema is optimized for quick access, by storing the hours as columns and the totals for each day in the same row (this way, hourly and daily profiles can be served from the same table/rows).
-
Speed Layer (Storm): The topology for processing real-time data comprises of a kafka-spout and a bolt (with tick interval frequency of 5 sec). The data is filtered to only store currently available (unoccupied) cabs into HBase. In order to serve queries with low latency, all the data is stored in one row (maximum possible columns = number of cabs = 500). For future work, the data can be stored with the key as city, so that all cabs pertaining to a city can be retrieved via one row. If the number of cabs is large, further breakdown on city_zipcode (as key) will enable quick access, while retaining the advantage of quick row scans in HBase.
-
Front-end (Flask): The cab locations are rendered on Google Maps and updated at 2 sec interval via AJAX. Historical data is represented as bar and line charts. Realted files: views.py, batch.js, map.js.
-
Libraries and APIs: Happybase, Pyleus, Kafka-python
#Data Transformations Following metrics are computed via a MapReduce operation on the raw dataset (MrJob):
- Miles travelled (based on latitude and longitude)
- Related Files: mr_hourly_job.py
The resulting table is aggregated using Hive to enable batch queries such as:
- Time of day profile of pickups, dropoffs, miles travelled
- Day of the week profile of metrics
- Related files: create_aggregates.q
- Some additional metrics calculated via hive: Pickup and Dropoff events
The windowing operation in Hive is used for translating the continous time series data (by cab) into tables representing trips and associated durations.
- The information pertaining to individual trips is extracted via filtering on pickup and dropoff events (1 million trips)
- Max idle time per day, per cab identifies potential drive shifts (contiguous block of idle time of driver)
- Average trip times are also calculated
- Related files: create_picks_drops.q, create_agg_trips.q, create_trips.q
Table below displays the transformed data: tripID (cabID_timestamp), day, month, year, idle time (secs), idle time (hours)
Hive Workflow:
#Schemas
Batch Schema: The key for batch storage is organized as yyyy_month_dayofweek. Each column represents an hour and the cells contain metrics for the hour. An additional column stores aggregate metrics for the whole day. This allows the same table to service two types of queries: hour of day and day of week profiles.
Realtime Schema: The realtime schema represents a city for each row (since there is only one city for the current data base, it has one row). The columns represent cabID (the ones that are available as filtered by Storm). The cells contain latitude longitude data.
Streaming Data
- The incoming data is filtered in real-time (simulated) based on occupancy to show available cabs. The storm topology comprises of one spout (kafka integrated) and one bolt. The bolt operates on a tick interval of 5 sec to collect data for 5 seconds before pushing it to HBase.
- Related files: stormBolt.py, cab_topology.yaml
#Live Demo: A Live Demo of the project is available here: www.mapmycab.org A snap shot of the map with cabs:
#Presentation Deck The presentation slides are available here: www.mapmycab.org/aboutme
#Instructions to Run this Pipeline
Install python packages:
sudo pip install kafka-python happybase pyleus mrjob
Run the Kafka producer / consumer:
python kafka/producer.py
python kafka/kafka_consumer.py
Run MrJob:
python mr_hourly_job.py -r hadoop --hadoop-bin /usr/bin/hadoop hdfs:///<input file path> -o <output file path>
Run Hive Scripts
hive -f <filename>
Build storm topology:
pyleus build cab_topology.yaml
Submit pyleus topology:
pyleus submit -n 54.153.51.200 cab_topology.jar