admin管理员组

文章数量:1125760

About the program: I am trying to implement pipeline pattern via my own first-class type intJob. The main func, which aggregates the pipelines is ExecutePipeline2 and as I can tell, exactly it causing problems.

Why am I keep getting deadlocks? To me I seem to close all channels, that are used by readers, after every goroutine. Also creating channels with buffer doesn't help, so I've completely ran out of ideas and will be really grateful for your help.

IMPORTANT: I can't change the main function and implement this idea from others functions only, when the base (mentioned function) remains constant and unchanged.

type intJob func(a, b chan int)

func ExecutePipeline2(jobs ...intJob) {
    outs := make([]chan int, len(jobs)+1)
    wg := sync.WaitGroup{}

    for i := 0; i < len(outs); i++ {
        outs[i] = make(chan int)
    }

    for i, job := range jobs {
        job := job
        in, out := outs[i], outs[i+1]
        i := i
        wg.Add(1)
        go func() {
            job(in, out)
            fmt.Printf("job %d closed\n", i)
            close(out)
            wg.Done()
        }()
    }

    wg.Wait()
}

func pipe(_, b chan int) {
    for i := 0; i < 5; i++ {
        b <- i
    }
}

func main() {
    inputData := []int{0, 1, 1, 2, 3, 5, 8}

    hashSignJobs := []intJob{
        intJob(func(in, out chan int) {
            for _, fibNum := range inputData {
                out <- fibNum
            }
        }),
        intJob(pipe),
        intJob(func(in, out chan int) {
            for val := range in {
                fmt.Println(val)
            }
        }),
    }

    ExecutePipeline2(hashSignJobs...)
}

About the program: I am trying to implement pipeline pattern via my own first-class type intJob. The main func, which aggregates the pipelines is ExecutePipeline2 and as I can tell, exactly it causing problems.

Why am I keep getting deadlocks? To me I seem to close all channels, that are used by readers, after every goroutine. Also creating channels with buffer doesn't help, so I've completely ran out of ideas and will be really grateful for your help.

IMPORTANT: I can't change the main function and implement this idea from others functions only, when the base (mentioned function) remains constant and unchanged.

type intJob func(a, b chan int)

func ExecutePipeline2(jobs ...intJob) {
    outs := make([]chan int, len(jobs)+1)
    wg := sync.WaitGroup{}

    for i := 0; i < len(outs); i++ {
        outs[i] = make(chan int)
    }

    for i, job := range jobs {
        job := job
        in, out := outs[i], outs[i+1]
        i := i
        wg.Add(1)
        go func() {
            job(in, out)
            fmt.Printf("job %d closed\n", i)
            close(out)
            wg.Done()
        }()
    }

    wg.Wait()
}

func pipe(_, b chan int) {
    for i := 0; i < 5; i++ {
        b <- i
    }
}

func main() {
    inputData := []int{0, 1, 1, 2, 3, 5, 8}

    hashSignJobs := []intJob{
        intJob(func(in, out chan int) {
            for _, fibNum := range inputData {
                out <- fibNum
            }
        }),
        intJob(pipe),
        intJob(func(in, out chan int) {
            for val := range in {
                fmt.Println(val)
            }
        }),
    }

    ExecutePipeline2(hashSignJobs...)
}

Share Improve this question edited Jan 9 at 8:16 Inian 85.4k15 gold badges160 silver badges179 bronze badges asked Jan 9 at 4:35 SYKOSYKO 737 bronze badges 1
  • 1 This is related to the code of your goroutines: the second goroutine does not "consume" values from the incoming pipe, so the first one ends up stuck on out <- fibNum. The generic code in ExecutePipeline2 should probably handle that case by somehow draining in before returning. – LeGEC Commented Jan 9 at 5:44
Add a comment  | 

2 Answers 2

Reset to default 4

I think the key issue is with your second pipeline pipe(), which instead of reading from the output of the previous pipeline, just started generating numbers on a loop. It should have been written as below to read from in as

func pipe(in, out chan int) {
    for i := range in { 
        out <- i
    }
}

Also you could rewrite the execute function to something like below. Full example at https://go.dev/play/p/K_-UFzz0zt5

func ExecutePipeline2(jobs ...intJob) {
    outs := make([]chan int, len(jobs)+1)
    for i := 0; i < len(outs); i++ {
        outs[i] = make(chan int)
    }

    var wg sync.WaitGroup

    for i, job := range jobs {
        wg.Add(1)
        go func(i int, job intJob) {
            defer wg.Done()
            // Close the output channel when done
            defer close(outs[i+1]) 
            job(outs[i], outs[i+1])
        }(i, job)
    }

    // Close the first channel after all input is written
    close(outs[0])

    wg.Wait()
}

The first "out" (1 index) channel created in this code snippet

   for i, job := range jobs {
        job := job
        in, out := outs[i], outs[i+1] // here 
        i := i
        wg.Add(1)
        go func() {
            job(in, out)
//...
        }()
    }

doesn't using as "producer" and after the first iteration in the first intJob and in the second iteration of the first intJob deadlock accrued (because chan is full):

    hashSignJobs := []intJob{
        intJob(func(in, out chan int) {
            for _, fibNum := range inputData {
                fmt.Printf("1 - %v\n", fibNum)
                out <- fibNum // fatal error: all goroutines are asleep - deadlock!
            }
        }),
// ....

PLAYGROUND

# ...

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0000161b0?)
    /usr/local/go-faketime/src/runtime/sema.go:71 +0x25
sync.(*WaitGroup).Wait(0x10100000010?)
    /usr/local/go-faketime/src/sync/waitgroup.go:118 +0x48
main.ExecutePipeline2({0xc000084f20, 0x3, 0x0?})
    /tmp/sandbox346714581/prog.go:31 +0x1d2
main.main()
    /tmp/sandbox346714581/prog.go:59 +0xd5

goroutine 6 [chan send]:
main.main.func1(0x101000000000000?, 0xc00007a0e0)
    /tmp/sandbox346714581/prog.go:48 +0xb6
main.ExecutePipeline2.func1()
    /tmp/sandbox346714581/prog.go:24 +0x3a
created by main.ExecutePipeline2 in goroutine 1
    /tmp/sandbox346714581/prog.go:23 +0xdb

本文标签: goDeadlock while implementing a pipeline concurrency patternStack Overflow