Integrating AWS Kinesis Data Streams and DynamoDB with GlassFlow

This tutorial demonstrates how to build an end-to-end serverless streaming data pipeline with AWS Kinesis Data Streams, GlassFlow, and AWS DynamoDB.

Scenario overview

This project simulates a scenario where Netflix viewing metrics data is monitored and processed in real-time to improve user experience and content delivery. Generated sample data streamed continuously into Amazon Kinesis Data Streams. The goal is to ingest metrics data into GlassFlow, analyze the user reactions based on the watch frequency, categorize the reactions into like, favorite, or dislike then send output to the Amazon DynamoDB.

The full source code for the project can be found on GitHub.

Project components

The project uses Docker Compose to include all components such as data generator, source, pipeline, and sink services into a single process and build the sample data pipeline with a single command. They are executed in different containers, eliminating the need to start each component separately. The docker compose file starts the following services in Docker containers:

Data Generator

The data generator service is a Python script that simulates the generation of fake Netflix viewing metrics which are streamed into Kinesis Data Streams. It also creates a new Kinesis data stream service in your AWS environment if you do not have an existing one.

Pipeline

GlassFlow serves as the central component. GlassFlow uses its Kinesis source connector to ingest data continuously from Kinesis, does transformations, and writes the results to DynamoDB using a sink connector.

Source

Kinesis is used in the pipeline as a streaming data service to send real-time Netflix view events to GlassFlow. GlassFlow reads data from Kinesis using a source connector service.

Sink

DynamoDB is used as a destination for storing processed Netflix view metrics data in the managed NoSQL database. The transformed and enriched data is delivered to the DynamoDB by GlassFlow a sink connector service. Business users can use a reporting interface on top of DynamoDB to gather insights from this metrics data.

Docker containers for source and sink services will be replaced by GlassFlow's built-in connectors soon.

Prerequisites

To complete the tutorial you'll need the following:

AWS Configuration

We use Boto3 (AWS SDK for Python) to send, receive, and delete messages in a queue.

Create IAM user

Before using Boto3, you need to set up authentication credentials for your AWS account using either the IAM Console or the AWS CLI. You can either choose an existing root user or create a new one.

For instructions about how to create a user using the IAM Console, see Creating IAM users. Once the user has been created, see Managing access keys to learn how to create and retrieve the keys used to authenticate the user.

Copy both generated aws_access_key_id and aws_secret_access_key, you will use them when you configure the local credentials file and to set environment variables.

Configure credentials file

Use AWS CLI installed to configure your credentials file by running the below command:

aws configure

Alternatively, you can create the credentials file yourself. By default, its location is ~/.aws/credentials. in MacOS/Linux operating systems At a minimum, the credentials file should specify the access key and secret access key. In this example, the key and secret key for the account are specified in the default profile:

[default]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY

You may also want to add a default region to the AWS configuration file, which is located by default at ~/.aws/config:

[default]
region=us-west-2

Alternatively, you can pass an AWS_REGION name as an environment variable when creating clients and resources.

You have now configured credentials for the default profile as well as a default region to use when creating connections. See Configuration for in-depth configuration sources and options.

Setup project environment

Clone the project

Start by cloning the glassflow-examples GitHub repository to your local machine.

git clone https://github.com/glassflow/glassflow-examples.git

Navigate to the project directory:

cd tutorials/aws-kinesis-dynamodb

Create an environment configuration file

Create a .env file in the project root directory with your AWS credentials and values for the Kinesis stream name, DynamoDB table name, and region name:

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=us-west-2
AWS_KINESIS_STREAM_NAME=netflix-view-metrics
AWS_DYNAMODB_TABLE_NAME=NetflixViewMetrics

You will fill in other variables in the upcoming sections.

Create a GlassFlow pipeline

Define a transformation function

Create a Python script transform.py that defines the logic for analyzing watch frequency from Netflix view metrics:

Note that the handler function is mandatory to implement in your code. Without it, the running transformation function will not be successful.

You can also import other Python dependencies (packages) in the transformation function. See supported libraries with GlassFlow.

Create a new space

Open a terminal and create a new space called examples to organize multiple pipelines:

glassflow space create netflix_metrics

After the space is created successfully, you will get a Space ID in the terminal.

Create a Pipeline

Create a new pipeline in the selected space with the transformation function:

glassflow pipeline create analyze_watch_frequency --space-id={your_space_id} --function=transform.py

This command initializes the pipeline with the name analyze_watch_frequency in the netflix_metrics space and specifies the transformation function transform.py. After running the command, it returns a new Pipeline ID with its Access Token.

The pipeline is now deployed and running on the GlassFlow cloud.

Add the Pipeline ID, Space ID, and Access Token as environment variables to .env file you created previously:

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=us-west-2
AWS_KINESIS_STREAM_NAME=netflix-view-metrics
AWS_DYNAMODB_TABLE_NAME=NetflixViewMetrics
PIPELINE_ID=your_pipeline_id
SPACE_ID=your_space_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Produce and consume data

Now all set to run the pipeline. Run the pipeline with Docker Compose:

docker compose up

Make sure that all containers are up and running!

If you check the logs of each container, you will see the pipeline retrieves stream data from the Kinesis, sends them to the GlassFlow pipeline and GlassFlow writes output to DynamoDB after a successful transformation.

Consumed transformed event from Glassflow and sent to DynamoDB
Record written to DynamoDB:
{
   "channelid":18,
   "etags":"b10730dd-2fa1-414a-bea6-135f95801aa2",
   "genre":"thriller",
   "lastactive":"2024-05-11T15:35:35.328987",
   "title":"James Jimenez",
   "userid":"8fb239be-ea55-4e92-be53-658b702e1a31",
   "watchfrequency":2,
   "reaction":"like"
}

It's time to validate the data stored in the DynamoDB NoSQL database. Go to the DynamoDB service and the table NetflixViewMetricsis created during the configuration, we can see the data is loading continously:

That's it! You have successfully used GlassFlow to process real-time data and deliver your streaming data to the DynamoDB.

Next

If you need a built-in GlassFlow connector for Amazon Kinesis Data Streams or Amazon DynamoDB service, raise an issue on GitHub.

Last updated

© 2023 GlassFlow