I’m working on a problem and am trying to wrap my head around what the best pattern to use should be. I’ve been digging on Stephen Cleary’s Concurrency in C# Cookbook as well as some S.O. posts on TaskCompletionSource. I think it should be something async related, but am unsure what to use.
My problem is that I have a process that sends a message to n other processes each of which returns some value/information back to this process via another message. I need to collect these replies, change them into an expected data structure, and then send them all back to another process.
My guess on how to do this is something like the pseudocode below. I’m thinking of sending the messages to the processes, creating a new method called “getFormattedResults…” and then having that method essentially monitor a data structure, thinking a dictionary, and then continues when all those messages are ready. This isn’t really async, and I’m sure there’s a better, simpler, and more async friendly way to do it, but am not sure how async and IPC work together.
// Maybe this task tracking object is the actual message it gets back?
someThreadSafeDictionary<Id, someTaskTrackingObject> messageRepliesDict
List<Values> SomeMethodThatSendMultMessages() {
for process in processes {
messageRepliesDict.add(process.Id, someTaskTrackingObject)
sendMessage(process)
}
// Not sure if this is needed? Thinking this new method could block.
List<Values> results = getFormattedResultsFromProcessReplies()
return results
}
void sendMessage(msg) {
// Logic what to do with message
if msg.type == oneImInterestedIn {
addMessageToMessageRepliesDict(msg)
}
}
List<Values> getFormattedResultsFromProcessReplies() {
Timeout = 5000
weTimedOut = true
Time = 0
While Time < Timeout {
if Task.WhenAll(messageRepliesDict == Done) {
weTimedOut = false
break
}
}
if weTimedOut {
errorHandling
return
}
List<Values> results = collectMessagesFromDictAndFormatThem()
return results
}