How to use AsyncModbusTCPClient from pymodbus.client.asynchronous.tcp in a coroutine?

2.8k views Asked by At

Based on the 'Async Asyncio Client Example' of PyModbus I tried to initialise the client in a coroutine. The example in run_with_already_running_loop() is working fine but initialising ModbusClient hangs without timeout or error message when run in coroutine async_read().

#!/usr/bin/env python
import asyncio
import logging
from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient as ModbusClient
from pymodbus.client.asynchronous import schedulers

from threading import Thread
import time
# --------------------------------------------------------------------------- #
# configure the client logging
# --------------------------------------------------------------------------- #

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)

UNIT = 0x01

TCP_IP = '192.168.0.168'
ADDRESS = 40035
COUNTS = 16


async def start_async_test(client):
    rr = await client.read_holding_registers(ADDRESS, COUNTS, unit=UNIT)
    print(rr.registers)


def run_with_already_running_loop():
    """
    An already running loop is passed to ModbusClient Factory
    :return:
    """
    log.debug("Running Async client with asyncio loop already started")
    log.debug("------------------------------------------------------")

    def done(future):
        log.info("future: Done !!!")

    def start_loop(loop):
        """
        Start Loop
        :param loop:
        :return:
        """
        asyncio.set_event_loop(loop)
        loop.run_forever()

    loop = asyncio.new_event_loop()
    t = Thread(target=start_loop, args=[loop])
    t.daemon = True
    # Start the loop
    t.start()
    assert loop.is_running()
    loop, client = ModbusClient(schedulers.ASYNC_IO,
                                host=TCP_IP,
                                loop=loop)
    future = asyncio.run_coroutine_threadsafe(
        start_async_test(client.protocol), loop=loop)
    future.add_done_callback(done)
    while not future.done():
        print('sleep')
        time.sleep(0.2)
    loop.stop()
    log.debug("--------DONE RUN_WITH_ALREADY_RUNNING_LOOP-------------")


async def async_read():
    """
    An already running loop is passed to ModbusClient Factory
    :return:
    """
    log.debug("Running Async client in async function")
    log.debug("------------------------------------------------------")
    loop = asyncio.get_running_loop()
    assert loop.is_running()
    # python hangs when initialising client
    loop, client = ModbusClient(schedulers.ASYNC_IO,
                                host=TCP_IP,
                                loop=loop)

    future = asyncio.run_coroutine_threadsafe(
        start_async_test(client.protocol), loop=loop)
    log.debug("------- DONE IN ASYNC FUNCTION -------------")
    log.debug("")


if __name__ == '__main__':
    log.debug(
        "------------------- Run with already running loop -------------------")
    run_with_already_running_loop()
    print('new test'.center(90, '-'))
    asyncio.run(async_read())

The working sync code was

from pymodbus.client.sync import ModbusTcpClient

client = ModbusTcpClient(TCP_IP)
client.connect()
client.read_holding_registers(ADDRESS, count=COUNT)

Any idea how to have a similar simple solution with asyncio?

1

There are 1 answers

1
Phil997 On

here is my working example. Feel free to give me a suggestion for improvement

import asyncio
from threading import Thread

from pymodbus import constants
from pymodbus.client.asynchronous import schedulers
from pymodbus.client.asynchronous.tcp import AsyncModbusTCPClient


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=[loop])
t.daemon = True
t.start()

assert loop.is_running()
asyncio.set_event_loop(loop)
constants.Defaults.UnitId = 0
loop, client = AsyncModbusTCPClient(schedulers.ASYNC_IO, host="192.168.178.32", port=502, loop=loop, timeout=20)
if not client.protocol:
    raise ConnectionError("Modbus Device is not avaiable")

async def execute_read():
    async def read():
        return await client.protocol.read_holding_registers(0, 2)
    
    future = asyncio.run_coroutine_threadsafe(read(), loop=loop)
    while not future.done():
        await asyncio.sleep(0.1)
    return future.result()

response = await execute_read()
print(response.__dict__['registers'])