admin管理员组

文章数量:1289774

I am in the process of building an effect system DSL with F# inspired by ZIO and Cats Effect.

I have my effect type defined as a DU of opcodes, a fiber type, a channel type for message passing, etc. I have a runtime that takes a composed set of opcodes, interprets them and returns either a success result or an error.

Recently, I have been working on adding support for TPL task computations. Essentially, I would like to implement an opcode (let's call it AwaitTask) that takes a task computation and uses the effect systems runtime to handle it.

I am having some issues with a sporadic deadlock that I cannot seem to comprehend. I am aware that this is very specific, complicated and difficult to answer, but I thought I would try my luck. I am unsure on how I would debug this and figure out what the issue is, so if anyone could have any idea, I'd appreciate any input.

The issue

This is roughly what happens when interpreting an effect.

1. I spawn a pool of 7 evaluation workers (threads) polling on a shared work item queue for interpreting effects, and 1 blocking worker (thread as well) for handling blocked effects through a blocking item queue.

2. When the AwaitTask opcode is interpreted for the first time, it is immediately scheduled as a blocking effect to the blocking item queue (else-branch). Thus, once the blocking worker has checked that it is non-blocking, it is scheduled back to the evaluation worker and the result can be returned (if-branch). (This can probably be optimized, but that is a seperate problem.)

| AwaitTask task ->
    if prevAction = RescheduleForBlocking (BlockingTask task) then
        handleResult (task.AwaitResult()) stack evalSteps newEvalSteps
    else
        (AwaitTask task, stack, RescheduleForBlocking <| BlockingTask task, evalSteps)

3. In parallel, the blocking worker continuously loops over the blocking item queue and waits for blocked items. When it finds a blocked task in the queue, it takes the task out and adds a continuation to the original task such that once the original task is completed, it is rescheduled back to the work item queue and picked up by an evaluation worker for interpretation.

let processBlockingTask (blockingTask: TaskWrapper<obj>) =
    blockingTask.Task().ContinueWith((fun (_: Tasks.Task) ->
        blockingTask.RescheduleBlockingWorkItems config.WorkItemQueue
    ), CancellationToken.None, TaskContinuationOptions.RunContinuationsAsynchronously, 
       TaskScheduler.Default)
    |> ignore

I have made a sample program with this, two websockets passing messages back and forth, however I seem to have a sporadic deadlock issue that makes the program hang. I suspect that somewhere in my code I am using blocking operations on the task computation and thus the thread may hang as it is not being fully scheduled by my runtime.

The user-facing API for using the task computations look like the following:

 /// Converts a Task into an effect.
static member FromGenericTask (task: Task<'R>, name) : FIO<'R, 'E> =
    AwaitTask <| TaskWrapper (task, name)
/// Converts a Task into an effect.
static member FromTask (task: Task, name) : FIO<unit, 'E> =
    AwaitTask <| TaskWrapper (task.ContinueWith((fun _ ->
       Task.FromResult(())), CancellationToken.None, 
       TaskContinuationOptions.RunContinuationsAsynchronously, TaskScheduler.Default)
    |> fun t -> t.Result, name)
/// Converts an Async computation into an effect.
static member FromAsync (async: Async<'R>, name) : FIO<'R, 'E> =
   FIO.FromGenericTask <| (Async.StartAsTask async, name)

It should be noted, that the tasks does not have any sub-tasks that need to be awaited or completed. It is very simple computations.

  • Full code can be found here:
  • The above user API functions can be found in /src/FIO/Core/DSL.fs.
  • The mentioned runtime is /src/FIO/Runtime/AdvancedRuntime.fs
  • The sample program where I have been experiencing the deadlock issue is WebSocketApp in /src/FIO/Examples/Program.fs.

Thanks to anyone who decides to take a look.

本文标签: multithreadingDeadlock issue when scheduling task computations (FNET TPL)Stack Overflow