SiteWise Edge Gateway を介して OPC UA データを Kinesis にストリームする方法
この投稿では、 OPC UA データを SiteWise Edge Gateway を介して Kinesis Data Streams にストリームする方法について説明します。
概要
この投稿では、 EC2 インスタンスをダミーの OPC UA サーバーとして利用します。
AWS リソース作成
OPC UA Server セットアップ
opcua-asyncio
は、ダミーの OPC UA サーバーに使用できます。
EC2 インスタンスにサンプルの Python スクリプト(examples/server-minimal.py
)をダウンロードし、次のコマンドでスクリプトを実行してください。
pip install asyncua
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
python server-minimal.py
Greengrass V2 Core Device セットアップ
公式ドキュメントに従って、 Greengrass V2 コアデバイスをセットアップしてください。 この投稿ではインストールの詳細な説明は行いません。
セットアップ後、以下の手順も行ってください。
aws.greengrass.StreamManager
コンポーネントのデプロイ- Kinesis Data Streams にデータを送信するための Token Exchange Role に以下のIAMポリシー追加
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
}
]
}
SiteWise Edge Gateway セットアップ
以下に従って、 SiteWise Edge Gateway をセットアップしてください。
Data processing pack は不要です。
Publisher 設定はスキップしてください。
Add data source
をクリックしてください.
local endpoint
は OPC UA エンドポイントです。
Greengrass stream name
はカスタムの Greengrass コンポーネントで使用されます。
以下の例では、 SiteWise_Stream_Kinesis
を使用しています。
レビュー後、 Create
をクリックしてください。
Kinesis Data Streams
Kinesis Data Stream を作成してください。
Kinesis Data Firehose
以下の設定で Kinesis Data Firehose を作成してください。
- Source: 上で作成した Kinesis Data Stream
- Destination: データを永続化するための S3 バケット。 NDJSON を出力するために Dynamic Partitioning を使用できます。こちらの投稿もご参考ください。
Greengrass Component 作成
Greengrass Stream のデータを Kinesis Data Streams にストリーミングするには、 Greengrass コンポーネントを作成する必要があります。 Greengrass コンポーネントの開発に関する情報は、公式ドキュメントをご参照ください。
ディレクトリ構成
/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip
requirements.txt
requirements.txt
を作成し、 pip install -r requirements.txt
を実行してください。
stream-manager
は AWS 公式ライブラリです。
cbor2~=5.4.2
stream-manager==1.1.1
Python スクリプト
以下のコードで kinesis_data_stream.py
を作成してください。
"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""
import argparse
import asyncio
import logging
import time
from stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def main(
stream_name: str,
kinesis_stream_name: str,
batch_size: int = None
):
try:
# Create a client for the StreamManager
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=batch_size,
)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
while True:
time.sleep(1)
except asyncio.TimeoutError:
logger.exception("Timed out while executing")
except Exception:
logger.exception("Exception while running")
finally:
# Always close the client to avoid resource leaks
if client:
client.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
parser.add_argument('--kinesis-stream', required=True)
parser.add_argument('--batch-size', required=False, type=int, default=500)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
logger.info(f'args: {args.__dict__}')
main(args.greengrass_stream, args.kinesis_stream, args.batch_size)
Recipe (recipe.yaml)
以下の内容で recipe.yaml
を作成してください。
この例では、コンポーネントの名前は jp.co.xyz.StreamManagerKinesis
です。
# Replace $ArtifactsS3Bucket with your value to complete component registration.
RecipeFormatVersion: 2020-01-25
ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: '^2.0.0'
ComponentConfiguration:
DefaultConfiguration:
GreengrassStream: SiteWise_Stream_Kinesis
KinesisStream: ''
BatchSize: 100 # minimum 1, maximum 500
Manifests:
- Platform:
os: linux
Lifecycle:
Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
--greengrass-stream {configuration:/GreengrassStream} \
--kinesis-stream {configuration:/KinesisStream} \
--batch-size {configuration:/BatchSize}
Artifacts:
- URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
Unarchive: ZIP
このレシピには ComponentConfiguration
内の以下の設定が含まれています。
名前 | 説明 |
---|---|
GreengrassStream | Greengrass ストリームの名前 |
KinesisStream | OPC UA データを受信する Kinesis Data Stream の名前 |
BatchSize | 送信されるバッチサイズ |
Component Artifact アップロード
コンポーネントのファイルをアーカイブし、以下のコマンドを使用して S3 バケットにアップロードしてください。
S3_BUCKET=$YOUR_BUCKET_NAME
VERSION=1.0.0
zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip
Component 登録
上記の recipe.yaml
の内容をコピーし、 $ArtifactsS3Bucket
を、コンポーネントのアーティファクトを含む実際の値に置き換えてください。
Component デプロイ
Deploy
をクリックしてください。
Configure component
をクリックしてください。
以下の例のように、 Greengrass コンポーネントの構成を更新してください。
KinesisStream
は上で作成した Kinesis Data Stream の名前です。
確認後、コンポーネントをデプロイしてください。
テスト
以下のコマンドで S3 オブジェクトを確認してください。
aws s3 cp s3://$YOUR_S3_BUCKET/... ./
以下のようなデータが確認できるはずです。
{
"propertyAlias": "/MyObject/MyVariable",
"propertyValues": [
{
"value": {
"doubleValue": 7.699999999999997
},
"timestamp": {
"timeInSeconds": 1661581962,
"offsetInNanos": 9000000
},
"quality": "GOOD"
},
...
まとめ
今回の AWS 構成を利用すれば、わずかなコードで OPC UA データを Kinesis Data Streams に送信できます。
この投稿が、お役に立てば幸いです。