How to Efficiently Monitor OPC UA Server Variable Nodes with Python

How to Efficiently Monitor OPC UA Server Variable Nodes with Python

Takahiro Iwasa
Takahiro Iwasa
2 min read
OPC-UA Python

Requirements

Install opcua-asyncio with the following command:

Terminal window
pip install asyncua

Python Scripts

server.py
import asyncio
import random
from asyncua import Server
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
async def main() -> None:
# Start a server.
server = Server()
await server.init()
server.set_endpoint(ENDPOINT)
idx = await server.register_namespace(NAMESPACE)
await server.start()
print(f'Server started: {server}')
# Create a node.
myobj = await server.get_objects_node().add_object(idx, 'MyObject')
myvar = await myobj.add_variable(idx, 'MyVariable', 1)
await myvar.set_writable()
# Write a new value every second.
while True:
await myvar.write_value(random.randint(1, 100))
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
client.py
import asyncio
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
print(f'Data change notification was received and queued.')
async def process(self) -> None:
try:
while True:
# Get data in a queue.
[node, value, data] = self._queue.get_nowait()
path = await node.get_path(as_string=True)
# *** Write your processing code ***
print(f'New value {value} of "{path}" was processed.')
except asyncio.QueueEmpty:
pass
async def main() -> None:
async with Client(url=ENDPOINT) as client:
# Get a variable node.
idx = await client.get_namespace_index(NAMESPACE)
node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])
# Subscribe data change.
handler = MyHandler()
subscription = await client.create_subscription(period=0, handler=handler)
await subscription.subscribe_data_change(node)
# Process data change every 100ms
while True:
await handler.process()
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())

Testing

Start the server with:

Terminal window
$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)

Start the client with:

Terminal window
$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...
Takahiro Iwasa

Takahiro Iwasa

Software Developer
Involved in the requirements definition, design, and development of cloud-native applications using AWS. Japan AWS Top Engineers 2020-2023.