Kinesis Shard Separation Based on AWS IoT Topic Payload

Kinesis Shard Separation Based on AWS IoT Topic Payload

Takahiro Iwasa
Takahiro Iwasa
3 min read
IoT Kinesis

When streaming data from AWS IoT Core to Kinesis Data Streams, it may be beneficial to separate the Kinesis shards based on specific value of the topic payloads.

Prerequisites

Install the following on you computer.

Creating SAM Application

Directory Structure

/
|-- src/
|   |-- __init__.py
|   |-- lambda_function.py
|   `-- requirements.txt
`-- template.yaml

AWS SAM Template

An AWS IoT topic rule specifies PartitionKey: ${customer_id}. (line 37)

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  Lambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./src/
      Events:
        Kinesis:
          Type: Kinesis
          Properties:
            BatchSize: 100
            BisectBatchOnFunctionError: true
            Enabled: true
            StartingPosition: LATEST
            Stream: !GetAtt Kinesis.Arn
      FunctionName: aws_iot_kinesis_partition_lambda
      Handler: lambda_function.lambda_handler
      Role: !GetAtt IamRole.Arn
      Runtime: python3.8

  Kinesis:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: aws_iot_kinesis_partition_stream
      RetentionPeriodHours: 24
      ShardCount: 1

  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: aws_iot_kinesis_partition_topic_rule
      TopicRulePayload:
        Actions:
          - Kinesis:
              PartitionKey: ${customer_id}
              RoleArn: !GetAtt IamRole.Arn
              StreamName: aws_iot_kinesis_partition_stream
        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'

  IamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
          - Effect: Allow
            Principal:
              Service: iot.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:DescribeStream
                  - kinesis:PutRecord
                Resource:
                  - !GetAtt Kinesis.Arn
          PolicyName: policy

  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
      RetentionInDays: 1

Python Script

requirements.txt

Leave it empty. The AWS Lambda runtime environment has boto3 installed by default, so there is no need to include it in your requirements.txt.

lambda_function.py

The following script prints data streamed by Kinesis Data Streams. It contains a partition key.

import json

import boto3


def lambda_handler(event, context):
    for record in event['Records']:
        print(json.dumps(record['kinesis']))

Build and Deploy

Build and deploy with the following command.

sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM

Testing

Sending Test Data to AWS IoT Topic

Send the payload with the following command.

aws iot-data publish \
  --topic aws-iot-kinesis-partition-topic \
  --payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

Checking Lambda Logs

You should see a log record including "partitionKey": "1", specified by customer_id.

Cleaning Up

Clean up the provisioned AWS resources with the following command.

sam delete --stack-name aws-iot-kinesis-partition-lambda

Conclusion

There should be some cases in which it may be beneficial to separating Kinesis Data Streams shards based on the specific value of topic payloads.

I hope you will find this post useful.

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.