DataprocCreateClusterOperator - in Composer2(Airflow), pip install specific versions

401 views Asked by At

I'm using DataprocCreateClusterOperator to create Dataproc cluster in the dag, using the following code (only relevant part of the dag shown below) : I'm using init_actions_uris & passing script pip_install_versions.sh

This script installs 'specific' versions of the required software using 'pip install ==' command, as shown below :


path = "gs://pip-install/pip_install_versions.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone="us-east1-b",
    master_machine_type="n1-highmem-8",
    worker_machine_type="n1-highmem-8",
    num_workers=2,
    storage_bucket="dataproc-spark-logs",
    init_actions_uris=[path],
    metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl numpy google-cloud-storage prophet kafka-python pyspark'},
).make()



with models.DAG(
        'vani-intf-test',
        schedule_interval='0 18 * * *',
        catchup=False,
        max_active_runs=1,
        default_args=default_dag_args
        ) as dag_pred:

   create_dataproc_cluster = DataprocCreateClusterOperator(
       task_id="create_dataproc_cluster",
       cluster_name=CLUSTER_NAME,
       region=REGION,
       cluster_config=CLUSTER_GENERATOR_CONFIG
   )
 
pip_install_version.sh
----------------------

pip install google-cloud-storage==1.31.0
pip install pandas==1.4.2
pip install pyspark==3.1.3
pip install prophet==1.1.1
pip install numpy==1.21.5
pip install kafka-python==2.0.2
pip install pymongo==4.3.3
   
   

Is this the best way to install specific versions in Composer2 ? Can this be achieved using a requirements.txt file(containing the package & version) passed as well ?

I looked at the documentation, and somehow could not find examples using requirements.txt

Another clarification: what does the PIP_PACKAGES in metadata do ?

metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl numpy google-cloud-storage prophet kafka-python pyspark'

The difference between init_actions_uris=[path], & PIP_PACKAGES is not clear, appreciate if someone can clarify why we need both.

tia!

2

There are 2 answers

0
Karan Alang On BEST ANSWER

I was able to get this done by

  1. identifying the dependencies for the packages (we can do this using either pipdeptree or johnnydep utilities)
  2. downloading the whl files for the packages to be installed (using - pip download)
  3. uploading the tar file to the GCP Storage bucket, and
  4. on Dataproc cluster installation - calling script to download the tar file(containing the whl files), untar and then
  5. pip install -- --no-index --no-deps (for each folder within the main folder)
0
Prajna Rai T On

As per this document:

  1. It’s not possible to pass requirements.txt file(containing the package & version) with the DataprocCreateClusterOperator in dag, packages can only be installed in composer environment and using metadata parameter in ClusterGenerator.

  2. Parameter init_actions_uris is used to List of GCS uri’s containing dataproc initialization script.

  3. PIP packages are used to install packages upon creation of Dataproc Cluster on Airflow DAG.