Friday, February 11, 2011

Generic Producer Consumer Class in C#

Over the years, I implemented the Producer/Consumer pattern numerous times. Today, I had a need to implement it. Again. In the spirit of DRY, I decided to write it once and for all.

I began thinking about the requirements of a Producer/Consumer object. It had to be generic, so that it would be type safe. Users of the object needed to be able to specify the action that would be executed upon consumption. And I wanted an Enqueue method, which would be the entry point for producers. Here's my starting point:

public class ProducerConsumer<T>
{
public ProducerConsumer(Action<T> consumerAction) { /* snip */ }

public void Enqueue(T item) { /* snip */ }
}


Producers will use the Enqueue method. But I only want to allow one producer to enqueue at a time, so I'm going to need to introduce an object that I'll use to lock with. I'm also going to need a Queue to hold items.

public class ProducerConsumer<T>
{
private readonly Queue<T> queue = new Queue<T>();

private readonly object queueLocker = new object();

/* snip */

public void Enqueue(T item)
{
lock (this.queueLocker)
{
this.queue.Enqueue(item);
}
}
}


I'm going to have to think about how to consume the items. I'll want to do this on a separate thread, so I'll set this up in the constructor. The thread's method should be an infinite loop - it will dequeue items and hand them to the consumer action. I'll also need to ensure that only one thread is adding to or removing from the queue at a time - I'll do this by locking our queueLocker when I dequeue.

public class ProducerConsumer<T>
{
/* snip */

private readonly Action<T> consumerAction;

public ProducerConsumer(Action<T> consumerAction)
{
this.consumerAction = consumerAction;
new Thread(this.ConsumeItems) { IsBackground = true }.Start();
}

/* snip */

private void ConsumeItems()
{
while (true)
{
T nextItem;

lock (this.queueLocker)
{
nextItem = this.queue.Dequeue();
}

this.consumerAction(nextItem);
}
}
}


We have a problem here. What if there are no items in the queue? I'll need to check that before dequeuing. And if there are no items in the queue, I'll need to block until there are some items. So the Enqueue method will need to signal when an item is enqueued. I'll use an AutoResetEvent for this.

public class ProducerConsumer<T>
{
private readonly AutoResetEvent queueWaitHandle = new AutoResetEvent(false);

/* snip */

public void Enqueue(T item)
{
lock (this.queueLocker)
{
this.queue.Enqueue(item);

// After enqueuing the item, signal the consumer thread.
this.queueWaitHandle.Set();
}
}

private void ConsumeItems()
{
while (true)
{
T nextItem = default(T);

// Later on, we'll need to know whether there was an item in the queue.
bool doesItemExist;

lock (this.queueLocker)
{
doesItemExist = this.queue.Count > 0;
if (doesItemExist)
{
nextItem = this.queue.Dequeue();
}
}

if (doesItemExist)
{
// If there was an item in the queue, process it...
this.consumerAction(nextItem);
}
else
{
// ...otherwise, wait for the an item to be queued up.
this.queueWaitHandle.WaitOne();
}
}
}
}


That's everything! Now, to put it together:

public class ProducerConsumer<T>
{
private readonly Queue<T> queue = new Queue<T>();

private readonly object queueLocker = new object();

private readonly AutoResetEvent queueWaitHandle = new AutoResetEvent(false);

private readonly Action<T> consumerAction;

public ProducerConsumer(Action<T> consumerAction)
{
if (consumerAction == null)
{
throw new ArgumentNullException("consumerAction");
}

this.consumerAction = consumerAction;
new Thread(this.ConsumeItems) { IsBackground = true }.Start();
}

public void Enqueue(T item)
{
lock (this.queueLocker)
{
this.queue.Enqueue(item);

// After enqueuing the item, signal the consumer thread.
this.queueWaitHandle.Set();
}
}

private void ConsumeItems()
{
while (true)
{
T nextItem = default(T);

// Later on, we'll need to know whether there was an item in the queue.
bool doesItemExist;

lock (this.queueLocker)
{
doesItemExist = this.queue.Count > 0;
if (doesItemExist)
{
nextItem = this.queue.Dequeue();
}
}

if (doesItemExist)
{
// If there was an item in the queue, process it...
this.consumerAction(nextItem);
}
else
{
// ...otherwise, wait for the an item to be queued up.
this.queueWaitHandle.WaitOne();
}
}
}
}


To use it, instantiate a ProducerConsumer, passing in the action you want to be performed when an item is consumed. Then, just start enqueuing items.

void Main()
{
var producerConsumer = new ProducerConsumer<int>(i => Console.WriteLine(i));

Random random = new Random();

var t1 = new Thread(() =>
{
for (int i = 0; i < 100; i++)
{
producerConsumer.Enqueue(i);
Thread.Sleep(random.Next(0, 5));
}
});

var t2 = new Thread(() =>
{
for (int i = 0; i > -100; i--)
{
producerConsumer.Enqueue(i);
Thread.Sleep(random.Next(0, 5));
}
});

t1.Start();
t2.Start();

t1.Join();
t2.Join();

Thread.Sleep(50);
}


What I've got here is a very basic implementation. The download allows the user to start and stop the consumer thread, along with options concerning what to do when it is stopped. It also gives the user the ability to clear the queue.