How to Stream OPC UA Data to Kinesis Using AWS SiteWise Edge Gateway

How to Stream OPC UA Data to Kinesis Using AWS SiteWise Edge Gateway

Takahiro Iwasa
Takahiro Iwasa
5 min read
Greengrass IoT Kinesis OPC-UA SiteWise

This note describes how to stream OPC UA data to Kinesis Data Streams through SiteWise Edge Gateway.

Overview Diagram

Building Backend

OPC UA Server

To create a dummy OPC UA server, use the opcua-asyncio Python library.

Install the required package:

Terminal window
pip install asyncua

Download the example script (server-minimal.py) to your EC2 instance:

Terminal window
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py

Run the script:

Terminal window
python server-minimal.py

Greengrass V2 Core Device

Set up a Greengrass V2 Core Device following the official documentation. In this note, setting it up is not described.

After installation, perform the following:

  • Deploy the aws.greengrass.StreamManager component.
  • Add the IAM policy below to the Token Exchange Role to allow data transmission to Kinesis Data Streams:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
}
]
}

SiteWise Edge Gateway

Set up a SiteWise Edge Gateway.

Choose Advanced setup.

Step 1

Data processing pack is not required.

Step 2

Skip Configure publisher step.

Step 3

Specify the local endpoint of your OPC UA server and set the Greengrass stream name (e.g., SiteWise_Stream_Kinesis).

Step 4

Step 4

Step 4

Review the configuration and press the Create button.

Step 5

Kinesis Data Stream

Create a Kinesis Data Stream as the destination for your OPC UA data.

Kinesis Data Stream Setup

Kinesis Data Firehose

To persist the streamed data, configure a Kinesis Data Firehose with the following:

  • Source: The Kinesis Data Stream created above.
  • Destination: An S3 bucket.

Kinesis Firehose Setup

Creating Greengrass Component

To stream data from the Greengrass Stream to Kinesis Data Streams, you need to create a custom Greengrass component. For detailed information about developing Greengrass components, refer to the official documentation.

Directory Structure

Organize the component’s files in the following structure:

Terminal window
- kinesis_data_stream.py
- recipe.yaml
- requirements.txt
- stream_manager_sdk.zip

Writing Component

ℹ️ Note

The stream_manager_sdk.zip file should contain the necessary SDK for your Greengrass component. Refer to the AWS Greengrass Stream Manager SDK for Python for additional details and sample code.

Create a requirements.txt. The stream-manager library is essential for interacting with Greengrass Stream Manager.

requirements.txt
cbor2~=5.4.2
stream-manager==1.1.1

Create kinesis_data_stream.py with the following code. This script leverages the Greengrass Stream Manager SDK to stream data into a Kinesis Data Stream.

kinesis_data_stream.py
"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""
import argparse
import asyncio
import logging
import time
from stream_manager import (
ExportDefinition,
KinesisConfig,
MessageStreamDefinition,
ReadMessagesOptions,
ResourceNotFoundException,
StrategyOnFull,
StreamManagerClient,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def main(
stream_name: str,
kinesis_stream_name: str,
batch_size: int = None
):
try:
# Create a client for the StreamManager
client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=batch_size,
)]
)
client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
while True:
time.sleep(1)
except asyncio.TimeoutError:
logger.exception("Timed out while executing")
except Exception:
logger.exception("Exception while running")
finally:
# Always close the client to avoid resource leaks
if client:
client.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
parser.add_argument('--kinesis-stream', required=True)
parser.add_argument('--batch-size', required=False, type=int, default=500)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
logger.info(f'args: {args.__dict__}')
main(args.greengrass_stream, args.kinesis_stream, args.batch_size)

Create a recipe.yaml file with the following content. In this note, the component name is jp.co.xyz.StreamManagerKinesis.

This recipe includes the following configurable parameters in ComponentConfiguration (lines 14-16):

  • GreengrassStream: Name of the Greengrass stream
  • KinesisStream: Kinesis Data Stream name to receive OPC UA data
  • BatchSize: Batch size for data transfer (minimum: 1, maximum: 500)
recipe.yaml
# Replace $ArtifactsS3Bucket with your value to complete component registration.
RecipeFormatVersion: 2020-01-25
ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
aws.greengrass.StreamManager:
VersionRequirement: '^2.0.0'
ComponentConfiguration:
DefaultConfiguration:
GreengrassStream: SiteWise_Stream_Kinesis
KinesisStream: ''
BatchSize: 100 # minimum 1, maximum 500
Manifests:
- Platform:
os: linux
Lifecycle:
Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
Run: |
export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
--greengrass-stream {configuration:/GreengrassStream} \
--kinesis-stream {configuration:/KinesisStream} \
--batch-size {configuration:/BatchSize}
Artifacts:
- URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
Unarchive: ZIP

Uploading Component Artifact

Archive the component files and upload them to your S3 bucket using the following commands:

Terminal window
S3_BUCKET=<YOUR_BUCKET_NAME>
VERSION=1.0.0
zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip

Registering Component

To register the component:

  1. Copy the contents of the recipe.yaml file created earlier.
  2. Replace $ArtifactsS3Bucket with the actual S3 bucket name where the component artifact resides.

Component Registration

Deploying Component

Click Deploy.

Deploy

Click Configure component.

Configure Component

Update the component’s configuration with the required details. Set the KinesisStream parameter to the Kinesis Data Stream name you created earlier.

Update Configuration

Review the configuration and deploy the component.

Deploy Component

Once the deployment is complete, the component will stream data from the Greengrass Stream to the specified Kinesis Data Stream.

Testing

To verify the setup, check the objects in your S3 bucket using the following command:

Terminal window
aws s3 cp s3://<YOUR_BUCKET_NAME>/... ./
ℹ️ Note

The example below is formatted for better readability.

{
"propertyAlias": "/MyObject/MyVariable",
"propertyValues": [
{
"value": {
"doubleValue": 7.699999999999997
},
"timestamp": {
"timeInSeconds": 1661581962,
"offsetInNanos": 9000000
},
"quality": "GOOD"
}
]
}

If the data appears as expected, the system is successfully streaming OPC UA data to the Kinesis Data Stream and persisting it in S3.

Takahiro Iwasa

Takahiro Iwasa

Software Developer
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Japan AWS Top Engineers 2020-2023.