I am trying to send data to IOT Hub and store it in blob storage as well as Power BI streaming dataset using Azure Stream Analytics service.

For sending data, I used Microsoft's Python SDK application (link given below):

Python application Code link

This script successfully sends data to Azure IOT Hub and I am able to visualize it in Azure IOT explorer but does not create a blob storage file or Power BI streaming dataset. Whereas in Stream Analytics, in the input section it gives the below error:

Error screenshot

ERROR : "Could not deserialize the input event(s) from resource 'Partition: [1], Offset: [8591414904], SequenceNumber: [3081], DeviceId: [test001]' as Json. Some possible reasons: 1) Malformed events 2) Input source configured with incorrect serialization format"

The output I received on Azure IOT explorer is as follows which is a perfect json string:

{
  "body": "test wind speed 10",
  "enqueuedTime": "Tue Nov 07 2023 23:49:29 GMT+0530 (India Standard Time)",
  "properties": {
    "tornado-warning": "yes"
  }
}

WHEREAS for the same IOT Hub and Stream Analytics connections, if I use the Microsoft's Javascript SDK application (link given below):

Javascript application code link

I am able to send data to IOT Hub as well as create a blob storage and streaming dataset on Power BI.

The output received on IOT explorer after running Javascript script.

{
  "body": {
    "deviceId": "myFirstDevice",
    "windSpeed": 10.487688712678684,
    "temperature": 23.702052411822482,
    "humidity": 79.27664927491016
  },
  "enqueuedTime": "Tue Nov 07 2023 23:02:30 GMT+0530 (India Standard Time)",
  "properties": {
    "temperatureAlert": "false"
  }
}

The Javascript application runs absolutely fine but not the python script.

Can anyone please point out what exactly I am doing wrong with Python script?

I tried running the Python as well as Javascript Azure SDK but only got the desired output from Javascript SDK and not the python script.

1

There are 1 answers

2
LeelaRajesh_Sayana On

Welcome to the Stack Overflow community! It looks from the error message of Stream Analytics job that the query errored out when trying to extract/process the data from the Input. Assuming that your Analytics job query is designed to extract deviceId, windSpeed, temperature and humidity details from the input, it makes sense that the job fails for Python SDK. The python sample you have shared sends a message to IoT Hub which does not include any of these properties. You need to modify the send_test_message as follows to include these properties in the telemetry.

async def send_test_message(i):        
    windSpeed = 10 + (random.uniform(0, 4))
    temperature = 20 + (random.uniform(0, 10))
    humidity = 60 + (random.uniform(0, 20))
    msg = Message("deviceId: " + 'myFirstDevice' + " windSpeed: "+ str(windSpeed)+
                   " temperature:"  +str(temperature)+ " humidity:" +str(humidity))
    print("sending message " + str(msg))
    msg.message_id = uuid.uuid4()
    msg.correlation_id = "correlation-1234"
    msg.custom_properties["tornado-warning"] = "yes"
    msg.content_encoding = "utf-8"
    msg.content_type = "application/json"
    await device_client.send_message(msg)
    print("done sending message #" + str(i))

This would include the properties needed by the stream analytics query in the Message telemetry. This should help resolve the issue.

Updating with a working sample

# Build the message with simulated telemetry values.
temperature = TEMPERATURE + (random.random() * 15)
humidity = HUMIDITY + (random.random() * 20)
msg_txt_formatted = MSG_TXT.format(temperature=temperature, humidity=humidity)
message = Message(msg_txt_formatted)
 
message.content_encoding = "utf-8"
message.content_type = "application/json"
 
# Add a custom application property to the message.
# An IoT hub can filter on these properties without access to the message body.
if temperature > 30:
    message.custom_properties["temperatureAlert"] = "true"
else:
    message.custom_properties["temperatureAlert"] = "false"
 
# Send the message.
print("Sending message: {}".format(message))
client.send_message(message)
print("Message sent")