Some time ago we implemented a warehouse management app that keeps track of quantities of each product we have in the store. We solved the problem of concurrent access to data with database locks (select for update), but this approach led to poor performance when many clients try to consume product quantities from the same store. Note that we manage only a small set of product types (less than 10) so the degree of concurrency could be heavy (also, we don't care of stock re-fill). We thought to split each resource quantity in smaller "buckets", but this approach could lead to starvation for clients that try to consume a quantity that is bigger than each bucket capacity: we should manage buckets merge and so on... My question is: there are some broadly-accepted solutions to this problem? I also looked for academic articles but the topic seems too wide.
P.S. 1: our application runs in a clustered environment, so we cannot rely on the application concurrency control. The question aims to find an algorithm that structures and manages the data in a different way than a single row, but keeping all the advantages that a db transaction (using locks or not) has.
P.S. 2: for your info, we manage a wide number of similar warehouses, the example focuses on a single one, but we keep all the data in one db (prices are all the same, etc).
Edit: The setup below will still work on a cluster if you use a queueing program that can coordinate among multiple processes / servers, e.g. RabbitMQ.
You can also use a simpler queueing algorithm that only uses the database, with the downside that it requires polling (whereas a system like RabbitMQ allows threads to block until a message is available). Create a Requests table with a column for unique
requestId
s (e.g. a random UUID) that acts as the primary key, atimestamp
column, arespourceType
column, and an integerrequestedQuantity
column. You'll also need a Logs table with a uniquerequestId
column that acts as the primary key, atimestamp
column, aresourceType
column, an integerrequestQuantity
column, and a boolean/tinyint/whateversuccess
column.When a client requests a quantity of ResourceX it generates a random UUID and adds a row to the Requests table using the UUID as the requestId, and then polls the Logs table for the requestId. If the
success
column is true then the request succeeded, else it failed.The server with the database assigns one thread or process to each resource, e.g. ProcessX is in charge of ResourceX. ProcessX retrieves all rows from the Requests table where
resourceType = ResourceX
, sorted by timestamp, and then deletes them from Requests; it then processes each request in order, decrementing an in-memory counter for each successful request, and at the end of processing the requests it updates the quantity of ResourceX on the Resources table. It then writes each request and itssuccess
status to the Logs table. It then retrieves all of the requests from Requests whererequestType = RequestX
again, etc.It may be slightly more efficient to use an autoincrement integer as the Requests primary key, and to have ProcessX sort by primary key instead of by timestamp.
One option is to assign one
DAOThread
per resource - this thread is the only thing that accesses that resource's database table so that there's no locking at the database level.Worker
s (e.g. web sessions) request resource quantities using a concurrent queue - the example below uses a Java BlockingQueue, but most languages will have some sort of concurrent queue implementation you can use.The Workers send resource requests to the appropriate DAOThread's queue; the DAOThread processes these requests in order, either updating the local resource quantity if the request's value doesn't exceed the quantity and returning a Success, else leaving the quantity unchanged and returning a Failure. The database is only updated after ten updates to reduce the amount of IO; the larger MAX_CHANGES is, the more complicated it will be to recover from system failure. You can also have a dedicated IOThread that does all of the database writes - this way you don't need to duplicate any logging or timing (e.g. there ought to be a Timer that flushes the current quantity to the database after every few seconds).
The Worker uses a SynchronousQueue to wait for a response from the DAOThread (a SynchronousQueue is a BlockingQueue that can only hold one item); if the Worker is running in its own thread the you may want to replace this with a standard multi-item BlockingQueue so that the Worker can process the ReturnMessages in any order.
There are some databases e.g. Riak that have native support for counters, so this might improve your IO thoughput and reduce or eliminate the need for a MAX_CHANGES.
You can further increase throughput by introducing
BufferThread
s to buffer the requests to theDAOThread
s.The Workers send their requests to the BufferThreads, who then wait until they've buffered
BUFFERSIZE
requests or have waited for 100ms for a request to come through the buffer (Request request = queue.poll(100, TimeUnit.MILLISECONDS)
), at which point they forward the buffered message to theDAOThread
. You can have multiple buffers perDAOThread
- rather than sending aMap<String, BlockingQueue<Request>>
to the Workers you instead send aMap<String, ArrayList<BlockingQueue<Request>>>
, one queue perBufferThread
, with the Worker either using a counter or a random number generator to determine whichBufferThread
to send a request to. Note that ifBUFFERSIZE
is too large and/or if you have too manyBufferThread
s then Workers will suffer from long pause times as they wait for the buffer to fill up.