Integrating Google Pub/Sub with GlassFlow
This tutorial demonstrates how to integrate Google Cloud Pub/Sub with GlassFlow to ingest and transform real-time data.
Google Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages. In this tutorial, you will learn how to process and transform real-time e-commerce order data with GlassFlow and Google Cloud Pub/Sub service. The following is the sequence of events:
A producer of data publishes an order details message to a Pub/Sub topic on Google PubSub service whenever a new order is placed. These order details include information such as order ID, product ID, quantity, and order timestamp.
A subscriber client creates a subscription to that topic and consumes messages from the subscription. While consuming data in real-time, we use GlassFlow to transform it to add additional insights (e.g., estimated delivery times based on order timestamp), and then make it available for further processing or analytics.
The full source code for the project can be found on GitHub.
We are researching creating a built-in connector for the Google Pub/Sub service so that GlassFlow can ingest data directly and push events to the GlassFlow pipeline.
Prerequisites
To complete the tutorial you'll need the following:
Python is installed on your machine.
Download and Install Pip to manage project packages.
GlassFlow CLI tool is installed on your machine.
You created a GlassFlow account.
Follow steps from 1 to 7 under Before you begin section of guidance on Google Pub/Sub get started documentation to install the Google Cloud CLI and create a new Google Cloud project called
glassflow-data-pipeline
.
Setup project environment
Setup Project
Start by creating a dedicated project folder. Create a directory for this project named glassflow-google-pubsub
, where you will place all the necessary files.
Create a new virtual environment
Create a new virtual environment in the same folder and activate that environment:
Install libraries
Install the GlassFlow, Google Cloud PubSub Python SDKs, and virtual environment package python-dotenv
using pip
.
Create a topic and a subscription on Google Pub/Sub
Use the following gcloud pubsub topics create command to create a topic named ecommerce-orders
. Don't change the name of the topic, because it's referenced throughout the rest of the tutorial.
Use the gcloud pubsub subscriptions create command to create a subscription. Only messages published to the topic after the subscription is created are available to subscriber applications.
Create an environment configuration file
Create a .env
file in your project directory with the following content, replacing the placeholders with your actual Google Pub/Sub project_id
,topic_id
and subscription_id
:
Publish messages
Simulate publishing order data to the topic ecommerce-orders
. You use the Google Cloud SDK to do so. Create a pubsub_publisher.py
Python script file and copy and paste the following code:
Run the Python script and you will get the published message ID printed on your console:
Setup a GlassFlow pipeline
Define a transformation function
Create a Python script transform.py
that defines how order data will be transformed. For example, adding an estimated delivery date based on the order timestamp.
Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not run successfully.
Create the pipeline in GlassFlow
Use the GlassFlow CLI to create a new pipeline that uses your transformation function.
Once the pipeline is configured successfully, the pipeline becomes active immediately. You will get a message on the terminal with the Pipeline ID, default Space ID as main
, and pipeline Access Token.
Note the default Space ID is main
if there are no other spaces in your GlassFlow environment. If other spaces exist, you need to pass Space ID in the above pipeline creation CLI command.
glassflow pipeline create analyzewatchfrequency —space-id={your_space_id} --function=
transform.py
Add the Pipeline ID, Space ID, and Access Token as environment variables to .env
file you created previously:
Ingest Data from Google Pub/Sub to GlassFlow pipeline
Implement a subscriber client code in Python to pull the messages you published Google Pub/Sub and ingest them into the GlassFlow pipeline:
Run the Python script and you will see output logs that the messages sent to the GlassFlow pipeline:
Consume data from the GlassFlow pipeline
Consume transformed data from the GlassFlow pipeline and write to an output file called ecommerce-orders.txt
. The pipeline continuously checks for new data from the Google Pub/Sub service, transforms it as needed, and consumes it.
Create a new Python file called glassflow_consumer.py
and copy and paste the following code:
After running the Python script on another terminal tab (to see side-by-side data producing and consuming process), you will see the transformed data in the console output something like this:
Integrating the Google Cloud Pub/Sub with GlassFlow allows you to process real-time data efficiently. Now the transform function is triggered whenever a new message is published to the ecommerce-orders
topic on Google Pub/Sub service.
Next
If you need a built-in GlassFlow connector for Google Cloud Pub/Sub service, raise an issue on GitHub.
Last updated