I'm using DataprocCreateClusterOperator to create Dataproc cluster in the dag, using the following code (only relevant part of the dag shown below) : I'm using init_actions_uris & passing script pip_install_versions.sh
This script installs 'specific' versions of the required software using 'pip install ==' command, as shown below :
path = "gs://pip-install/pip_install_versions.sh"
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
project_id=PROJECT_ID,
zone="us-east1-b",
master_machine_type="n1-highmem-8",
worker_machine_type="n1-highmem-8",
num_workers=2,
storage_bucket="dataproc-spark-logs",
init_actions_uris=[path],
metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl numpy google-cloud-storage prophet kafka-python pyspark'},
).make()
with models.DAG(
'vani-intf-test',
schedule_interval='0 18 * * *',
catchup=False,
max_active_runs=1,
default_args=default_dag_args
) as dag_pred:
create_dataproc_cluster = DataprocCreateClusterOperator(
task_id="create_dataproc_cluster",
cluster_name=CLUSTER_NAME,
region=REGION,
cluster_config=CLUSTER_GENERATOR_CONFIG
)
pip_install_version.sh
----------------------
pip install google-cloud-storage==1.31.0
pip install pandas==1.4.2
pip install pyspark==3.1.3
pip install prophet==1.1.1
pip install numpy==1.21.5
pip install kafka-python==2.0.2
pip install pymongo==4.3.3
Is this the best way to install specific versions in Composer2 ? Can this be achieved using a requirements.txt file(containing the package & version) passed as well ?
I looked at the documentation, and somehow could not find examples using requirements.txt
Another clarification: what does the PIP_PACKAGES in metadata do ?
metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl numpy google-cloud-storage prophet kafka-python pyspark'
The difference between init_actions_uris=[path], & PIP_PACKAGES is not clear, appreciate if someone can clarify why we need both.
tia!
I was able to get this done by