I have a class that processes a never ending stream of data. I also have external components that would like to asynchronously sample the processed data from time to time. Since it could be a while before the “next item” is processed, I’d like the data inspector components to “await” the item so they don’t block. They could be any number of different data inspecting components, and if multiple inspectors are awaiting an item, it’s fine if they all get the same item instance.
I have the implementation below using a TaskCompletionSource
, and it seems to work okay, but the DataProcessor is using a full Thread to process the data, not a Task. If I were to switch to a Task, then lock
would cause problems. I’d also like to avoid locking if possible, because overwhelming majority of the time there’s nothing trying to inspect the items.
public class DataProcessor
{
private object _getDataLock = new object();
private TaskCompletionSource<Data> _getData;
private Thread _processThread;
public DataProcessor()
{
_processThread = new(ProcessDataLoop);
_processThread.Start();
}
void ProcessDataLoop()
{
while (true)
{
var processedData = ProcessNextData();
lock (_getDataLock)
{
_getData?.SetResult(processedData.Clone());
}
}
}
public Task<Data> GetData(TimeSpan timeout)
{
lock (_getDataLock)
{
_getData ??= new TaskCompletionSource<Data>();
return _getData.Task.WithTimeout(timeout);
}
}
}
public class DataInspector
{
private DataProcessor _processor;
public async Task InspectData()
{
var data = await _processor.GetData(TimeSpan.FromSeconds(30));
// look at data
}
}
I’m wondering if there’s a way to do the same thing without locking.