Background
I am developing a Kafka consumer program using Reader
struct from kafka-go client. When the program started, it will initialize some worker process to consume and handle the message, which those worker process will be long living as long as the main program still running. I also added recover
function to those worker process in case any of them panic.
Problem
When some goroutines failed for some reason and then exit, the number of living worker will reduce.
package main
import (
"fmt"
"sync"
"time"
)
const numberOfWorker = 5
const longListOfTasks = 10
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < numberOfWorker; i++ {
wg.Add(1)
go func(i int) {
defer func() {
fmt.Printf("worker: %d done.n", i)
wg.Done()
if r := recover(); r != nil {
fmt.Printf("Recovered: %vn", r)
}
}()
fmt.Printf("worker: %d started.n", i)
// simulate streaming works
for j := 0; j < longListOfTasks; j++ {
fmt.Printf("worker: %d, task: %d n", i, j)
time.Sleep(1 * time.Second)
if i < numberOfWorker-1 && i == j {
panic(fmt.Errorf("worker %d panic", i))
}
// there will be context handle graceful shutdown for main process
}
}(i)
}
wg.Wait()
}
go playground
This sample program starts with 5 workers and panic a worker process each second. Eventually, there is only one worker left and keep working.
Is there any practice for golang long living process and panic handling? or I should avoid using the kind of process and keep the worker reusable?
What I try
I can restart a worker process by calling the function itself with goroutine like:
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < numberOfWorker; i++ {
wg.Add(1)
go doWork(i, wg)
}
wg.Wait()
}
func doWork(i int, wg *sync.WaitGroup) {
defer func() {
fmt.Printf("worker: %d done.n", i)
wg.Done() // reduce 1 for current go routine
if r := recover(); r != nil {
fmt.Printf("Recovered: %vn", r)
// ----- here -----
wg.Add(1) // add 1 to wait group
go doWork(i, wg) // start a new goroutine
}
}()
// ...
}
but this make the code hard to read and maybe cause some behaviors that’s hard to predict (e.g. passing value by shared channel). On the other hand, since the function never escape, I think it will cause OOM in a long run.
The another solution is handled by platfrom like K8S. Exiting from the main process and let k8s to handle the pod recovery, which means restarting the program automatically. This approach rely on external tool, but the problem of the code itself is not solved, which I think its less stable.
Instead of using long live process, I can create a listener process, e.g. http serve, that accept request and kick start the worker process to consume message. That makes the work creation controllable but it’s out of my scenario that the program does not require any interaction.
JackyMC is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.