In logs can see the following error and DAG gets failed:-
[2024-03-27, 06:25:37 UTC] {logging_mixin.py:137} INFO - data:b'DNB-MS-MY,2024-03-26,Location' [2024-03-27, 06:25:37 UTC] {taskinstance.py:1778} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute return_value = self.execute_callable() File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) File "/home/airflow/gcs/dags/pm_cm_dags/nim_batch_request_location_dag.py", line 148, in publish_message pubsub_message = {'data': base64.b64encode(data.encode()).decode()} AttributeError: 'bytes' object has no attribute 'encode'
Here is the code snippet for which error arises-
def publish_message(environ, **kwargs):
assert isinstance(data, bytes)
print(f"data:{data}")
pubsub_message = {'data': base64.b64encode(data.encode()).decode()}
print(f"pubsub_message:{pubsub_message}")
topic_name=''
gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID,topic=topic_name,messages=pubsub_message)
Also after changing to pubsub_message = {'data': base64.b64encode(data).decode()}, getting the below error-
gcp_pubsub_hook.PubSubHook().publish(project_id=PROJECT_ID,topic=topic_name,messages=pubsub_message) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 475, in inner_wrapper return func(self, *args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py", line 130, in publish self._validate_messages(messages) File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/pubsub.py", line 152, in _validate_messages if "data" in message and isinstance(message["data"], str): TypeError: string indices must be integers
Tried this approach and DAGs gets failed with error as mentioned above. It would be extremely helpful if someone can assist on this issue.
Looks like
datais already abytesobject.Can you try