Friday, February 11, 2011

Generic Producer Consumer Class in C#

Over the years, I implemented the Producer/Consumer pattern numerous times. Today, I had a need to implement it. Again. In the spirit of DRY, I decided to write it once and for all.

I began thinking about the requirements of a Producer/Consumer object. It had to be generic, so that it would be type safe. Users of the object needed to be able to specify the action that would be executed upon consumption. And I wanted an Enqueue method, which would be the entry point for producers. Here's my starting point:

public class ProducerConsumer<T>
{
public ProducerConsumer(Action<T> consumerAction) { /* snip */ }

public void Enqueue(T item) { /* snip */ }
}


Producers will use the Enqueue method. But I only want to allow one producer to enqueue at a time, so I'm going to need to introduce an object that I'll use to lock with. I'm also going to need a Queue to hold items.

public class ProducerConsumer<T>
{
private readonly Queue<T> queue = new Queue<T>();

private readonly object queueLocker = new object();

/* snip */

public void Enqueue(T item)
{
lock (this.queueLocker)
{
this.queue.Enqueue(item);
}
}
}


I'm going to have to think about how to consume the items. I'll want to do this on a separate thread, so I'll set this up in the constructor. The thread's method should be an infinite loop - it will dequeue items and hand them to the consumer action. I'll also need to ensure that only one thread is adding to or removing from the queue at a time - I'll do this by locking our queueLocker when I dequeue.

public class ProducerConsumer<T>
{
/* snip */

private readonly Action<T> consumerAction;

public ProducerConsumer(Action<T> consumerAction)
{
this.consumerAction = consumerAction;
new Thread(this.ConsumeItems) { IsBackground = true }.Start();
}

/* snip */

private void ConsumeItems()
{
while (true)
{
T nextItem;

lock (this.queueLocker)
{
nextItem = this.queue.Dequeue();
}

this.consumerAction(nextItem);
}
}
}


We have a problem here. What if there are no items in the queue? I'll need to check that before dequeuing. And if there are no items in the queue, I'll need to block until there are some items. So the Enqueue method will need to signal when an item is enqueued. I'll use an AutoResetEvent for this.

public class ProducerConsumer<T>
{
private readonly AutoResetEvent queueWaitHandle = new AutoResetEvent(false);

/* snip */

public void Enqueue(T item)
{
lock (this.queueLocker)
{
this.queue.Enqueue(item);

// After enqueuing the item, signal the consumer thread.
this.queueWaitHandle.Set();
}
}

private void ConsumeItems()
{
while (true)
{
T nextItem = default(T);

// Later on, we'll need to know whether there was an item in the queue.
bool doesItemExist;

lock (this.queueLocker)
{
doesItemExist = this.queue.Count > 0;
if (doesItemExist)
{
nextItem = this.queue.Dequeue();
}
}

if (doesItemExist)
{
// If there was an item in the queue, process it...
this.consumerAction(nextItem);
}
else
{
// ...otherwise, wait for the an item to be queued up.
this.queueWaitHandle.WaitOne();
}
}
}
}


That's everything! Now, to put it together:

public class ProducerConsumer<T>
{
private readonly Queue<T> queue = new Queue<T>();

private readonly object queueLocker = new object();

private readonly AutoResetEvent queueWaitHandle = new AutoResetEvent(false);

private readonly Action<T> consumerAction;

public ProducerConsumer(Action<T> consumerAction)
{
if (consumerAction == null)
{
throw new ArgumentNullException("consumerAction");
}

this.consumerAction = consumerAction;
new Thread(this.ConsumeItems) { IsBackground = true }.Start();
}

public void Enqueue(T item)
{
lock (this.queueLocker)
{
this.queue.Enqueue(item);

// After enqueuing the item, signal the consumer thread.
this.queueWaitHandle.Set();
}
}

private void ConsumeItems()
{
while (true)
{
T nextItem = default(T);

// Later on, we'll need to know whether there was an item in the queue.
bool doesItemExist;

lock (this.queueLocker)
{
doesItemExist = this.queue.Count > 0;
if (doesItemExist)
{
nextItem = this.queue.Dequeue();
}
}

if (doesItemExist)
{
// If there was an item in the queue, process it...
this.consumerAction(nextItem);
}
else
{
// ...otherwise, wait for the an item to be queued up.
this.queueWaitHandle.WaitOne();
}
}
}
}


To use it, instantiate a ProducerConsumer, passing in the action you want to be performed when an item is consumed. Then, just start enqueuing items.

void Main()
{
var producerConsumer = new ProducerConsumer<int>(i => Console.WriteLine(i));

Random random = new Random();

var t1 = new Thread(() =>
{
for (int i = 0; i < 100; i++)
{
producerConsumer.Enqueue(i);
Thread.Sleep(random.Next(0, 5));
}
});

var t2 = new Thread(() =>
{
for (int i = 0; i > -100; i--)
{
producerConsumer.Enqueue(i);
Thread.Sleep(random.Next(0, 5));
}
});

t1.Start();
t2.Start();

t1.Join();
t2.Join();

Thread.Sleep(50);
}


What I've got here is a very basic implementation. The download allows the user to start and stop the consumer thread, along with options concerning what to do when it is stopped. It also gives the user the ability to clear the queue.

4 comments:

  1. Very nice Random Skunk. I really liked your description of the creation process. The only negative is that I had to look up DRY!

    ReplyDelete
  2. Do you have C# 3.0 version of the code ?

    ReplyDelete
  3. The Best 8 Best Casinos in New Orleans - MapyRO
    Casinos with more than 80 slot machines and more at a glance · The 속초 출장샵 10 Best 경산 출장샵 Casinos Near the city · 10 Best Casinos Near 김제 출장안마 the city · 9 Best 대전광역 출장안마 Casinos Near Washington 춘천 출장마사지

    ReplyDelete
  4. And to know 솔카지노 which one is an excellent slot to play, want to|you should|you have to} care about the Return to Player share. In other words, if you are making an attempt to discover how to decide on} a slot machine and method to|tips on how to} discover the most effective slot machines to play online, you will love this information. All Wildcard Club members obtain 1 base point for every $1.00 cycled through the machines. As long as you play quickly as} every 6 months your factors will stay lively in your cards. Aside from the entertainment of casinos, some folks do get swept into an addiction that far surpasses the entertainment value of the games. Only a small share of gamblers attain this point, but sadly, it’s estimated that their losses make up a quarter of the profits for the casinos.

    ReplyDelete