Handling NDJSON using Kinesis Data Firehose Dynamic Partitioning
Kinesis Data Firehose recently added support for dynamic partitioning. Developers no longer need Lambda functions to just append new lines for NDJSON (Newline Delimited JSON).
Creating AWS Resources
Create a CloudFormation template with the following content.
Key points are DynamicPartitioningConfiguration
(line 13-14) and ProcessingConfiguration
(line 17-29).
AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ndjson-firehose
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt S3Bucket.Arn
BufferingHints:
IntervalInSeconds: 60
DynamicPartitioningConfiguration:
Enabled: true
Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
ProcessingConfiguration:
Enabled: true
Processors:
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: '\\n'
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{user_id: .user_id}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn
S3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
IAMRoleKinesisFirehose:
Type: AWS::IAM::Role
Properties:
RoleName: ndjson-firehose-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
MaxSessionDuration: 3600
Policies:
- PolicyName: policy1
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3Bucket.Arn
- !Sub ${S3Bucket.Arn}/*
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
Condition:
StringEquals:
kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
StringLike:
kms:EncryptionContext:aws:s3:arn:
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*
Replace <YOUR_S3_BUCKET>
with the actual value and deploy the CloudFormation stack with the following command.
aws cloudformation deploy \
--template-file stack.yaml \
--stack-name firehose-ndjson-sample \
--s3-bucket <YOUR_S3_BUCKET> \
--s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
--capabilities CAPABILITY_NAMED_IAM
Testing
Ingest two JSON records to Firehose with the following command.
The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.
$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==
$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==
$ echo '{
"DeliveryStreamName": "ndjson-firehose",
"Records": [
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
]
}' > input.json
$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
"FailedPutCount": 0,
"Encrypted": false,
...
}
Check objects in the S3 bucket with the following command. You should see the two JSON records separated by a newline - NDJSON.
$ aws s3 ls --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/
2022-07-26 23:52:47 69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ aws s3 cp s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
1 {"user_id": 1, "message": "Hello"}
2 {"user_id": 1, "message": "World"}
Cleaning Up
Clean up the provisioned AWS resources with the following command.
aws s3 rm --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample
Conclusion
Thanks to this dynamic partitioning feature, AWS users do not need to use Lambda functions in many cases, for example, handling NDJSON.
I hope you will find this post useful.