How to Stream OPC UA Data to Kinesis through SiteWise Edge Gateway
This post provides a guide on how to stream OPC UA data to Kinesis Data Streams through SiteWise Edge Gateway.
Overview
In this post, an EC2 instance is used as a dummy OPC UA server.
Creating AWS Resources
Setting Up OPC UA Server
opcua-asyncio
can be used for a dummy OPC UA server.
Download the example Python script (examples/server-minimal.py
) into the EC2 instance and run the script with the following command.
pip install asyncua
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
python server-minimal.py
Setting Up Greengrass V2 Core Device
Set up a Greengrass V2 core device according to the official documentation. This post does not cover the detailed description about the installation.
Additionally do the following:
- Deploying
aws.greengrass.StreamManager
component - Adding the following IAM policy to the Token Exchange Role for sending data to Kinesis Data Streams
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
}
]
}
Setting Up SiteWise Edge Gateway
Set up a SiteWise Edge Gateway according to the following.
Data processing pack is not needed here.
Skip the publisher configuration.
Click Add data source
.
The local endpoint
refers to your OPC UA endpoint, possibly in your on-premise environment.
The Greengrass stream name
will be used in a custom Greengrass component.
The example below uses SiteWise_Stream_Kinesis
.
Review, and click Create
.
Kinesis Data Streams
Create a Kinesis Data Stream.
Kinesis Data Firehose
Create a Kinesis Data Firehose with the following configurations:
- Source: The Kinesis Data Stream created above
- Destination: An S3 bucket in which you want to persist data. You can use the Dynamic Partitioning feature to output NDJSON. You may also refer to my post.
Creating Greengrass Component
To stream data in the Greengrass Stream into the Kinesis Data Streams, you need to create a Greengrass component. For information on developing Greengrass components, please refer to the official documentation.
Directory Structure
/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip
requirements.txt
Create requirements.txt
and run pip install -r requirements.txt
.
stream-manager
is the AWS official library.
cbor2~=5.4.2
stream-manager==1.1.1
Python Script
Create kinesis_data_stream.py
with the following code.
"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""
import argparse
import asyncio
import logging
import time
from stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def main(
stream_name: str,
kinesis_stream_name: str,
batch_size: int = None
):
try:
# Create a client for the StreamManager
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=batch_size,
)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
while True:
time.sleep(1)
except asyncio.TimeoutError:
logger.exception("Timed out while executing")
except Exception:
logger.exception("Exception while running")
finally:
# Always close the client to avoid resource leaks
if client:
client.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
parser.add_argument('--kinesis-stream', required=True)
parser.add_argument('--batch-size', required=False, type=int, default=500)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
logger.info(f'args: {args.__dict__}')
main(args.greengrass_stream, args.kinesis_stream, args.batch_size)
Recipe (recipe.yaml)
Create recipe.yaml
with the following content.
In this example, the component name is jp.co.xyz.StreamManagerKinesis
.
# Replace $ArtifactsS3Bucket with your value to complete component registration.
RecipeFormatVersion: 2020-01-25
ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: '^2.0.0'
ComponentConfiguration:
DefaultConfiguration:
GreengrassStream: SiteWise_Stream_Kinesis
KinesisStream: ''
BatchSize: 100 # minimum 1, maximum 500
Manifests:
- Platform:
os: linux
Lifecycle:
Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
--greengrass-stream {configuration:/GreengrassStream} \
--kinesis-stream {configuration:/KinesisStream} \
--batch-size {configuration:/BatchSize}
Artifacts:
- URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
Unarchive: ZIP
This recipe includes the following configuration in ComponentConfiguration
.
Name | Description |
---|---|
GreengrassStream | Greengrass stream name |
KinesisStream | Kinesis Data Stream name to receive OPC UA data |
BatchSize | Batch size to be sent |
Uploading Component Artifact
Archive the component files and upload the archive to the S3 bucket using the following command.
S3_BUCKET=$YOUR_BUCKET_NAME
VERSION=1.0.0
zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip
Component Registration
Copy the contents of the recipe.yaml
above, and replace $ArtifactsS3Bucket
with the actual value containing the component artifact.
Component Deployment
Click Deploy
.
Click Configure component
.
Update the Greengrass component configuration like the example below.
KinesisStream
is the Kinesis Data Stream name which has been created above.
After review, deploy the component.
Testing
Check S3 objects with the following command.
aws s3 cp s3://$YOUR_S3_BUCKET/... ./
You should see the data like the following.
{
"propertyAlias": "/MyObject/MyVariable",
"propertyValues": [
{
"value": {
"doubleValue": 7.699999999999997
},
"timestamp": {
"timeInSeconds": 1661581962,
"offsetInNanos": 9000000
},
"quality": "GOOD"
},
...
Conclusion
With the AWS design in this post, you can send OPC UA data to Kinesis Data Streams with just a few lines of code.
I hope you will find this post useful.