For some reason jdbc postgresql works well for sinking batch data, but it doesn't for streaming data with my new version of spark 3.2.4, Scala 2.12.15 and hadoop 3.3.4.
Jar Files:
- kafka-clients-3.3.2.jar
- postgresql-42.5.0.jar
- spark-sql-kafka-0-10_2.12-3.2.4.jar
- spark-streaming-kafka-0-10-assembly_2.12-3.2.4.jar
- spark-token-provider-kafka-0-10_2.12-3.2.4.jar
- commons-pool2-2.11.1.jar
Everything used to work pretty good for spark 3.1.3 but I need to upgrade spark
For batch data the following code works as expected
conf = SparkConf()\
.setAppName('Testing jdbc postgresql')\
.set("spark.streaming.stopGracefullyOnShutdown", "true")\
.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
.set("spark.sql.shuffle.partitions", 2)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
data2 = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",400000000),
("Jen","Mary","Brown","","F",-1)
]
schema = T.StructType([ \
T.StructField("firstname",T.StringType(),True), \
T.StructField("middlename",T.StringType(),True), \
T.StructField("lastname",T.StringType(),True), \
T.StructField("id", T.StringType(), True), \
T.StructField("gender", T.StringType(), True), \
T.StructField("salary", T.IntegerType(), True) \
])
df = spark.createDataFrame(data=data2,schema=schema)
# df.write.jdbc(url=url, table="test", mode=mode, properties=properties)
df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
.option("dbtable",os.environ['DB_TABLE_1MIN_MAP']).option("user",os.environ['USER_DB'])\
.option("password", os.environ['PASSWORD_DB']).mode(os.environ['MODE_DB'])\
.option("driver", "org.postgresql.Driver").save()
with .env file
FORMAT=jdbc
URL_DB=jdbc:postgresql://localhost:5432/testing
DB_TABLE_1MIN=c1_fraud_1min
DB_TABLE_5MIN=c1_fraud_5min
DB_TABLE_1MIN_MAP=c1_fraud_1min_map
USER_DB=postgres
PASSWORD_DB=1234
MODE_DB=append
CHECKPOINT_LOCATION_C1_1MIN=/user/ivan/chk-point-dir-1min/notify
CHECKPOINT_LOCATION_C1_5MIN=/user/ivan/chk-point-dir-5min/notify
CHECKPOINT_LOCATION_C1_1MIN_MAP=/user/ivan/chk-point-dir-1min-map/notify
However, for streaming data the following code does not work for format('jdbc')
(please take into accout that it works for format('console')
what makes all this issue pretty weird) (absolutely everything works fine for spark 3.1.3)
conf = SparkConf()\
.setAppName('Testing Streaming testing 4')\
.set("spark.streaming.stopGracefullyOnShutdown", "true")\
.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
.set("spark.jars", "/home/ivan/spark/jars/postgresql-42.5.0.jar")\
.set("spark.sql.shuffle.partitions", 2)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
jsonFormatSchema = open("./schemas/JsonFormatSchema-v1.json","r").read()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094") \
.option("subscribe", "bank-creditcard-one-status") \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("substring(value, 6) as avro_value") \
.select(from_avro("avro_value", jsonFormatSchema).alias("value"))
kafka_df.printSchema()
flattened_df = kafka_df.select("value.*")\
.withColumn("created_time", F.to_timestamp(F.col("created_time"),
"yyyy-MM-dd HH:mm:ss"))\
.withColumn("no_approved_credit_c1_status",
F.when(F.col('approved_credit_c1_status')==0,1).otherwise(0))
flattened_df.printSchema()
window_agg_df_approved_c1_1_min = flattened_df \
.withWatermark("created_time","5 minute") \
.groupBy(
window(F.col("created_time"), "1 minute")) \
.agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
window_agg_df_approved_c1_5_min = flattened_df \
.withWatermark("created_time","5 minute") \
.groupBy( # col("BrokerCode"),
window(F.col("created_time"), "5 minute")) \
.agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
window_df_map_approved_c1_1_min = flattened_df \
.withWatermark("created_time","5 minute")
window_agg_df_approved_c1_1_min.printSchema()
window_agg_df_approved_c1_5_min.printSchema()
output_df_approved_c1_1_min\
= window_agg_df_approved_c1_1_min.select("window.start",
"window.end",
"total_approved_c1",
"total_no_approved_c1")
output_df_approved_c1_5_min\
= window_agg_df_approved_c1_5_min.select("window.start",
"window.end",
"total_approved_c1",
"total_no_approved_c1")
def foreach_batch_function_1min(df, epoch_id):
df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
.option("dbtable",os.environ['DB_TABLE_1MIN']).option("user",os.environ['USER_DB'])\
.option("password", os.environ['PASSWORD_DB']).option("driver", "org.postgresql.Driver").mode(os.environ['MODE_DB']).save()
output_df_approved_c1_1_min\
.writeStream\
.option("checkpointLocation", os.environ['CHECKPOINT_LOCATION_C1_1MIN'])\
.foreachBatch(foreach_batch_function_1min).start()
spark.streams.awaitAnyTermination()
Please take into account that everything works fine if I use format('console')
instead:
output_df_approved_c1_1_min.writeStream.format('console').outputMode("complete").start()
I attached the logs hoping this may help
711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] INFO SparkContext: Created broadcast 1 from start at NativeMethodAccessorImpl.java:0
23/11/12 10:38:27.533 stream execution thread for [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] ERROR MicroBatchExecution: Query [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] terminated with error
py4j.Py4JException: Error while sending a command.
at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy28.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: py4j.Py4JNetworkException: Error while sending a command: c
p0
call
ro133
L0
e
at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:253)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
... 30 more
Caused by: java.net.SocketException: Connection reset
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at py4j.ClientServerConnection.readBlockingResponse(ClientServerConnection.java:313)
at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:229)
... 31 more
23/11/12 10:38:27.615 shutdown-hook-0 INFO SparkContext: Invoking stop() from shutdown hook
23/11/12 10:38:27.620 shutdown-hook-0 INFO AbstractConnector: Stopped Spark@449eb268{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
23/11/12 10:38:27.623 shutdown-hook-0 INFO SparkUI: Stopped Spark web UI at http://192.168.100.54:4040
23/11/12 10:38:27.628 YARN application state monitor INFO YarnClientSchedulerBackend: Interrupting monitor thread
23/11/12 10:38:27.641 shutdown-hook-0 INFO YarnClientSchedulerBackend: Shutting down all executors
23/11/12 10:38:27.642 dispatcher-CoarseGrainedScheduler INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
23/11/12 10:38:27.646 shutdown-hook-0 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
23/11/12 10:38:27.657 dispatcher-event-loop-9 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/12 10:38:27.668 shutdown-hook-0 INFO MemoryStore: MemoryStore cleared
23/11/12 10:38:27.669 shutdown-hook-0 INFO BlockManager: BlockManager stopped
23/11/12 10:38:27.675 shutdown-hook-0 INFO BlockManagerMaster: BlockManagerMaster stopped
23/11/12 10:38:27.677 dispatcher-event-loop-12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/12 10:38:27.690 shutdown-hook-0 INFO SparkContext: Successfully stopped SparkContext
23/11/12 10:38:27.690 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called
23/11/12 10:38:27.691 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437/pyspark-6518207c-6aac-4439-bd7c-476e7e7637cf
23/11/12 10:38:27.693 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437
23/11/12 10:38:27.695 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-4f37d206-2576-42ad-95a9-1ead167eac4e
I would really appreciate your help.