SQL Transaction using Apache Spark and Scala

78 views Asked by At

With a data frame of IPs and updated status, we first need to delete the data frame's IPs from the SQL table. After that, we should insert this data frame with the updated status using Spark SQL. Additionally, the transaction should roll back if either the deletion or insertion encounters an error. I am stuck with various solutions; if you can suggest an optimal way of doing it, it would be of great help

val connection = DriverManager.getConnection(jdbcUrl, "user", "Password")
    val statement = connection.createStatement()

    try {

      connection.setAutoCommit(false)

       //Build the DELETE SQL statement
      val deleteStatement =
        s"""
           |DELETE FROM test_logs1
           |WHERE IPAddress IN (${ipList.map(ip => s"'$ip'").mkString(",")})
       """.stripMargin

       //Execute the DELETE statement
      statement.execute(deleteStatement)

      println("df_to_insert after deletion:")
      df_to_insert.show()
      df_to_insert.write
        .mode("append")
        .jdbc(jdbcUrl, tableName, connectionProperties)

      statement.execute("COMMIT")
    
      connection.commit()


    }

    catch {
      case e: Exception =>

        // Rollback the transaction in case of an exception
        println("Error occurred: " + e.getMessage)
        connection.rollback()

    } finally {
      // Close the statement and connection
      if (statement != null) statement.close()
      if (connection != null) connection.close()
}

This gives me a lock-timeout, retry transaction error

1

There are 1 answers

0
Gaël J On

Your DELETE statement and your INSERT ones via df_to_insert.write are not using the same connection (and transaction).

This likely explains the error you get as the DELETEs are not commited yet when you try to insert.

df_to_insert.write is opening its own connection/transaction.