I have a a stream of entity transactions. Each transaction is a json consists of _id of the entity and a time of the transaction (there are many more fields). I want to save in the last transaction of each entity in my mongodb colection.
meaning i want to replace an existing mongo record if it has the same _id and the new time is great the the current time. If the _id does not exists yet, then I want to insert it as a new record.
Currently I have a semi working version. I just do 2 queries. If I am processing 100 transaction than first I query the mongo to get all the _ids and their time field currently in my collection than I just filter it in my python project than I create replace one statements for all the new records and the records with a newer time field and I do a bulk_write operation.
However there is a problem when there are few workers at the same time. what if 2 workers try to process the same entity _id at the same time. The data might be wrong.
As we are using kafka for the data stream we thought maybe just put a key to the kafka messages so all the transactions with the same __id will go to the same worker, however, it is also not possible cause it limits our scalability.
So I came up with a new solution to save all the data as an archive data in my mongo collection. For each json I will convert it to look like {_id:uuid, schema:{original_transaction}}. And I will build a cron job that will run every 5 min and delete old record.
Yet maybe there is just an easy solution I am missing so I would like to know if there is a better and easier solution