Streamlining NDJSON Processing with Kinesis Firehose Dynamic Partitioning

Kinesis Data Firehose recently introduced support for dynamic partitioning. With this, developers no longer require Lambda functions just to convert to NDJSON (Newline Delimited JSON).
Building
Create a CloudFormation template with the following content.
AWSTemplateFormatVersion: 2010-09-09Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioningResources: KinesisFirehoseDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: ndjson-firehose DeliveryStreamType: DirectPut ExtendedS3DestinationConfiguration: BucketARN: !GetAtt S3Bucket.Arn BufferingHints: IntervalInSeconds: 60 DynamicPartitioningConfiguration: Enabled: true Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/" ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/" ProcessingConfiguration: Enabled: true Processors: - Type: AppendDelimiterToRecord Parameters: - ParameterName: Delimiter ParameterValue: '\\n' - Type: MetadataExtraction Parameters: - ParameterName: MetadataExtractionQuery ParameterValue: '{user_id: .user_id}' - ParameterName: JsonParsingEngine ParameterValue: JQ-1.6 RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn
S3Bucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3 BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true
IAMRoleKinesisFirehose: Type: AWS::IAM::Role Properties: RoleName: ndjson-firehose-role AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole MaxSessionDuration: 3600 Policies: - PolicyName: policy1 PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - glue:GetTable - glue:GetTableVersion - glue:GetTableVersions Resource: - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER% - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER% - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !GetAtt S3Bucket.Arn - !Sub ${S3Bucket.Arn}/* - Effect: Allow Action: - lambda:InvokeFunction - lambda:GetFunctionConfiguration Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER% - Effect: Allow Action: - kms:GenerateDataKey - kms:Decrypt Resource: - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER% Condition: StringEquals: kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com StringLike: kms:EncryptionContext:aws:s3:arn: - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/* - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER% - Effect: Allow Action: - logs:PutLogEvents Resource: - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*
Deploy the CloudFormation stack with the following command:
aws cloudformation deploy \ --template-file stack.yaml \ --stack-name firehose-ndjson-sample \ --s3-bucket <YOUR_S3_BUCKET> \ --s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \ --capabilities CAPABILITY_NAMED_IAM
Testing
Data blobs must be Base64 encoded before being sent described in the official documentation.
The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.
Encode your sample JSON as follows:
$ echo -n '{"user_id": 1, "message": "Hello"}' | base64eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==
$ echo -n '{"user_id": 1, "message": "World"}' | base64eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==
Use the AWS CLI to send data to the Firehose stream:
echo '{ "DeliveryStreamName": "ndjson-firehose", "Records": [ {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="}, {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="} ]}' > input.jsonaws firehose put-record-batch --cli-input-json file://~/input.json
Check the objects in your S3 bucket:
$ aws s3 ls --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/2022-07-26 23:52:47 69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ aws s3 cp s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./download: s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 1 {"user_id": 1, "message": "Hello"} 2 {"user_id": 1, "message": "World"}
Cleaning Up
Clean up all the AWS resources provisioned during this example with the following command:
aws s3 rm --recursive s3://<AWS_REGION>-<AWS_ACCOUNT>-ndjson-s3/aws cloudformation delete-stack --stack-name firehose-ndjson-sample