admin管理员组

文章数量:1392003

I have ~10000 objects of type System.Text.Json.JsonDocument. For each JsonDocument in parallel, I have to do the following steps in order.

  • clone JsonDocument to JsonNode
  • apply a sequence of tasks transforming the JsonNode in place
  • convert the shape of the JsonNode to be an opensearch document
  • store the converted JsonNode into a data structure (perhaps a ConcurrentBag)

Now, since I have~10000 documents, I'd like to process these documents in parallel, applying the flow I outlined above. The problem is step2 of the flow. the sequence of tasks that would be applied on the second step have different characteristics: Some of them are very small and fast, some are heavy CPU bound, and some are IO bound. For example, step2 might have the following calls (this is just an example, I am using ActivatorUtilities.CreateInstance to retrieve dynamic tasks in runtime).

await Task1(doc); // calls an API and updates doc
await Task2(doc); // make some heavy string manipulations to doc's properties and updates it. This returns a Task.CompletedTask.

where

// Task1.cs
public class Task1 : ITask
{
    public async Task RunAsync(JsonNode doc)
    {
        // call an API
        // update doc's properties
    }
}

// Task2.cs
// this also returns Task and can be awaited to make ITask uniform
public class Task2 : ITask
{
    public Task RunAsync(JsonNode doc)
    {
        // heavy string/regex manipulations
        // update doc's properties
        return Task.CompletedTask;
    }
}

As I understand it, I have these parallel options.

  • Parallel.ForEach - I think this is good for CPU tasks, but not so good for IO tasks
  • Parallel.ForEachAsync or Task.WhenAll - I think this is good for IO tasks, but maybe not so good at CPU tasks?
  • TPL dataflow: Now I like this option. I think it can handle concurrent and parallel execution, and it seems to support overlapping executions between blocks. The only problems are if this is overkill from what I'm trying to do, and I have no idea how to implement step2.

Can anyone help me? Am I thinking this through correctly?

I have ~10000 objects of type System.Text.Json.JsonDocument. For each JsonDocument in parallel, I have to do the following steps in order.

  • clone JsonDocument to JsonNode
  • apply a sequence of tasks transforming the JsonNode in place
  • convert the shape of the JsonNode to be an opensearch document
  • store the converted JsonNode into a data structure (perhaps a ConcurrentBag)

Now, since I have~10000 documents, I'd like to process these documents in parallel, applying the flow I outlined above. The problem is step2 of the flow. the sequence of tasks that would be applied on the second step have different characteristics: Some of them are very small and fast, some are heavy CPU bound, and some are IO bound. For example, step2 might have the following calls (this is just an example, I am using ActivatorUtilities.CreateInstance to retrieve dynamic tasks in runtime).

await Task1(doc); // calls an API and updates doc
await Task2(doc); // make some heavy string manipulations to doc's properties and updates it. This returns a Task.CompletedTask.

where

// Task1.cs
public class Task1 : ITask
{
    public async Task RunAsync(JsonNode doc)
    {
        // call an API
        // update doc's properties
    }
}

// Task2.cs
// this also returns Task and can be awaited to make ITask uniform
public class Task2 : ITask
{
    public Task RunAsync(JsonNode doc)
    {
        // heavy string/regex manipulations
        // update doc's properties
        return Task.CompletedTask;
    }
}

As I understand it, I have these parallel options.

  • Parallel.ForEach - I think this is good for CPU tasks, but not so good for IO tasks
  • Parallel.ForEachAsync or Task.WhenAll - I think this is good for IO tasks, but maybe not so good at CPU tasks?
  • TPL dataflow: Now I like this option. I think it can handle concurrent and parallel execution, and it seems to support overlapping executions between blocks. The only problems are if this is overkill from what I'm trying to do, and I have no idea how to implement step2.

Can anyone help me? Am I thinking this through correctly?

Share Improve this question edited Mar 12 at 1:40 lightning_missile asked Mar 11 at 19:35 lightning_missilelightning_missile 3,0845 gold badges38 silver badges67 bronze badges 2
  • I think TPL dataflow sounds ideal for your situation. You can then also control the degree of parallelism for each block in the pipeline. – Magnus Commented Mar 12 at 8:06
  • Premature optimization. I'd take the "easiest" method; run it; and establish a base line before deciding things are "too slow". The "randomness" might be a solution in itself. – Gerry Schmitz Commented Mar 12 at 18:37
Add a comment  | 

2 Answers 2

Reset to default 2

I am on the same page with Ivan Petrov's answer. I think that a Parallel.ForEachAsync loop with local SemaphoreSlim-throttled regions is the simplest solution to your problem. For optimal control you can instantiate multiple SemaphoreSlim throttlers, one for each heterogeneous operation, and configure the Parallel.ForEachAsync with a MaxDegreeOfParallelism equal to the sum of the concurrency of all the SemaphoreSlim throttlers. This way there will be enough parallelism for all heterogeneous operations, without the one throttling inadvertently the other. You should avoid the temptation to configure the MaxDegreeOfParallelism with Int32.MaxValue, because in this case the Parallel.ForEachAsync will create 10,000 internal worker tasks, consuming more memory than needed, and also any errors will not be surfaced in a timely manner, unless you observe the CancellationToken in the body of the loop.

Regarding the specific options that you mentioned:

  1. The Parallel.ForEach is suitable only for synchronous workloads. You can use it with asynchronous workloads only by blocking them with .Wait(). Doing so will increase the demand for ThreadPool threads, so you might have to configure proactively the ThreadPool availability according to the desirable MaxDegreeOfParallelism. For these reasons the Parallel.ForEach is not recommended, unless you target a .NET version before .NET 6. Using it with async body is a flat out mistake: the Parallel.ForEach doesn't understand async delegates.

  2. The Parallel.ForEachAsync is good. The Task.WhenAll not so much. It is basically equivalent with a Parallel.ForEachAsync configured with MaxDegreeOfParallelism = Int32.MaxValue, so you won't get responsive completion in case of errors.

  3. Using the TPL Dataflow is tempting, because in principle a pipeline is the ideal tool for the job. Nevertheless I wouldn't advise using this library in production code because of its many rough corners, and because of a nasty bug that can cause a pipeline to deadlock indefinitely. The bug can emerge in rare circumstances, these circumstances can occur non-deterministically, it is known for years, and might never be resolved.

For simplicity you may want to combine Parallel.ForEachAsync with SemaphoreSlim for throttling the CPU-bound tasks. Something like this:

public static void Main() {
    var list = Enumerable.Range(1, 20);
    Parallel.ForEachAsync(list, new ParallelOptions() {
        MaxDegreeOfParallelism = 10 // but maybe more like 100
    }, PipeLine)
    .Wait();
}

static SemaphoreSlim semaphoreSlim = 
new SemaphoreSlim(2,2); // more like close to processer count


static async ValueTask PipeLine(int i, CancellationToken token) {
    
    await IOBound();
    Console.WriteLine("After IOBOund");
    await semaphoreSlim.WaitAsync(token);
    try {      
        await ComputeBound();
    } finally{
        semaphoreSlim.Release();
    }
    
    Console.WriteLine("After CPU Bound");
}

static async Task IOBound(){
    await Task.Delay(1000);
}

static Task ComputeBound(){
    Thread.Sleep(1000);
    return Task.CompletedTask;
}

/FAD

本文标签: cExecuting mixed tasks in parallelStack Overflow