How to Subscribe Data Changes on Variable Nodes in OPC UA Servers
You can subscribe data changes on variable nodes in OPC UA servers using opcua-asyncio. With this feature, you can avoid OPC UA clients from continuously polling for new values.
You can pull an example code used in this post from my GitHub repository.
Requirements
Install opcua-asyncio with the following command.
pip install asyncua
Starting OPC UA Server
Create server.py
with the example below.
This starts an OPC UA testing server that writes random integer values to a variable node every second.
The server endpoint is opc.tcp://localhost:4840
and the variable node name is MyObject/MyVariable
.
import asyncio
import random
from asyncua import Server
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
async def main() -> None:
# Start a server.
server = Server()
await server.init()
server.set_endpoint(ENDPOINT)
idx = await server.register_namespace(NAMESPACE)
await server.start()
print(f'Server started: {server}')
# Create a node.
myobj = await server.get_objects_node().add_object(idx, 'MyObject')
myvar = await myobj.add_variable(idx, 'MyVariable', 1)
await myvar.set_writable()
# Write a new value every second.
while True:
await myvar.write_value(random.randint(1, 100))
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
Run the OPC UA testing server with the following command.
$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)
Running OPC UA Client to Subscribe
Create client.py
with the example below.
This subscribes data changes in the OPC UA testing server, queues the data by datachange_notification
and process asynchronously by process
.
import asyncio
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
print(f'Data change notification was received and queued.')
async def process(self) -> None:
try:
while True:
# Get data in a queue.
[node, value, data] = self._queue.get_nowait()
path = await node.get_path(as_string=True)
# *** Write your processing code ***
print(f'New value {value} of "{path}" was processed.')
except asyncio.QueueEmpty:
pass
async def main() -> None:
async with Client(url=ENDPOINT) as client:
# Get a variable node.
idx = await client.get_namespace_index(NAMESPACE)
node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])
# Subscribe data change.
handler = MyHandler()
subscription = await client.create_subscription(period=0, handler=handler)
await subscription.subscribe_data_change(node)
# Process data change every 100ms
while True:
await handler.process()
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
Run the client with the following command. You should see the data changes like the following.
$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...
Conclusion
Based on the implementation in this post, OPC UA server variable nodes can be efficiently monitored using a Pub/Sub architecture.
I hope you will find this post useful.