We are integrating Eclipse Ditto into a digital twin platform, but we have encountered a problem while testing and we don't really know how to fix it.
We made a question related to this one time ago and it worked. Here you have the link to that question: Eclipse Ditto does not send all things events over target connection
Unfortunately it started failling again but we dont think that the problem is the same as before.
We are in the same scenario, the goal is to receive in 593 twins (Ditto Thing) the result of a simulation. The idea is to be able to do several simulation runs simultaneously and that each simulation run sends 593 messages to a Kafka topic. For example, for 6 runs we will have 3558 messages in the topic.
We upgraded all fields and values that were given to us deleted javascript mapping and tested with the maximun amount of simulations, 360. It worked with 360 simulations that send a total of 213480 messages. No messages were droped in any of the tests that we carried out. Perfect!.
So we decided to make some test over all the platform to measure latency. The workflow of the data is the following:
Simulation --> Kafka --> Ditto --> MQTT (Mosquitto) --> Database
We made a script that sended 1 simulation, waited the data to be stored into the database and then retrieved timestamps. When al 593 messages arrived, the script sended 2 simulations, waited the all 1186 messages to arrive to the db and then sended a run with 3 simulations. The script should stop when it reached 360 simulations simulataneously.
We found that ditto was not capable of processing data from 200 simulations even when it was previously capable of supporting 360. We tried giving Ditto and its components more resources, don't worry we are still free resources, but nothing changed. It even got worse.
We decided to reinstall every component with the configuration that worked previously but now we found some problems:
- Sometimes some messages remains in Kafka and Ditto don't read them.
- Sometimes all data is read from Kafka but no messages are sent to MQTT.
- Sometimes it read some messages from Kafka but not all and then, Ditto sends the read data to MQTT multiple times.
The funny thing is that all those unread/unsended messages sometimes are sent after 1 or 2 hours to the MQTT broker, even though we delete all messages from the Kafka topic. We though that Ditto stores some data in a cache, but we dont know how to clear it or stop it.
Furthermore, despite all these problems, we have 5 twins receiving data every 15 minutes and sending it over MQTT via other connections. These twins are working properly at all times.
On the other hand we are a little bit confused about resources management cause we are using Kubernetes. We dont know exactly the amount of resources that Ditto need for a specific amount of connections, things, etc, or even if we need to give arguments to the JVM. Sometimes connections pods are restarted due to an AskTimeoutException error.
Here are the connections we have established, their logs and metrics, along with the Helm's values.yaml.
Before executions:
- Source connection status : https://pastebin.com/xgtqFZab
- Target connection status : https://pastebin.com/YMJE3xs2
After executing 1 simulation (593 messages):
Source connection status : https://pastebin.com/jaxB7LQ0
Target connection status : https://pastebin.com/RZ4p0Mq9
Source connection metrics : https://pastebin.com/mGKPDr8V
Target connection metrics : https://pastebin.com/kwTZHmiK
Source connection logs : https://pastebin.com/dfaDyUS5
Target connection logs : https://pastebin.com/TxRVHfjq
When executing just one simulation at the begining of the morning it works correctly, but when executing simulations after that, it start failing.
- After executing 11 simulations (6.523 messages)
- Source connection status : https://pastebin.com/G9mYpmnT
- Target connection status : https://pastebin.com/0ij6pDYn
- Source connection metrics : https://pastebin.com/QjTDwBmL
- Target connection metrics : https://pastebin.com/P5MVFTJu
- Source connection logs : https://pastebin.com/Kpft7Tme
- Target connection logs : https://pastebin.com/wMe4DYnA
Source connection:
{
"name": "connection-for-pivot-simulation-with-idSimulationRun",
"connectionType": "kafka",
"connectionStatus": "open",
"uri": "tcp://KAFKAIP",
"sources": [
{
"addresses": [
"riego"
],
"consumerCount": 1,
"qos": 1,
"authorizationContext": [
"nginx:ditto"
],
"headerMapping": {
"correlation-id": "{{header:correlation-id}}",
"namespace": "{{ entity:namespace }}",
"content-type": "{{header:content-type}}",
"connection": "{{ connection:id }}",
"id": "{{ entity:id }}",
"reply-to": "{{header:reply-to}}"
},
"replyTarget": {
"address": "{{header:reply-to}}",
"headerMapping": {
"content-type": "{{header:content-type}}",
"correlation-id": "{{header:correlation-id}}"
},
"expectedResponseTypes": [
"response",
"error"
],
"enabled": true
}
}
],
"targets": [],
"clientCount": 5,
"failoverEnabled": true,
"validateCertificates": true,
"processorPoolSize": 1,
"specificConfig": {
"saslMechanism": "plain",
"bootstrapServers": "KAFKAIP"
},
"tags": []
}
Target connection:
{
"name": "mqtt-connection-for-telegraf-pivot",
"connectionType": "mqtt-5",
"connectionStatus": "open",
"uri": "tcp://MQTTIP",
"sources": [],
"targets": [
{
"address": "opentwins/{{ topic:channel }}/{{ topic:criterion }}/{{ thing:namespace }}/{{ thing:name }}",
"topics": [
"_/_/things/twin/events?namespaces=pivot&extraFields=thingId,attributes/_parents,features/idSimulationRun/properties/value",
"_/_/things/live/messages",
"_/_/things/live/commands"
],
"qos": 1,
"authorizationContext": [
"nginx:ditto"
],
"headerMapping": {}
}
],
"clientCount": 5,
"failoverEnabled": true,
"validateCertificates": true,
"processorPoolSize": 1,
"tags": []
}
Values:
swaggerui:
enabled: false
mongodb:
enabled: false
global:
prometheus:
enabled: true
dbconfig:
connectivity:
uri: mongodb://dt-mongodb:27017/connectivity
things:
uri: mongodb://dt-mongodb:27017/things
searchDB:
uri: mongodb://dt-mongodb:27017/search
policies:
uri: mongodb://dt-mongodb:27017/policies
connectivity:
replicaCount: 5
extraEnv:
- name: MQTT_CONSUMER_THROTTLING_ENABLED
value: "false"
- name: MQTT_CONSUMER_THROTTLING_LIMIT
value: "100000"
- name: KAFKA_CONSUMER_THROTTLING_ENABLED
value: "false"
- name: KAFKA_CONSUMER_THROTTLING_LIMIT
value: "100000"
- name: KAFKA_SESSION_TIMEOUT_MS
value: "60000"
- name: CONNECTIVITY_MQTT_MAX_QUEUE_SIZE
value: "100000"
- name: CONNECTIVITY_KAFKA_MAX_QUEUE_SIZE
value: "100000"
- name: CONNECTIVITY_SIGNAL_ENRICHMENT_BUFFER_SIZE
value: "300000"
- name: CONNECTIVITY_MESSAGE_MAPPING_MAX_POOL_SIZE
value: "10"
resources:
requests:
cpu: 2000m
limits:
memory: 3Gi
gateway:
resources:
requests:
cpu: 1000m
limits:
memory: 768Mi
nginx:
replicaCount: 2
service:
type: NodePort
nodePort: 30525
resources:
requests:
cpu: 500m
limits:
cpu: 1000m
memory: 768Mi
policies:
resources:
requests:
cpu: 1000m
limits:
memory: 768Mi
things:
replicaCount: 1
resources:
requests:
cpu: 1000m
limits:
memory: 8192Mi
thingsSearch:
resources:
requests:
cpu: 1000m
limits:
memory: 768Mi
The behaviour described on OP was a product of multiple errors on Ditto 3.1.2
We believe version 3.3.0 has solved most of our issues, but further testing is needed.