I have been trying to write events from a delta table in databricks to eventhubs and I have been having issues with trying to make it work with Trigger availableNow=True,
I am basically collecting events which first reads the required delta table, add a watermark, do some transformations, create an event dataframe with some added metadata columns, convert it to a format to push into eventhubs,
The thing is it works when I set the processingTime trigger to 2 seconds but doesn't work with availableNow=True, I am not able to understand why this is so.
Can anyone provide a solution to make it work with availableNow or the reasons behind why it doesn't
Sample Code
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
import base64
def _get_connection_string() -> str: # pragma: no cover
"""Provides connection string from the secret."""
connection_string_secret = "MetaEventhubConnectionStringDevelopment"
secret_scope = "mr-wallet"
encoded_secret = dbutils.secrets.get(scope=secret_scope, key=connection_string_secret)
return base64.b64decode(encoded_secret).decode()
def _transform_events_information(df: DataFrame) -> DataFrame:
select_cols = ["EventTime", "Source", "Error"]
date_interval = "14 days"
window = f.window(f.col("EventTime"), windowDuration=date_interval)
df_transformed = (df.withColumnRenamed("SourceLocator", "Source")
.withColumnRenamed("_commit_timestamp", "EventTime")
.withColumn("Error", f.lit(None))
.groupBy("Source", "Error", window).agg(f.min("EventTime").alias("EventTime"))
.select(*select_cols)
)
return df_transformed
def _add_watermark(df: DataFrame) -> DataFrame:
watermark_interval = "1 second"
df = df.withWatermark("_commit_timestamp", watermark_interval)
return df
def _read_source_table(): # pragma: no cover
source_table = 'eventlog.common_source'
df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table(source_table)
return df
def collect() -> DataFrame: # pragma: no cover
"""Prepares information on processed files."""
df = _read_source_table()
df = _add_watermark(df)
df = _transform_events_information(df)
return df
def _build_events(df: DataFrame, destination_table_path: str, data_product_name: str, workflow_name: str) -> DataFrame:
output_columns = ["EventTime", "Application", "DataProduct", "Source", "Destination", "Error"]
date_format = "yyyy-MM-dd HH:mm:ss.SSS"
df = (df.withColumn("EventTime", f.date_format(f.col("EventTime"), date_format))
.withColumn("Destination", f.lit(destination_table_path))
.withColumn("DataProduct", f.lit(data_product_name))
.withColumn("Application", f.lit(workflow_name))
)
return df.select(*output_columns)
def _push_events(df: DataFrame) -> None: # pragma: no cover
checkpoint_path = "/FileStore/asarkar/test_pushing_events/checkpoint"
connection_string = _get_connection_string()
eventhub_utils = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils
write_options = {"eventhubs.connectionstring": eventhub_utils.encrypt(connection_string)}
bootstrap_servers = "df-event-hub-weu-sprint.servicebus.windows.net:9093"
eventhub_endpoint = _get_connection_string()
# EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=XXXXXX;SharedAccessKey=XXXXXX\";"
EH_SASL = f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{eventhub_endpoint}\";"
topic = "meta"
# (df.writeStream
# .format("kafka")
# .option("topic", topic)
# .option("kafka.bootstrap.servers", bootstrap_servers)
# .option("kafka.sasl.mechanism", "PLAIN")
# .option("kafka.security.protocol", "SASL_SSL")
# .option("kafka.sasl.jaas.config", EH_SASL)
# .option("checkpointLocation", checkpoint_path)
# # .trigger(processingTime='2 seconds')
# .start())
(df.writeStream
.format("eventhubs")
.options(**write_options)
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.start()
# .awaitTermination()
)
def _prepare_events(df: DataFrame) -> DataFrame:
output_df = (
df.select(
f.to_json(f.create_map(
f.lit('EventTime'), f.col('EventTime'),
f.lit('Application'), f.col('Application'),
f.lit('DataProduct'), f.col('DataProduct'),
f.lit('Source'), f.col('Source'),
f.lit('Destination'), f.col('Destination'),
f.lit('Error'), f.col('Error')
)).alias('body')
)
# .select(f.to_json(f.struct("body")).alias("value"))
)
return output_df
Thanks
The Available Now and Processing Time triggers have very different semantics, so it's hard to know what you mean by "doesn't work." To recap:
So, if you start a query with Available Now and notice that no data arrives in EventHubs, it's very likely because no new data is available; it likely works with the Processing Time (of 2 seconds) trigger since the job keeps polling for new data and eventually finds and processes that new data.
You can verify what's actually being processed for a particular trigger by looking at the
numInputRows
metric of the streaming query progress, which will get printed to the driver logs, which you can find in the Spark UI (note: this log is not in the output of a Databricks notebook cell).