redis collection atomic lpop all

834 views Asked by At

redis list, a producer keeps lpush. In another thread, consumers periodically take all out from the list, and categorize elements. Because the producer keeps pushing, so the taking-all-out must be done atomically. So is there an effective way to do this? spring-data-redis could be used.

// producer
getOpsForList.push(k, v);

// consumer
alist = range(k,0,-1); // take all out
alist.parallelStream() // during which a producer thread could push but I hope it is "blocked".
delete(k);  // list is now empty and push from producer is unblocked.

multi and exec does not achieve my goal, because it actually submits lrange, lpush and delete just in one transaction. So far, the only way I could think of, is keeping lpop and add returned into alist until list is empty.

EDIT, this is what I think: when you want to be sure an operation is ran only once, using watch:

watch key
val = get key
val = val + 1
multi
set key val
exec

when you want to be not "interrupted" (not multithreading interrupt), and don't care how many times it runs, transaction (multi and exec) is enough.

multi
val = lrange key 0 -1
delete key
exec

val is still a list after it finishes, like what is said in official-doc

All the commands in a transaction are serialized and executed sequentially. It can never happen that a request issued by another client is served in the middle of the execution of a Redis transaction.

Beyond redis, I took the data operation list.stream.parallelism out, and the function now only focuses on the data getter, which is exactly like the last code paragraph. ;)

1

There are 1 answers

8
holi-java On BEST ANSWER

A good example to illustrate how WATCH can be used to create new atomic operations otherwise not supported by Redis is to implement ZPOP, that is a command that pops the element with the lower score from a sorted set in an atomic way.

There is a implementation for ZPOP in documentation as below:

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

What you need to do is repeat the operation above If EXEC fails (i.e. returns a Null reply). the producer operation lpush is atomic, so it needn't to use watch command. for example:

// consumer pesudo code
do {
  watch(k);
  transaction = multi();
  alist = transaction.range(k,0,-1); 
  transaction.delete(k);  
  status = get status of transaction.exec();
} while(status == null);

alist.parallelStream()