I have a system which has 5 services. Each are run in a separate process. They resemble an assembly line, in the sense that they mostly process messages in the same order 1->2->3->4->5, however it’s not always the case that the processing is linear. There are cases where it can skip though the chain (e.g. 1->2->5) or go backwards (e.g. 1->2->4->3->1) or even zig-zag (1->2->4->3->5).
The system is fed payloads to process one by one, starting with service 1. Given the system is asynchronous and non-linear, I cannot really know when the whole system processed each of the payloads. On the other hand, I want the system to stop sometimes and the best choice is as soon as it processed all the payloads.
These solutions would not be too good:
- A simple kill or using a poison-pill approach would not work (example https://stackoverflow.com/questions/13847963/akka-kill-vs-stop-vs-poison-pill), as then I could possibly end up not processing the payload completely (1->2->1 would fail, as 1 would have been stopped after 2 received the last payload)
- I can do a timeout approach, however that would not work for two reasons: a) it will hinder development / testing (e.g. if I waited 1 minute for a 5-sec payload effective processing time), b) timeouts still can be too low, so I could run into issues if some services did not finishe processing
I was thinking about creating another service that would serve as a two-phase commit kind of stopper. The idea would go along these lines:
- Any service could call stopper to say “I want the system to stop”
- Stopper would call all other services and tell them “Stop has been requested”
- Other services will wait till they process what they have in the queue and tell stopper “I can to be stopped”
- Stopper will wait till it collects “I can be stopped” from all services, then broadcast “Perform stop”
- All services will then go ahead and stop their respective processes in a graceful manner
I have issues with the above:
- It looks overly complex for the benefit (compared to “just wait 1 minute then stop” and not adding any non-business code), not sure if worth the trade-off
- I would have to infuse logic related to stopping into all services and it would have to be coupled to both the queuing mechanism (so it knows when there are not more payload messages) and business logic (so it knows when the payload itself is done processing)
- There are still some edge cases to be considered. E.g. in 1->2->1 scenario, 1 could finish, 2 could be processing the last payload, when it’s done it could send a message to 1 and to stopper service, stopper will send “Perform stop” and 1 could receive that message before it received a message from 2 that it should process, so it could stop prematurely
Are there best practices or examples of how to achieve this in some other way? Or maybe constraining the system in some other way that would make this problem disappear / easier to manage?
3
Simply have a “STOP” control message.
Send it to process 1,
Process 1 should forward a STOP message to all the other processes.
Each process on receiving a STOP message should drain all messages from its queue, wait for more messages to appear on the queue (20 seconds or so) and if no more messages arrive in that time then shutdown otherwise process the remaining messages and wait another 20 seconds.
You will need some mechanism to differentiate a genuinely “new” message from outside the system arriving at process 1 (which you do not want to process) and messages generated internally by processes 2-5 which you do want to process before shutting down.
The last time I had to do this, I designed the steps in the assembly line so that, if any of the steps received a null object from the previous step, that was an indication that there were no more items to process.
Then all you have to do is simply send a stop signal to the object that is feeding the assembly lines. The object will then send null items to the assembly lines, effectively flushing them and shutting them down. As each step shuts down, it sends a null object to the following step, shutting it down as well.
So here is the architecture of my last application:
File Reader --> Producer --> Queue --> Filter --> --> Output Queue --> Consumer --> File Writer
-> Queue --> Filter --> -/
The producer splits the input file packets into channels, represented by the multiple queue-filter combinations (pipelines). The filter conforms to input and output interfaces, making it swappable. The entire architecture from producer to consumer (but excluding the file reader and file writer objects) is wrapped in a Controller class. Each queue/filter combination is contained in a Pipeline class. The Controller class has a Cancel method, which sends a Cancel signal to the Producer. The producer, in turn, calls Cancel methods on each of the pipelines. There is a Flush() method on each of the queues, which is called by the pipeline objects.
When Dequeue() is called on a queue, it will do one of three things:
- Return a packet from the queue, or
- If the queue is empty, block until a packet is enqueued, unless
- Flush() has been called on the queue, in which case the Dequeue() method will return a null object if the queue is empty.
So when no more packets are available, the filter will see a null packet. The entire process is completed when the output queue returns a null object after it has been Flushed. The controller will not flush the output queue until all pipelines have signalled that they are both Flushed and empty (having received the final null object from the input queue).
Now, if a loop condition exists, where an item returns to some previous stage, what you’ll have to do is wrap your packet in another object, so that you can store some state information with the packet, such as how many times it’s gone through stage 1, etc.
8
If you have a circular queue of services (eg 1 -> 2 -> 1) so you cannot send any form of “stop” message, then you will have to stop the entry to the entire system and wait for them all to complete processing.
Only when they are all quiet will it be safe to turn them all off, so you’ll need a monitor to see every service’s processing status. Alternatively it might be possible to count entries and exits to the system if every message sent in comes out. Then you can stop entry to the system and the last process in the chain (a special one that is always placed last and performs little processing, maybe just reporting) is the one that can tell when the system has completed all processing – if that is appropriate for your system.