AWS IoT トピックペイロードに基づく Kinesis シャードの分離
岩佐 孝浩
3 min read
IoT Kinesis
この記事は、公開後3年以上が経過しています。
AWS IoT Core から Kinesis Data Streams にデータをストリーミングする際、トピックペイロードの特定の値に基づいて Kinesis シャードを分離できます。
前提条件
URL Copied!
以下のソフトウェアをインストールしてください。
- AWS SAM CLI
- Python 3.x
SAM アプリケーション作成
URL Copied!
ディレクトリ構成
URL Copied!
/
|-- src/
| |-- __init__.py
| |-- lambda_function.py
| `-- requirements.txt
`-- template.yaml
AWS SAM テンプレート
URL Copied!
AWS IoT トピックルールで PartitionKey: ${customer_id}
を指定しています(行37)。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
Lambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./src/
Events:
Kinesis:
Type: Kinesis
Properties:
BatchSize: 100
BisectBatchOnFunctionError: true
Enabled: true
StartingPosition: LATEST
Stream: !GetAtt Kinesis.Arn
FunctionName: aws_iot_kinesis_partition_lambda
Handler: lambda_function.lambda_handler
Role: !GetAtt IamRole.Arn
Runtime: python3.8
Kinesis:
Type: AWS::Kinesis::Stream
Properties:
Name: aws_iot_kinesis_partition_stream
RetentionPeriodHours: 24
ShardCount: 1
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: aws_iot_kinesis_partition_topic_rule
TopicRulePayload:
Actions:
- Kinesis:
PartitionKey: ${customer_id}
RoleArn: !GetAtt IamRole.Arn
StreamName: aws_iot_kinesis_partition_stream
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: SELECT * FROM 'aws-iot-kinesis-partition-topic'
IamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:DescribeStream
- kinesis:PutRecord
Resource:
- !GetAtt Kinesis.Arn
PolicyName: policy
LogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: /aws/lambda/aws_iot_kinesis_partition_lambda
RetentionInDays: 1
Python スクリプト
URL Copied!
requirements.txt
URL Copied!
空のままにしてください。
AWS Lambda ランタイム環境には、デフォルトで boto3
がインストールされているため、 requirements.txt
に含める必要はありません。
lambda_function.py
URL Copied!
次のスクリプトは Kinesis Data Streams によってストリーミングされたデータを出力します。 パーティションキーが含まれているはずです。
import json
import boto3
def lambda_handler(event, context):
for record in event['Records']:
print(json.dumps(record['kinesis']))
ビルドとデプロイ
URL Copied!
以下のコマンドでビルドおよびデプロイしてください。
sam build
sam deploy --stack-name aws-iot-kinesis-partition-lambda --capabilities CAPABILITY_NAMED_IAM
テスト
URL Copied!
AWS IoT トピックへテストデータ送信
URL Copied!
次のコマンドでペイロードを送信してください。
aws iot-data publish \
--topic aws-iot-kinesis-partition-topic \
--payload '{"customer_id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
Lambda ログ確認
URL Copied!
customer_id
で指定された "partitionKey": "1"
を含むログレコードが表示されるはずです。
クリーンアップ
URL Copied!
以下のコマンドを使用して、プロビジョニングされた AWS リソースを削除してください。
sam delete --stack-name aws-iot-kinesis-partition-lambda
まとめ
URL Copied!
トピックペイロードの特定の値に基づいて、 Kinesis Data Streams シャードを分離することが有益なケースがあるはずです。
この投稿が、お役に立てば幸いです。