When collecting data with Modbus protocol through kafka Producer, collection stops after a certain period of time

570 views Asked by At

I have deployed a Kafka cluster on a GCP instance.

I used the connector through config/connect-distributed.properties. Start collecting data through restapi using the following command:

curl -X POST -H "Content-Type:application/json" \
--data '{
  "name": "operation1",
  "config": {
    "connector.class": "org.apache.plc4x.kafka.Plc4xSourceConnector",
    "default.topic": "operation1",
    "tasks.max": "1",
    "sources": "Modbus",
    "sources.Modbus.connectionString": "modbus:tcp://<IP address:port>",
    "sources.Modbus.pollReturnInterval": "10000",
    "sources.Modbus.bufferSize": "5000",
    "sources.Modbus.jobReferences": "operation1",
    "jobs": "operation1",
    "jobs.operation1.fields": "BMS1-1, BMS1-2, BMS2-1, BMS2-2, BMS2-3, PCS, ETC",
    "jobs.operation1.interval": "1000",
    "jobs.operation1.fields.BMS1-1": "input-register:1[125]",
  "jobs.operation1.fields.BMS1-2": "input-register:126[12]",
  "jobs.operation1.fields.BMS2-1": "input-register:201[125]",
  "jobs.operation1.fields.BMS2-2": "input-register:326[125]",
  "jobs.operation1.fields.BMS2-3": "input-register:451[16]",
  "jobs.operation1.fields.PCS": "input-register:501[89]",
  "jobs.operation1.fields.ETC": "input-register:601[5]"
}
}' http://localhost:8083/connectors



In the log of config/connect-distributed.properties , the following log appears and collection is successful. However, collection stops after a certain amount of time (minutes or hours).

[2022-05-10 05:36:44,522] INFO [operation1|task-0|offsets] WorkerSourceTask{id=operation1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-05-10 05:36:54,526] INFO [operation1|task-0|offsets] WorkerSourceTask{id=operation1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-05-10 05:37:04,530] INFO [operation1|task-0|offsets] WorkerSourceTask{id=operation1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-05-10 05:37:14,534] INFO [operation1|task-0|offsets] WorkerSourceTask{id=operation1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-05-10 05:37:24,550] INFO [operation1|task-0|offsets] WorkerSourceTask{id=operation1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-05-10 05:37:34,554] INFO [operation1|task-0|offsets] WorkerSourceTask{id=operation1-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)

After a certain amount of time, the log message changes to the following:

[2022-05-10 05:42:36,597] WARN [operation1|task-0] Exception during scraping of Job operation1, Connection-Alias Modbus: Error-message: null - for stack-trace change logging to DEBUG (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask:148)
[2022-05-10 05:42:38,598] WARN [operation1|task-0] Exception during scraping of Job operation1, Connection-Alias Modbus: Error-message: null - for stack-trace change logging to DEBUG (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask:148)
[2022-05-10 05:42:40,598] WARN [operation1|task-0] Exception during scraping of Job operation1, Connection-Alias Modbus: Error-message: null - for stack-trace change logging to DEBUG (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask:148)
[2022-05-10 05:42:42,599] WARN [operation1|task-0] Exception during scraping of Job operation1, Connection-Alias Modbus: Error-message: null - for stack-trace change logging to DEBUG (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask:148)
[2022-05-10 05:42:44,600] WARN [operation1|task-0] Exception during scraping of Job operation1, Connection-Alias Modbus: Error-message: null - for stack-trace change logging to DEBUG (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperTask:148)

At this time, if you check the status of the Connector with curl , it is still Running.

curl -X GET localhost:8083/connectors/operation1/status
{"name":"operation1","connector":{"state":"RUNNING","worker_id":"<IP>:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"<IP>:8083"}],"type":"source"}

I really don't know why. Help


Logs modified to DEBUG level.

[2022-05-10 08:14:18,708] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-12 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,708] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,708] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-6 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-18 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-9 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-3 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-15 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-21 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-24 at position FetchPosition{offset=18793, offsetEpoch=Optional[23], currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Built incremental fetch (sessionId=714175396, epoch=1010) for node 1. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 9 partition(s) (org.apache.kafka.clients.FetchSessionHandler:351)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(connect-offsets-12, connect-offsets-0, connect-offsets-6, connect-offsets-18, connect-offsets-9, connect-offsets-3, connect-offsets-15, connect-offsets-21, connect-offsets-24), canUseTopicIds=True) to broker <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:274)
[2022-05-10 08:14:18,709] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-1, correlationId=3034) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=714175396, sessionEpoch=1010, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:521)
[2022-05-10 08:14:18,757] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-2, correlationId=3030): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1712137779, responses=[]) (org.apache.kafka.clients.NetworkClient:879)
[2022-05-10 08:14:18,757] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Node 1 sent an incremental fetch response with throttleTimeMs = 0 for session 1712137779 with 0 response partition(s), 1 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler:584)
[2022-05-10 08:14:18,758] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-status-2 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>76.32:9092 (id: 1 rack: null)], epoch=23}} to node <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,758] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Built incremental fetch (sessionId=1712137779, epoch=1006) for node 1. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 partition(s) (org.apache.kafka.clients.FetchSessionHandler:351)
[2022-05-10 08:14:18,758] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(connect-status-2), canUseTopicIds=True) to broker <IP>76.32:9092 (id: 1 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:274)
[2022-05-10 08:14:18,758] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-2, correlationId=3033) and timeout 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1712137779, sessionEpoch=1006, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:521)
[2022-05-10 08:14:18,759] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-2, correlationId=3031): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=619420322, responses=[]) (org.apache.kafka.clients.NetworkClient:879)
[2022-05-10 08:14:18,759] DEBUG [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-3, correlationId=1014): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=208110829, responses=[]) (org.apache.kafka.clients.NetworkClient:879)
[2022-05-10 08:14:18,759] DEBUG [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 208110829 with 0 response partition(s), 1 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler:584)
[2022-05-10 08:14:18,759] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 619420322 with 0 response partition(s), 2 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler:584)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-configs-0 at position FetchPosition{offset=698, offsetEpoch=Optional[54], currentLeader=LeaderAndEpoch{leader=Optional[<IP>92.153:9092 (id: 0 rack: null)], epoch=54}} to node <IP>92.153:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-status-3 at position FetchPosition{offset=129, offsetEpoch=Optional[50], currentLeader=LeaderAndEpoch{leader=Optional[<IP>92.153:9092 (id: 0 rack: null)], epoch=54}} to node <IP>92.153:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Built incremental fetch (sessionId=208110829, epoch=1008) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 1 partition(s) (org.apache.kafka.clients.FetchSessionHandler:351)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(connect-configs-0), canUseTopicIds=True) to broker <IP>92.153:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:274)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-3, groupId=connect-cluster] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-3, correlationId=1015) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=208110829, sessionEpoch=1008, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:521)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-status-0 at position FetchPosition{offset=116, offsetEpoch=Optional[54], currentLeader=LeaderAndEpoch{leader=Optional[<IP>92.153:9092 (id: 0 rack: null)], epoch=54}} to node <IP>92.153:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Built incremental fetch (sessionId=619420322, epoch=1008) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 2 partition(s) (org.apache.kafka.clients.FetchSessionHandler:351)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(connect-status-0, connect-status-3), canUseTopicIds=True) to broker <IP>92.153:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:274)
[2022-05-10 08:14:18,760] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-2, correlationId=3034) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=619420322, sessionEpoch=1008, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:521)
[2022-05-10 08:14:18,812] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-1, correlationId=3032): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=581764107, responses=[]) (org.apache.kafka.clients.NetworkClient:879)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Node 2 sent an incremental fetch response with throttleTimeMs = 0 for session 581764107 with 0 response partition(s), 8 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler:584)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-8 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-14 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-2 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-20 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-11 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-5 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-23 at position FetchPosition{offset=599, offsetEpoch=Optional[23], currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,813] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-offsets-17 at position FetchPosition{offset=70, offsetEpoch=Optional[23], currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:18,814] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Built incremental fetch (sessionId=581764107, epoch=1006) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 8 partition(s) (org.apache.kafka.clients.FetchSessionHandler:351)
[2022-05-10 08:14:18,814] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(connect-offsets-8, connect-offsets-14, connect-offsets-2, connect-offsets-20, connect-offsets-11, connect-offsets-5, connect-offsets-23, connect-offsets-17), canUseTopicIds=True) to broker <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:274)
[2022-05-10 08:14:18,814] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-1, correlationId=3035) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=581764107, sessionEpoch=1006, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:521)
[2022-05-10 08:14:18,977] DEBUG [operation1|task-0] Job statistics (operation1, Modbus) number of requests: 354 (201 success, 43.2 % failed, 0.0 % too slow), min latency: 82.47 ms, mean latency: 93.20 ms, median: 89.56 ms (org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl:250)
[2022-05-10 08:14:19,073] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-2, correlationId=3032): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1118973913, responses=[]) (org.apache.kafka.clients.NetworkClient:879)
[2022-05-10 08:14:19,073] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Node 2 sent an incremental fetch response with throttleTimeMs = 0 for session 1118973913 with 0 response partition(s), 2 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler:584)
[2022-05-10 08:14:19,073] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-status-4 at position FetchPosition{offset=85, offsetEpoch=Optional[23], currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:19,073] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Added READ_UNCOMMITTED fetch request for partition connect-status-1 at position FetchPosition{offset=115, offsetEpoch=Optional[23], currentLeader=LeaderAndEpoch{leader=Optional[<IP>156.202:9092 (id: 2 rack: null)], epoch=23}} to node <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:1245)
[2022-05-10 08:14:19,074] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Built incremental fetch (sessionId=1118973913, epoch=1008) for node 2. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 2 partition(s) (org.apache.kafka.clients.FetchSessionHandler:351)
[2022-05-10 08:14:19,074] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), toReplace=(), implied=(connect-status-1, connect-status-4), canUseTopicIds=True) to broker <IP>156.202:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:274)
[2022-05-10 08:14:19,074] DEBUG [Consumer clientId=consumer-connect-cluster-2, groupId=connect-cluster] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-2, correlationId=3035) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1118973913, sessionEpoch=1008, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient:521)
[2022-05-10 08:14:19,126] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Received FETCH response from node 0 for request with header RequestHeader(apiKey=FETCH, apiVersion=13, clientId=consumer-connect-cluster-1, correlationId=3033): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=407599491, responses=[]) (org.apache.kafka.clients.NetworkClient:879)
[2022-05-10 08:14:19,126] DEBUG [Consumer clientId=consumer-connect-cluster-1, groupId=connect-cluster] Node 0 sent an incremental fetch response with throttleTimeMs = 0 for session 407599491 with 0 response partition(s), 8 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler:584)

1

There are 1 answers

2
Ben Hutcheson On

This seems to have been an issue with how the PLC4X connector handles errors. It was causing the connector to stop requesting new messages from the Modbus server after a timeout occurred. However what was interesting was that if the TCP connection to the Modbus server was interrupted, the PLC4X connector would reconnect and start polling again.

Can you please try building the latest PLC4X connector from the PLC4X Github repo a fix has been pushed to it? PLC4X Kafka Connector Repository

The PLC4X Kafka Connector doesn't fail the Kafka connector on a failed Kafka Connector->PLC connection. Instead it waits for the connection to be restored and begins polling again.

From your comments it would also seem that you have a Modbus server available on a public IP address. This isn't the best design as Modbus provides no security.