I have a Producer/Consumer implementation where the number of consumers is configurable (this is a form of configurable throttling). The producer and consumer are kicked off like this:
var cts = new CancellationTokenSource();
var processCancelToken = cts.Token;
Task.Run(() => Parallel.Invoke(new ParallelOptions() { CancellationToken = processCancelToken }
, producer
, consumer
)
, processCancelToken);
The producer action is quite simple and just populates a BlockingCollection with objects that are derived from System.Threading.Tasks.Task (yes it is possible!). Here is a simplified example:
var pollingInterval = 30000;
var producer = new Action(() =>
{
Random rnd = new Random(DateTime.Now.Second);
while (!processCancelToken.IsCancellationRequested)
{
for (int ii = 0; ii < 10; ii++)
{
var r = rnd.Next(2, 15);
_mainQueue.Add(new Task(() =>
{
//this is a dummy task for illustrative purposes
Console.WriteLine(" Queued task starting, set to sleep {0} seconds, ID: {1}", r, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(r*1000);
}));
Console.WriteLine(" Producer has added task to queue");
}
System.Threading.Thread.Sleep(pollingInterval);
}
Console.WriteLine("Exiting producer");
}
For this example it creates an anonymous task that sleeps for a random number of seconds between 2 and 15. In the real code this producer polls the database, extracts data entities representing work items, then transforms those into executable tasks that are added to the collection.
I then have a consumer task that uses a Parallel.For()
to start n instances of an anonymous action which then dequeues a task from the collection, then starts and waits on the task, then repeats:
var numberConsumerThreads = 3;
var consumer = new Action(() =>
{
Parallel.For(0, numberConsumerThreads, (x) =>
{
//this action should continue to dequeue work items until it is cancelled
while (!processCancelToken.IsCancellationRequested)
{
var dequeuedTask = _mainQueue.Take(processCancelToken);
Console.WriteLine(" Consumer #{0} has taken task from the queue", Thread.CurrentThread.ManagedThreadId);
dequeuedTask.Start();
while (!processCancelToken.IsCancellationRequested)
{
if (dequeuedTask.Wait(500))
break;
Console.WriteLine(" Consumer #{0} task wait elapsed", Thread.CurrentThread.ManagedThreadId);
}
}
Console.WriteLine("Exiting consumer #{0}", Thread.CurrentThread.ManagedThreadId);
});
}
The question: is this an efficient way to start and operate an arbitrary number of consumers? Or is there a more efficient way of using PLINQ from within the main consumer action that both continues to execute queued tasks, blocks while there isn’t any, and can still be cancelled using processCancelToken
?
(Note: processCancelToken
is operated separately to cancel tokens contained within the queued tasks – they are independently cancelable. This all runs within a Windows service and processCancelToken
is used to cancel everything if the service is stopped).
6