diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index cbf737b..ceafbb1 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,7 +1,8 @@ # Release Notes -## 3.4.2 +## 3.5.0 +* [Allow graceful shutdowns](https://code.forgejo.org/forgejo/runner/pulls/201): when receiving a signal (INT or TERM) wait for running jobs to complete (up to shutdown_timeout). * [Fix label declaration](https://code.forgejo.org/forgejo/runner/pulls/176): Runner in daemon mode now takes labels found in config.yml into account when declaration was successful. * [Fix the docker compose example](https://code.forgejo.org/forgejo/runner/pulls/175) to workaround the race on labels. * [Fix the kubernetes dind example](https://code.forgejo.org/forgejo/runner/pulls/169). diff --git a/internal/app/cmd/daemon.go b/internal/app/cmd/daemon.go index 8e47bf6..8f062ad 100644 --- a/internal/app/cmd/daemon.go +++ b/internal/app/cmd/daemon.go @@ -120,8 +120,18 @@ func runDaemon(ctx context.Context, configFile *string) func(cmd *cobra.Command, poller := poll.New(cfg, cli, runner) - poller.Poll(ctx) + go poller.Poll() + <-ctx.Done() + log.Infof("runner: %s shutdown initiated, waiting %s for running jobs to complete before shutting down", resp.Msg.Runner.Name, cfg.Runner.ShutdownTimeout) + + ctx, cancel := context.WithTimeout(context.Background(), cfg.Runner.ShutdownTimeout) + defer cancel() + + err = poller.Shutdown(ctx) + if err != nil { + log.Warnf("runner: %s cancelled in progress jobs during shutdown", resp.Msg.Runner.Name) + } return nil } } diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index f79e98e..99e8215 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -25,40 +25,95 @@ type Poller struct { runner *run.Runner cfg *config.Config tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea. + + pollingCtx context.Context + shutdownPolling context.CancelFunc + + jobsCtx context.Context + shutdownJobs context.CancelFunc + + done chan struct{} } func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { + pollingCtx, shutdownPolling := context.WithCancel(context.Background()) + + jobsCtx, shutdownJobs := context.WithCancel(context.Background()) + + done := make(chan struct{}) + return &Poller{ client: client, runner: runner, cfg: cfg, + + pollingCtx: pollingCtx, + shutdownPolling: shutdownPolling, + + jobsCtx: jobsCtx, + shutdownJobs: shutdownJobs, + + done: done, } } -func (p *Poller) Poll(ctx context.Context) { +func (p *Poller) Poll() { limiter := rate.NewLimiter(rate.Every(p.cfg.Runner.FetchInterval), 1) wg := &sync.WaitGroup{} for i := 0; i < p.cfg.Runner.Capacity; i++ { wg.Add(1) - go p.poll(ctx, wg, limiter) + go p.poll(wg, limiter) } wg.Wait() + + // signal that we shutdown + close(p.done) } -func (p *Poller) poll(ctx context.Context, wg *sync.WaitGroup, limiter *rate.Limiter) { +func (p *Poller) Shutdown(ctx context.Context) error { + p.shutdownPolling() + + select { + // graceful shutdown completed succesfully + case <-p.done: + return nil + + // our timeout for shutting down ran out + case <-ctx.Done(): + // when both the timeout fires and the graceful shutdown + // completed succsfully, this branch of the select may + // fire. Do a non-blocking check here against the graceful + // shutdown status to avoid sending an error if we don't need to. + _, ok := <-p.done + if !ok { + return nil + } + + // force a shutdown of all running jobs + p.shutdownJobs() + + // wait for running jobs to report their status to Gitea + _, _ = <-p.done + + return ctx.Err() + } +} + +func (p *Poller) poll(wg *sync.WaitGroup, limiter *rate.Limiter) { defer wg.Done() for { - if err := limiter.Wait(ctx); err != nil { - if ctx.Err() != nil { + if err := limiter.Wait(p.pollingCtx); err != nil { + if p.pollingCtx.Err() != nil { log.WithError(err).Debug("limiter wait failed") } return } - task, ok := p.fetchTask(ctx) + task, ok := p.fetchTask(p.pollingCtx) if !ok { continue } - p.runTaskWithRecover(ctx, task) + + p.runTaskWithRecover(p.jobsCtx, task) } } diff --git a/internal/pkg/config/config.example.yaml b/internal/pkg/config/config.example.yaml index 54e49c0..74d97fd 100644 --- a/internal/pkg/config/config.example.yaml +++ b/internal/pkg/config/config.example.yaml @@ -23,7 +23,10 @@ runner: # Please note that the Forgejo instance also has a timeout (3h by default) for the job. # So the job could be stopped by the Forgejo instance if it's timeout is shorter than this. timeout: 3h - # Whether skip verifying the TLS certificate of the Forgejo instance. + # The timeout for the runner to wait for running jobs to finish when shutting down. + # Any running jobs that haven't finished after this timeout will be cancelled. + shutdown_timeout: 0s + # Whether skip verifying the TLS certificate of the instance. insecure: false # The timeout for fetching the job from the Forgejo instance. fetch_timeout: 5s diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index a7bb977..5c260fb 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -21,15 +21,16 @@ type Log struct { // Runner represents the configuration for the runner. type Runner struct { - File string `yaml:"file"` // File specifies the file path for the runner. - Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner. - Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner. - EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner. - Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout. - Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode. - FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources. - FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources. - Labels []string `yaml:"labels"` // Labels specifies the labels of the runner. Labels are declared on each startup + File string `yaml:"file"` // File specifies the file path for the runner. + Capacity int `yaml:"capacity"` // Capacity specifies the capacity of the runner. + Envs map[string]string `yaml:"envs"` // Envs stores environment variables for the runner. + EnvFile string `yaml:"env_file"` // EnvFile specifies the path to the file containing environment variables for the runner. + Timeout time.Duration `yaml:"timeout"` // Timeout specifies the duration for runner timeout. + ShutdownTimeout time.Duration `yaml:"shutdown_timeout"` // ShutdownTimeout specifies the duration to wait for running jobs to complete during a shutdown of the runner. + Insecure bool `yaml:"insecure"` // Insecure indicates whether the runner operates in an insecure mode. + FetchTimeout time.Duration `yaml:"fetch_timeout"` // FetchTimeout specifies the timeout duration for fetching resources. + FetchInterval time.Duration `yaml:"fetch_interval"` // FetchInterval specifies the interval duration for fetching resources. + Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup } // Cache represents the configuration for caching. diff --git a/scripts/run.sh b/scripts/run.sh index 89626b4..5a6f28b 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -45,4 +45,4 @@ fi # Prevent reading the token from the forgejo-runner process unset GITEA_RUNNER_REGISTRATION_TOKEN -forgejo-runner daemon ${CONFIG_ARG} +exec forgejo-runner daemon ${CONFIG_ARG}