Is Apache Airflow or Luigi a good tool for this use case?

521 views Asked by At

I'm working at an org that has an embarrassingly large amount of manual tasks that touch multiple databases (internal and external), multiple file types/formats, and a huge amount of datasets. The "general" workflow is to take data from one place, extract/create metadata for it, change the file format into something more standardised, and publish the dataset.

I'm trying to improve automation here and I've been looking at Luigi and Apache-Airflow to try standardise some of the common blocks that get used but I'm not sure if these are the appropriate tools. Before I sink too much time in figuring out these tools I thought I'd ask here.

A dummy example:

  1. Check a REST API end point to see if a dataset has changed (www.some_server.org/api/datasets/My_dataset/last_update_time)
  2. If it's changed download the zip file (My_dataset.zip)
  3. Unzip the file (My_dataset.zip >> my_file1.csv, my_file2.csv ... my_fileN.csv)
  4. Do something with the each CSV; filter, delete, pivot whatever
  5. Combine the csv's and transform into "My_filtered_dataset.json"
  6. For each step create/append a "my_dataset_metadata.csv" file to show things like the processing date, inputs, authors, pipeline version etc.
  7. Upload json and metadata files somewhere else

My end goal would be to quickly swap out blocks, like the "csv_to_json" function with a "csv_to_xlsx" function, for different processing tasks. Also have things like alerting on failure, job visualisation, worker management etc.

Some problems I'm seeing is that Luigi isn't so good at handling dynamic filenames and would struggle to create N branches when I don't know the number of files coming out of the zip file. It's also very basic and doesn't seem to have much community support.

Also from the Airflow docs: "This is a subtle but very important point: in general, if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. (although there does seem to be some support for this ability with XCOMs)" In my dummy case it I would probably need to share, at least, the filenames and the metadata between each step. Combining all steps into a single operator would kind of defeat the point of Airflow...

Am I misunderstanding things here? Are these tools good for this kind of application? Is this task too simple/complex and should just be stuck into a single script?

1

There are 1 answers

2
Hussein Awala On

With Airflow you can achieve all your goals:

  • there is sensor operators to wait for a condition: check an API, check if a file exists, run a query on a database and check the result, ...
  • create a dag to define the dependencies between your tasks, and decide which tasks can run in parallel and which should be run sequentially
  • a lot of existing operators developed by the community: SSH operators, operators to interact with cloud providers services, ...
  • a built-in mechanism to send emails on run failure and retry
  • it's based on python scripts, so you can create a method to create dags dynamically (dags factory), so if you have several dags which share a part of the same logic, you can create them by a conditional method
  • a built-in messaging system (XCom), to send small data between tasks
  • a secure way to store your credentials and secrets (Airflow variables and connections)
  • a modern UI to manage your dag runs and read the logs of your tasks, with Access Control.
  • you can develop your own plugins and add them to Airflow (ex: UI plugin using FlaskAppBuilder)
  • you can process each file in a separate task in parallel, on a cluster of nodes (Celery or K8S), using the new feature Dynamic Task Mapping (Airflow >= 2.3)

To pass files between the tasks, which is a basic need for everyone, you can use an external storage service (google GCS, AWS S3, ...) to store the output of each task, use XCom to pass the file path, then read the file in the second task. You can also use a custom backend for XCom to use S3 for example, instead of Airflow metastore db, in this case all the variables and the files passed by XCom will be stored automatically on S3, and there will be no more limit on message size.