Kinesis Data Firehose Dynamic Partitioning を使用して NDJSON を処理する方法
Kinesis Data Firehose が、 Dynamic Partitioning をサポートしました。 これにより、 Lambda 関数を使用して NDJSON (Newline Delimited JSON) にする必要がなくなりました。
AWS リソース作成
以下の内容で CloudFormation テンプレートを作成してください。
重要なポイントは DynamicPartitioningConfiguration
(行13-14) および ProcessingConfiguration
(行17-29) です。
AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ndjson-firehose
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
BucketARN: !GetAtt S3Bucket.Arn
BufferingHints:
IntervalInSeconds: 60
DynamicPartitioningConfiguration:
Enabled: true
Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
ProcessingConfiguration:
Enabled: true
Processors:
- Type: AppendDelimiterToRecord
Parameters:
- ParameterName: Delimiter
ParameterValue: '\\n'
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: '{user_id: .user_id}'
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn
S3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
IAMRoleKinesisFirehose:
Type: AWS::IAM::Role
Properties:
RoleName: ndjson-firehose-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
MaxSessionDuration: 3600
Policies:
- PolicyName: policy1
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:GetTableVersion
- glue:GetTableVersions
Resource:
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !GetAtt S3Bucket.Arn
- !Sub ${S3Bucket.Arn}/*
- Effect: Allow
Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- kms:GenerateDataKey
- kms:Decrypt
Resource:
- !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
Condition:
StringEquals:
kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
StringLike:
kms:EncryptionContext:aws:s3:arn:
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
- arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
- Effect: Allow
Action:
- logs:PutLogEvents
Resource:
- !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*
<YOUR_S3_BUCKET>
を実際の値に置き換えて、以下のコマンドで CloudFormation スタックをデプロイしてください。
aws cloudformation deploy \
--template-file stack.yaml \
--stack-name firehose-ndjson-sample \
--s3-bucket <YOUR_S3_BUCKET> \
--s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
--capabilities CAPABILITY_NAMED_IAM
テスト
以下のコマンドで、2つの JSON レコードを Firehose に取り込んでください。
The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.
$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==
$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==
$ echo '{
"DeliveryStreamName": "ndjson-firehose",
"Records": [
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
{"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
]
}' > input.json
$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
"FailedPutCount": 0,
"Encrypted": false,
...
}
以下のコマンドで S3 バケットのオブジェクトを確認してください。 改行で区切られた2つの JSON レコード - NDJSON が表示されるはずです。
$ aws s3 ls --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/
2022-07-26 23:52:47 69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ aws s3 cp s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
1 {"user_id": 1, "message": "Hello"}
2 {"user_id": 1, "message": "World"}
クリーンアップ
以下のコマンドを使用して、プロビジョニングされた AWS リソースを削除してください。
aws s3 rm --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample
まとめ
Dynamic Partitioning のおかげで、 AWS ユーザーは多くのケース(例:NDJSON の取り扱い)で、 Lambda Function を利用する必要がなくなります。
この投稿が、お役に立てば幸いです。