How to Efficiently Monitor OPC UA Server Variable Nodes with Python

Takahiro Iwasa
2 min read
OPC-UA Python
Requirements
Install opcua-asyncio with the following command:
pip install asyncua
Python Scripts
import asyncioimport random
from asyncua import Server
ENDPOINT = 'opc.tcp://localhost:4840'NAMESPACE = 'http://examples.freeopcua.github.io'
async def main() -> None: # Start a server. server = Server() await server.init() server.set_endpoint(ENDPOINT) idx = await server.register_namespace(NAMESPACE) await server.start() print(f'Server started: {server}')
# Create a node. myobj = await server.get_objects_node().add_object(idx, 'MyObject') myvar = await myobj.add_variable(idx, 'MyVariable', 1) await myvar.set_writable()
# Write a new value every second. while True: await myvar.write_value(random.randint(1, 100)) await asyncio.sleep(1)
if __name__ == '__main__': asyncio.run(main())
import asyncio
from asyncua import Client, Nodefrom asyncua.common.subscription import DataChangeNotif, SubHandler
ENDPOINT = 'opc.tcp://localhost:4840'NAMESPACE = 'http://examples.freeopcua.github.io'
class MyHandler(SubHandler): def __init__(self): self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None: self._queue.put_nowait([node, value, data]) print(f'Data change notification was received and queued.')
async def process(self) -> None: try: while True: # Get data in a queue. [node, value, data] = self._queue.get_nowait() path = await node.get_path(as_string=True)
# *** Write your processing code ***
print(f'New value {value} of "{path}" was processed.')
except asyncio.QueueEmpty: pass
async def main() -> None: async with Client(url=ENDPOINT) as client: # Get a variable node. idx = await client.get_namespace_index(NAMESPACE) node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])
# Subscribe data change. handler = MyHandler() subscription = await client.create_subscription(period=0, handler=handler) await subscription.subscribe_data_change(node)
# Process data change every 100ms while True: await handler.process() await asyncio.sleep(0.1)
if __name__ == '__main__': asyncio.run(main())
Testing
Start the server with:
$ python server.pyServer started: OPC UA Server(opc.tcp://localhost:4840)
Start the client with:
$ python client.pyData change notification was received and queued.New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.Data change notification was received and queued.New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.Data change notification was received and queued.New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed....