1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
package service
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"code.crute.us/mcrute/golib/log"
)
type SyncRunnerFunc func() error
type RunnerFunc func(context.Context, *sync.WaitGroup) error
type AppRunner struct {
Logger log.LeveledLogger
initJobs []SyncRunnerFunc // Jobs that run synchronously before other jobs
jobs []RunnerFunc // Normal jobs
backgroundJobs []RunnerFunc // Jobs that can be disabled
eagerJobs []RunnerFunc // Jobs that must start immediately
wg *sync.WaitGroup
sigs chan os.Signal
ctx context.Context
ctxCancel context.CancelFunc
}
func NewAppRunner(ctx context.Context, logger log.LeveledLogger) *AppRunner {
r := &AppRunner{
initJobs: []SyncRunnerFunc{},
jobs: []RunnerFunc{},
backgroundJobs: []RunnerFunc{},
eagerJobs: []RunnerFunc{},
wg: &sync.WaitGroup{},
sigs: make(chan os.Signal),
Logger: logger,
}
r.ctx, r.ctxCancel = context.WithCancel(ctx)
signal.Notify(r.sigs, os.Interrupt)
signal.Notify(r.sigs, os.Kill)
signal.Notify(r.sigs, syscall.SIGTERM)
return r
}
func (r *AppRunner) AddInitJob(f SyncRunnerFunc) {
r.initJobs = append(r.initJobs, f)
}
func (r *AppRunner) AddInitJobs(jobs []SyncRunnerFunc) {
for _, j := range jobs {
r.AddInitJob(j)
}
}
func (r *AppRunner) AddJob(f RunnerFunc) {
r.jobs = append(r.jobs, f)
}
func (r *AppRunner) AddJobRunNow(f RunnerFunc) {
r.eagerJobs = append(r.eagerJobs, f)
r.Logger.Info("Starting an eager job")
r.goRunPanic(f)
}
func (r *AppRunner) AddJobs(jobs []RunnerFunc) {
for _, j := range jobs {
r.AddJob(j)
}
}
func (r *AppRunner) AddBackgroundJob(f RunnerFunc) {
r.backgroundJobs = append(r.backgroundJobs, f)
}
func (r *AppRunner) goRunPanic(f RunnerFunc) {
errs := make(chan error)
go func() { errs <- f(r.ctx, r.wg) }()
select {
case e := <-errs:
panic(e)
default:
}
}
func (r *AppRunner) RunForever(enableBg bool) {
// Init jobs must run before main jobs start, the should return on
// success but will panic on error
r.Logger.Info("Starting initialization jobs")
for _, f := range r.initJobs {
if err := f(); err != nil {
r.Logger.Error(fmt.Sprintf("Init job failed: %s", err))
return
}
}
r.Logger.Info("Initialization jobs finished")
r.Logger.Info("Starting main jobs")
for _, f := range r.jobs {
r.goRunPanic(f)
}
if enableBg {
r.Logger.Info("Starting background jobs")
for _, f := range r.backgroundJobs {
r.goRunPanic(f)
}
} else {
r.Logger.Info("Background jobs not enabled")
}
// go run foreground
// go run background if enabled
select {
case <-r.ctx.Done():
goto done
case <-r.sigs:
goto done
}
done:
r.Logger.Info("Shutting down main app context")
r.ctxCancel()
r.Logger.Info("Waiting for jobs to terminate")
r.wg.Wait()
r.Logger.Info("Shutdown completed")
return
}
|