Separator to Kinesis Firehose Action in IoT Core Topic Rules using CloudFormation
When specifying Kinesis Firehose as destination in AWS IoT Core topic rules, newlines can be added as record separator.
Creating AWS Resources
Create a CloudFormation template with the following content.
The key point is Separator: |+ <NEW_LINE>
(lines 13-14).
IamFirehose
is based on the default automatically generated using Kinesis Firehose management console.
AWSTemplateFormatVersion: "2010-09-09"
Resources:
TopicRule:
Type: AWS::IoT::TopicRule
Properties:
RuleName: topic_rule_firehose_separator_test
TopicRulePayload:
Actions:
- Firehose:
DeliveryStreamName: !Ref Firehose
RoleArn: !GetAtt IamTopicRule.Arn
Separator: |+
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: !Sub
SELECT * FROM 'topic_rule_firehose_separator_test'
Firehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: topic-rule-firehose-separator-test
DeliveryStreamType: DirectPut
S3DestinationConfiguration:
BucketARN: !GetAtt S3.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 5
CompressionFormat: GZIP
# Apache Hive Prefix
ErrorOutputPrefix: "error/!{firehose:error-output-type}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
Prefix: "success/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
RoleARN: !GetAtt IamFirehose.Arn
S3:
Type: AWS::S3::Bucket
Properties:
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketName: topic-rule-firehose-separator-test
PublicAccessBlockConfiguration:
BlockPublicAcls: TRUE
BlockPublicPolicy: TRUE
IgnorePublicAcls: TRUE
RestrictPublicBuckets: TRUE
IamTopicRule:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: iot.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: firehose:PutRecord
Resource:
- !GetAtt Firehose.Arn
PolicyName: policy
RoleName: iam-topic-rule
IamFirehose:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref AWS::AccountId
Policies:
- PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource: "*"
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3.Arn
- Fn::Sub:
- ${arn}/*
- {arn: !GetAtt S3.Arn}
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/topic-rule-firehose-separator-test
- Effect: Allow
Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
- Effect: Allow
Action:
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
Condition:
StringEquals:
kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
StringLike:
kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
PolicyName: policy
RoleName: iam-firehose
Deploy the CloudFormation stack with the following command.
aws cloudformation deploy --template template.yaml --stack-name topic-rule-firehose-separator-test --capabilities CAPABILITY_NAMED_IAM
Testing
Check the Kinesis Firehose action on the topic rule.
aws iot get-topic-rule --rule-name topic_rule_firehose_separator_test
The separator should be configured on line 12.
{
"ruleArn": "arn:aws:iot:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:rule/topic_rule_firehose_separator_test",
"rule": {
"ruleName": "topic_rule_firehose_separator_test",
"sql": "SELECT * FROM 'topic_rule_firehose_separator_test'",
"createdAt": "2020-05-13T10:29:18+09:00",
"actions": [
{
"firehose": {
"roleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/iam-topic-rule",
"deliveryStreamName": "topic-rule-firehose-separator-test",
"separator": "\n"
}
}
],
"ruleDisabled": false,
"awsIotSqlVersion": "2016-03-23"
}
}
Publish the following two messages in a row to the topic topic_rule_firehose_separator_test
from the AWS IoT MQTT Test Client.
aws iot-data publish \
--topic topic_rule_firehose_separator_test \
--payload '{"id": 1, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
aws iot-data publish \
--topic topic_rule_firehose_separator_test \
--payload '{"id": 2, "message": "Hello from AWS IoT"}' \
--cli-binary-format raw-in-base64-out
Check an object in the S3 bucket which were streamed by Kinesis Firehose. It should contain the two records.
# Check an object.
$ aws s3 ls topic-rule-firehose-separator-test --recursive
2020-05-14 11:30:59 68 success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
# Download the object.
$ aws s3 cp s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz ./result.gz
# Check the JSON.
$ gunzip -c result.gz > result.json
$ cat result.json
{"id": 1, "message": "Hello from AWS IoT"}
{"id": 2, "message": "Hello from AWS IoT"}
# Delete the results.
$ rm result.gz result.json
$ aws s3 rm s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
Incorrect CloudFormation Template
If you specify Separator: \n
, CloudFormation will cause an error with the following message.
1 validation error detected: Value '\n' at 'topicRulePayload.actions.1.member.firehose.separator' failed to satisfy constraint: Member must satisfy regular expression pattern: ([\n\t])|(\r\n)|(,)
--- Sun Oct 10 17:31:16 2021 UTC
+++ Sun Oct 10 17:31:16 2021 UTC
@@ -7,8 +7,7 @@
- Firehose:
DeliveryStreamName: !Ref Firehose
RoleArn: !GetAtt IamTopicRule.Arn
- Separator: |+
-
+ Separator: '\n'
AwsIotSqlVersion: 2016-03-23
RuleDisabled: false
Sql: !Sub
Cleaning Up
Clean up the provisioned AWS resources with the following command.
aws cloudformation delete-stack --stack-name topic-rule-firehose-separator-test
Conclusion
I think that it seems a little awkward to specify '\n'
to Separator
causes the error.
I hope you will find this post useful.