jdbc postgres writes batch but not streaming in spark 3.2.4

66 views Asked by At

For some reason jdbc postgresql works well for sinking batch data, but it doesn't for streaming data with my new version of spark 3.2.4, Scala 2.12.15 and hadoop 3.3.4.

Jar Files:

  • kafka-clients-3.3.2.jar
  • postgresql-42.5.0.jar
  • spark-sql-kafka-0-10_2.12-3.2.4.jar
  • spark-streaming-kafka-0-10-assembly_2.12-3.2.4.jar
  • spark-token-provider-kafka-0-10_2.12-3.2.4.jar
  • commons-pool2-2.11.1.jar

Everything used to work pretty good for spark 3.1.3 but I need to upgrade spark

For batch data the following code works as expected

conf = SparkConf()\
.setAppName('Testing jdbc postgresql')\
.set("spark.streaming.stopGracefullyOnShutdown", "true")\
.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
.set("spark.sql.shuffle.partitions", 2)

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()




data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",400000000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = T.StructType([ \
    T.StructField("firstname",T.StringType(),True), \
    T.StructField("middlename",T.StringType(),True), \
    T.StructField("lastname",T.StringType(),True), \
    T.StructField("id", T.StringType(), True), \
    T.StructField("gender", T.StringType(), True), \
    T.StructField("salary", T.IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)

# df.write.jdbc(url=url, table="test", mode=mode, properties=properties)
df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
        .option("dbtable",os.environ['DB_TABLE_1MIN_MAP']).option("user",os.environ['USER_DB'])\
        .option("password", os.environ['PASSWORD_DB']).mode(os.environ['MODE_DB'])\
        .option("driver", "org.postgresql.Driver").save()

with .env file

FORMAT=jdbc
URL_DB=jdbc:postgresql://localhost:5432/testing
DB_TABLE_1MIN=c1_fraud_1min
DB_TABLE_5MIN=c1_fraud_5min
DB_TABLE_1MIN_MAP=c1_fraud_1min_map
USER_DB=postgres
PASSWORD_DB=1234
MODE_DB=append
CHECKPOINT_LOCATION_C1_1MIN=/user/ivan/chk-point-dir-1min/notify
CHECKPOINT_LOCATION_C1_5MIN=/user/ivan/chk-point-dir-5min/notify
CHECKPOINT_LOCATION_C1_1MIN_MAP=/user/ivan/chk-point-dir-1min-map/notify

However, for streaming data the following code does not work for format('jdbc') (please take into accout that it works for format('console') what makes all this issue pretty weird) (absolutely everything works fine for spark 3.1.3)

conf = SparkConf()\
        .setAppName('Testing Streaming testing 4')\
        .set("spark.streaming.stopGracefullyOnShutdown", "true")\
        .set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.4")\
        .set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4")\
        .set("spark.jars", "/home/ivan/spark/jars/postgresql-42.5.0.jar")\
        .set("spark.sql.shuffle.partitions", 2)

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()

jsonFormatSchema = open("./schemas/JsonFormatSchema-v1.json","r").read()  
    
kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094") \
        .option("subscribe", "bank-creditcard-one-status") \
        .option("startingOffsets", "earliest") \
        .load() \
        .selectExpr("substring(value, 6) as avro_value") \
        .select(from_avro("avro_value", jsonFormatSchema).alias("value")) 
        

kafka_df.printSchema()


flattened_df = kafka_df.select("value.*")\
    .withColumn("created_time", F.to_timestamp(F.col("created_time"), 
                                               "yyyy-MM-dd HH:mm:ss"))\
    .withColumn("no_approved_credit_c1_status", 
                F.when(F.col('approved_credit_c1_status')==0,1).otherwise(0))
    
flattened_df.printSchema()

    

window_agg_df_approved_c1_1_min = flattened_df \
    .withWatermark("created_time","5 minute") \
    .groupBy( 
     window(F.col("created_time"), "1 minute")) \
    .agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
    F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
    
window_agg_df_approved_c1_5_min = flattened_df \
    .withWatermark("created_time","5 minute") \
    .groupBy(  # col("BrokerCode"),
     window(F.col("created_time"), "5 minute")) \
    .agg(F.sum("approved_credit_c1_status").alias("total_approved_c1"),
    F.sum("no_approved_credit_c1_status").alias("total_no_approved_c1"))
    
window_df_map_approved_c1_1_min = flattened_df \
    .withWatermark("created_time","5 minute") 
    
window_agg_df_approved_c1_1_min.printSchema()
window_agg_df_approved_c1_5_min.printSchema()


output_df_approved_c1_1_min\
    = window_agg_df_approved_c1_1_min.select("window.start", 
                                             "window.end", 
                                             "total_approved_c1",
                                             "total_no_approved_c1")
output_df_approved_c1_5_min\
    = window_agg_df_approved_c1_5_min.select("window.start", 
                                             "window.end", 
                                             "total_approved_c1",
                                             "total_no_approved_c1")




def foreach_batch_function_1min(df, epoch_id):
    
    df.write.format(os.environ['FORMAT']).option("url", os.environ['URL_DB'])\
    .option("dbtable",os.environ['DB_TABLE_1MIN']).option("user",os.environ['USER_DB'])\
    .option("password", os.environ['PASSWORD_DB']).option("driver", "org.postgresql.Driver").mode(os.environ['MODE_DB']).save()

output_df_approved_c1_1_min\
    .writeStream\
    .option("checkpointLocation", os.environ['CHECKPOINT_LOCATION_C1_1MIN'])\
    .foreachBatch(foreach_batch_function_1min).start()

spark.streams.awaitAnyTermination()

Please take into account that everything works fine if I use format('console') instead:

output_df_approved_c1_1_min.writeStream.format('console').outputMode("complete").start()

I attached the logs hoping this may help

 711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] INFO SparkContext: Created broadcast 1 from start at NativeMethodAccessorImpl.java:0
    23/11/12 10:38:27.533 stream execution thread for [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] ERROR MicroBatchExecution: Query [id = 6abd3186-6e71-43aa-bc72-711b38a7471c, runId = d0ceb3b4-cb76-4db5-827b-bc6229c7aae3] terminated with error
    py4j.Py4JException: Error while sending a command.
        at py4j.CallbackClient.sendCommand(CallbackClient.java:397)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy28.call(Unknown Source)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
        at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
        at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
    Caused by: py4j.Py4JNetworkException: Error while sending a command: c
    p0
    call
    ro133
    L0
    e
    
        at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:253)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
        ... 30 more
    Caused by: java.net.SocketException: Connection reset
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
        at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
        at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
        at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
        at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
        at py4j.ClientServerConnection.readBlockingResponse(ClientServerConnection.java:313)
        at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:229)
        ... 31 more
    23/11/12 10:38:27.615 shutdown-hook-0 INFO SparkContext: Invoking stop() from shutdown hook
    23/11/12 10:38:27.620 shutdown-hook-0 INFO AbstractConnector: Stopped Spark@449eb268{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
    23/11/12 10:38:27.623 shutdown-hook-0 INFO SparkUI: Stopped Spark web UI at http://192.168.100.54:4040
    23/11/12 10:38:27.628 YARN application state monitor INFO YarnClientSchedulerBackend: Interrupting monitor thread
    23/11/12 10:38:27.641 shutdown-hook-0 INFO YarnClientSchedulerBackend: Shutting down all executors
    23/11/12 10:38:27.642 dispatcher-CoarseGrainedScheduler INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
    23/11/12 10:38:27.646 shutdown-hook-0 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
    23/11/12 10:38:27.657 dispatcher-event-loop-9 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    23/11/12 10:38:27.668 shutdown-hook-0 INFO MemoryStore: MemoryStore cleared
    23/11/12 10:38:27.669 shutdown-hook-0 INFO BlockManager: BlockManager stopped
    23/11/12 10:38:27.675 shutdown-hook-0 INFO BlockManagerMaster: BlockManagerMaster stopped
    23/11/12 10:38:27.677 dispatcher-event-loop-12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    23/11/12 10:38:27.690 shutdown-hook-0 INFO SparkContext: Successfully stopped SparkContext
    23/11/12 10:38:27.690 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called
    23/11/12 10:38:27.691 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437/pyspark-6518207c-6aac-4439-bd7c-476e7e7637cf
    23/11/12 10:38:27.693 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-bc5c26c6-ba5d-4d9e-aa91-bc20d542a437
    23/11/12 10:38:27.695 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /tmp/spark-4f37d206-2576-42ad-95a9-1ead167eac4e

I would really appreciate your help.

0

There are 0 answers