how to use concurrentskiplistmap correctly?

2.2k views Asked by At

trying to use a concurrent skip list map. i had problems with how to use a synchronized linked hash map correctly, so i decided to give concurrent skip list map a try.

i have the same sort of problem. the unit test below fails because when i get the entry set, it has null values when size() indicates that the map is not empty. naict, i have all access to the map synchronized.

i would think that one would not need to do this (synchronized), since this a concurrent map.

the server just puts the numbers 0,1,2,3, ... into the map, keeping it's size below a threshold. it tries to put one number in for each millisecond that has passed since the server was started.

any pointers will be appreciated.

thanks

import static org.junit.Assert.*;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import org.junit.*;
class DummyServer implements Runnable {
    DummyServer(int pieces) {
        t0=System.currentTimeMillis();
        this.pieces=pieces;
        max=pieces;
        lruMap=new ConcurrentSkipListMap<Long,Long>();
    }
    Set<Map.Entry<Long,Long>> entrySet() {
        Set<Entry<Long,Long>> entries=null;
        synchronized(lruMap) {
            entries=Collections.unmodifiableSet(lruMap.entrySet());
        }
        return entries;
    }
    Set<Long> keySet() {
        Set<Long> entries=null;
        synchronized(lruMap) {
            entries=Collections.unmodifiableSet(lruMap.keySet());
        }
        return entries;
    }
    @Override public void run() {
        int n=0;
        while(piece<stopAtPiece) {
            long target=piece(System.currentTimeMillis()-t0);
            long n0=piece;
            for(;piece<target;piece++,n++)
                put(piece);
            if(n>max+max/10) {
                Long[] keys=keySet().toArray(new Long[0]);
                synchronized(lruMap) {
                    for(int i=0;n>max;i++,n--)
                        lruMap.remove(keys[i]);
                }
            }
            try {
                Thread.sleep(10);
            } catch(InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }
    private void put(long piece) {
        synchronized(lruMap) {
            lruMap.put(piece,piece);
        }
    }
    public long piece() {
        return piece;
    }
    public Long get(long piece) {
        synchronized(lruMap) {
            return lruMap.get(piece);
        }
    }
    public int size() {
        synchronized(lruMap) {
            return lruMap.size();
        }
    }
    public long piece(long dt) {
        return dt/period*pieces+dt%period*pieces/period;
    }
    private long piece;
    int period=2000;
    private volatile Map<Long,Long> lruMap;
    public final long t0;
    protected final int pieces;
    public final int max;
    public long stopAtPiece=Long.MAX_VALUE;
}
public class DummyServerTestCase {
    void checkMap(Long n) {
        if(server.size()>0) {
            final Set<Map.Entry<Long,Long>> mapValues=server.entrySet();
            @SuppressWarnings("unchecked") final Map.Entry<Long,Long>[] entries=new Map.Entry[mapValues.size()];
            mapValues.toArray(entries);
            try {
                if(entries[0]==null)
                    System.out.println(server.piece());
                assertNotNull(entries[0]);
            } catch(Exception e) {
                fail(e.toString());
            }
        }
    }
    @Test public void testRunForFirstIsNotZero() {
        server.stopAtPiece=1*server.pieces;
        Thread thread=new Thread(server);
        thread.start();
        while(thread.isAlive()) {
            for(long i=0;i<server.piece();i++) {
                server.get(i);
                Thread.yield();
                checkMap(server.piece());
                Thread.yield();
            }
        }
    }
    DummyServer server=new DummyServer(1000);
}
2

There are 2 answers

0
Peter Lawrey On BEST ANSWER

The problem is that you are performing

final Map.Entry<Long,Long>[] entries=new Map.Entry[mapValues.size()]; // size>0
mapValues.toArray(entries); // size is 0.

Between creating the array and calling toArray you are clearing the map.

If you take a copy using the Iterator you will not get this race condition.

void checkMap(Long n) {
    final Set<Map.Entry<Long, Long>> mapValues = server.entrySet();
    Set<Map.Entry<Long, Long>> entries = new LinkedHashSet<>(mapValues);

    for (Entry<Long, Long> entry : entries) {
        assertNotNull(entry);
    }
}

or

void checkMap(Long n) {
    for (Entry<Long, Long> entry : server.entrySet())
        assertNotNull(entry);
}
2
John Vint On

First you shouldn't ever have to synchronize a thread-safe collection implementation unless you have to do some compound operation. The ConcurrentMap offers good atomic compound functions for you so even then you shouldnt have to.

Second. You should never rely on the size method to be correct while doing concurrent operations. The javadoc notes:

Beware that, unlike in most collections, the size method is not a constant-time operation. Because of the asynchronous nature of these maps, determining the current number of elements requires a traversal of the elements.

The size can be different from when you start the invocation to when you get a return.

In short your test isn't a valid concurrent test. Can you elaborate more on what you're trying to achieve?