My datasource sends information in 100 batches of 100 records with a delay of 1 to 3 seconds between batches.
I would like to start processing data as soon as it’s received, but I’m not sure how to best approach this.
Some ideas I’ve been playing with include:
yield
- Concurrent Dictionary
- ConcurrentDictionary with INotifyProperyChanged
- Events
- etc.
As you can see I’m all over the place, and would appreciate some tested guidance on how to approach this
I would look into the TPL Dataflow APIs for this kind of work. At bare minimum you would do something as simple as:
ActionBlock<MyData> myActionBlock = new ActionBlock<MyData>(data =>
{
// your processing logic here
});
And then as your messages arrive from whatever external event you just post them to that block:
myActionBlock.Post(data);
That’s it. TPL Dataflow then implements all the producer/consumer scale for you by farming the work you’ve provided in your action definition out to as many possible worker threads as the machine can handle (which is based on all the usual TPL heuristics).
From there you can start to get a little more advanced by simply taking control of governing exactly how much capacity/scale you want with ExecutionDataflowBlockOptions
. You can also hand in a CancelationToken
via EDBO.
Then you can start to get really advanced by using the composition features of TPL Dataflow’s many different “block” types and do things like introduce a BufferBlock
which, in your case, would allow you to receive that burst of 100 items and not block the producer who is doing the .Post
call. Only if you reach 100 would the producer then be blocked from adding any more. There’s also BatchBlock
which, depending on your exact workload, it might make more sense to use so that you can send maybe 20 items off to a worker thread to be processed rather than 1 at a time.
You can link together any concoction of blocks as you can imagine. There’s filtering, transforming… you name it. To top it all off you can even write your own custom blocks. For example, if you wanted to introduce temporal coupling into the mix you could, for example, batch up to 20 item or if you haven’t received 20 for 10s you send through however many you have already received. The possibilities are pretty much limitless.
1
get back to the basics with a producer-consumer buffer (look at thread safe queues)
-
your receiving code pushes each record to a queue
-
then several processing threads will pull a record and process it in a loop
4