I spent all day trying to figure out how to solve this issue.
The purpose is to insert several sequence of strings into a single column of a table.
I have a method like this:
case class Column(strings: Seq[String])
def insertColumns(columns: Seq[Column]) = for {
_ <- Future.sequence(columns.map(col => insert(col)))
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
This is working to a point. I tested for a sequence of 2100 columns (with 100 strings in each), and it works fine. But as soon as I increase the number of columns to 3100+, I have this error
Task slick.basic.BasicBackend$DatabaseDef$$anon$3@293ce053 rejected from slick.util.AsyncExecutor$$anon$1$$anon$2@3e423930[Running, pool size = 10, active threads = 10, queued tasks = 1000, completed tasks = 8160]
I have read on several places that doing something like this would help
case class Column(strings: Seq[String])
val f = Future.sequence(columns.map(col => insert(col)))
def insertColumns(columns: Seq[Column]) = for {
_ <- f
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
it does not.
I tried several combination of changes inside insert
Future.sequence(
rows.grouped(500).toSeq.map(group => db.run(DBIO.seq(stringTable ++= group)))
)
Source(rows).buffer(500, OverflowStrategy.backpressure)
.via(
Slick.flow(row => stringTable += row)
)
.log("nr-of-inserted-rows")
.runWith(Sink.ignore)
Source(rows)
.runWith(Slick.sink(1, row => stringTable += row))
I tried:
- to not use
reWriteBatchedInserts=trueinside my config (dataColumnStringsTable ++= rows).transactionallyoption- use a specific execution context to enable a single thread:
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))to try to execute the futures sequentially
I don't have any other idea than reworking my subscriber to receive and block my messages (sequence of strings) and handle the back pressure at queue messaging side.
I am using slick (with alpakka-slick) 3.3.3 / HikariCP 3.2.0 / Postgres 13.2
My config is as such
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
connectionPool = "HikariCP"
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
url = "jdbc:postgresql://"${slick.db.host}":5432/slick?reWriteBatchedInserts=true"
}
host = "localhost"
numThreads = 10
maxConnections = 100
minConnections = 1
}
}
Thank your for your help.
You shouldn't use
Future.sequencewith collections of more than a few elements. EveryFutureis a computation running in the background. So when you run this for a collection of, let's say, 3000columns:you effectively spawn 3000 operations at once. As a result, the executor may start rejecting new tasks.
The solution is to process the input collection with Akka Streams. In your case, this means creating a
Sourcefromcolumns(not fromrows). This will ensure that the executor is not overwhelmed with too many parallel operations. I haven't usedalpakka-slick, but looking at the docs, the solution should look something like this:What's more, if "columns" are coming from a message queue, it's possible that you don't even need an intermediate
Seq[Column]. You may simply need to define aSourceofColumnthat reads from the queue, and process it with a Slick flow.