1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
package service
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"code.crute.us/mcrute/golib/log"
)
type SyncRunnerFunc func() error
type RunnerFunc func(context.Context, *sync.WaitGroup) error
type AppRunner struct {
Logger log.LeveledLogger
initJobs []SyncRunnerFunc // Jobs that run synchronously before other jobs
jobs []RunnerFunc // Normal jobs
backgroundJobs []RunnerFunc // Jobs that can be disabled
eagerJobs []RunnerFunc // Jobs that must start immediately
wg *sync.WaitGroup
sigs chan os.Signal
ctx context.Context
ctxCancel context.CancelFunc
}
func NewAppRunner(ctx context.Context, logger log.LeveledLogger) *AppRunner {
r := &AppRunner{
initJobs: []SyncRunnerFunc{},
jobs: []RunnerFunc{},
backgroundJobs: []RunnerFunc{},
eagerJobs: []RunnerFunc{},
wg: &sync.WaitGroup{},
sigs: make(chan os.Signal),
Logger: logger,
}
r.ctx, r.ctxCancel = context.WithCancel(ctx)
signal.Notify(r.sigs, os.Interrupt)
signal.Notify(r.sigs, os.Kill)
signal.Notify(r.sigs, syscall.SIGTERM)
return r
}
func (r *AppRunner) AddInitJob(f SyncRunnerFunc) {
r.initJobs = append(r.initJobs, f)
}
func (r *AppRunner) AddInitJobs(jobs []SyncRunnerFunc) {
for _, j := range jobs {
r.AddInitJob(j)
}
}
func (r *AppRunner) AddJob(f RunnerFunc) {
r.jobs = append(r.jobs, f)
}
func (r *AppRunner) AddJobRunNow(f RunnerFunc) {
r.eagerJobs = append(r.eagerJobs, f)
r.Logger.Info("Starting an eager job")
r.goRunPanic(f)
}
func (r *AppRunner) AddJobs(jobs []RunnerFunc) {
for _, j := range jobs {
r.AddJob(j)
}
}
func (r *AppRunner) AddBackgroundJob(f RunnerFunc) {
r.backgroundJobs = append(r.backgroundJobs, f)
}
func (r *AppRunner) goRunPanic(f RunnerFunc) {
errs := make(chan error)
go func() { errs <- f(r.ctx, r.wg) }()
select {
case e := <-errs:
panic(e)
default:
}
}
func (r *AppRunner) RunForever(enableBg bool) {
// Init jobs must run before main jobs start, the should return on
// success but will panic on error
r.Logger.Info("Starting initialization jobs")
for _, f := range r.initJobs {
if err := f(); err != nil {
r.Logger.Error(fmt.Sprintf("Init job failed: %s", err))
return
}
}
r.Logger.Info("Initialization jobs finished")
r.Logger.Info("Starting main jobs")
for _, f := range r.jobs {
r.goRunPanic(f)
}
if enableBg {
r.Logger.Info("Starting background jobs")
for _, f := range r.backgroundJobs {
r.goRunPanic(f)
}
} else {
r.Logger.Info("Background jobs not enabled")
}
// go run foreground
// go run background if enabled
select {
case <-r.sigs:
r.Logger.Info("Shutting down main app context")
r.ctxCancel()
r.Logger.Info("Waiting for jobs to terminate")
r.wg.Wait()
r.Logger.Info("Shutdown completed")
return
}
}
|