Integrating AWS SQS and S3 with GlassFlow

This tutorial demonstrates how to build an end-to-end serverless streaming pipeline with AWS Simple Queue Service (Amazon SQS), GlassFlow and S3.

Scenario overview

This project assumes that an online marketplace website collects customer feedback through various channels and aggregates these messages into an Amazon SQS queue. The goal is to ingest this data into GlassFlow, analyze the sentiment of each feedback message, categorize the feedback into positive, negative, or neutral, and then aggregate this data to send output to the AWS S3 cloud storage via AWS Kinesis Data Firehose.

The full source code for the project can be found on GitHub.

Project components

The project uses Docker Compose to include all components such as data generator, source, pipeline, and sink services into a single process and build the sample data pipeline with a single command. They are executed in different containers, eliminating the need to start each component separately. The docker compose file starts the following services in Docker containers:

Data Generator

The data generator service is a Python script that simulates the generation of fake customer feedback streaming data. It feeds fake data into AWS SQS.

Pipeline

GlassFlow serves as the central component. GlassFlow uses its SQS source connector to ingest data continuously from SQS, does transformations, and writes the results to Kinesis Data Firehose using a sink connector.

Source

AWS SQS is used in the pipeline as a message broker to send real-time customer feedback events to GlassFlow. GlassFlow reads data from SQS using a source connector service.

Sink

AWS S3 is used as a destination for storing processed streaming feedback data. The transformed and enriched customer feedback data is delivered to an S3 bucket by GlassFlow a sink connector service. S3 retains historical data for further analysis.

Docker containers for source and sink services will be replaced by GlassFlow's built-in connectors soon.

Prerequisites

To complete the tutorial you'll need the following:

AWS Configuration

We use Boto3 (AWS SDK for Python) to send, receive, and delete messages in a queue.

Create IAM user

Before using Boto3, you need to set up authentication credentials for your AWS account using either the IAM Console or the AWS CLI. You can either choose an existing root user or create a new one.

For instructions about how to create a user using the IAM Console, see Creating IAM users. Once the user has been created, see Managing access keys to learn how to create and retrieve the keys used to authenticate the user.

Copy both generated aws_access_key_id and aws_secret_access_key, you will use them when you configure the local credentials file and to set environment variables.

Configure credentials file

Use AWS CLI installed to configure your credentials file by running the below command:

aws configure

Alternatively, you can create the credentials file yourself. By default, its location is ~/.aws/credentials. in MacOS/Linux operating systems At a minimum, the credentials file should specify the access key and secret access key. In this example, the key and secret key for the account are specified in the default profile:

[default]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY

You may also want to add a default region to the AWS configuration file, which is located by default at ~/.aws/config:

[default]
region=us-west-2

Alternatively, you can pass an AWS_REGION name as an environment variable when creating clients and resources.

You have now configured credentials for the default profile as well as a default region to use when creating connections. See Configuration for in-depth configuration sources and options.

Setup project environment

Clone the project

Start by cloning the glassflow-examples GitHub repository to your local machine.

git clone https://github.com/glassflow/glassflow-examples.git

Navigate to the project directory:

cd tutorials/aws-sqs-s3

Create a new virtual environment

Create a new virtual environment in the same folder and activate that environment:

python -m venv .venv && source .venv/bin/activate

Install libraries

Install the AWS Boto3 Python SDK and virtual environment package python-dotenvusing pip.

pip install python-dotenv boto3

Create an environment configuration file

Create a .env file in the project root directory with your AWS credentials and values for the SQS queue, S3 bucket name, and region name:

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=us-west-2
AWS_SQS_QUEUE_NAME=glassflow-data-pipeline
AWS_S3_BUCKET_NAME=product-feedback

You will fill in other variables in the upcoming sections.

Create Amazon SQS [OPTIONAL]

You can use the existing SQS queue or create a new queue on the AWS Management Console or using boto3Python library. Create a new SQS queue named glassflow-data-pipeline using the sample Python script.

After running the Python script, you will get AWS_SQS_QUEUE_URL in a console output. Set the value of the queue URL as another environment variable in the.env file.

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=us-west-2
AWS_S3_BUCKET_NAME=product-feedback
AWS_SQS_QUEUE_NAME=glassflow-data-pipeline
AWS_SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/745335613048/glassflow-data-pipeline

Create Amazon S3 [OPTIONAL]

You can use the existing S3 storage bucket or create a new bucket on the AWS Management Console or using boto3Python library. Create a new S3 bucket named product-feedback using the sample Python script.

Create a GlassFlow pipeline

Define a transformation function

Create a Python script transform.py that defines the logic for analyzing feedback sentiment.

Note that the handler function is mandatory to implement in your code. Without it, the running transformation function will not be successful.

You can also import other Python dependencies (packages) in the transformation function. See supported libraries with GlassFlow.

Create a new space

Open a terminal and create a new space called examples to organize multiple pipelines:

glassflow space create examples

After the space is created successfully, you will get a Space ID in the terminal.

Create a Pipeline

Create a new pipeline in the selected space with the transformation function:

glassflow pipeline create customerfeedbackanalysis --space-id={your_space_id} --function=transform.py

This command initializes the pipeline with a name customerfeedbackanalysis in the examples space and specifies the transformation function transform.py. After running the command, it returns a new Pipeline ID with its Access Token.

The pipeline is now deployed and running on the GlassFlow cloud.

Add the Pipeline ID, Space ID, and Access Token as environment variables to .env file you created previously:

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=us-west-2
AWS_S3_BUCKET_NAME=product-feedback
AWS_SQS_QUEUE_NAME=glassflow-data-pipeline
AWS_SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/745335613048/glassflow-data-pipeline
PIPELINE_ID=your_pipeline_id
SPACE_ID=your_space_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Produce and consume data

Now all set to run the pipeline. Run the pipeline with Docker Compose:

docker compose up

Make sure that all containers are up and running!

If you check the logs of each container, you will see the pipeline retrieves messages from the SQS queue, sends them to the GlassFlow pipeline and GlassFlow writes output to S3 after a successful transformation.

Consumed transformed event from Glassflow and sent to S3 via Firehose:

{
   "customer_id":"78910",
   "order_id":"123456",
   "order_timestamp":"2024-01-01T12:00:00Z",
   "text":"Loved the product! Will definitely recommend to my friends.",
   "sentiment":"positive"
}

It's time to validate the data stored in the S3. Go to the S3 bucket and under the bucket created during the configuration, we can see the data loaded.

At this point, you can download the files to our local machine. Or you can further query, visualize, and derive actionable insights from the stored view metrics data in S3 by connecting to other AWS services such as Amazon Athena, Amazon Redshift, or Amazon QuickSight.

That's it! You have successfully used GlassFlow to process real-time data and deliver your streaming data to the S3 storage.

Next

If you need a built-in GlassFlow connector for AWS SQS service, raise an issue on GitHub.

Last updated

© 2023 GlassFlow