Having a little trouble with this one and hoping you guys can help… I am using C# System.Reactive. I have one observable that is simply an interval that gets azure queue messages from an azure storage queue, one at at time. This looks like this:
public IObservable<QueueMessage> On(
string queueName,
int poleIntervalSeconds = 5
)
{
return Observable
.Interval(TimeSpan.FromSeconds(poleIntervalSeconds))
.SelectMany(async _ =>
{
IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
Response<QueueMessage[]> response = await queueAdapter.ReceiveMessagesAsync(1);
return response.Value;
})
.SelectMany(messages => messages)
.Do(async queueMessage =>
{
IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
await queueAdapter.DeleteMessageAsync(queueMessage.MessageId, queueMessage.PopReceipt);
});
}
// Usage
IObservable<ProcessingCompletedEvent> processingCompletedEvent = eventAggregator
.On<ProcessingCompletedEvent>(); // Not the same On
IObservable<QueueMessage> queueMessageObservable = queueAdapter.On(queueName, 5);
It’s fairly straight forward, and for straight async processing of queue messages (windows service app) this works perfectly. However I now have an external process that uses a third party C++ SDK that I have to use pinvoke to execute. The SDK basically exposes event delegates that I have wrapped with observables using FromEventPattern. This all works fine for normal async stuff as I said, but the problem is I now have a subscription that needs to listen to the queue and ALSO wait for a completion event from the SDK wrappers observables.
In simple terms, I need to only get a new message off the queue if the previous message also has a matching completed event. This is due to issues with the SDK, which crashes if I try to do two things at a time.
For reference here is the completed event that gets emitted:
public class ProcessingCompletedEvent : EventMessage<QueueMessage>
{
public ProcessingCompletedEvent(QueueMessage data) : base(data) {
MessageId = data.MessageId;
}
public string MessageId { get; set; }
public string? CorrelationId { get; set; }
}
To be clear I don’t really have an option to change the source of the messages to service bus or anything. However, I am fine with messages backing up locally if another one is retrieved while the previous one is still being processed, I just want it to wait to process it until the previous message finished processing. I do have the option to change the On or create a new function to it with or without the interval if it better serves the purpose, but I do feel like the interval is necessary.