How to Subscribe Data Changes on Variable Nodes in OPC UA Servers

How to Subscribe Data Changes on Variable Nodes in OPC UA Servers

Takahiro Iwasa
Takahiro Iwasa
3 min read
OPC-UA Python

You can subscribe data changes on variable nodes in OPC UA servers using opcua-asyncio. With this feature, you can avoid OPC UA clients from continuously polling for new values.

You can pull an example code used in this post from my GitHub repository.

Requirements

Install opcua-asyncio with the following command.

pip install asyncua

Starting OPC UA Server

Create server.py with the example below. This starts an OPC UA testing server that writes random integer values to a variable node every second. The server endpoint is opc.tcp://localhost:4840 and the variable node name is MyObject/MyVariable.

import asyncio
import random

from asyncua import Server

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


async def main() -> None:
    # Start a server.
    server = Server()
    await server.init()
    server.set_endpoint(ENDPOINT)
    idx = await server.register_namespace(NAMESPACE)
    await server.start()
    print(f'Server started: {server}')

    # Create a node.
    myobj = await server.get_objects_node().add_object(idx, 'MyObject')
    myvar = await myobj.add_variable(idx, 'MyVariable', 1)
    await myvar.set_writable()

    # Write a new value every second.
    while True:
        await myvar.write_value(random.randint(1, 100))
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())

Run the OPC UA testing server with the following command.

$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)

Running OPC UA Client to Subscribe

Create client.py with the example below. This subscribes data changes in the OPC UA testing server, queues the data by datachange_notification and process asynchronously by process.

import asyncio

from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


class MyHandler(SubHandler):
    def __init__(self):
        self._queue = asyncio.Queue()

    def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
        self._queue.put_nowait([node, value, data])
        print(f'Data change notification was received and queued.')

    async def process(self) -> None:
        try:
            while True:
                # Get data in a queue.
                [node, value, data] = self._queue.get_nowait()
                path = await node.get_path(as_string=True)

                # *** Write your processing code ***

                print(f'New value {value} of "{path}" was processed.')

        except asyncio.QueueEmpty:
            pass


async def main() -> None:
    async with Client(url=ENDPOINT) as client:
        # Get a variable node.
        idx = await client.get_namespace_index(NAMESPACE)
        node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])

        # Subscribe data change.
        handler = MyHandler()
        subscription = await client.create_subscription(period=0, handler=handler)
        await subscription.subscribe_data_change(node)

        # Process data change every 100ms
        while True:
            await handler.process()
            await asyncio.sleep(0.1)


if __name__ == '__main__':
    asyncio.run(main())

Run the client with the following command. You should see the data changes like the following.

$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...

Conclusion

Based on the implementation in this post, OPC UA server variable nodes can be efficiently monitored using a Pub/Sub architecture.

I hope you will find this post useful.

Takahiro Iwasa

Takahiro Iwasa

Software Developer at KAKEHASHI Inc.
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Now, building a new prescription data collection platform at KAKEHASHI Inc. Japan AWS Top Engineers 2020-2023.