AWS IoT トピックペイロードに基づく Kinesis シャードの分離

AWS IoT トピックペイロードに基づく Kinesis シャードの分離

岩佐 孝浩
岩佐 孝浩
3 min read
IoT Kinesis

AWS IoT Core から Kinesis Data Streams にデータをストリーミングする際、トピックペイロードの特定の値に基づいて Kinesis シャードを分離できます。

前提条件

以下のソフトウェアをインストールしてください。

SAM アプリケーション作成

ディレクトリ構成

/
|-- src/
|   |-- __init__.py
|   |-- lambda_function.py
|   `-- requirements.txt
`-- template.yaml

AWS SAM テンプレート

AWS IoT トピックルールで PartitionKey: ${customer_id} を指定しています(行37)。

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  Lambda:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ./src/
      Events:
        Kinesis:
          Type: Kinesis
          Properties:
            BatchSize: 100
            BisectBatchOnFunctionError: true
            Enabled: true
            StartingPosition: LATEST
            Stream: !GetAtt Kinesis.Arn
      FunctionName: aws_iot_kinesis_partition_lambda
      Handler: lambda_function.lambda_handler
      Role: !GetAtt IamRole.Arn
      Runtime: python3.8

  Kinesis:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: aws_iot_kinesis_partition_stream
      RetentionPeriodHours: 24
      ShardCount: 1

  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: aws_iot_kinesis_partition_topic_rule
      TopicRulePayload:
        Actions:
          - Kinesis:
              PartitionKey: ${customer_id}
              RoleArn: !GetAtt IamRole.Arn
              StreamName: aws_iot_kinesis_partition_stream
        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'

  IamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
          - Effect: Allow
            Principal:
              Service: iot.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                  - kinesis:DescribeStream
                  - kinesis:PutRecord
                Resource:
                  - !GetAtt Kinesis.Arn
          PolicyName: policy

  LogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
      RetentionInDays: 1

Python スクリプト

requirements.txt

空のままにしてください。 AWS Lambda ランタイム環境には、デフォルトで boto3 がインストールされているため、 requirements.txt に含める必要はありません。

lambda_function.py

次のスクリプトは Kinesis Data Streams によってストリーミングされたデータを出力します。 パーティションキーが含まれているはずです。

import json

import boto3


def lambda_handler(event, context):
    for record in event['Records']:
        print(json.dumps(record['kinesis']))

ビルドとデプロイ

以下のコマンドでビルドおよびデプロイしてください。

sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM

テスト

AWS IoT トピックへテストデータ送信

次のコマンドでペイロードを送信してください。

aws iot-data publish \
  --topic aws-iot-kinesis-partition-topic \
  --payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

Lambda ログ確認

customer_id で指定された "partitionKey": "1" を含むログレコードが表示されるはずです。

クリーンアップ

以下のコマンドを使用して、プロビジョニングされた AWS リソースを削除してください。

sam delete --stack-name aws-iot-kinesis-partition-lambda

まとめ

トピックペイロードの特定の値に基づいて、 Kinesis Data Streams シャードを分離することが有益なケースがあるはずです。

この投稿が、お役に立てば幸いです。

岩佐 孝浩

岩佐 孝浩

Software Developer at KAKEHASHI Inc.
AWS を活用したクラウドネイティブ・アプリケーションの要件定義・設計・開発に従事。 株式会社カケハシで、処方箋データ収集の新たな基盤の構築に携わっています。 Japan AWS Top Engineers 2020-2023