OPC UA サーバーの変数ノードにおける値変更をサブスクライブする方法
OPC UA サーバーの変数ノードで値変更をサブスクライブするには、 opcua-asyncio を使用できます。 この機能により、「OPC UA クライアントが新しい値をポーリングする」必要がなくなります。
この投稿のサンプルは、 GitHub リポジトリから取得できます。
前提条件
以下のコマンドを使用して、 opcua-asyncio をインストールしてください。
pip install asyncua
OPC UA サーバー起動
以下の例を使用して、 server.py
を作成してください。
毎秒ランダムな整数値が、変数ノードに書き込まれる OPC UA テストサーバーが起動します。
サーバーのエンドポイントは opc.tcp://localhost:4840
で、変数ノードの名前は MyObject/MyVariable
です。
import asyncio
import random
from asyncua import Server
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
async def main() -> None:
# Start a server.
server = Server()
await server.init()
server.set_endpoint(ENDPOINT)
idx = await server.register_namespace(NAMESPACE)
await server.start()
print(f'Server started: {server}')
# Create a node.
myobj = await server.get_objects_node().add_object(idx, 'MyObject')
myvar = await myobj.add_variable(idx, 'MyVariable', 1)
await myvar.set_writable()
# Write a new value every second.
while True:
await myvar.write_value(random.randint(1, 100))
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
以下のコマンドを使用して、 OPC UA テストサーバーを起動してください。
$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)
OPC UA クライアント実行
以下の例を使用して、 client.py
を作成してください。
OPC UA テストサーバーでデータの変更をサブスクライブし、 datachange_notification
でデータをキューに入れ、 process
で非同期に処理します。
import asyncio
from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler
ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
print(f'Data change notification was received and queued.')
async def process(self) -> None:
try:
while True:
# Get data in a queue.
[node, value, data] = self._queue.get_nowait()
path = await node.get_path(as_string=True)
# *** Write your processing code ***
print(f'New value {value} of "{path}" was processed.')
except asyncio.QueueEmpty:
pass
async def main() -> None:
async with Client(url=ENDPOINT) as client:
# Get a variable node.
idx = await client.get_namespace_index(NAMESPACE)
node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])
# Subscribe data change.
handler = MyHandler()
subscription = await client.create_subscription(period=0, handler=handler)
await subscription.subscribe_data_change(node)
# Process data change every 100ms
while True:
await handler.process()
await asyncio.sleep(0.1)
if __name__ == '__main__':
asyncio.run(main())
以下のコマンドを使用してクライアントを実行してください。 次のようなデータ変更が表示されるはずです。
$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...
まとめ
この記事の実装によって、 OPC UA サーバーの変数ノードを、 Pub/Sub アーキテクチャで効率的に監視できます。
この投稿が、お役に立てば幸いです。