mirror of https://code.forgejo.org/forgejo/runner
parent
db7ee2eaf4
commit
08282a519f
27
cmd/damon.go
27
cmd/damon.go
|
@ -5,10 +5,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
|
@ -36,7 +33,7 @@ const (
|
||||||
MsgTypeBuildResult // build result
|
MsgTypeBuildResult // build result
|
||||||
)
|
)
|
||||||
|
|
||||||
func handleVersion1(conn *websocket.Conn, sigs chan os.Signal, message []byte, msg *Message) error {
|
func handleVersion1(ctx context.Context, conn *websocket.Conn, message []byte, msg *Message) error {
|
||||||
switch msg.Type {
|
switch msg.Type {
|
||||||
case MsgTypeRegister:
|
case MsgTypeRegister:
|
||||||
log.Info().Msgf("received registered success: %s", message)
|
log.Info().Msgf("received registered success: %s", message)
|
||||||
|
@ -67,7 +64,7 @@ func handleVersion1(conn *websocket.Conn, sigs chan os.Signal, message []byte, m
|
||||||
reuseContainers: true,
|
reuseContainers: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
|
ctx, cancel := context.WithTimeout(ctx, time.Hour)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
done := make(chan error)
|
done := make(chan error)
|
||||||
|
@ -80,7 +77,7 @@ func handleVersion1(conn *websocket.Conn, sigs chan os.Signal, message []byte, m
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-sigs:
|
case <-ctx.Done():
|
||||||
cancel()
|
cancel()
|
||||||
log.Info().Msgf("cancel task")
|
log.Info().Msgf("cancel task")
|
||||||
return nil
|
return nil
|
||||||
|
@ -115,7 +112,7 @@ func handleVersion1(conn *websocket.Conn, sigs chan os.Signal, message []byte, m
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle the message
|
// TODO: handle the message
|
||||||
func handleMessage(conn *websocket.Conn, sigs chan os.Signal, message []byte) error {
|
func handleMessage(ctx context.Context, conn *websocket.Conn, message []byte) error {
|
||||||
var msg Message
|
var msg Message
|
||||||
if err := json.Unmarshal(message, &msg); err != nil {
|
if err := json.Unmarshal(message, &msg); err != nil {
|
||||||
return fmt.Errorf("unmarshal received message faild: %v", err)
|
return fmt.Errorf("unmarshal received message faild: %v", err)
|
||||||
|
@ -123,7 +120,7 @@ func handleMessage(conn *websocket.Conn, sigs chan os.Signal, message []byte) er
|
||||||
|
|
||||||
switch msg.Version {
|
switch msg.Version {
|
||||||
case 1:
|
case 1:
|
||||||
return handleVersion1(conn, sigs, message, &msg)
|
return handleVersion1(ctx, conn, message, &msg)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner")
|
return fmt.Errorf("recevied a message with an unsupported version, consider upgrade your runner")
|
||||||
}
|
}
|
||||||
|
@ -138,16 +135,10 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
|
||||||
ticker := time.NewTicker(time.Second)
|
ticker := time.NewTicker(time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
var failedCnt int
|
var failedCnt int
|
||||||
sigs := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-sigs:
|
|
||||||
log.Info().Msgf("cancel task")
|
|
||||||
return nil
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
log.Info().Msgf("cancel task")
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -191,8 +182,8 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-sigs:
|
case <-ctx.Done():
|
||||||
log.Info().Msgf("cancel task")
|
log.Info().Msg("cancel task")
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -221,7 +212,7 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := handleMessage(conn, sigs, message); err != nil {
|
if err := handleMessage(ctx, conn, message); err != nil {
|
||||||
log.Error().Msgf(err.Error())
|
log.Error().Msgf(err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
3
main.go
3
main.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"gitea.com/gitea/act_runner/cmd"
|
"gitea.com/gitea/act_runner/cmd"
|
||||||
)
|
)
|
||||||
|
@ -14,7 +15,7 @@ func main() {
|
||||||
|
|
||||||
// trap Ctrl+C and call cancel on the context
|
// trap Ctrl+C and call cancel on the context
|
||||||
c := make(chan os.Signal, 1)
|
c := make(chan os.Signal, 1)
|
||||||
signal.Notify(c, os.Interrupt)
|
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
|
||||||
defer func() {
|
defer func() {
|
||||||
signal.Stop(c)
|
signal.Stop(c)
|
||||||
cancel()
|
cancel()
|
||||||
|
|
Loading…
Reference in New Issue