func ConsumeTasks(ctx context.Context, c *tasq.Consumer, conf *config.RouterConfig) {
logger.Info("Begin Consume Tasks")
// Create an unbuffered (blocking) pool with a fixed number of workers
pool := pond.New(10, 0, pond.MinWorkers(10), pond.Context(ctx))
for {
job := <-c.Channel()
if job == nil {
continue
}
pool.Submit(*job)
select {
case <-ctx.Done():
logger.Info("consume tasks stop")
return
default:
continue
}
}
// unreachable code
// pool.StopAndWait()
}
fatal error: concurrent map writes
goroutine 149 [running]:
github.com/greencoda/tasq.(*Consumer).removeFromActiveTasks(...)
/app/vendor/github.com/greencoda/tasq/consumer.go:338
github.com/greencoda/tasq.(*Consumer).registerTaskSuccess(0xc0004382c0, {0x10d1a88?, 0x1746dc0?}, 0xc0002886e0)
/app/vendor/github.com/greencoda/tasq/consumer.go:312 +0x91
github.com/greencoda/tasq.(*Consumer).createJobFromTask.(*Consumer).newJob.func1()
/app/vendor/github.com/greencoda/tasq/consumer.go:491 +0x93
github.com/alitto/pond.(*WorkerPool).executeTask(0xc000000e40, 0x0?, 0x0?)
/app/vendor/github.com/alitto/pond/pond.go:454 +0x6c
github.com/alitto/pond.worker({0x10d1b30, 0xc0005527d0}, 0x0?, 0x0?, 0xc000403260, 0xc000556c00)
/app/vendor/github.com/alitto/pond/worker.go:32 +0xc2
created by github.com/alitto/pond.(*WorkerPool).maybeStartWorker in goroutine 105
/app/vendor/github.com/alitto/pond/pond.go:423 +0x133