admin管理员组文章数量:1125760
About the program: I am trying to implement pipeline pattern via my own first-class type intJob
. The main func, which aggregates the pipelines is ExecutePipeline2
and as I can tell, exactly it causing problems.
Why am I keep getting deadlocks? To me I seem to close all channels, that are used by readers, after every goroutine. Also creating channels with buffer doesn't help, so I've completely ran out of ideas and will be really grateful for your help.
IMPORTANT: I can't change the main
function and implement this idea from others functions only, when the base (mentioned function) remains constant and unchanged.
type intJob func(a, b chan int)
func ExecutePipeline2(jobs ...intJob) {
outs := make([]chan int, len(jobs)+1)
wg := sync.WaitGroup{}
for i := 0; i < len(outs); i++ {
outs[i] = make(chan int)
}
for i, job := range jobs {
job := job
in, out := outs[i], outs[i+1]
i := i
wg.Add(1)
go func() {
job(in, out)
fmt.Printf("job %d closed\n", i)
close(out)
wg.Done()
}()
}
wg.Wait()
}
func pipe(_, b chan int) {
for i := 0; i < 5; i++ {
b <- i
}
}
func main() {
inputData := []int{0, 1, 1, 2, 3, 5, 8}
hashSignJobs := []intJob{
intJob(func(in, out chan int) {
for _, fibNum := range inputData {
out <- fibNum
}
}),
intJob(pipe),
intJob(func(in, out chan int) {
for val := range in {
fmt.Println(val)
}
}),
}
ExecutePipeline2(hashSignJobs...)
}
About the program: I am trying to implement pipeline pattern via my own first-class type intJob
. The main func, which aggregates the pipelines is ExecutePipeline2
and as I can tell, exactly it causing problems.
Why am I keep getting deadlocks? To me I seem to close all channels, that are used by readers, after every goroutine. Also creating channels with buffer doesn't help, so I've completely ran out of ideas and will be really grateful for your help.
IMPORTANT: I can't change the main
function and implement this idea from others functions only, when the base (mentioned function) remains constant and unchanged.
type intJob func(a, b chan int)
func ExecutePipeline2(jobs ...intJob) {
outs := make([]chan int, len(jobs)+1)
wg := sync.WaitGroup{}
for i := 0; i < len(outs); i++ {
outs[i] = make(chan int)
}
for i, job := range jobs {
job := job
in, out := outs[i], outs[i+1]
i := i
wg.Add(1)
go func() {
job(in, out)
fmt.Printf("job %d closed\n", i)
close(out)
wg.Done()
}()
}
wg.Wait()
}
func pipe(_, b chan int) {
for i := 0; i < 5; i++ {
b <- i
}
}
func main() {
inputData := []int{0, 1, 1, 2, 3, 5, 8}
hashSignJobs := []intJob{
intJob(func(in, out chan int) {
for _, fibNum := range inputData {
out <- fibNum
}
}),
intJob(pipe),
intJob(func(in, out chan int) {
for val := range in {
fmt.Println(val)
}
}),
}
ExecutePipeline2(hashSignJobs...)
}
Share
Improve this question
edited Jan 9 at 8:16
Inian
85.4k15 gold badges160 silver badges179 bronze badges
asked Jan 9 at 4:35
SYKOSYKO
737 bronze badges
1
|
2 Answers
Reset to default 4I think the key issue is with your second pipeline pipe()
, which instead of reading from the output of the previous pipeline, just started generating numbers on a loop. It should have been written as below to read from in
as
func pipe(in, out chan int) {
for i := range in {
out <- i
}
}
Also you could rewrite the execute function to something like below. Full example at https://go.dev/play/p/K_-UFzz0zt5
func ExecutePipeline2(jobs ...intJob) {
outs := make([]chan int, len(jobs)+1)
for i := 0; i < len(outs); i++ {
outs[i] = make(chan int)
}
var wg sync.WaitGroup
for i, job := range jobs {
wg.Add(1)
go func(i int, job intJob) {
defer wg.Done()
// Close the output channel when done
defer close(outs[i+1])
job(outs[i], outs[i+1])
}(i, job)
}
// Close the first channel after all input is written
close(outs[0])
wg.Wait()
}
The first "out" (1 index) channel created in this code snippet
for i, job := range jobs {
job := job
in, out := outs[i], outs[i+1] // here
i := i
wg.Add(1)
go func() {
job(in, out)
//...
}()
}
doesn't using as "producer" and after the first iteration in the first intJob
and in the second iteration of the first intJob
deadlock accrued (because chan
is full):
hashSignJobs := []intJob{
intJob(func(in, out chan int) {
for _, fibNum := range inputData {
fmt.Printf("1 - %v\n", fibNum)
out <- fibNum // fatal error: all goroutines are asleep - deadlock!
}
}),
// ....
PLAYGROUND
# ...
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0000161b0?)
/usr/local/go-faketime/src/runtime/sema.go:71 +0x25
sync.(*WaitGroup).Wait(0x10100000010?)
/usr/local/go-faketime/src/sync/waitgroup.go:118 +0x48
main.ExecutePipeline2({0xc000084f20, 0x3, 0x0?})
/tmp/sandbox346714581/prog.go:31 +0x1d2
main.main()
/tmp/sandbox346714581/prog.go:59 +0xd5
goroutine 6 [chan send]:
main.main.func1(0x101000000000000?, 0xc00007a0e0)
/tmp/sandbox346714581/prog.go:48 +0xb6
main.ExecutePipeline2.func1()
/tmp/sandbox346714581/prog.go:24 +0x3a
created by main.ExecutePipeline2 in goroutine 1
/tmp/sandbox346714581/prog.go:23 +0xdb
本文标签: goDeadlock while implementing a pipeline concurrency patternStack Overflow
版权声明:本文标题:go - Deadlock while implementing a pipeline concurrency pattern - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1736672713a1947007.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
out <- fibNum
. The generic code inExecutePipeline2
should probably handle that case by somehow drainingin
before returning. – LeGEC Commented Jan 9 at 5:44