Mobility Project

A practical example of configuring a new data pipeline for a car-sharing (Mobility) project.

The mobility project aims to process real-time ride events from an API to monitor fuel consumption and manage the supply of cars available for sharing. The primary objectives include identifying low-fuel vehicles and finding the nearest fuel station based on GPS coordinates and fuel type. It also discounts drivers based on fuel level and proximity to the closest fuel station.

We'll use the GlassFlow CLI to create a new space and configure our data pipeline.

Prerequisites

We assume that you have already had the following before proceeding with the tutorial:

Custom Transformation Function

For the mobility project, the function processes real-time ride event data to identify a vehicle with low fuel levels, find the closest fuel station, and calculate discounts for users who need to refuel. It enables the ride-sharing service to encourage drivers to refill vehicles when necessary and optimize fleet management.

Creating a transform function

Create a Python script file transform.pyinside a new mobility project folder.

Thehandler function contains all transformation logic where the event data is modified based on specific conditions.

  • If the vehicle is not electric and its current fuel percentage is below 25%, it calls the get_nearest_fuel_station function to find the nearest fuel station via the mock API server.

  • If a fuel station is found, it updates the 'discount' key with details about the discount offered by the fuel station.

In the next steps, we will configure a pipeline on Glassflow with the transform.py.

Creating the pipeline on GlassFlow

Step 1: Create a new space

Open a terminal and create a new space called examples to organize multiple pipelines:

glassflow space create examples

After creating the space successfully, you will get a SpaceID in the terminal.

Save the SpaceID for reference. You'll set it as environment variables for the project in the upcoming section.

Step 2: Configuring the Pipeline

Create a new pipeline in the selected space with a transformation function:

glassflow pipeline create mobilitydemo --space-id={space_id} --function=transform.py

This command initializes the pipeline with a name mobilitydemo in the examples space and specifies the transformation function transform.py. After running the command, it returns a new Pipeline ID with its access token.

Save the Pipeline ID and Access Token for reference. You'll set them as environment variables in the upcoming section.

The pipeline is now deployed and running on the GlassFlow cloud.

Step 3: Create an environment configuration file

Add a .env file in the project directory with the following configuration variables and their values:

PIPELINE_ID=your_pipeline_id
SPACE_ID=your_space_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Replace your_pipeline_id, your_space_id, and your_pipeline_access_token with appropriate values obtained in the previous steps.

Publish Data

Generate data for the mobility project and publish it to the data pipeline in GlassFlow using the Python SDK.

Install required libraries

Install required libraries including GlassFlow SDK listed in the requirements.txt file using the pipcommand in a terminal.

pip install -r requirements.txt

Publish real-time API events to the pipeline

Create a new Python script file called producer_api.py in your project root directory and insert the code below. This Python script serves as a data producer, fetching mobility events data from a mock API server and publishing it to a GlassFlow pipeline.

Run the script

Run the Python script producer_api.py

python producer_api.py

This script continuously fetches mock mobility events data from a mock API server and publishes it to the specified GlassFlow pipeline.

Consume Data

Consume transformed data from the mobility project data pipeline in GlassFlow and store it locally on a file. You'll use the GlassFlow Python SDK to interact with the pipeline and retrieve the transformed data in real-time.

Consume transformed data

Create a Python script consumer_file.py inside the mobility folder and add the following code:

Run the script

python consumer_file.py

The script will start consuming data continuously from the pipeline and storing it locally on disk. You can see an example of consumed data here. You can check the updates to the data written to the file by running this command in another terminal window

tail -f mobility_data_transformed.txt

You can extend this functionality to push the consumed data to cloud storage buckets or real-time databases per your project requirements.

See other use cases for complex scenarios.

Last updated

© 2023 GlassFlow