PLinq and Object Pooling using ConcurrentCollections

151 views Asked by At

I have a method that has to iterate over a large set of data and returned the processed results to a consumer thread for serialization. Streaming PLinq fits best performance-wise.

Because these operations are frequent, i am using an objectpool to cache the containers for my processing, to minimize the object creation. I tried implementing the objectpool using a concurrentstack (concurrentbag and concurrentqueue exhibit the same problem). In same rare cases, the same item (looking at the hashcode) is acquired from the pool by the same thread, although it was not released by the consumer thread. I added tracing in the acquire and release methods of the pool, and this is the output:

5:11:32.250 PM Get item 16071020 for Thread 31
5:11:32.254 PM Get item 16071020 for Thread 31
5:11:32.260 PM Put item 16071020 for Thread 27
5:11:32.286 PM Put item 16071020 for Thread 27

Here is the code i am using:

var itemsToProcess = data.AsParallel()
                         .Where(x => Filter(x))
                         .Select(row => Process(row));

In the Process method, i will get the object from the pool:

result = ObjectPool.Instance.GetObject();

The Pool class implementation:

public class ObjectPool
{
    private ConcurrentStack<object[]> _objects;
    private int size;
    private const int maxSize = 20000;

    private static ObjectPool instance = new ObjectPool(500);
    public static ObjectPool Instance
    {
        get { return instance; }
    }

    private ObjectPool(int size)
    {
        this.size = size;
        _objects = new ConcurrentStack<object[]>();
    }

    public object[] GetObject()
    {
        object[] item;
        if (_objects.TryPop(out item))
        {
            Trace.WriteLine(string.Format("Get item {0} for Thread {1}", item.GetHashCode(), Thread.CurrentThread.ManagedThreadId));
            return item;
        }
        return new object[size];
    }

    public void Clear()
    {
        _objects.Clear();
    }

    public void PutObject(object[] item)
    {
        Trace.WriteLine(string.Format("Put item {0} for Thread {1}", item.GetHashCode(), Thread.CurrentThread.ManagedThreadId));

        if (_objects.Count < maxSize)
        {
            _objects.Push(item);
        }
    }
}

I am at a loss on how to prevent this kind of situation to occur. Any ideas on why this can happen and how to prevent it?

1

There are 1 answers

0
svick On BEST ANSWER

I can't see anything wrong with the code you posted.

To me, the most likely case seems to be that you call PutObject() twice on the same array. But without seeing more of your code, it's impossible to tell.