In the following example, I call my function check_data_acknowledge from a synchronous function to subscribe to a node, determine the moment of its modification, and act accordingly.
I just found a starting solution that I will share and update when I finally finish
import asyncio
from asyncua import Node
from asyncua.common.subscription import SubscriptionHandler, SubHandler
from asyncua.sync import Client
from opcua.common.subscription import DataChangeNotif
from prod_declarator.config_line import line_id
from prod_declarator.opcua import opcua_instance
class MyHandler(SubHandler):
def __init__(self):
self._queue = asyncio.Queue()
def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
self._queue.put_nowait([node, value, data])
print(f'Data change notification was received and queued.')
async def process(self) -> None:
try:
while True:
# Get data in a queue.
[node, value, data] = self._queue.get_nowait()
path = node.get_path(as_string=True)
# *** Write your processing code ***
print(f'New value {value} of "{path}" was processed.')
except asyncio.QueueEmpty:
pass
async def check_data_acknowledge(part_declaration_check_packaging_number) -> None:
end_point = line_id[part_declaration_check_packaging_number['line_id']]["end_point"]
time_out = 600000
print(f"End point : {end_point}")
name_space = line_id[part_declaration_check_packaging_number['line_id']]["associatedNameSpaceIndex"]
node_data_acknowledge_line_id = opcua_instance.instanceExigences["data_acknowledge"]
line_name = line_id[part_declaration_check_packaging_number["line_id"]]["lineName"]
node_data_acknowledge = node_data_acknowledge_line_id.replace("LineName", line_name)
client = Client(end_point, timeout=time_out)
try:
print(f"Before connect: {client}")
client.connect()
print(f"Connected: {client}")
# Get a variable node.
node = client.get_node("ns={};s={}".format(name_space, node_data_acknowledge))
# Subscribe data change.
handler = MyHandler()
subscription = client.create_subscription(period=0, handler=handler)
subscription.subscribe_data_change(node)
# Process data change every 100ms
while True:
await handler.process()
await asyncio.sleep(0.1)
except Exception as e:
import traceback
traceback.print_exc()
print(f"An error occurred during connection: {e}")
finally:
print(f"Before disconnect: {client}")
if client is not None:
client.disconnect()
print(f"After disconnect: {client}")
return part_declaration_check_packaging_number
Thank you in advance for your future advice.