/             All Posts       Projects       GitHub       More...
🔗

How to ensure Tasks are executed serially (FIFO)

(in C#...)

I have an async method that is called from various places in my code base, potentially from different threads.

I need to ensure that calls to this method are executed serially: in order of arrival, and only one at a time.

It turns out there is no ready-made solution for this problem in the BCL, and no perfect solution outside...

A solution

Without further delay, here is what I came up with:

using System;
using System.Threading;
using System.Threading.Tasks;

public class AsyncJobQueue
{
    private object _locker = new object();
    Task _previousJob = Task.CompletedTask;

    private int _count;
    public int Count => _count;

    /// <summary>
    /// Serialize jobs execution in order of arrival.
    /// Jobs are not started until the previous one is complete.
    /// </summary>
    public async Task<T> WaitForMyJobAsync<T>(Func<Task<T>> getJob)
    {
        Task previousJob;
        var myJobIsCompleteTcs = new TaskCompletionSource<bool>();

        lock (_locker)
        {
            // Replace the previous job with a TCS that will complete
            // when our job completes:
            previousJob = _previousJob;
            _previousJob = myJobIsCompleteTcs.Task;
            Interlocked.Increment(ref _count); // Keep count for debug
        }

        // Wait for the previous job to complete.
        // No need for a try catch because the previous job is a TCS too,
        // so it will never fail.
        await previousJob;

        try
        {
            return await getJob();
        }
        finally
        {
            myJobIsCompleteTcs.SetResult(true);
            Interlocked.Decrement(ref _count);
        }
    }
}

The basic idea is to keep track of the previous/antecedent Task and wait for its completion before executing a new one. This is done in a lock to prevent concurrency issues.

It's simple, but it works well. Exceptions are also properly taken care of.

Improvements

TPL Continuations

I did not use one of the TPL ContinueWith() methods to chain Task executions.

I know there are many pitfalls with the TPL, especially with its low-level building blocks, so I have learned to stay away from them when I can.

If this was a problem, replacing async/await with ContinueWith() would probably increase performance slightly...

TaskCompletionSource is kind of useless

For simplicity and clarity, I used a TaskCompletionSource<bool> as a signal for the completion of the antecedent Task. This way, I don't have to deal with exceptions, and the code feels easier to understand.

It probably also has an impact on performance...

Alternative implementation

Another implementation I found after is SerialQueue, on GitHub.

Here is the gist of it (from this revision, edited by me):

public class SerialQueue
{
    readonly object _locker = new object();
    readonly WeakReference<Task> _lastTask = new WeakReference<Task>(null);

    public Task<T> Enqueue<T>(Func<Task<T>> asyncFunction)
    {
        lock (_locker)
        {
            Task<T> resultTask;

            if (_lastTask.TryGetTarget(out var lastTask))
                resultTask = lastTask.ContinueWith(_ => asyncFunction(), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
            else
                resultTask = asyncFunction();

            _lastTask.SetTarget(resultTask);

            return resultTask;
        }
    }
}

Pros:

Cons:


That's it! I hope this post will be helpful to the next person in need of a FIFO task queue!

Older
0 comment



Formatting cheat sheet.
The current page url links to a specific comment.
The comment is shown highlighted below in context.

JavaScript is required to see the comments. Sorry...