How to Stream OPC UA Data to Kinesis Using AWS SiteWise Edge Gateway

This note describes how to stream OPC UA data to Kinesis Data Streams through SiteWise Edge Gateway.
Building Backend
OPC UA Server
To create a dummy OPC UA server, use the opcua-asyncio
Python library.
Install the required package:
pip install asyncua
Download the example script (server-minimal.py
) to your EC2 instance:
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
Run the script:
python server-minimal.py
Greengrass V2 Core Device
Set up a Greengrass V2 Core Device following the official documentation. In this note, setting it up is not described.
After installation, perform the following:
- Deploy the
aws.greengrass.StreamManager
component. - Add the IAM policy below to the Token Exchange Role to allow data transmission to Kinesis Data Streams:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME" } ]}
SiteWise Edge Gateway
Set up a SiteWise Edge Gateway.
Choose Advanced setup
.
Data processing pack
is not required.
Skip Configure publisher
step.
Specify the local endpoint of your OPC UA server and set the Greengrass stream name (e.g., SiteWise_Stream_Kinesis
).
Review the configuration and press the Create
button.
Kinesis Data Stream
Create a Kinesis Data Stream as the destination for your OPC UA data.
Kinesis Data Firehose
To persist the streamed data, configure a Kinesis Data Firehose with the following:
- Source: The Kinesis Data Stream created above.
- Destination: An S3 bucket.
Creating Greengrass Component
To stream data from the Greengrass Stream to Kinesis Data Streams, you need to create a custom Greengrass component. For detailed information about developing Greengrass components, refer to the official documentation.
Directory Structure
Organize the component’s files in the following structure:
- kinesis_data_stream.py- recipe.yaml- requirements.txt- stream_manager_sdk.zip
Writing Component
The stream_manager_sdk.zip
file should contain the necessary SDK for your Greengrass component. Refer to the AWS Greengrass Stream Manager SDK for Python for additional details and sample code.
Create a requirements.txt
. The stream-manager library is essential for interacting with Greengrass Stream Manager.
cbor2~=5.4.2stream-manager==1.1.1
Create kinesis_data_stream.py
with the following code. This script leverages the Greengrass Stream Manager SDK to stream data into a Kinesis Data Stream.
"""Script to use Greengrass Stream Manager to stream data to a Kinesis Data StreamSee also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py"""
import argparseimport asyncioimport loggingimport time
from stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ReadMessagesOptions, ResourceNotFoundException, StrategyOnFull, StreamManagerClient,)
logging.basicConfig(level=logging.INFO)logger = logging.getLogger()
def main( stream_name: str, kinesis_stream_name: str, batch_size: int = None): try: # Create a client for the StreamManager client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start try: client.delete_message_stream(stream_name=stream_name) except ResourceNotFoundException: pass
exports = ExportDefinition( kinesis=[KinesisConfig( identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=batch_size, )] ) client.create_message_stream( MessageStreamDefinition( name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports ) )
while True: time.sleep(1)
except asyncio.TimeoutError: logger.exception("Timed out while executing") except Exception: logger.exception("Exception while running") finally: # Always close the client to avoid resource leaks if client: client.close()
def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis') parser.add_argument('--kinesis-stream', required=True) parser.add_argument('--batch-size', required=False, type=int, default=500) return parser.parse_args()
if __name__ == '__main__': args = parse_args() logger.info(f'args: {args.__dict__}') main(args.greengrass_stream, args.kinesis_stream, args.batch_size)
Create a recipe.yaml
file with the following content. In this note, the component name is jp.co.xyz.StreamManagerKinesis
.
This recipe includes the following configurable parameters in ComponentConfiguration
(lines 14-16):
- GreengrassStream: Name of the Greengrass stream
- KinesisStream: Kinesis Data Stream name to receive OPC UA data
- BatchSize: Batch size for data transfer (minimum: 1, maximum: 500)
# Replace $ArtifactsS3Bucket with your value to complete component registration.
RecipeFormatVersion: 2020-01-25
ComponentName: jp.co.xyz.StreamManagerKinesisComponentVersion: 1.0.0ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.ComponentPublisher: selfComponentDependencies: aws.greengrass.StreamManager: VersionRequirement: '^2.0.0'ComponentConfiguration: DefaultConfiguration: GreengrassStream: SiteWise_Stream_Kinesis KinesisStream: '' BatchSize: 100 # minimum 1, maximum 500
Manifests: - Platform: os: linux Lifecycle: Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt Run: | export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \ --greengrass-stream {configuration:/GreengrassStream} \ --kinesis-stream {configuration:/KinesisStream} \ --batch-size {configuration:/BatchSize} Artifacts: - URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip Unarchive: ZIP
Uploading Component Artifact
Archive the component files and upload them to your S3 bucket using the following commands:
S3_BUCKET=<YOUR_BUCKET_NAME>VERSION=1.0.0
zip component.zip kinesis_data_stream.py requirements.txtaws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/rm component.zip
Registering Component
To register the component:
- Copy the contents of the
recipe.yaml
file created earlier. - Replace
$ArtifactsS3Bucket
with the actual S3 bucket name where the component artifact resides.
Deploying Component
Click Deploy
.
Click Configure component
.
Update the component’s configuration with the required details. Set the KinesisStream
parameter to the Kinesis Data Stream name you created earlier.
Review the configuration and deploy the component.
Once the deployment is complete, the component will stream data from the Greengrass Stream to the specified Kinesis Data Stream.
Testing
To verify the setup, check the objects in your S3 bucket using the following command:
aws s3 cp s3://<YOUR_BUCKET_NAME>/... ./
The example below is formatted for better readability.
{ "propertyAlias": "/MyObject/MyVariable", "propertyValues": [ { "value": { "doubleValue": 7.699999999999997 }, "timestamp": { "timeInSeconds": 1661581962, "offsetInNanos": 9000000 }, "quality": "GOOD" } ]}
If the data appears as expected, the system is successfully streaming OPC UA data to the Kinesis Data Stream and persisting it in S3.