Spark Structured Streaming with TriggerAvailableNow and Eventhubs

80 views Asked by At

I have been trying to write events from a delta table in databricks to eventhubs and I have been having issues with trying to make it work with Trigger availableNow=True,

I am basically collecting events which first reads the required delta table, add a watermark, do some transformations, create an event dataframe with some added metadata columns, convert it to a format to push into eventhubs,

The thing is it works when I set the processingTime trigger to 2 seconds but doesn't work with availableNow=True, I am not able to understand why this is so.

Can anyone provide a solution to make it work with availableNow or the reasons behind why it doesn't

Sample Code

import pyspark.sql.functions as f
from pyspark.sql import DataFrame
import base64

def _get_connection_string() -> str:  # pragma: no cover
    """Provides connection string from the secret."""
    connection_string_secret = "MetaEventhubConnectionStringDevelopment"
    secret_scope = "mr-wallet"
    encoded_secret = dbutils.secrets.get(scope=secret_scope, key=connection_string_secret)
    
    return base64.b64decode(encoded_secret).decode()

def _transform_events_information(df: DataFrame) -> DataFrame:
    select_cols = ["EventTime", "Source", "Error"]
    date_interval = "14 days"
    window = f.window(f.col("EventTime"), windowDuration=date_interval)
    df_transformed = (df.withColumnRenamed("SourceLocator", "Source")
                        .withColumnRenamed("_commit_timestamp", "EventTime")
                        .withColumn("Error", f.lit(None))
                        .groupBy("Source", "Error", window).agg(f.min("EventTime").alias("EventTime"))
                        .select(*select_cols)
                        )

    return df_transformed


def _add_watermark(df: DataFrame) -> DataFrame:
    watermark_interval = "1 second"
    df = df.withWatermark("_commit_timestamp", watermark_interval)

    return df

def _read_source_table():  # pragma: no cover
    source_table = 'eventlog.common_source'
    df = spark.readStream.format("delta") \
        .option("readChangeFeed", "true") \
        .table(source_table)

    return df

def collect() -> DataFrame:  # pragma: no cover
    """Prepares information on processed files."""
    df = _read_source_table()
    df = _add_watermark(df)
    df = _transform_events_information(df)

    return df

def _build_events(df: DataFrame, destination_table_path: str, data_product_name: str, workflow_name: str) -> DataFrame:
    output_columns = ["EventTime", "Application", "DataProduct", "Source", "Destination", "Error"]
    date_format = "yyyy-MM-dd HH:mm:ss.SSS"
    df = (df.withColumn("EventTime", f.date_format(f.col("EventTime"), date_format))
            .withColumn("Destination", f.lit(destination_table_path))
            .withColumn("DataProduct", f.lit(data_product_name))
            .withColumn("Application", f.lit(workflow_name))
            )
    return df.select(*output_columns)


def _push_events(df: DataFrame) -> None:  # pragma: no cover
    checkpoint_path = "/FileStore/asarkar/test_pushing_events/checkpoint"
    connection_string = _get_connection_string()
    eventhub_utils = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils
    write_options = {"eventhubs.connectionstring": eventhub_utils.encrypt(connection_string)}

    bootstrap_servers = "df-event-hub-weu-sprint.servicebus.windows.net:9093"
    eventhub_endpoint = _get_connection_string()


    # EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX\";"
    EH_SASL = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{eventhub_endpoint}\";"

    topic = "meta"

    # (df.writeStream
    #     .format("kafka")
    #     .option("topic", topic)
    #     .option("kafka.bootstrap.servers", bootstrap_servers)
    #     .option("kafka.sasl.mechanism", "PLAIN")
    #     .option("kafka.security.protocol", "SASL_SSL")
    #     .option("kafka.sasl.jaas.config", EH_SASL)
    #     .option("checkpointLocation", checkpoint_path)
    #     # .trigger(processingTime='2 seconds')
    #     .start())

    (df.writeStream
        .format("eventhubs")
        .options(**write_options)
        .option("checkpointLocation", checkpoint_path)
        .trigger(availableNow=True)
        .start()
        # .awaitTermination()
        )
 
def _prepare_events(df: DataFrame) -> DataFrame:
    output_df = (
        df.select(
            f.to_json(f.create_map(
                f.lit('EventTime'), f.col('EventTime'),
                f.lit('Application'), f.col('Application'),
                f.lit('DataProduct'), f.col('DataProduct'),
                f.lit('Source'), f.col('Source'),
                f.lit('Destination'), f.col('Destination'),
                f.lit('Error'), f.col('Error')
            )).alias('body')
        )
        # .select(f.to_json(f.struct("body")).alias("value"))
    )
    return output_df

Thanks

1

There are 1 answers

4
Neil Ramaswamy On

The Available Now and Processing Time triggers have very different semantics, so it's hard to know what you mean by "doesn't work." To recap:

  • Available Now will process only the data that is available at the beginning at the start of the Structured Streaming job. If no new data is available, the stream will exit completely.
  • Processing Time trigger with your parameter of 2 seconds will start a batch at most every 2 seconds. If there's no data available at second 0, it will check to see if there's more data (and execute a batch, if so) at second 2.

So, if you start a query with Available Now and notice that no data arrives in EventHubs, it's very likely because no new data is available; it likely works with the Processing Time (of 2 seconds) trigger since the job keeps polling for new data and eventually finds and processes that new data.

You can verify what's actually being processed for a particular trigger by looking at the numInputRows metric of the streaming query progress, which will get printed to the driver logs, which you can find in the Spark UI (note: this log is not in the output of a Databricks notebook cell).