diff --git a/poller/poller.go b/poller/poller.go index 1600dc4..f91f8bf 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -54,6 +54,40 @@ func (p *Poller) Wait() { p.routineGroup.Wait() } +func (p *Poller) handle(ctx context.Context, l *log.Entry) { + defer func() { + if r := recover(); r != nil { + l.Errorf("handle task panic: %+v", r) + } + }() + + for { + select { + case <-ctx.Done(): + return + default: + task, err := p.pollTask(ctx) + if task == nil || err != nil { + if err != nil { + l.Errorf("can't find the task: %v", err.Error()) + } + time.Sleep(5 * time.Second) + break + } + + p.metric.IncBusyWorker() + p.routineGroup.Run(func() { + defer p.schedule() + defer p.metric.DecBusyWorker() + if err := p.dispatchTask(ctx, task); err != nil { + l.Errorf("execute task: %v", err.Error()) + } + }) + return + } + } +} + func (p *Poller) Poll(ctx context.Context) error { l := log.WithField("func", "Poll") @@ -67,32 +101,7 @@ func (p *Poller) Poll(ctx context.Context) error { case <-ctx.Done(): return nil } - LOOP: - for { - select { - case <-ctx.Done(): - break LOOP - default: - task, err := p.pollTask(ctx) - if task == nil || err != nil { - if err != nil { - l.Errorf("can't find the task: %v", err.Error()) - } - time.Sleep(5 * time.Second) - break - } - - p.metric.IncBusyWorker() - p.routineGroup.Run(func() { - defer p.schedule() - defer p.metric.DecBusyWorker() - if err := p.dispatchTask(ctx, task); err != nil { - l.Errorf("execute task: %v", err.Error()) - } - }) - break LOOP - } - } + p.handle(ctx, l) } }