How to perform HDFS operation in Airflow?
make sure you install following python package
pip install apache-airflow-providers-apache-hdfs
#Code Snippet
#Import packages
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator
#Define new DAG
dag_execute_hdfs_commands = DAG(
dag_id ='connect_hdfs',
schedule_interval='@once',
start_date=days_ago(1),
dagrun_timeout=timedelta(minutes=60),
description='excuting hdfs commands',
)
#Establish connection to HDFS
conn =Connection(
conn_id = 'webhdfs_default1',
conn_type='HDFS',
host='localhost',
login='usr_id',
password='password',
port='9000',
)
session = settings.Session()
#Following line will add new connection to you airflow default DB
#Make sure once the DAG runs successfully you comment out following line.
#Because we do not want to add same connection "webhdfs_default1" every time we perform hdfs operations.
session.add(conn) #On your next run comment this out
session.close()
if __name__ == '__main__':
dag_execute_hdfs_commands.cli()
Once above DAG runs successfully you can perform hdfs operation hereafter For example if you wish to list files in hdfs directory try the following code
#File listing operation
start_task = BashOperator(
task_id="start_task",
bash_command="hdfs dfs -ls /",
dag=dag_execute_hdfs_commands
)
start_task
You cannot use the connection
webhdfs_defaultwithBashOperator, where it works withWebHDFSHookhook, which create a client to query the web HDFS server. Currently there is two implemented method:You can access the client to do other operation:
The client it an instance from
hdfs.InsecureClientif the confcore.securityis notkerberos, andhdfs.ext.kerberos.KerberosClientif it is. Here is the documentation of hdfs cli clients, you can check what are the available operation and use them.There is a lot of available operations like download, delete, list, read, make_dir, ..., which you can call in a new Airflow operator.