SiteWise Edge Gateway を介して OPC UA データを Kinesis Data Streams にストリーム配信する方法

はじめに
この投稿では、AWS SiteWise Edge Gateway を介して OPC UA データを Kinesis Data Streams にストリーム配信する方法を解説します。デモでは、EC2 インスタンスをダミーの OPC UA サーバーとして構成します。
AWS リソースのセットアップ
OPC UA サーバーのセットアップ
ダミーの OPC UA サーバーを作成するには、opcua-asyncio
Python ライブラリを使用します。以下の手順を実行してください。
- 必要なパッケージをインストールします。
pip install asyncua
- サンプルスクリプト (
server-minimal.py
) を EC2 インスタンスにダウンロードします。
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
- スクリプトを実行します。
python server-minimal.py
Greengrass V2 Core デバイスのセットアップ
公式ドキュメント に従って Greengrass V2 Core デバイス をセットアップします。インストール後、以下を実行してください。
aws.greengrass.StreamManager
コンポーネントをデプロイします。- トークン交換ロールに以下の IAM ポリシーを追加し、Kinesis Data Streams へのデータ送信を許可します。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
}
]
}
SiteWise Edge Gateway のセットアップ
SiteWise Edge Gateway のセットアップを行います。主な手順は次のとおりです。
- ゲートウェイ構成:
Advanced setup
を選択します。
- データ処理パック: 必要ありません。
- パブリッシャー構成: スキップします。
- データソースの追加: OPC UA サーバーの ローカルエンドポイント を指定し、Greengrass ストリーム名 (例:
SiteWise_Stream_Kinesis
) を設定します。
- レビュー: 設定を確認し、
作成
ボタンを押します。
Kinesis Data Stream の作成
Kinesis Data Stream を作成し、OPC UA データの宛先として使用します。
Kinesis Data Firehose の設定
ストリーム配信データを永続化するため、以下を使用して Kinesis Data Firehose を設定します。
- ソース: 上記で作成した Kinesis Data Stream
- 宛先: S3 バケット。
NDJSON 形式で出力する場合、動的パーティショニング 機能を使用できます。NDJSON の取り扱いについて詳しくは このブログ記事 をご参照ください。
Greengrass コンポーネントの作成
Greengrass Stream から Kinesis Data Streams にデータをストリーム配信するには、カスタム Greengrass コンポーネント を作成する必要があります。公式ドキュメント をご参照ください。
ディレクトリ構造
コンポーネントのファイルを次の構造で整理します。
/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip
requirements.txt
requirements.txt
ファイルを作成し、以下のコマンドを使用して必要な依存関係をインストールします。
pip install -r requirements.txt
stream-manager
ライブラリは、Greengrass Stream Manager と連携するために必須です。
依存関係:
cbor2~=5.4.2
stream-manager==1.1.1
Python スクリプト
以下のコードで kinesis_data_stream.py
を作成してください。このスクリプトは、Greengrass Stream Manager SDK を活用して、データを Kinesis Data Stream にストリーミングします。
"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""
import argparse
import asyncio
import logging
import time
from stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def main(
stream_name: str,
kinesis_stream_name: str,
batch_size: int = None
):
try:
# Create a client for the StreamManager
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=batch_size,
)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
while True:
time.sleep(1)
except asyncio.TimeoutError:
logger.exception("Timed out while executing")
except Exception:
logger.exception("Exception while running")
finally:
# Always close the client to avoid resource leaks
if client:
client.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
parser.add_argument('--kinesis-stream', required=True)
parser.add_argument('--batch-size', required=False, type=int, default=500)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
logger.info(f'args: {args.__dict__}')
main(args.greengrass_stream, args.kinesis_stream, args.batch_size)
注意点
stream_manager_sdk.zip
ファイルには、Greengrass コンポーネントに必要な SDK を含める必要があります。recipe.yaml
ファイルを更新し、コンポーネントの設定を含めてください。
詳細およびサンプルコードについては、Greengrass SDK ドキュメント をご参照ください。
コンポーネントのレシピ (recipe.yaml)
以下の内容で recipe.yaml
ファイルを作成します。この例では、コンポーネント名を jp.co.xyz.StreamManagerKinesis
としています。
# Replace $ArtifactsS3Bucket with your value to complete component registration.
RecipeFormatVersion: 2020-01-25
ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: '^2.0.0'
ComponentConfiguration:
DefaultConfiguration:
GreengrassStream: SiteWise_Stream_Kinesis
KinesisStream: ''
BatchSize: 100 # minimum 1, maximum 500
Manifests:
- Platform:
os: linux
Lifecycle:
Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
--greengrass-stream {configuration:/GreengrassStream} \
--kinesis-stream {configuration:/KinesisStream} \
--batch-size {configuration:/BatchSize}
Artifacts:
- URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
Unarchive: ZIP
このレシピの ComponentConfiguration
には、以下の設定可能なパラメータが含まれています。
Name | Description |
---|---|
GreengrassStream | Greengrass ストリームの名前 |
KinesisStream | OPC UA データを受信する Kinesis Data Stream の名前 |
BatchSize | データ転送のバッチサイズ (最小値: 1, 最大値: 500) |
コンポーネントアーティファクトのアップロード
コンポーネントファイルをアーカイブし、以下のコマンドを使用して S3 バケットにアップロードします。
S3_BUCKET=<YOUR_BUCKET_NAME>
VERSION=1.0.0
zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip
コンポーネント登録
コンポーネントを登録するには、以下の手順を実行してください。
- 作成した
recipe.yaml
ファイルの内容をコピーします。 $ArtifactsS3Bucket
をコンポーネントアーティファクトが存在する S3 バケット名に置き換えます。
コンポーネントのデプロイ
以下の手順に従ってコンポーネントをデプロイします。
- デプロイ をクリックします。
- コンポーネントの構成 をクリックします。
- 必要な詳細を入力してコンポーネントの構成を更新します。
KinesisStream
パラメータには、先ほど作成した Kinesis Data Stream の名前を設定します。
- 構成を確認し、コンポーネントをデプロイします。
デプロイが完了すると、コンポーネントは Greengrass Stream から指定された Kinesis Data Stream へデータをストリーム配信します。
テスト
セットアップを検証するには、以下のコマンドを使用して S3 バケット内のオブジェクトを確認します。
aws s3 cp s3://<YOUR_BUCKET_NAME>/... ./
取得したデータは、以下の例のような形式になっているはずです。
{
"propertyAlias": "/MyObject/MyVariable",
"propertyValues": [
{
"value": {
"doubleValue": 7.699999999999997
},
"timestamp": {
"timeInSeconds": 1661581962,
"offsetInNanos": 9000000
},
"quality": "GOOD"
}
]
}
このようなデータが表示されている場合、システムは OPC UA データを Kinesis Data Stream に正常にストリーム配信し、S3 に永続化できています。
まとめ
この投稿では、EC2 ベースの OPC UA サーバーと AWS SiteWise Edge Gateway を使用して、OPC UA データを Kinesis Data Streams にストリーム配信する手順を説明しました。Greengrass V2、Kinesis、SiteWise などの AWS サービスを活用することで、産業データをスケーラブルなクラウドインフラへ効率的に統合し、永続化することが可能です。
Happy Coding! 🚀