Database Exception in Slick 3.0 while batch insert

707 views Asked by At

While inserting thousands of records per five seconds through batch insert in slick 3 I am getting

org.postgresql.util.PSQLException: FATAL: sorry, too many clients already

My data access layer looks like :

val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url, user=user, password=password, driver= jdbcDriver)



 override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = {
    val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)}
//db.close()
        res
      }

  override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int], NoStream, Write] = {
    query ++= (rowList)
  }

closing the connection in insert batch has no effect...it still gives the same error.

I am calling insert batch from my code like this :

val temp1 = list1.flatMap { li =>
        Future.sequence(li.map { trip =>
            val data = for {
              tripData <- TripDataRepository.insertQuery( trip.tripData)
              subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
            } yield ((tripData, subTripData))
            val res=db.run(data.transactionally)
          res
//db.close()
        })
      }

if i close the connection after my work here as you can see in commented code i get error :

java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]

After calling the method without Future.sequence like this :

 val temp1 =list.map { trip =>
          val data = for {
            tripData <- TripDataRepository.insertQuery( trip.tripData)
            subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
          } yield ((tripData, subTripData))
          val res=db.run(data.transactionally)
          res
      }

I still got too many clients error...

1

There are 1 answers

0
jkinkead On BEST ANSWER

The root of this problem is that you are spinning up an unbounded list of Future simultaneously, each connecting to the database - one per entry in list.

This can be solved by running your inserts in serial, forcing each insert batch to depend on the previous:

// Empty Future for the results. Replace Unit with the correct type - whatever
// "res" is below.
val emptyFuture = Future.successful(Seq.empty[Unit])
// This will only insert one at a time. You could use list.sliding to batch the
// inserts if that was important.
val temp1 = list.foldLeft(emptyFuture) { (previousFuture, trip) =>
  previousFuture flatMap { previous =>
    // Inner code copied from your example.
    val data = for {
      tripData <- TripDataRepository.insertQuery(trip.tripData)
      subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
    } yield ((tripData, subTripData))
    val res = db.run(data.transactionally)
    previous :+ res
  }
}