How to Stream OPC UA Data to Kinesis through SiteWise Edge Gateway

How to Stream OPC UA Data to Kinesis through SiteWise Edge Gateway

Takahiro Iwasa
Takahiro Iwasa
5 min read
Greengrass IoT Kinesis OPC-UA SiteWise

This post provides a guide on how to stream OPC UA data to Kinesis Data Streams through SiteWise Edge Gateway.

Overview

In this post, an EC2 instance is used as a dummy OPC UA server.

Creating AWS Resources

Setting Up OPC UA Server

opcua-asyncio can be used for a dummy OPC UA server. Download the example Python script (examples/server-minimal.py) into the EC2 instance and run the script with the following command.

pip install asyncua
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
python server-minimal.py

Setting Up Greengrass V2 Core Device

Set up a Greengrass V2 core device according to the official documentation. This post does not cover the detailed description about the installation.

Additionally do the following:

  • Deploying aws.greengrass.StreamManager component
  • Adding the following IAM policy to the Token Exchange Role for sending data to Kinesis Data Streams
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
        }
    ]
}

Setting Up SiteWise Edge Gateway

Set up a SiteWise Edge Gateway according to the following.

Data processing pack is not needed here.

Skip the publisher configuration.

Click Add data source.

The local endpoint refers to your OPC UA endpoint, possibly in your on-premise environment. The Greengrass stream name will be used in a custom Greengrass component. The example below uses SiteWise_Stream_Kinesis.

Review, and click Create.

Kinesis Data Streams

Create a Kinesis Data Stream.

Kinesis Data Firehose

Create a Kinesis Data Firehose with the following configurations:

  • Source: The Kinesis Data Stream created above
  • Destination: An S3 bucket in which you want to persist data. You can use the Dynamic Partitioning feature to output NDJSON. You may also refer to my post.

Creating Greengrass Component

To stream data in the Greengrass Stream into the Kinesis Data Streams, you need to create a Greengrass component. For information on developing Greengrass components, please refer to the official documentation.

Directory Structure

/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip

requirements.txt

Create requirements.txt and run pip install -r requirements.txt. stream-manager is the AWS official library.

cbor2~=5.4.2
stream-manager==1.1.1

Python Script

Create kinesis_data_stream.py with the following code.

"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""

import argparse
import asyncio
import logging
import time

from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def main(
        stream_name: str,
        kinesis_stream_name: str,
        batch_size: int = None
):
    try:
        # Create a client for the StreamManager
        client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=batch_size,
            )]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

        while True:
            time.sleep(1)

    except asyncio.TimeoutError:
        logger.exception("Timed out while executing")
    except Exception:
        logger.exception("Exception while running")
    finally:
        # Always close the client to avoid resource leaks
        if client:
            client.close()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
    parser.add_argument('--kinesis-stream', required=True)
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    logger.info(f'args: {args.__dict__}')
    main(args.greengrass_stream, args.kinesis_stream, args.batch_size)

Recipe (recipe.yaml)

Create recipe.yaml with the following content. In this example, the component name is jp.co.xyz.StreamManagerKinesis.

# Replace $ArtifactsS3Bucket with your value to complete component registration.

RecipeFormatVersion: 2020-01-25

ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: '^2.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    GreengrassStream: SiteWise_Stream_Kinesis
    KinesisStream: ''
    BatchSize: 100 # minimum 1, maximum 500

Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
        python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
          --greengrass-stream {configuration:/GreengrassStream} \
          --kinesis-stream {configuration:/KinesisStream} \
          --batch-size {configuration:/BatchSize}
    Artifacts:
      - URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
        Unarchive: ZIP

This recipe includes the following configuration in ComponentConfiguration.

NameDescription
GreengrassStreamGreengrass stream name
KinesisStreamKinesis Data Stream name to receive OPC UA data
BatchSizeBatch size to be sent

Uploading Component Artifact

Archive the component files and upload the archive to the S3 bucket using the following command.

S3_BUCKET=$YOUR_BUCKET_NAME
VERSION=1.0.0

zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip

Component Registration

Copy the contents of the recipe.yaml above, and replace $ArtifactsS3Bucket with the actual value containing the component artifact.

Component Deployment

Click Deploy.

Click Configure component.

Update the Greengrass component configuration like the example below. KinesisStream is the Kinesis Data Stream name which has been created above.

After review, deploy the component.

Testing

Check S3 objects with the following command.

aws s3 cp s3://$YOUR_S3_BUCKET/... ./

You should see the data like the following.

{
  "propertyAlias": "/MyObject/MyVariable",
  "propertyValues": [
    {
      "value": {
        "doubleValue": 7.699999999999997
      },
      "timestamp": {
        "timeInSeconds": 1661581962,
        "offsetInNanos": 9000000
      },
      "quality": "GOOD"
    },
...

Conclusion

With the AWS design in this post, you can send OPC UA data to Kinesis Data Streams with just a few lines of code.

I hope you will find this post useful.

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.