chore(poller): add error retry limit

pull/2/head
fuxiaohei 2022-10-12 20:55:50 +08:00 committed by Jason Song
parent dada0730b0
commit 45b0429b21
1 changed files with 15 additions and 1 deletions

View File

@ -16,6 +16,9 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
const errorRetryCounterLimit = 3
const errorRetryTimeSleepSecs = 30
var ( var (
ErrDataLock = errors.New("Data Lock Error") ErrDataLock = errors.New("Data Lock Error")
defaultLabels = []string{"self-hosted"} defaultLabels = []string{"self-hosted"}
@ -35,7 +38,8 @@ type Poller struct {
Filter *client.Filter Filter *client.Filter
Dispatch func(context.Context, *runnerv1.Task) error Dispatch func(context.Context, *runnerv1.Task) error
routineGroup *routineGroup routineGroup *routineGroup
errorRetryCounter int
} }
type runner struct { type runner struct {
@ -98,6 +102,11 @@ func (p *Poller) Poll(ctx context.Context, n int) error {
if err := p.poll(ctx, i+1); err != nil { if err := p.poll(ctx, i+1); err != nil {
log.WithField("thread", i+1). log.WithField("thread", i+1).
WithError(err).Error("poll error") WithError(err).Error("poll error")
if p.errorRetryCounter > errorRetryCounterLimit {
log.WithField("thread", i+1).Error("poller: too many errors, sleeping for 30 seconds")
// FIXME: it makes ctrl+c hang up
time.Sleep(time.Second * errorRetryTimeSleepSecs)
}
} }
} }
} }
@ -123,16 +132,19 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
})) }))
if err == context.Canceled || err == context.DeadlineExceeded { if err == context.Canceled || err == context.DeadlineExceeded {
l.WithError(err).Trace("poller: no stage returned") l.WithError(err).Trace("poller: no stage returned")
p.errorRetryCounter++
return nil return nil
} }
if err != nil && err == ErrDataLock { if err != nil && err == ErrDataLock {
l.WithError(err).Info("task accepted by another runner") l.WithError(err).Info("task accepted by another runner")
p.errorRetryCounter++
return nil return nil
} }
if err != nil { if err != nil {
l.WithError(err).Error("cannot accept task") l.WithError(err).Error("cannot accept task")
p.errorRetryCounter++
return err return err
} }
@ -142,5 +154,7 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
return nil return nil
} }
p.errorRetryCounter = 0
return p.Dispatch(ctx, resp.Msg.Task) return p.Dispatch(ctx, resp.Msg.Task)
} }