Kinesis Shard Separation Using AWS IoT Topic Payloads

Kinesis Shard Separation Using AWS IoT Topic Payloads

Takahiro Iwasa
Takahiro Iwasa
2 min read
IoT Kinesis

This note describes how to separate Kinesis shards by IoT topic payloads.

Prerequisites

Ensure you have the following installed on your machine:

Building

In the AWS IoT topic rule, PartitionKey: ${customer_id} is specified (line 37).

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
Lambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/
Events:
Kinesis:
Type: Kinesis
Properties:
BatchSize: 100
BisectBatchOnFunctionError: true
Enabled: true
StartingPosition: LATEST
Stream: !GetAtt Kinesis.Arn
FunctionName: aws_iot_kinesis_partition_lambda
Handler: lambda_function.lambda_handler
Role: !GetAtt IamRole.Arn
Runtime: python3.8
Kinesis:
Type: AWS::Kinesis::Stream
Properties:
Name: aws_iot_kinesis_partition_stream
RetentionPeriodHours: 24
ShardCount: 1
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: aws_iot_kinesis_partition_topic_rule
TopicRulePayload:
Actions:
- Kinesis:
PartitionKey: ${customer_id}
RoleArn: !GetAtt IamRole.Arn
StreamName: aws_iot_kinesis_partition_stream
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'
IamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:DescribeStream
- kinesis:PutRecord
Resource:
- !GetAtt Kinesis.Arn
PolicyName: policy
LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
RetentionInDays: 1

Implement the following Python script to process data streamed by Kinesis:

lambda_function.py
import json
def lambda_handler(event, context):
for record in event['Records']:
print(json.dumps(record['kinesis']))

Build and deploy the stack:

Terminal window
sam build
sam deploy \
--stack-name aws-iot-kinesis-partition-lambda \
--capabilities CAPABILITY_NAMED_IAM

Testing

Send test payloads to the AWS IoT topic using the following command:

Terminal window
aws iot-data publish \
--topic aws-iot-kinesis-partition-topic \
--payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out

Check the Lambda logs to confirm that the partitionKey matches the customer_id in your payload.

Cleaning Up

Clean up all the AWS resources provisioned during this example with the following command:

Terminal window
sam delete --stack-name aws-iot-kinesis-partition-lambda
Takahiro Iwasa

Takahiro Iwasa

Software Developer
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Japan AWS Top Engineers 2020-2023.