Streamlining NDJSON Processing with Kinesis Firehose Dynamic Partitioning

Streamlining NDJSON Processing with Kinesis Firehose Dynamic Partitioning

Takahiro Iwasa
Takahiro Iwasa
3 min read
Firehose Kinesis

Kinesis Data Firehose recently introduced support for dynamic partitioning. With this, developers no longer require Lambda functions just to convert to NDJSON (Newline Delimited JSON).

Building

Create a CloudFormation template with the following content.

stack.yaml
AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ndjson-firehose
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt S3Bucket.Arn
BufferingHints:
IntervalInSeconds: 60
DynamicPartitioningConfiguration:
Enabled: true
Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
ProcessingConfiguration:
Enabled: true
Processors:
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: '\\n'
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{user_id: .user_id}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn
S3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
IAMRoleKinesisFirehose:
Type: AWS::IAM::Role
Properties:
RoleName: ndjson-firehose-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
MaxSessionDuration: 3600
Policies:
- PolicyName: policy1
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3Bucket.Arn
- !Sub ${S3Bucket.Arn}/*
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
Condition:
StringEquals:
kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
StringLike:
kms:EncryptionContext:aws:s3:arn:
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*

Deploy the CloudFormation stack with the following command:

Terminal window
aws cloudformation deploy \
--template-file stack.yaml \
--stack-name firehose-ndjson-sample \
--s3-bucket <YOUR_S3_BUCKET> \
--s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
--capabilities CAPABILITY_NAMED_IAM

Testing

Data blobs must be Base64 encoded before being sent described in the official documentation.

The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.

Encode your sample JSON as follows:

Terminal window
$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==
$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==

Use the AWS CLI to send data to the Firehose stream:

Terminal window
echo '{
"DeliveryStreamName": "ndjson-firehose",
"Records": [
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
]
}' > input.json
aws firehose put-record-batch --cli-input-json file://~/input.json

Check the objects in your S3 bucket:

Terminal window
$ aws s3 ls --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/
2022-07-26 23:52:47 69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ aws s3 cp s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
1 {"user_id": 1, "message": "Hello"}
2 {"user_id": 1, "message": "World"}

Cleaning Up

Clean up all the AWS resources provisioned during this example with the following command:

Terminal window
aws s3 rm --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample
Takahiro Iwasa

Takahiro Iwasa

Software Developer
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Japan AWS Top Engineers 2020-2023.