How to ensure Tasks are executed serially (FIFO)
2020-10-10(in C#...)
I have an async
method that is called from various places in my code base, potentially from different threads.
I need to ensure that calls to this method are executed serially: in order of arrival, and only one at a time.
It turns out there is no ready-made solution for this problem in the BCL, and no perfect solution outside...
A solution
Without further delay, here is what I came up with:
using System;
using System.Threading;
using System.Threading.Tasks;
public class AsyncJobQueue
{
private object _locker = new object();
Task _previousJob = Task.CompletedTask;
private int _count;
public int Count => _count;
/// <summary>
/// Serialize jobs execution in order of arrival.
/// Jobs are not started until the previous one is complete.
/// </summary>
public async Task<T> WaitForMyJobAsync<T>(Func<Task<T>> getJob)
{
Task previousJob;
var myJobIsCompleteTcs = new TaskCompletionSource<bool>();
lock (_locker)
{
// Replace the previous job with a TCS that will complete
// when our job completes:
previousJob = _previousJob;
_previousJob = myJobIsCompleteTcs.Task;
Interlocked.Increment(ref _count); // Keep count for debug
}
// Wait for the previous job to complete.
// No need for a try catch because the previous job is a TCS too,
// so it will never fail.
await previousJob;
try
{
return await getJob();
}
finally
{
myJobIsCompleteTcs.SetResult(true);
Interlocked.Decrement(ref _count);
}
}
}
The basic idea is to keep track of the previous/antecedent Task
and wait for its completion before executing a new one.
This is done in a lock to prevent concurrency issues.
It's simple, but it works well. Exceptions are also properly taken care of.
Improvements
TPL Continuations
I did not use one of the TPL ContinueWith()
methods to chain Task
executions.
I know there are many pitfalls with the TPL, especially with its low-level building blocks, so I have learned to stay away from them when I can.
If this was a problem, replacing async/await
with ContinueWith()
would probably increase performance slightly...
TaskCompletionSource is kind of useless
For simplicity and clarity, I used a TaskCompletionSource<bool>
as a signal for the completion of the antecedent Task
.
This way, I don't have to deal with exceptions, and the code feels easier to understand.
It probably also has an impact on performance...
Alternative implementation
Another implementation I found after is SerialQueue, on GitHub.
Here is the gist of it (from this revision, edited by me):
public class SerialQueue
{
readonly object _locker = new object();
readonly WeakReference<Task> _lastTask = new WeakReference<Task>(null);
public Task<T> Enqueue<T>(Func<Task<T>> asyncFunction)
{
lock (_locker)
{
Task<T> resultTask;
if (_lastTask.TryGetTarget(out var lastTask))
resultTask = lastTask.ContinueWith(_ => asyncFunction(), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
else
resultTask = asyncFunction();
_lastTask.SetTarget(resultTask);
return resultTask;
}
}
}
Pros:
- More consideration for performance than my implementation
- Avoids the creation of an
async
state machine by usingContinueWith()
WeakReference
for the antecedentTask
is a nice touch
- Avoids the creation of an
Cons:
- Old fashioned (predates
async/await
), though this is not bad in itself - Reading the code and the tests, I don't feel like the author completely know what they are doing...
- I don't like that the first call to
Enqueue()
will always execute on the thread pool (Task.Run(...)
) - I don't like unit tests that use actual delays (
Task.Delay()
orThread.Sleep()
)
- I don't like that the first call to
That's it! I hope this post will be helpful to the next person in need of a FIFO task queue!
The comment is shown highlighted below in context.
JavaScript is required to see the comments. Sorry...