I have an AirFlow DAG aimed to be triggered manually. Its logic is straightforward. The DAG receives start_date and end_date as params and then does the following steps:
- Lists prefixes in the S3 bucket for each target date
- Builds
spark-submitarguments using date and partitions extracted from S3 prefixes - Triggers Databricks job with prepared arguments.
The complete code is below:
@dag(
dag_id="test_backfill_dag_v1",
start_date=datetime(2024, 2, 27),
catchup=False,
render_template_as_native_obj=True,
schedule=None,
params={
"start_date": Param(default=f"{date.today()}", format="date", type="string"),
"end_date": Param(default=f"{date.today()}", format="date", type="string"),
},
)
def test_backfill_dag():
@task
def build_spark_submit_args(target_date: str, partitions: list[str]) -> list[str]:
return build_args(start_date=target_date, end_date=target_date, partitions=partitions)
@task
def list_target_dates(**context):
params = context["params"]
start_end_range = date_range(params["start_date"], params["end_date"], freq="D")
return start_end_range.strftime("%Y-%m-%d").tolist()
@task
def build_root_s3_prefix(target_date: str) -> str:
return f"foo/date={target_date}/"
@task
def extract_target_partitions(prefixes: list[str]) -> list[str]:
return extract_partitions(prefixes)
@task_group
def backfill_group(target_date: str) -> None:
root_prefix = build_root_s3_prefix(target_date)
s3_list_prefixes_operator = S3ListPrefixesOperator(
task_id="s3_list_prefixes_operator",
prefix=root_prefix,
bucket="test-bucket",
delimiter="/",
)
target_partitions = extract_target_partitions(s3_list_prefixes_operator.output)
spark_submit_args = build_spark_submit_args(target_date, target_partitions)
databricks_run_now = DatabricksRunNowOperator(
retries=2,
job_id="my-test-job",
task_id="databricks_run_now",
spark_submit_params="{{ ti.xcom_pull(task_ids='backfill_group.build_spark_submit_args') }}",
max_active_tis_per_dagrun=1,
)
root_prefix >> s3_list_prefixes_operator >> segments >> spark_submit_args >> databricks_run_now
dates = list_target_dates()
backfill_group.expand(target_date=dates)
test_backfill_dag()
Everything works as expected except triggering DatabricksRunNowOperator. Tasks are failing with TypeError: Object of type LazyXComAccess is not JSON serializable:
[2024-02-27, 11:23:23 UTC] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 827, in execute
self.run_id = hook.run_now(self.json)
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py", line 223, in run_now
response = self._do_api_call(RUN_NOW_ENDPOINT, json)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 596, in _do_api_call
for attempt in self._get_retry_object():
File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 347, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter
return fut.result()
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 598, in _do_api_call
response = request_func(
^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/requests/api.py", line 115, in post
return request("post", url, data=data, json=json, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/requests/api.py", line 59, in request
return session.request(method=method, url=url, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/requests/sessions.py", line 575, in request
prep = self.prepare_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/requests/sessions.py", line 486, in prepare_request
p.prepare(
File "/home/airflow/.local/lib/python3.11/site-packages/requests/models.py", line 371, in prepare
self.prepare_body(data, files, json)
File "/home/airflow/.local/lib/python3.11/site-packages/requests/models.py", line 511, in prepare_body
body = complexjson.dumps(json, allow_nan=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/json/__init__.py", line 238, in dumps
**kw).encode(obj)
^^^^^^^^^^^
File "/usr/local/lib/python3.11/json/encoder.py", line 200, in encode
chunks = self.iterencode(o, _one_shot=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/json/encoder.py", line 258, in iterencode
return _iterencode(o, 0)
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/json/encoder.py", line 180, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type LazyXComAccess is not JSON serializable
I've also tried to replace DatabricksRunNowOperator with a simple PythonOperator that prints the inputs:
def print_callable(**context):
print(context["templates_dict"]["spark_submit_args"])
databricks_run_now = PythonOperator(
task_id="databricks_run_now",
python_callable=print_callable,
provide_context=True,
templates_dict={
"spark_submit_args": "{{ ti.xcom_pull(task_ids='backfill_group.build_spark_submit_args') }}"
},
)
With this replacement, the DAG was completed successfully. So, I can assume that the problem is related to the DatabricksRunNowOperator.
I would be glad if anyone could advise me on how I can fix my implementation.