how to connect hdfs in airflow?

1.5k views Asked by At

How to perform HDFS operation in Airflow?

make sure you install following python package

pip install apache-airflow-providers-apache-hdfs

#Code Snippet

#Import packages 
from airflow import settings
from airflow.models import Connection
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.operators.bash import BashOperator
 
#Define new DAG
dag_execute_hdfs_commands = DAG(
      dag_id ='connect_hdfs',
      schedule_interval='@once',
      start_date=days_ago(1),
      dagrun_timeout=timedelta(minutes=60),
      description='excuting hdfs commands',
     )

#Establish connection to HDFS
conn =Connection(
     conn_id = 'webhdfs_default1',
     conn_type='HDFS',
     host='localhost',
     login='usr_id',
     password='password',
     port='9000',
    )
session = settings.Session()

#Following line will add new connection to you airflow default DB
#Make sure once the DAG runs successfully you comment out following line.
#Because we do not want to add same connection "webhdfs_default1" every time we perform hdfs operations.
session.add(conn) #On your next run comment this out
session.close()

if __name__ == '__main__':
    dag_execute_hdfs_commands.cli()

Once above DAG runs successfully you can perform hdfs operation hereafter For example if you wish to list files in hdfs directory try the following code

#File listing operation
start_task = BashOperator(
         task_id="start_task",
         bash_command="hdfs dfs -ls /",
         dag=dag_execute_hdfs_commands
         )

start_task
1

There are 1 answers

0
Hussein Awala On

You cannot use the connection webhdfs_default with BashOperator, where it works with WebHDFSHook hook, which create a client to query the web HDFS server. Currently there is two implemented method:

  • check_for_path: to check if a file exists in hdfs
  • load_file: to upload a file to hdfs

You can access the client to do other operation:

webHDFS_hook = WebHDFSHook(webhdfs_conn_id="<you conn id>")
client = webHDFS_hook.get_conn()
client.<operation>

The client it an instance from hdfs.InsecureClient if the conf core.security is not kerberos, and hdfs.ext.kerberos.KerberosClient if it is. Here is the documentation of hdfs cli clients, you can check what are the available operation and use them.

There is a lot of available operations like download, delete, list, read, make_dir, ..., which you can call in a new Airflow operator.