Integration of Apache Airflow with Google cloud pubsub

4.5k views Asked by At

I am quite new to airflow and trying to use the integration of apache airflow with google pubsub, which I guess is added to it under "Airflow-300" JIRA. Please correct me if I am reading incorrectly here.

Also, can you please advise if this has been released or when will it be released? We are looking at adding notifications on Google Cloud Storage, upon any file events, we want to trigger some workflow in Airflow.

I can't seem to find any documentation around how to use it.

Any advice would be highly appreciated.

2

There are 2 answers

1
mik-laj On

Integration in Airflow has already been introduced.

Publish message

from base64 import b64encode as b64e

m1 = {'data': b64e('Hello, World!'),
       'attributes': {'type': 'greeting'}
      }
m2 = {'data': b64e('Knock, knock')}
m3 = {'attributes': {'foo': ''}}

t1 = PubSubPublishOperator(
    topic='my_topic',
    messages=[m1, m2, m3],
    create_topic=True,
    dag=dag)

Receive message

PubSubPullSensor(
    task_id='pub_sub_wait', 
    project='my_project',
    subscription='my-subscription',
    ack_messages=True)

Reference:

https://github.com/apache/incubator-airflow/commit/d231dce37d753ed196a26d9b244ddf376385de38 https://github.com/apache/incubator-airflow/commit/6645218092096e4b10fc737a62bacc2670e1d6dc

0
medvedev1088 On

Adding to @user1849502's answer, you can also use PubSubHook:

PubSubHook().publish(project, topic, message)

PubSubHook().pull(project, subscription, max_messages, return_immediately)

Reference https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/hooks/gcp_pubsub_hook.html