1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
package service
import (
"context"
"os"
"os/signal"
"sync"
"syscall"
)
type AppRunnerLogger interface {
Info(...interface{})
}
type RunnerFunc func(context.Context, *sync.WaitGroup) error
type AppRunner struct {
Logger AppRunnerLogger
jobs []RunnerFunc
backgroundJobs []RunnerFunc
wg *sync.WaitGroup
sigs chan os.Signal
ctx context.Context
ctxCancel context.CancelFunc
}
func NewAppRunner(ctx context.Context, logger AppRunnerLogger) *AppRunner {
r := &AppRunner{
jobs: []RunnerFunc{},
backgroundJobs: []RunnerFunc{},
wg: &sync.WaitGroup{},
sigs: make(chan os.Signal),
Logger: logger,
}
r.ctx, r.ctxCancel = context.WithCancel(ctx)
signal.Notify(r.sigs, os.Interrupt)
signal.Notify(r.sigs, os.Kill)
signal.Notify(r.sigs, syscall.SIGTERM)
return r
}
func (r *AppRunner) AddJob(f RunnerFunc) {
r.jobs = append(r.jobs, f)
}
func (r *AppRunner) AddJobs(jobs []RunnerFunc) {
for _, j := range jobs {
r.AddJob(j)
}
}
func (r *AppRunner) AddBackgroundJob(f RunnerFunc) {
r.backgroundJobs = append(r.backgroundJobs, f)
}
func (r *AppRunner) goRunPanic(f func(context.Context, *sync.WaitGroup) error) {
errs := make(chan error)
go func() { errs <- f(r.ctx, r.wg) }()
select {
case e := <-errs:
panic(e)
default:
}
}
func (r *AppRunner) RunForever(enableBg bool) {
for _, f := range r.jobs {
r.goRunPanic(f)
}
if enableBg {
r.Logger.Info("Starting background jobs")
for _, f := range r.backgroundJobs {
r.goRunPanic(f)
}
} else {
r.Logger.Info("Background jobs not enabled")
}
// go run foreground
// go run background if enabled
select {
case <-r.sigs:
r.Logger.Info("Shutting down main app context")
r.ctxCancel()
r.Logger.Info("Waiting for jobs to terminate")
r.wg.Wait()
r.Logger.Info("Shutdown completed")
return
}
}
|