Using Composer version 2.6.2 and Airflow version 2.5.3, getting AttributeError: 'bytes' object has no attribute 'encode' error in pubsub message

27 views Asked by At

In logs can see the following error and DAG gets failed:-

[2024-03-27, 06:25:37 UTC] {logging_mixin.py:137} INFO - data:b'DNB-MS-MY,2024-03-26,Location' [2024-03-27, 06:25:37 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last):   File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute     return_value = self.execute_callable()   File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable     return self.python_callable(*self.op_args, **self.op_kwargs)   File "/home/airflow/gcs/dags/pm_cm_dags/nim_batch_request_location_dag.py", line 148, in publish_message     pubsub_message = {'data': base64.b64encode(data.encode()).decode()} AttributeError: 'bytes' object has no attribute 'encode'

Here is the code snippet for which error arises-

def publish_message(environ, **kwargs):
 assert isinstance(data, bytes)
 print(f"data:{data}")
 pubsub_message = {'data': base64.b64encode(data.encode()).decode()}
 print(f"pubsub_message:{pubsub_message}")
 topic_name=''
 gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID,topic=topic_name,messages=pubsub_message)

Also after changing to pubsub_message = {'data': base64.b64encode(data).decode()}, getting the below error-

gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID,topic=topic_name,messages=pubsub_message) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py", line 130, in publish self._validate_messages(messages) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py", line 152, in _validate_messages if "data" in message and isinstance(message["data"], str): TypeError: string indices must be integers

Tried this approach and DAGs gets failed with error as mentioned above. It would be extremely helpful if someone can assist on this issue.

1

There are 1 answers

2
Talha Tayyab On

Looks like data is already a bytes object.

Can you try

pubsub_message = {'data': base64.b64encode(data).decode()}