admin管理员组文章数量:1392003
I have ~10000 objects of type System.Text.Json.JsonDocument. For each JsonDocument in parallel, I have to do the following steps in order.
- clone JsonDocument to JsonNode
- apply a sequence of tasks transforming the JsonNode in place
- convert the shape of the JsonNode to be an opensearch document
- store the converted JsonNode into a data structure (perhaps a ConcurrentBag)
Now, since I have~10000 documents, I'd like to process these documents in parallel, applying the flow I outlined above. The problem is step2 of the flow. the sequence of tasks that would be applied on the second step have different characteristics: Some of them are very small and fast, some are heavy CPU bound, and some are IO bound. For example, step2 might have the following calls (this is just an example, I am using ActivatorUtilities.CreateInstance to retrieve dynamic tasks in runtime).
await Task1(doc); // calls an API and updates doc
await Task2(doc); // make some heavy string manipulations to doc's properties and updates it. This returns a Task.CompletedTask.
where
// Task1.cs
public class Task1 : ITask
{
public async Task RunAsync(JsonNode doc)
{
// call an API
// update doc's properties
}
}
// Task2.cs
// this also returns Task and can be awaited to make ITask uniform
public class Task2 : ITask
{
public Task RunAsync(JsonNode doc)
{
// heavy string/regex manipulations
// update doc's properties
return Task.CompletedTask;
}
}
As I understand it, I have these parallel options.
- Parallel.ForEach - I think this is good for CPU tasks, but not so good for IO tasks
- Parallel.ForEachAsync or Task.WhenAll - I think this is good for IO tasks, but maybe not so good at CPU tasks?
- TPL dataflow: Now I like this option. I think it can handle concurrent and parallel execution, and it seems to support overlapping executions between blocks. The only problems are if this is overkill from what I'm trying to do, and I have no idea how to implement step2.
Can anyone help me? Am I thinking this through correctly?
I have ~10000 objects of type System.Text.Json.JsonDocument. For each JsonDocument in parallel, I have to do the following steps in order.
- clone JsonDocument to JsonNode
- apply a sequence of tasks transforming the JsonNode in place
- convert the shape of the JsonNode to be an opensearch document
- store the converted JsonNode into a data structure (perhaps a ConcurrentBag)
Now, since I have~10000 documents, I'd like to process these documents in parallel, applying the flow I outlined above. The problem is step2 of the flow. the sequence of tasks that would be applied on the second step have different characteristics: Some of them are very small and fast, some are heavy CPU bound, and some are IO bound. For example, step2 might have the following calls (this is just an example, I am using ActivatorUtilities.CreateInstance to retrieve dynamic tasks in runtime).
await Task1(doc); // calls an API and updates doc
await Task2(doc); // make some heavy string manipulations to doc's properties and updates it. This returns a Task.CompletedTask.
where
// Task1.cs
public class Task1 : ITask
{
public async Task RunAsync(JsonNode doc)
{
// call an API
// update doc's properties
}
}
// Task2.cs
// this also returns Task and can be awaited to make ITask uniform
public class Task2 : ITask
{
public Task RunAsync(JsonNode doc)
{
// heavy string/regex manipulations
// update doc's properties
return Task.CompletedTask;
}
}
As I understand it, I have these parallel options.
- Parallel.ForEach - I think this is good for CPU tasks, but not so good for IO tasks
- Parallel.ForEachAsync or Task.WhenAll - I think this is good for IO tasks, but maybe not so good at CPU tasks?
- TPL dataflow: Now I like this option. I think it can handle concurrent and parallel execution, and it seems to support overlapping executions between blocks. The only problems are if this is overkill from what I'm trying to do, and I have no idea how to implement step2.
Can anyone help me? Am I thinking this through correctly?
Share Improve this question edited Mar 12 at 1:40 lightning_missile asked Mar 11 at 19:35 lightning_missilelightning_missile 3,0845 gold badges38 silver badges67 bronze badges 2- I think TPL dataflow sounds ideal for your situation. You can then also control the degree of parallelism for each block in the pipeline. – Magnus Commented Mar 12 at 8:06
- Premature optimization. I'd take the "easiest" method; run it; and establish a base line before deciding things are "too slow". The "randomness" might be a solution in itself. – Gerry Schmitz Commented Mar 12 at 18:37
2 Answers
Reset to default 2I am on the same page with Ivan Petrov's answer. I think that a Parallel.ForEachAsync
loop with local SemaphoreSlim
-throttled regions is the simplest solution to your problem. For optimal control you can instantiate multiple SemaphoreSlim
throttlers, one for each heterogeneous operation, and configure the Parallel.ForEachAsync
with a MaxDegreeOfParallelism
equal to the sum of the concurrency of all the SemaphoreSlim
throttlers. This way there will be enough parallelism for all heterogeneous operations, without the one throttling inadvertently the other. You should avoid the temptation to configure the MaxDegreeOfParallelism
with Int32.MaxValue
, because in this case the Parallel.ForEachAsync
will create 10,000
internal worker tasks, consuming more memory than needed, and also any errors will not be surfaced in a timely manner, unless you observe the CancellationToken
in the body
of the loop.
Regarding the specific options that you mentioned:
The
Parallel.ForEach
is suitable only for synchronous workloads. You can use it with asynchronous workloads only by blocking them with.Wait()
. Doing so will increase the demand forThreadPool
threads, so you might have to configure proactively theThreadPool
availability according to the desirableMaxDegreeOfParallelism
. For these reasons theParallel.ForEach
is not recommended, unless you target a .NET version before .NET 6. Using it withasync
body is a flat out mistake: theParallel.ForEach
doesn't understand async delegates.The
Parallel.ForEachAsync
is good. TheTask.WhenAll
not so much. It is basically equivalent with aParallel.ForEachAsync
configured withMaxDegreeOfParallelism = Int32.MaxValue
, so you won't get responsive completion in case of errors.Using the TPL Dataflow is tempting, because in principle a pipeline is the ideal tool for the job. Nevertheless I wouldn't advise using this library in production code because of its many rough corners, and because of a nasty bug that can cause a pipeline to deadlock indefinitely. The bug can emerge in rare circumstances, these circumstances can occur non-deterministically, it is known for years, and might never be resolved.
For simplicity you may want to combine Parallel.ForEachAsync
with SemaphoreSlim
for throttling the CPU-bound tasks. Something like this:
public static void Main() {
var list = Enumerable.Range(1, 20);
Parallel.ForEachAsync(list, new ParallelOptions() {
MaxDegreeOfParallelism = 10 // but maybe more like 100
}, PipeLine)
.Wait();
}
static SemaphoreSlim semaphoreSlim =
new SemaphoreSlim(2,2); // more like close to processer count
static async ValueTask PipeLine(int i, CancellationToken token) {
await IOBound();
Console.WriteLine("After IOBOund");
await semaphoreSlim.WaitAsync(token);
try {
await ComputeBound();
} finally{
semaphoreSlim.Release();
}
Console.WriteLine("After CPU Bound");
}
static async Task IOBound(){
await Task.Delay(1000);
}
static Task ComputeBound(){
Thread.Sleep(1000);
return Task.CompletedTask;
}
/FAD
本文标签: cExecuting mixed tasks in parallelStack Overflow
版权声明:本文标题:c# - Executing mixed tasks in parallel - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1744774474a2624541.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论