SiteWise Edge Gateway を介して OPC UA データを Kinesis にストリームする方法

SiteWise Edge Gateway を介して OPC UA データを Kinesis にストリームする方法

Takahiro Iwasa
Takahiro Iwasa
6 min read
Greengrass IoT Kinesis OPC-UA SiteWise

この投稿では、 OPC UA データを SiteWise Edge Gateway を介して Kinesis Data Streams にストリームする方法について説明します。

概要

この投稿では、 EC2 インスタンスをダミーの OPC UA サーバーとして利用します。

AWS リソース作成

OPC UA Server セットアップ

opcua-asyncio は、ダミーの OPC UA サーバーに使用できます。 EC2 インスタンスにサンプルの Python スクリプト(examples/server-minimal.py)をダウンロードし、次のコマンドでスクリプトを実行してください。

pip install asyncua
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
python server-minimal.py

Greengrass V2 Core Device セットアップ

公式ドキュメントに従って、 Greengrass V2 コアデバイスをセットアップしてください。 この投稿ではインストールの詳細な説明は行いません。

セットアップ後、以下の手順も行ってください。

  • aws.greengrass.StreamManager コンポーネントのデプロイ
  • Kinesis Data Streams にデータを送信するための Token Exchange Role に以下のIAMポリシー追加
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
        }
    ]
}

SiteWise Edge Gateway セットアップ

以下に従って、 SiteWise Edge Gateway をセットアップしてください。

Data processing pack は不要です。

Publisher 設定はスキップしてください。

Add data source をクリックしてください.

local endpoint は OPC UA エンドポイントです。 Greengrass stream name はカスタムの Greengrass コンポーネントで使用されます。 以下の例では、 SiteWise_Stream_Kinesis を使用しています。

レビュー後、 Create をクリックしてください。

Kinesis Data Streams

Kinesis Data Stream を作成してください。

Kinesis Data Firehose

以下の設定で Kinesis Data Firehose を作成してください。

  • Source: 上で作成した Kinesis Data Stream
  • Destination: データを永続化するための S3 バケット。 NDJSON を出力するために Dynamic Partitioning を使用できます。こちらの投稿もご参考ください。

Greengrass Component 作成

Greengrass Stream のデータを Kinesis Data Streams にストリーミングするには、 Greengrass コンポーネントを作成する必要があります。 Greengrass コンポーネントの開発に関する情報は、公式ドキュメントをご参照ください。

ディレクトリ構成

/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip

requirements.txt

requirements.txt を作成し、 pip install -r requirements.txt を実行してください。 stream-manager は AWS 公式ライブラリです。

cbor2~=5.4.2
stream-manager==1.1.1

Python スクリプト

以下のコードで kinesis_data_stream.py を作成してください。

"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""

import argparse
import asyncio
import logging
import time

from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def main(
        stream_name: str,
        kinesis_stream_name: str,
        batch_size: int = None
):
    try:
        # Create a client for the StreamManager
        client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=batch_size,
            )]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

        while True:
            time.sleep(1)

    except asyncio.TimeoutError:
        logger.exception("Timed out while executing")
    except Exception:
        logger.exception("Exception while running")
    finally:
        # Always close the client to avoid resource leaks
        if client:
            client.close()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
    parser.add_argument('--kinesis-stream', required=True)
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    logger.info(f'args: {args.__dict__}')
    main(args.greengrass_stream, args.kinesis_stream, args.batch_size)

Recipe (recipe.yaml)

以下の内容で recipe.yaml を作成してください。 この例では、コンポーネントの名前は jp.co.xyz.StreamManagerKinesis です。

# Replace $ArtifactsS3Bucket with your value to complete component registration.

RecipeFormatVersion: 2020-01-25

ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: '^2.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    GreengrassStream: SiteWise_Stream_Kinesis
    KinesisStream: ''
    BatchSize: 100 # minimum 1, maximum 500

Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
        python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
          --greengrass-stream {configuration:/GreengrassStream} \
          --kinesis-stream {configuration:/KinesisStream} \
          --batch-size {configuration:/BatchSize}
    Artifacts:
      - URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
        Unarchive: ZIP

このレシピには ComponentConfiguration 内の以下の設定が含まれています。

名前説明
GreengrassStreamGreengrass ストリームの名前
KinesisStreamOPC UA データを受信する Kinesis Data Stream の名前
BatchSize送信されるバッチサイズ

Component Artifact アップロード

コンポーネントのファイルをアーカイブし、以下のコマンドを使用して S3 バケットにアップロードしてください。

S3_BUCKET=$YOUR_BUCKET_NAME
VERSION=1.0.0

zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip

Component 登録

上記の recipe.yaml の内容をコピーし、 $ArtifactsS3Bucket を、コンポーネントのアーティファクトを含む実際の値に置き換えてください。

Component デプロイ

Deploy をクリックしてください。

Configure component をクリックしてください。

以下の例のように、 Greengrass コンポーネントの構成を更新してください。 KinesisStream は上で作成した Kinesis Data Stream の名前です。

確認後、コンポーネントをデプロイしてください。

テスト

以下のコマンドで S3 オブジェクトを確認してください。

aws s3 cp s3://$YOUR_S3_BUCKET/... ./

以下のようなデータが確認できるはずです。

{
  "propertyAlias": "/MyObject/MyVariable",
  "propertyValues": [
    {
      "value": {
        "doubleValue": 7.699999999999997
      },
      "timestamp": {
        "timeInSeconds": 1661581962,
        "offsetInNanos": 9000000
      },
      "quality": "GOOD"
    },
...

まとめ

今回の AWS 構成を利用すれば、わずかなコードで OPC UA データを Kinesis Data Streams に送信できます。

この投稿が、お役に立てば幸いです。

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.