How to parallelize tasks while limiting concurrent executions (in F#)
2024-09-16TL;DR: use the Tasks.Parallel.ForEachAsync() based implementation
If you are using async {}
F# computation expressions, you may know you can just use Async.Parallel()
with the maxDegreeOfParallelism
parameter to control parallel executions of the computations.
With Task
s, it's not so easy.
Tasks are generally created hot, meaning they start executing as soon as they are created.
It's not very convenient when you want to await a bunch of them in parallel while keeping the number of concurrent executions under control...
Nevertheless, tasks are now everywhere in the .NET world, and quite unavoidable in F# too.
After a bit of research, System.Threading.Tasks.Parallel.ForEachAsync()
seems to be what I want, but I had a hard time using it in F# as type inference was working against me. I had to add various type hints before it could find a valid overload.
This method is also very statement-oriented since it doesn't return a result value, and so it's not very F# friendly.
I wondered if I could find a better way (easier/more intuitive/more performant), so I did a few benchmarks...
Implementation attempts
As previously mentioned, tasks are generally created hot.
To control their execution you need cold tasks.
The easiest way to do that is simply to use unit -> Task
functions instead of Task
.
Another thing: I want to be able to use the result of the awaited tasks after their parallel execution.
To keep the implementations simple and leave exceptions handling to the caller, I chose to follow the Task.WhenAll()
or Task.WhenAny()
way and just return the tasks as-is after they are executed.
The caller is then free to unwrap the completed tasks and do whatever they want with the results.
This has the downside that it's not possible to stream the results, but I'm okay with that.
Baseline implementation
As a benchmark baseline, I simply wrapped Task.WhenAll()
:
/// Baseline: runs all the tasks at the same time and awaits them all at once.
let allAtOnce (coldTasks: (unit -> Task<_>) seq) =
task {
let hotTasks = [| for f in coldTasks -> f() |]
let! _ = Task.WhenAll hotTasks
return hotTasks
}
Semaphore based implementation
A common way to implement concurrency limitation is to use a semaphore:
/// Limits concurrent tasks using a semaphore.
let withSemaphoreLimit limit coldTasks =
task {
let semaphore = new SemaphoreSlim(limit)
let executedTasks = ConcurrentQueue()
let! _ =
seq {
for f in coldTasks do
task {
do! semaphore.WaitAsync()
try
let hotTask = f()
executedTasks.Enqueue(hotTask)
return! hotTask
finally
semaphore.Release() |> ignore
} }
|> Task.WhenAll
return executedTasks :> IReadOnlyCollection<_>
}
Tasks.Parallel.ForEachAsync() based implementation
Version wrapping the Task-Parallel-Library ForEachAsync() method:
/// Limits concurrent tasks using the ForEachAsync() TPL method.
let withTPLForEachAsync maxDegreeOfParallelism (coldTasks: (unit -> Task<_>) seq) =
task {
let executedTasks = ConcurrentQueue()
do! System.Threading.Tasks.Parallel.ForEachAsync(
coldTasks,
ParallelOptions(MaxDegreeOfParallelism=maxDegreeOfParallelism),
fun f _ct ->
let hotTask = f()
executedTasks.Enqueue(hotTask)
hotTask |> ValueTask)
return executedTasks :> IReadOnlyCollection<_>
}
Spoiler: it's the one that performs better among all my attempts.
I was curious to see how ForEachAsync()
is implemented so I took a look at the .NET sources, and surprise... it's basically a loop over a semaphore!
They are doing a better job than me though, as you will see in the benchmark results...
F#-async-computations-to-tasks implementation
I also did a version using Async.Parallel
and converting tasks to async computations then back to tasks:
/// Limits concurrent tasks using F# async computations and Async.Parallel:
let withFSharpAsyncParallel limit coldTasks =
task {
let executedTasks = ConcurrentQueue()
let asyncs =
coldTasks
|> Seq.map (fun f ->
async {
let hotTask = f()
executedTasks.Enqueue(hotTask)
return! hotTask |> Async.AwaitTask
})
let! _ = Async.Parallel(asyncs, maxDegreeOfParallelism=limit) |> Async.StartImmediateAsTask
return executedTasks :> IReadOnlyCollection<_>
}
Benchmarking
I tested all the above implementations using BenchmarkDotNet and the following simple task generator:
/// Simulates the sending of a network request returning a result after some time.
let sendRequest () =
task {
do! Task.Delay 100
let now = DateTimeOffset.UtcNow
return now
}
And following input parameters:
let concurrentTasksLimit = 10_000
let coldTasks = [ for _ in 1..100_000 -> sendRequest ]
Results
// * Summary *
BenchmarkDotNet v0.14.0, Windows 10 (10.0.19045.4842/22H2/2022Update)
Intel Core i7-8700K CPU 3.70GHz (Coffee Lake), 1 CPU, 12 logical and 6 physical cores
.NET SDK 8.0.202
[Host] : .NET 8.0.3 (8.0.324.11423), X64 RyuJIT AVX2 DEBUG
DefaultJob : .NET 8.0.3 (8.0.324.11423), X64 RyuJIT AVX2
| Method | Mean | Error | StdDev | Allocated |
|----------------------------- |-----------:|--------:|--------:|----------:|
| test_allAtOnce | 174.5 ms | 1.81 ms | 1.61 ms | 32.52 MB |
| test_withTPLForEachAsync | 1,142.0 ms | 2.98 ms | 2.79 ms | 39.05 MB |
| test_withSemaphoreLimit | 1,124.5 ms | 4.88 ms | 4.08 ms | 55.81 MB |
| test_withFSharpAsyncParallel | 1,197.5 ms | 8.07 ms | 7.55 ms | 113.68 MB |
All implementations work as expected but clearly, the ForEachAsync()
based one is doing a better job regarding memory usage!
Full code
The complete code — implementations, benchmarks, and demo — is available here.
The comment is shown highlighted below in context.
JavaScript is required to see the comments. Sorry...