Integrating Google Pub/Sub with GlassFlow

This tutorial demonstrates how to integrate Google Cloud Pub/Sub with GlassFlow to ingest and transform real-time data.

Google Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages. In this tutorial, you will learn how to process and transform real-time e-commerce order data with GlassFlow and Google Cloud Pub/Sub service. The following is the sequence of events:

  1. A producer of data publishes an order details message to a Pub/Sub topic on Google PubSub service whenever a new order is placed. These order details include information such as order ID, product ID, quantity, and order timestamp.

  2. A subscriber client creates a subscription to that topic and consumes messages from the subscription. While consuming data in real-time, we use GlassFlow to transform it to add additional insights (e.g., estimated delivery times based on order timestamp), and then make it available for further processing or analytics.

The full source code for the project can be found on GitHub.

We are researching creating a built-in connector for the Google Pub/Sub service so that GlassFlow can ingest data directly and push events to the GlassFlow pipeline.

Prerequisites

To complete the tutorial you'll need the following:

  1. Python is installed on your machine.

  2. Download and Install Pip to manage project packages.

  3. GlassFlow CLI tool is installed on your machine.

  4. You created a GlassFlow account.

  5. Follow steps from 1 to 7 under Before you begin section of guidance on Google Pub/Sub get started documentation to install the Google Cloud CLI and create a new Google Cloud project called glassflow-data-pipeline.

Setup project environment

Setup Project

Start by creating a dedicated project folder. Create a directory for this project named glassflow-google-pubsub, where you will place all the necessary files.

mkdir glassflow-google-pubsub
cd glassflow-google-pubsub

Create a new virtual environment

Create a new virtual environment in the same folder and activate that environment:

virtualenv glassflow && source glassflow/bin/activate

Install libraries

Install the GlassFlow, Google Cloud PubSub Python SDKs, and virtual environment package python-dotenvusing pip.

pip install glassflow python-dotenv google-cloud-pubsub

Create a topic and a subscription on Google Pub/Sub

Use the following gcloud pubsub topics create command to create a topic named ecommerce-orders. Don't change the name of the topic, because it's referenced throughout the rest of the tutorial.

gcloud pubsub topics create ecommerce-orders

Use the gcloud pubsub subscriptions create command to create a subscription. Only messages published to the topic after the subscription is created are available to subscriber applications.

gcloud pubsub subscriptions create ecommerce-orders-sub --topic my-topic

Create an environment configuration file

Create a .env file in your project directory with the following content, replacing the placeholders with your actual Google Pub/Sub project_id,topic_id and subscription_id:

PROJECT_ID=glassflow-data-pipeline
TOPIC_ID=ecommerce-orders
SUBSCRIPTION_ID=ecommerce-orders-sub

Publish messages

Simulate publishing order data to the topic ecommerce-orders. You use the Google Cloud SDK to do so. Create a pubsub_publisher.py Python script file and copy and paste the following code:

# pubsub_publisher.py

from google.cloud import pubsub_v1
import json
from dotenv import dotenv_values

config = dotenv_values(".env")
project_id = config.get("PROJECT_ID")
topic_id = config.get("TOPIC_ID")

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

order_data = {
    "order_id": "12345",
    "product_id": "67890",
    "quantity": 2,
    "order_timestamp": "2024-01-01T12:00:00Z",
}

future = publisher.publish(topic_path, data=json.dumps(order_data).encode("utf-8"))

print(f"Published message ID: {future.result()}")

Run the Python script and you will get the published message ID printed on your console:

Published message ID: 10478246062331482

Setup a GlassFlow pipeline

Define a transformation function

Create a Python script transform.py that defines how order data will be transformed. For example, adding an estimated delivery date based on the order timestamp.

# transform.py

import json
from datetime import datetime, timedelta


def handler(data, log):
    log.info("Event:" + json.dumps(data), data=data)

    order_timestamp = datetime.strptime(data["order_timestamp"], "%Y-%m-%dT%H:%M:%SZ")
    # Assume delivery takes 3 days
    estimated_delivery = order_timestamp + timedelta(days=3)
    data["estimated_delivery"] = estimated_delivery.strftime("%Y-%m-%d")
    return data

Note that the handler function is mandatory to implement in your code. Without it, the transformation function will not run successfully.

Create the pipeline in GlassFlow

Use the GlassFlow CLI to create a new pipeline that uses your transformation function.

glassflow pipeline create ecommerceorderprocessing --function=transform.py

Once the pipeline is configured successfully, the pipeline becomes active immediately. You will get a message on the terminal with the Pipeline ID, default Space ID as main, and pipeline Access Token.

Note the default Space ID is main if there are no other spaces in your GlassFlow environment. If other spaces exist, you need to pass Space ID in the above pipeline creation CLI command. glassflow pipeline create analyzewatchfrequency —space-id={your_space_id} --function=transform.py

Add the Pipeline ID, Space ID, and Access Token as environment variables to .env file you created previously:

PROJECT_ID=glassflow-data-pipeline
TOPIC_ID=ecommerce-orders
SUBSCRIPTION_ID=ecommerce-orders-sub
PIPELINE_ID=your_pipeline_id
SPACE_ID=your_space_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Ingest Data from Google Pub/Sub to GlassFlow pipeline

Implement a subscriber client code in Python to pull the messages you published Google Pub/Sub and ingest them into the GlassFlow pipeline:

# pubsub_subscriber.py

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import glassflow
import json
from dotenv import dotenv_values

config = dotenv_values(".env")
project_id = config.get("PROJECT_ID")
subscription_id = config.get("SUBSCRIPTION_ID")
pipeline_id = config.get("PIPELINE_ID")
space_id = config.get("SPACE_ID")
token = config.get("PIPELINE_ACCESS_TOKEN")
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

client = glassflow.GlassFlowClient()
pipeline_client = client.pipeline_client(space_id=space_id,
                                         pipeline_id=pipeline_id,
                                         pipeline_access_token=token)


def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received message: {message}.")

    data = json.loads(message.data.decode("utf-8"))

    # Here you can transform or directly send the data to GlassFlow
    # For example, let's send the data directly
    try:
        # Publish the data to the GlassFlow pipeline
        response = pipeline_client.publish(request_body=data)
        if response.status_code == 200:
            print("Data sent successfully to GlassFlow.")
        else:
            print(f"Failed to send data to GlassFlow: {response.text}")
    except Exception as e:
        print(f"An error occurred: {e}")

    # Acknowledge the message so it's not sent again
    message.ack()


streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Run the Python script and you will see output logs that the messages sent to the GlassFlow pipeline:

Received Message {
  data: b'{"order_id": "12345", "product_id": "67890", "quan...'
  ordering_key: ''
  attributes: {}
}.

Data sent successfully to GlassFlow.

Consume data from the GlassFlow pipeline

Consume transformed data from the GlassFlow pipeline and write to an output file called ecommerce-orders.txt. The pipeline continuously checks for new data from the Google Pub/Sub service, transforms it as needed, and consumes it.

Create a new Python file called glassflow_consumer.pyand copy and paste the following code:

# glassflow_consumer.py

import glassflow
import sys
import time
import json
from dotenv import dotenv_values

config = dotenv_values(".env")

project_id = config.get("PROJECT_ID")
subscription_id = config.get("SUBSCRIPTION_ID")
pipeline_id = config.get("PIPELINE_ID")
space_id = config.get("SPACE_ID")
token = config.get("PIPELINE_ACCESS_TOKEN")


def read_data_from_pipeline(file_path):
    client = glassflow.GlassFlowClient()
    pipeline_client = client.pipeline_client(space_id=space_id,
                                             pipeline_id=pipeline_id,
                                             pipeline_access_token=token)

    with open(file_path, "a+") as f:
        while True:
            try:
                print("Consuming data from GlassFlow pipeline...")
                # consume transformed data from the pipeline
                res = pipeline_client.consume()
                print(res)
                if res.status_code == 200:
                    # get the transformed data as json
                    data = res.body.event
                    print("Data consumed successfully")
                    print(data)
                    f.write(json.dumps(data) + "\n")
            except KeyboardInterrupt:
                print("exiting")
                sys.exit(0)
            time.sleep(0.5)


read_data_from_pipeline("ecommerce-orders.txt")

After running the Python script on another terminal tab (to see side-by-side data producing and consuming process), you will see the transformed data in the console output something like this:

Consuming data from GlassFlow pipeline...
Data consumed successfully
{
   "order_id":"12345",
   "order_timestamp":"2024-01-01T12:00:00Z",
   "product_id":"67890",
   "quantity":2,
   "estimated_delivery":"2024-01-04"
}

Integrating the Google Cloud Pub/Sub with GlassFlow allows you to process real-time data efficiently. Now the transform function is triggered whenever a new message is published to the ecommerce-orders topic on Google Pub/Sub service.

Next

If you need a built-in GlassFlow connector for Google Cloud Pub/Sub service, raise an issue on GitHub.

Last updated

© 2023 GlassFlow