Configuring Record Separators for Kinesis Firehose in AWS IoT Core

This note describes how to configure Kinesis Firehose record separator in IoT Core topic rules.
Building
The Separator: |+ <NEW_LINE>
configuration on lines 13-14 are important.
AWSTemplateFormatVersion: "2010-09-09"
Resources: TopicRule: Type: AWS::IoT::TopicRule Properties: RuleName: topic_rule_firehose_separator_test TopicRulePayload: Actions: - Firehose: DeliveryStreamName: !Ref Firehose RoleArn: !GetAtt IamTopicRule.Arn Separator: |+
AwsIotSqlVersion: 2016-03-23 RuleDisabled: false Sql: !Sub SELECT * FROM 'topic_rule_firehose_separator_test'
Firehose: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: topic-rule-firehose-separator-test DeliveryStreamType: DirectPut S3DestinationConfiguration: BucketARN: !GetAtt S3.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 5 CompressionFormat: GZIP ErrorOutputPrefix: "error/!{firehose:error-output-type}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/" Prefix: "success/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/" RoleARN: !GetAtt IamFirehose.Arn
S3: Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: topic-rule-firehose-separator-test PublicAccessBlockConfiguration: BlockPublicAcls: TRUE BlockPublicPolicy: TRUE IgnorePublicAcls: TRUE RestrictPublicBuckets: TRUE
IamTopicRule: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: iot.amazonaws.com Action: sts:AssumeRole Policies: - PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: firehose:PutRecord Resource: - !GetAtt Firehose.Arn PolicyName: policy RoleName: iam-topic-rule
IamFirehose: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole Condition: StringEquals: sts:ExternalId: !Ref AWS::AccountId Policies: - PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - glue:GetTable - glue:GetTableVersion - glue:GetTableVersions Resource: "*" - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !GetAtt S3.Arn - Fn::Sub: - ${arn}/* - {arn: !GetAtt S3.Arn} - Effect: Allow Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION% - Effect: Allow Action: - logs:PutLogEvents Resource: - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/topic-rule-firehose-separator-test - Effect: Allow Action: - kinesis:DescribeStream - kinesis:GetShardIterator - kinesis:GetRecords Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME% - Effect: Allow Action: - kms:Decrypt Resource: - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID% Condition: StringEquals: kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com StringLike: kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME% PolicyName: policy RoleName: iam-firehose
Deploy the CloudFormation stack with the following command:
aws cloudformation deploy \ --template template.yaml \ --stack-name topic-rule-firehose-separator-test \ --capabilities CAPABILITY_NAMED_IAM
Run the following command to verify the topic rule configuration:
aws iot get-topic-rule \ --rule-name topic_rule_firehose_separator_test
The separator
configuration should appear on line 12 in the output.
{ "ruleArn": "arn:aws:iot:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:rule/topic_rule_firehose_separator_test", "rule": { "ruleName": "topic_rule_firehose_separator_test", "sql": "SELECT * FROM 'topic_rule_firehose_separator_test'", "createdAt": "2020-05-13T10:29:18+09:00", "actions": [ { "firehose": { "roleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/iam-topic-rule", "deliveryStreamName": "topic-rule-firehose-separator-test", "separator": "\n" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }}
Testing
Publish test messages to the topic topic_rule_firehose_separator_test
:
aws iot-data publish \ --topic topic_rule_firehose_separator_test \ --payload '{"id": 1, "message": "Hello from AWS IoT"}' \ --cli-binary-format raw-in-base64-out
aws iot-data publish \ --topic topic_rule_firehose_separator_test \ --payload '{"id": 2, "message": "Hello from AWS IoT"}' \ --cli-binary-format raw-in-base64-out
Retrieve and inspect the object from the S3 bucket:
# Check an object.$ aws s3 ls topic-rule-firehose-separator-test --recursive2020-05-14 11:30:59 68 success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
# Download the object.$ aws s3 cp s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz ./result.gz
The output should contain the two records, separated by the configured separator.
# Check the JSON.$ gunzip -c result.gz > result.json$ cat result.json{"id": 1, "message": "Hello from AWS IoT"}{"id": 2, "message": "Hello from AWS IoT"}
# Delete the results.$ rm result.gz result.json$ aws s3 rm s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz
Cleaning Up
Clean up all the AWS resources provisioned during this example with the following command:
aws cloudformation delete-stack --stack-name topic-rule-firehose-separator-test