Kinesis Shard Separation Using AWS IoT Topic Payloads

Takahiro Iwasa
2 min read
IoT Kinesis
This note describes how to separate Kinesis shards by IoT topic payloads.
Prerequisites
Ensure you have the following installed on your machine:
- AWS SAM CLI
- Python 3.x
Building
In the AWS IoT topic rule, PartitionKey: ${customer_id}
is specified (line 37).
AWSTemplateFormatVersion: '2010-09-09'Transform: AWS::Serverless-2016-10-31
Resources: Lambda: Type: AWS::Serverless::Function Properties: CodeUri: ./src/ Events: Kinesis: Type: Kinesis Properties: BatchSize: 100 BisectBatchOnFunctionError: true Enabled: true StartingPosition: LATEST Stream: !GetAtt Kinesis.Arn FunctionName: aws_iot_kinesis_partition_lambda Handler: lambda_function.lambda_handler Role: !GetAtt IamRole.Arn Runtime: python3.8
Kinesis: Type: AWS::Kinesis::Stream Properties: Name: aws_iot_kinesis_partition_stream RetentionPeriodHours: 24 ShardCount: 1
TopicRule: Type: AWS::IoT::TopicRule Properties: RuleName: aws_iot_kinesis_partition_topic_rule TopicRulePayload: Actions: - Kinesis: PartitionKey: ${customer_id} RoleArn: !GetAtt IamRole.Arn StreamName: aws_iot_kinesis_partition_stream AwsIotSqlVersion: 2016-03-23 RuleDisabled: false Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'
IamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole - Effect: Allow Principal: Service: iot.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Policies: - PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - kinesis:GetShardIterator - kinesis:GetRecords - kinesis:DescribeStream - kinesis:PutRecord Resource: - !GetAtt Kinesis.Arn PolicyName: policy
LogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda RetentionInDays: 1
Implement the following Python script to process data streamed by Kinesis:
import json
def lambda_handler(event, context): for record in event['Records']: print(json.dumps(record['kinesis']))
Build and deploy the stack:
sam buildsam deploy \ --stack-name aws-iot-kinesis-partition-lambda \ --capabilities CAPABILITY_NAMED_IAM
Testing
Send test payloads to the AWS IoT topic using the following command:
aws iot-data publish \ --topic aws-iot-kinesis-partition-topic \ --payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \ --cli-binary-format raw-in-base64-out
Check the Lambda logs to confirm that the partitionKey
matches the customer_id
in your payload.
Cleaning Up
Clean up all the AWS resources provisioned during this example with the following command:
sam delete --stack-name aws-iot-kinesis-partition-lambda