fix: limit workers

pull/2/head
Jason Song 2022-11-23 16:58:26 +08:00
parent f05e08a767
commit 5781e233c1
2 changed files with 13 additions and 12 deletions

View File

@ -4,15 +4,15 @@ import "sync/atomic"
// Metric interface // Metric interface
type Metric interface { type Metric interface {
IncBusyWorker() uint64 IncBusyWorker() int64
DecBusyWorker() uint64 DecBusyWorker() int64
BusyWorkers() uint64 BusyWorkers() int64
} }
var _ Metric = (*metric)(nil) var _ Metric = (*metric)(nil)
type metric struct { type metric struct {
busyWorkers uint64 busyWorkers int64
} }
// NewMetric for default metric structure // NewMetric for default metric structure
@ -20,14 +20,14 @@ func NewMetric() Metric {
return &metric{} return &metric{}
} }
func (m *metric) IncBusyWorker() uint64 { func (m *metric) IncBusyWorker() int64 {
return atomic.AddUint64(&m.busyWorkers, 1) return atomic.AddInt64(&m.busyWorkers, 1)
} }
func (m *metric) DecBusyWorker() uint64 { func (m *metric) DecBusyWorker() int64 {
return atomic.AddUint64(&m.busyWorkers, ^uint64(0)) return atomic.AddInt64(&m.busyWorkers, -1)
} }
func (m *metric) BusyWorkers() uint64 { func (m *metric) BusyWorkers() int64 {
return atomic.LoadUint64(&m.busyWorkers) return atomic.LoadInt64(&m.busyWorkers)
} }

View File

@ -83,7 +83,10 @@ func (p *Poller) Poll(ctx context.Context) error {
break break
} }
p.metric.IncBusyWorker()
p.routineGroup.Run(func() { p.routineGroup.Run(func() {
defer p.schedule()
defer p.metric.DecBusyWorker()
if err := p.dispatchTask(ctx, task); err != nil { if err := p.dispatchTask(ctx, task); err != nil {
l.Errorf("execute task: %v", err.Error()) l.Errorf("execute task: %v", err.Error())
} }
@ -131,12 +134,10 @@ func (p *Poller) pollTask(ctx context.Context) (*runnerv1.Task, error) {
func (p *Poller) dispatchTask(ctx context.Context, task *runnerv1.Task) error { func (p *Poller) dispatchTask(ctx context.Context, task *runnerv1.Task) error {
l := log.WithField("func", "dispatchTask") l := log.WithField("func", "dispatchTask")
defer func() { defer func() {
p.metric.DecBusyWorker()
e := recover() e := recover()
if e != nil { if e != nil {
l.Errorf("panic error: %v", e) l.Errorf("panic error: %v", e)
} }
p.schedule()
}() }()
runCtx, cancel := context.WithTimeout(ctx, time.Hour) runCtx, cancel := context.WithTimeout(ctx, time.Hour)