New "one shot" type of execution by adding a new command called one-job. (#423)

As commented here https://code.forgejo.org/forgejo/runner/issues/422, this PR aims to allow a new type of one shot execution compatible with autoscaling features and other job types.

Co-authored-by: jaime merino <jaime.merino_mora@mail.schwarzª>
Reviewed-on: https://code.forgejo.org/forgejo/runner/pulls/423
Reviewed-by: earl-warren <earl-warren@noreply.code.forgejo.org>
Co-authored-by: cobak78 <cobak78@noreply.code.forgejo.org>
Co-committed-by: cobak78 <cobak78@noreply.code.forgejo.org>
pull/435/head
cobak78 2025-01-15 12:29:07 +00:00 committed by earl-warren
parent a3cb2b7d6c
commit ba78c11326
5 changed files with 390 additions and 0 deletions

View File

@ -3,6 +3,7 @@
## 6.1.0
* [Add `[container].force_rebuild` config option](https://code.forgejo.org/forgejo/runner/pulls/406) to force rebuilding of local docker images, even if they are already present.
* [Add new `--one-job` flag](https://code.forgejo.org/forgejo/runner/pulls/423) to execute a previously configured runner, execute one task if it exists and exit. Motivation [here](https://code.forgejo.org/forgejo/runner/issues/422)
## 6.0.1

View File

@ -52,6 +52,15 @@ func Execute(ctx context.Context) {
}
rootCmd.AddCommand(daemonCmd)
// ./act_runner job
jobCmd := &cobra.Command{
Use: "one-job",
Short: "Run only one job",
Args: cobra.MaximumNArgs(1),
RunE: runJob(ctx, &configFile),
}
rootCmd.AddCommand(jobCmd)
// ./act_runner exec
rootCmd.AddCommand(loadExecCmd(ctx))

117
internal/app/cmd/job.go Normal file
View File

@ -0,0 +1,117 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package cmd
import (
"context"
"fmt"
"os"
"strings"
"connectrpc.com/connect"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"gitea.com/gitea/act_runner/internal/app/job"
"gitea.com/gitea/act_runner/internal/app/run"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
"gitea.com/gitea/act_runner/internal/pkg/envcheck"
"gitea.com/gitea/act_runner/internal/pkg/labels"
"gitea.com/gitea/act_runner/internal/pkg/ver"
)
func runJob(ctx context.Context, configFile *string) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
cfg, err := config.LoadDefault(*configFile)
if err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}
initLogging(cfg)
log.Infoln("Starting job")
reg, err := config.LoadRegistration(cfg.Runner.File)
if os.IsNotExist(err) {
log.Error("registration file not found, please register the runner first")
return err
} else if err != nil {
return fmt.Errorf("failed to load registration file: %w", err)
}
lbls := reg.Labels
if len(cfg.Runner.Labels) > 0 {
lbls = cfg.Runner.Labels
}
ls := labels.Labels{}
for _, l := range lbls {
label, err := labels.Parse(l)
if err != nil {
log.WithError(err).Warnf("ignored invalid label %q", l)
continue
}
ls = append(ls, label)
}
if len(ls) == 0 {
log.Warn("no labels configured, runner may not be able to pick up jobs")
}
if ls.RequireDocker() {
dockerSocketPath, err := getDockerSocketPath(cfg.Container.DockerHost)
if err != nil {
return err
}
if err := envcheck.CheckIfDockerRunning(ctx, dockerSocketPath); err != nil {
return err
}
// if dockerSocketPath passes the check, override DOCKER_HOST with dockerSocketPath
os.Setenv("DOCKER_HOST", dockerSocketPath)
// empty cfg.Container.DockerHost means act_runner need to find an available docker host automatically
// and assign the path to cfg.Container.DockerHost
if cfg.Container.DockerHost == "" {
cfg.Container.DockerHost = dockerSocketPath
}
// check the scheme, if the scheme is not npipe or unix
// set cfg.Container.DockerHost to "-" because it can't be mounted to the job container
if protoIndex := strings.Index(cfg.Container.DockerHost, "://"); protoIndex != -1 {
scheme := cfg.Container.DockerHost[:protoIndex]
if !strings.EqualFold(scheme, "npipe") && !strings.EqualFold(scheme, "unix") {
cfg.Container.DockerHost = "-"
}
}
}
cli := client.New(
reg.Address,
cfg.Runner.Insecure,
reg.UUID,
reg.Token,
ver.Version(),
)
runner := run.NewRunner(cfg, reg, cli)
// declare the labels of the runner before fetching tasks
resp, err := runner.Declare(ctx, ls.Names())
if err != nil && connect.CodeOf(err) == connect.CodeUnimplemented {
// Gitea instance is older version. skip declare step.
log.Warn("Because the Forgejo instance is an old version, skipping declaring the labels and version.")
} else if err != nil {
log.WithError(err).Error("fail to invoke Declare")
return err
} else {
log.Infof("runner: %s, with version: %s, with labels: %v, declared successfully",
resp.Msg.Runner.Name, resp.Msg.Runner.Version, resp.Msg.Runner.Labels)
// if declared successfully, override the labels in the.runner file with valid labels in the config file (if specified)
runner.Update(ctx, ls)
reg.Labels = ls.ToStrings()
if err := config.SaveRegistration(cfg.Runner.File, reg); err != nil {
return fmt.Errorf("failed to save runner config: %w", err)
}
}
j := job.NewJob(cfg, cli, runner)
return j.Run(ctx)
}
}

94
internal/app/job/job.go Normal file
View File

@ -0,0 +1,94 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package job
import (
"context"
"errors"
"fmt"
"sync/atomic"
"connectrpc.com/connect"
log "github.com/sirupsen/logrus"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"gitea.com/gitea/act_runner/internal/app/run"
"gitea.com/gitea/act_runner/internal/pkg/client"
"gitea.com/gitea/act_runner/internal/pkg/config"
)
type Job struct {
client client.Client
runner run.RunnerInterface
cfg *config.Config
tasksVersion atomic.Int64
}
func NewJob(cfg *config.Config, client client.Client, runner run.RunnerInterface) *Job {
j := &Job{}
j.client = client
j.runner = runner
j.cfg = cfg
return j
}
func (j *Job) Run(ctx context.Context) error {
task, ok := j.fetchTask(ctx)
if !ok {
return fmt.Errorf("could not fetch task")
}
return j.runTaskWithRecover(ctx, task)
}
func (j *Job) runTaskWithRecover(ctx context.Context, task *runnerv1.Task) error {
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("panic: %v", r)
log.WithError(err).Error("panic in runTaskWithRecover")
}
}()
if err := j.runner.Run(ctx, task); err != nil {
log.WithError(err).Error("failed to run task")
return err
}
return nil
}
func (j *Job) fetchTask(ctx context.Context) (*runnerv1.Task, bool) {
reqCtx, cancel := context.WithTimeout(ctx, j.cfg.Runner.FetchTimeout)
defer cancel()
// Load the version value that was in the cache when the request was sent.
v := j.tasksVersion.Load()
resp, err := j.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{
TasksVersion: v,
}))
if err != nil {
if errors.Is(err, context.Canceled) {
log.WithError(err).Debugf("shutdown, fetch task canceled")
} else {
log.WithError(err).Error("failed to fetch task")
}
return nil, false
}
if resp == nil || resp.Msg == nil {
return nil, false
}
if resp.Msg.TasksVersion > v {
j.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion)
}
if resp.Msg.Task == nil {
return nil, false
}
j.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0)
return resp.Msg.Task, true
}

View File

@ -0,0 +1,169 @@
package job
import (
"context"
"fmt"
"testing"
"time"
"connectrpc.com/connect"
"code.gitea.io/actions-proto-go/ping/v1/pingv1connect"
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
"code.gitea.io/actions-proto-go/runner/v1/runnerv1connect"
"gitea.com/gitea/act_runner/internal/pkg/config"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
type mockClient struct {
pingv1connect.PingServiceClient
runnerv1connect.RunnerServiceClient
sleep time.Duration
cancel bool
err error
noTask bool
}
func (o mockClient) Address() string {
return ""
}
func (o mockClient) Insecure() bool {
return true
}
func (o *mockClient) FetchTask(ctx context.Context, _ *connect.Request[runnerv1.FetchTaskRequest]) (*connect.Response[runnerv1.FetchTaskResponse], error) {
if o.sleep > 0 {
select {
case <-ctx.Done():
log.Trace("fetch task done")
return nil, context.DeadlineExceeded
case <-time.After(o.sleep):
log.Trace("slept")
return nil, fmt.Errorf("unexpected")
}
}
if o.cancel {
return nil, context.Canceled
}
if o.err != nil {
return nil, o.err
}
task := &runnerv1.Task{}
if o.noTask {
task = nil
o.noTask = false
}
return connect.NewResponse(&runnerv1.FetchTaskResponse{
Task: task,
TasksVersion: int64(1),
}), nil
}
type mockRunner struct {
cfg *config.Runner
log chan string
panics bool
err error
}
func (o *mockRunner) Run(ctx context.Context, _ *runnerv1.Task) error {
o.log <- "runner starts"
if o.panics {
log.Trace("panics")
o.log <- "runner panics"
o.panics = false
panic("whatever")
}
if o.err != nil {
log.Trace("error")
o.log <- "runner error"
err := o.err
o.err = nil
return err
}
for {
select {
case <-ctx.Done():
log.Trace("shutdown")
o.log <- "runner shutdown"
return nil
case <-time.After(o.cfg.Timeout):
log.Trace("after")
o.log <- "runner timeout"
return nil
}
}
}
func TestNewJob(t *testing.T) {
j := NewJob(&config.Config{}, &mockClient{}, &mockRunner{})
assert.NotNil(t, j)
}
func setTrace(t *testing.T) {
t.Helper()
log.SetReportCaller(true)
log.SetLevel(log.TraceLevel)
}
func TestJob_fetchTask(t *testing.T) {
setTrace(t)
for _, testCase := range []struct {
name string
noTask bool
sleep time.Duration
err error
cancel bool
success bool
}{
{
name: "Success",
success: true,
},
{
name: "Canceled",
cancel: true,
},
{
name: "NoTask",
noTask: true,
},
{
name: "Error",
err: fmt.Errorf("random error"),
},
} {
t.Run(testCase.name, func(t *testing.T) {
configRunner := config.Runner{
FetchTimeout: 1 * time.Millisecond,
}
j := NewJob(
&config.Config{
Runner: configRunner,
},
&mockClient{
sleep: testCase.sleep,
cancel: testCase.cancel,
noTask: testCase.noTask,
err: testCase.err,
},
&mockRunner{},
)
task, ok := j.fetchTask(context.Background())
if testCase.success {
assert.True(t, ok)
assert.NotNil(t, task)
} else {
assert.False(t, ok)
assert.Nil(t, task)
}
})
}
}