Websocket to apache kafka in eclipse Ditto

243 views Asked by At

I am having a problem with eclipse ditto. I want to send a command to update the features of a digital twins using a websocket (in python) and I want to read the new features in an apache kafka topic. This is my websocket:

import asyncio 
import random
import time
from websockets import connect
import json

async def func(uri):
  async with connect(uri) as websocket:
    await websocket.send("START-SEND-EVENTS")
    #await websocket.send("START-SEND-MESSAGES")
    message = await websocket.recv()
    print(message)
    while(True):
        
        msg = {
            "topic": "org.eclipse.ditto/camera01/things/twin/commands/modify",
            "headers": {
                "content-type": "text/plain"
            },
            "path": "features/coordinates/properties",
            "value": {"x": random.randrange(0,1000), "y": random.randrange(0,1000), "z": random.randrange(0,1000), "x_rotation": 0.0, "y_rotation": 0.0, "z_rotation": 0.0, "w_rotation": 1.0, "thingId": "org.eclipse.ditto:camera01"}
        }
        to_send = json.dumps(msg)
        time.sleep(1)
        await websocket.send(to_send)
        msg_recv = await websocket.recv()
        print(msg_recv)
uri = "ws://ditto:ditto@localhost:8080/ws/2"
asyncio.run(func(uri))

When I send a message, ditto updates the digital twin and a second websocket gets the new features, but kafka's topic doesn't receive it.

I thought the problem may be the target connection, but it doesn't seem like there are any errors. This is how I set it up:

{
"targetActorSelection": "/system/sharding/connection",
"headers": {
    "aggregate": false
},
"piggybackCommand": {
    "type": "connectivity.commands:modifyConnection",
    "connection": {
        "id": "kafka-connection-target",
        "connectionType": "kafka",
        "connectionStatus": "open",
        "failoverEnabled": true,
        "uri": "tcp://localhost:9092",
        "specificConfig":{
       "bootstrapServers":"localhost:9092"
         },
        "targets": [{
            "address": "topic_ditto",
            "topics": [
                "_/_/things/twin/events",
                "_/_/things/live/messages"
            ],
            "authorizationContext": ["ditto:unity"],
            "qos": 0
        }],
        "mappingContext": {
            "mappingEngine": "JavaScript",
            "options": {
                "incomingScript": "function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) {return null;}",
                "outgoingScript": "function mapFromDittoProtocolMsg(namespace, id, group, channel, criterion, action, path, dittoHeaders, value, status, extra) {let textPayload = '{\"x\":' + value.coordinates.properties.x + ',\"y\":' + value.coordinates.properties.y + ',\"z\":' + value.coordinates.properties.z + ',\"x_rotation\":' + value.coordinates.properties.x_rotation + ',\"y_rotation\": ' + value.coordinates.properties.y_rotation + ', \"z_rotation\": ' + value.coordinates.properties.z_rotation + ',\"w_rotation\":' + value.coordinates.properties.w_rotation + ',\"idCamera\":\"' + id + '\"}'; let bytePayload = null; let contentType = 'text/plain; charset=UTF-8'; return  Ditto.buildExternalMsg(dittoHeaders, textPayload, bytePayload, contentType);}",
                "loadBytebufferJS": "false",
                 "loadLongJS": "false"
            }
        }
    }
}
}

Note: if I update the digital twin using a topic (specified in a source connection), the topic of the target connection receives the new features (also the second websocket ..)

1

There are 1 answers

0
Lurù On BEST ANSWER

Solved.

the path of the message to be sent with websocket was not the correct path to pass to the mapping function of the target connection. This is why I was able to update the digital twin without being able to update the "ditto_topic" topic.

I passed path "features/coordinates/properties" but the correct path is "/features" for how I set up the mapping function.

The form of the message I sent is "{" x ": random.randrange (0,1000), .." but the correct form in this case is {"coordinates": {"properties": {"x": random.randrange (0,1000), ... "