Kinesis Shard Separation Based on AWS IoT Topic Payload
When streaming data from AWS IoT Core to Kinesis Data Streams, it may be beneficial to separate the Kinesis shards based on specific value of the topic payloads.
Prerequisites
Install the following on you computer.
- AWS SAM CLI
- Python 3.x
Creating SAM Application
Directory Structure
/
|-- src/
| |-- __init__.py
| |-- lambda_function.py
| `-- requirements.txt
`-- template.yaml
AWS SAM Template
An AWS IoT topic rule specifies PartitionKey: ${customer_id}
. (line 37)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
Lambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/
Events:
Kinesis:
Type: Kinesis
Properties:
BatchSize: 100
BisectBatchOnFunctionError: true
Enabled: true
StartingPosition: LATEST
Stream: !GetAtt Kinesis.Arn
FunctionName: aws_iot_kinesis_partition_lambda
Handler: lambda_function.lambda_handler
Role: !GetAtt IamRole.Arn
Runtime: python3.8
Kinesis:
Type: AWS::Kinesis::Stream
Properties:
Name: aws_iot_kinesis_partition_stream
RetentionPeriodHours: 24
ShardCount: 1
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: aws_iot_kinesis_partition_topic_rule
TopicRulePayload:
Actions:
- Kinesis:
PartitionKey: ${customer_id}
RoleArn: !GetAtt IamRole.Arn
StreamName: aws_iot_kinesis_partition_stream
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'
IamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:DescribeStream
- kinesis:PutRecord
Resource:
- !GetAtt Kinesis.Arn
PolicyName: policy
LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
RetentionInDays: 1
Python Script
requirements.txt
Leave it empty.
The AWS Lambda runtime environment has boto3
installed by default, so there is no need to include it in your requirements.txt
.
lambda_function.py
The following script prints data streamed by Kinesis Data Streams. It contains a partition key.
import json
import boto3
def lambda_handler(event, context):
for record in event['Records']:
print(json.dumps(record['kinesis']))
Build and Deploy
Build and deploy with the following command.
sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM
Testing
Sending Test Data to AWS IoT Topic
Send the payload with the following command.
aws iot-data publish \
--topic aws-iot-kinesis-partition-topic \
--payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
Checking Lambda Logs
You should see a log record including "partitionKey": "1"
, specified by customer_id
.
Cleaning Up
Clean up the provisioned AWS resources with the following command.
sam delete --stack-name aws-iot-kinesis-partition-lambda
Conclusion
There should be some cases in which it may be beneficial to separating Kinesis Data Streams shards based on the specific value of topic payloads.
I hope you will find this post useful.