IoT Core トピックルールでの Kinesis Firehose アクションにおけるセパレータ設定方法
AWS IoT Core トピックルールで Kinesis Firehose を宛先として指定する際、改行をレコードのセパレータとして設定できます。
AWS リソース作成
以下の内容で CloudFormation テンプレートを作成してください。
重要なポイントは Separator: |+ <NEW_LINE>
(13-14行目)です。
IamFirehose
は、 Kinesis Firehose マネジメントコンソールを使用して自動生成されたデフォルトのものをベースにしています。
AWSTemplateFormatVersion: "2010-09-09"
Resources:
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: topic_rule_firehose_separator_test
TopicRulePayload:
Actions:
- Firehose:
DeliveryStreamName: !Ref Firehose
RoleArn: !GetAtt IamTopicRule.Arn
Separator: |+
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: !Sub
SELECT * FROM 'topic_rule_firehose_separator_test'
Firehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: topic-rule-firehose-separator-test
DeliveryStreamType: DirectPut
S3DestinationConfiguration:
BucketARN: !GetAtt S3.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 5
CompressionFormat: GZIP
# Apache Hive Prefix
ErrorOutputPrefix: "error/!{firehose:error-output-type}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
Prefix: "success/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
RoleARN: !GetAtt IamFirehose.Arn
S3:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: topic-rule-firehose-separator-test
PublicAccessBlockConfiguration:
BlockPublicAcls: TRUE
BlockPublicPolicy: TRUE
IgnorePublicAcls: TRUE
RestrictPublicBuckets: TRUE
IamTopicRule:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: firehose:PutRecord
Resource:
- !GetAtt Firehose.Arn
PolicyName: policy
RoleName: iam-topic-rule
IamFirehose:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref AWS::AccountId
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource: "*"
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3.Arn
- Fn::Sub:
- ${arn}/*
- {arn: !GetAtt S3.Arn}
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/topic-rule-firehose-separator-test
- Effect: Allow
Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
- Effect: Allow
Action:
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
Condition:
StringEquals:
kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
StringLike:
kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
PolicyName: policy
RoleName: iam-firehose
以下のコマンドを使用して、 CloudFormation スタックをデプロイしてください。
aws cloudformation deploy --template template.yaml --stack-name topic-rule-firehose-separator-test --capabilities CAPABILITY_NAMED_IAM
テスト
トピックルールの Kinesis Firehose アクションを確認してください。
aws iot get-topic-rule --rule-name topic_rule_firehose_separator_test
セパレータが適切に設定されているはずです(12行目)。
{
"ruleArn": "arn:aws:iot:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:rule/topic_rule_firehose_separator_test",
"rule": {
"ruleName": "topic_rule_firehose_separator_test",
"sql": "SELECT * FROM 'topic_rule_firehose_separator_test'",
"createdAt": "2020-05-13T10:29:18+09:00",
"actions": [
{
"firehose": {
"roleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/iam-topic-rule",
"deliveryStreamName": "topic-rule-firehose-separator-test",
"separator": "\n"
}
}
],
"ruleDisabled": false,
"awsIotSqlVersion": "2016-03-23"
}
}
AWS IoT MQTT テストクライアントから、トピック topic_rule_firehose_separator_test
に以下の2つのメッセージを連続して発行してください。
aws iot-data publish \
--topic topic_rule_firehose_separator_test \
--payload '{"id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
aws iot-data publish \
--topic topic_rule_firehose_separator_test \
--payload '{"id": 2, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
Kinesis Firehose によってストリーミングされた S3 バケット内のオブジェクトを確認してください。 2つのレコードが含まれているはずです。
# Check an object.
$ aws s3 ls topic-rule-firehose-separator-test --recursive
2020-05-14 11:30:59 68 success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
# Download the object.
$ aws s3 cp s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz ./result.gz
# Check the JSON.
$ gunzip -c result.gz > result.json
$ cat result.json
{"id": 1, "message": "Hello from AWS IoT"}
{"id": 2, "message": "Hello from AWS IoT"}
# Delete the results.
$ rm result.gz result.json
$ aws s3 rm s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
誤った CloudFormation テンプレート
Separator: \n
を指定すると、 CloudFormation は以下のエラーを発生させます。
1 validation error detected: Value '\n' at 'topicRulePayload.actions.1.member.firehose.separator' failed to satisfy constraint: Member must satisfy regular expression pattern: ([\n\t])|(\r\n)|(,)
--- Sun Oct 10 17:31:16 2021 UTC
+++ Sun Oct 10 17:31:16 2021 UTC
@@ -7,8 +7,7 @@
- Firehose:
DeliveryStreamName: !Ref Firehose
RoleArn: !GetAtt IamTopicRule.Arn
- Separator: |+
-
+ Separator: '\n'
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: !Sub
クリーンアップ
以下のコマンドを使用して、プロビジョニングされた AWS リソースを削除してください。
aws cloudformation delete-stack --stack-name topic-rule-firehose-separator-test
まとめ
Separator
に '\n'
を指定するとエラーになるのは、少し厄介だと感じました。
この投稿が、お役に立てば幸いです。