Mobility Project
A practical example of configuring a new data pipeline for a car-sharing (Mobility) project.
The mobility project aims to process real-time ride events from an API to monitor fuel consumption and manage the supply of cars available for sharing. The primary objectives include identifying low-fuel vehicles and finding the nearest fuel station based on GPS coordinates and fuel type. It also discounts drivers based on fuel level and proximity to the closest fuel station.
We'll use the GlassFlow CLI to create a new space and configure our data pipeline.
Prerequisites
We assume that you have already had the following before proceeding with the tutorial:
You created a GlassFlow account.
You installed GlassFlow CLI and logged into your account via the CLI.
Custom Transformation Function
For the mobility project, the function processes real-time ride event data to identify a vehicle with low fuel levels, find the closest fuel station, and calculate discounts for users who need to refuel. It enables the ride-sharing service to encourage drivers to refill vehicles when necessary and optimize fleet management.
Creating a transform function
Create a Python script file transform.py
inside a new mobility
project folder.
Thehandler
function contains all transformation logic where the event data is modified based on specific conditions.
If the vehicle is not electric and its current fuel percentage is below 25%, it calls the
get_nearest_fuel_station
function to find the nearest fuel station via the mock API server.If a fuel station is found, it updates the 'discount' key with details about the discount offered by the fuel station.
In the next steps, we will configure a pipeline on Glassflow with the transform.py
.
Creating the pipeline on GlassFlow
Step 1: Create a new space
Open a terminal and create a new space called examples
to organize multiple pipelines:
After creating the space successfully, you will get a SpaceID in the terminal.
Save the SpaceID for reference. You'll set it as environment variables for the project in the upcoming section.
Step 2: Configuring the Pipeline
Create a new pipeline in the selected space with a transformation function:
This command initializes the pipeline with a name mobilitydemo
in the examples
space and specifies the transformation function transform.py
. After running the command, it returns a new Pipeline ID with its access token.
Save the Pipeline ID and Access Token for reference. You'll set them as environment variables in the upcoming section.
The pipeline is now deployed and running on the GlassFlow cloud.
Step 3: Create an environment configuration file
Add a .env
file in the project directory with the following configuration variables and their values:
Replace your_pipeline_id
, your_space_id
, and your_pipeline_access_token
with appropriate values obtained in the previous steps.
Publish Data
Generate data for the mobility project and publish it to the data pipeline in GlassFlow using the Python SDK.
Install required libraries
Install required libraries including GlassFlow SDK listed in the requirements.txt file using the pip
command in a terminal.
Publish real-time API events to the pipeline
Create a new Python script file called producer_api.py
in your project root directory and insert the code below. This Python script serves as a data producer, fetching mobility events data from a mock API server and publishing it to a GlassFlow pipeline.
Run the script
Run the Python script producer_api.py
This script continuously fetches mock mobility events data from a mock API server and publishes it to the specified GlassFlow pipeline.
Consume Data
Consume transformed data from the mobility project data pipeline in GlassFlow and store it locally on a file. You'll use the GlassFlow Python SDK to interact with the pipeline and retrieve the transformed data in real-time.
Consume transformed data
Create a Python script consumer_file.py
inside the mobility
folder and add the following code:
Run the script
The script will start consuming data continuously from the pipeline and storing it locally on disk. You can see an example of consumed data here. You can check the updates to the data written to the file by running this command in another terminal window
You can extend this functionality to push the consumed data to cloud storage buckets or real-time databases per your project requirements.
See other use cases for complex scenarios.
Last updated