Kinesis Data Firehose Dynamic Partitioning を使用して NDJSON を処理する方法

Kinesis Data Firehose Dynamic Partitioning を使用して NDJSON を処理する方法

岩佐 孝浩
岩佐 孝浩
4 min read
Firehose Kinesis

Kinesis Data Firehose が、 Dynamic Partitioningサポートしました。 これにより、 Lambda 関数を使用して NDJSON (Newline Delimited JSON) にする必要がなくなりました。

AWS リソース作成

以下の内容で CloudFormation テンプレートを作成してください。 重要なポイントは DynamicPartitioningConfiguration (行13-14) および ProcessingConfiguration (行17-29) です。

AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: ndjson-firehose
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt S3Bucket.Arn
        BufferingHints:
          IntervalInSeconds: 60
        DynamicPartitioningConfiguration:
          Enabled: true
        Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
        ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: '\\n'
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{user_id: .user_id}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
        RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn

  S3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  IAMRoleKinesisFirehose:
    Type: AWS::IAM::Role
    Properties:
      RoleName: ndjson-firehose-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      MaxSessionDuration: 3600
      Policies:
        - PolicyName: policy1
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource:
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3Bucket.Arn
                  - !Sub ${S3Bucket.Arn}/*
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - kms:GenerateDataKey
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                Condition:
                  StringEquals:
                    kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:s3:arn:
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*

<YOUR_S3_BUCKET> を実際の値に置き換えて、以下のコマンドで CloudFormation スタックをデプロイしてください。

aws cloudformation deploy \
  --template-file stack.yaml \
  --stack-name firehose-ndjson-sample \
  --s3-bucket <YOUR_S3_BUCKET> \
  --s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
  --capabilities CAPABILITY_NAMED_IAM

テスト

以下のコマンドで、2つの JSON レコードを Firehose に取り込んでください。

put-record-batch — AWS CLI 2.15.44 Command Reference
awscli.amazonaws.com awscli.amazonaws.com

put-record-batch — AWS CLI 2.15.44 Command Reference

Description¶ Writes multiple data records into a delivery stream in a single call, which can achieve higher throughput per producer than when writing single records. To write single data records into a delivery stream, use PutRecord . Applications using these operations are referred to as producers. Firehose accumulates and publishes a particular metric for a customer account in one minute intervals. It is possible that the bursts of incoming bytes/records ingested to a delivery stream last only for a few seconds. Due to this, the actual spikes in the traffic might not be fully visible in the customer’s 1 minute CloudWatch metrics. For information about service quota, see Amazon Firehose Quota . Each PutRecordBatch request supports up to 500 records. Each record in the request can be as large as 1,000 KB (before base64 encoding), up to a limit of 4 MB for the entire request. These limits cannot be changed. You must specify the name of the delivery stream and the data record when using PutRecord . The data record consists of a data blob that can be up to 1,000 KB in size, and any kind of data. For example, it could be a segment from a log file, geographic location data, website clickstream data, and so on. Firehose buffers records before delivering them to the destination. To disambiguate the data blobs at the destination, a common solution is to use delimiters in the data, such as a newline (\n ) or some other character unique within the data. This allows the consumer application to parse individual data items when reading the data from the destination. The PutRecordBatch response includes a count of failed records, FailedPutCount , and an array of responses, RequestResponses . Even if the PutRecordBatch call succeeds, the value of FailedPutCount may be greater than 0, indicating that there are records for which the operation didn’t succeed. Each entry in the RequestResponses array provides additional information about the processed record. It directly correlates with a record in the request array using the same ordering, from the top to the bottom. The response array always includes the same number of records as the request array. RequestResponses includes both successfully and unsuccessfully processed records. Firehose tries to process all records in each PutRecordBatch request. A single record failure does not stop the processing of subsequent records. A successfully processed record includes a RecordId value, which is unique for the record. An unsuccessfully processed record includes ErrorCode and ErrorMessage values. ErrorCode reflects the type of error, and is one of the following values: ServiceUnavailableException or InternalFailure . ErrorMessage provides more detailed information about the error. If there is an internal server error or a timeout, the write might have completed or it might have failed. If FailedPutCount is greater than 0, retry the request, resending only those records that might have failed processing. This minimizes the possible duplicate records and also reduces the total bytes sent (and corresponding charges). We recommend that you handle any duplicates at the destination. If PutRecordBatch throws ServiceUnavailableException , the API is automatically reinvoked (retried) 3 times. If the exception persists, it is possible that the throughput limits have been exceeded for the delivery stream. Re-invoking the Put API operations (for example, PutRecord and PutRecordBatch) can result in data duplicates. For larger data assets, allow for a longer time out before retrying Put API operations. Data records sent to Firehose are stored for 24 hours from the time they are added to a delivery stream as it attempts to send the records to the destination. If the destination is unreachable for more than 24 hours, the data is no longer available. WarningDon’t concatenate two or more base64 strings to form the data fields of your records. Instead, concatenate the raw data, then perform base64 encoding. See also: AWS API Documentation

The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.

$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==

$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==

$ echo '{
    "DeliveryStreamName": "ndjson-firehose",
    "Records": [
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
    ]
}' > input.json

$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
    "FailedPutCount": 0,
    "Encrypted": false,
    ...
}

以下のコマンドで S3 バケットのオブジェクトを確認してください。 改行で区切られた2つの JSON レコード - NDJSON が表示されるはずです。

$ aws s3 ls --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/
2022-07-26 23:52:47         69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ aws s3 cp s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
     1  {"user_id": 1, "message": "Hello"}
     2  {"user_id": 1, "message": "World"}

クリーンアップ

以下のコマンドを使用して、プロビジョニングされた AWS リソースを削除してください。

aws s3 rm --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample

まとめ

Dynamic Partitioning のおかげで、 AWS ユーザーは多くのケース(例:NDJSON の取り扱い)で、 Lambda Function を利用する必要がなくなります。

この投稿が、お役に立てば幸いです。

岩佐 孝浩

岩佐 孝浩

Software Developer at KAKEHASHI Inc.
AWS を活用したクラウドネイティブ・アプリケーションの要件定義・設計・開発に従事。 株式会社カケハシで、処方箋データ収集の新たな基盤の構築に携わっています。 Japan AWS Top Engineers 2020-2023