How to store ByteBufs inside of a Caffeine LoadingCache without race conditions?

93 views Asked by At

I have a LoadingCache that loads a Netty ByteBuf from some data. I need to be able to release this data using release() once the entry is evicted, however there is a race condition where the entry is evicted before I am able to retain() it and the ByteBuf returned is invalid.

Here is an example of what I am trying to do - the race occurs when the removal listener is invoked between the get and the retain.

LoadingCache<String, ByteBuf> cache = Caffeine.newBuilder()
    .maximumSize(128)
    .evictionListener(
        (RemovalListener<String, ByteBuf>) (string, buf, removalCause) -> {
            buf.release();
        }
    )
    .build(
        key -> {
            ByteBuf byteBuf = null;
            // TODO: Create the ByteBuf from a pool
            return byteBuf;
        }
    );

ByteBuf buffer = cache.get("hello").retain();

// If the entry is evicted between call to get and the retain then a race condition occurs
// That means the reference count drops to 0 before the retain is invoked

Is there any way to have Caffeine safely and atomically invoke retain before returning from the get?

1

There are 1 answers

2
Ben Manes On BEST ANSWER

You can use asMap().compute to perform a read/write operation on the entry.

Cache<String, ByteBuf> cache = Caffeine.newBuilder()
    .evictionListener((String string, ByteBuf buf, RemovalCause cause) -> buf.release())
    .maximumSize(128)
    .build();

ByteBuf buffer = cache.asMap().compute(key, (k, buf) -> {
  if (buf == null) {
    buf = // TODO: Create the ByteBuf from a pool
  }
  buf.retain();
  return buf;
});

You may also be interested in pinning where you mark the entry as not evictable by specifying that it consumes zero capacity, so it will be skipped over by a size eviction.

Cache<String, ByteBuf> cache = Caffeine.newBuilder()
    .weigher((String string, ByteBuf buf) -> (buf.refCnt() == 0) ? 1 : 0)
    .maximumWeight(128)
    .build();

public ByteBuf acquire(String key) {
  // above
}
public void release(String key) {
  cache.asMap().compute(key, (k, buf) -> {
    buf.release();
    return buf;
  });
}