aboutsummaryrefslogtreecommitdiff
path: root/service/app_runner.go
blob: 3d3057119070ab85a50c543c3e87acbf5ba37b5d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package service

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"code.crute.us/mcrute/golib/log"
)

type SyncRunnerFunc func() error
type RunnerFunc func(context.Context, *sync.WaitGroup) error

type AppRunner struct {
	Logger         log.LeveledLogger
	initJobs       []SyncRunnerFunc // Jobs that run synchronously before other jobs
	jobs           []RunnerFunc     // Normal jobs
	backgroundJobs []RunnerFunc     // Jobs that can be disabled
	eagerJobs      []RunnerFunc     // Jobs that must start immediately
	wg             *sync.WaitGroup
	sigs           chan os.Signal
	ctx            context.Context
	ctxCancel      context.CancelFunc
}

func NewAppRunner(ctx context.Context, logger log.LeveledLogger) *AppRunner {
	r := &AppRunner{
		initJobs:       []SyncRunnerFunc{},
		jobs:           []RunnerFunc{},
		backgroundJobs: []RunnerFunc{},
		eagerJobs:      []RunnerFunc{},
		wg:             &sync.WaitGroup{},
		sigs:           make(chan os.Signal),
		Logger:         logger,
	}

	r.ctx, r.ctxCancel = context.WithCancel(ctx)

	signal.Notify(r.sigs, os.Interrupt)
	signal.Notify(r.sigs, os.Kill)
	signal.Notify(r.sigs, syscall.SIGTERM)

	return r
}

func (r *AppRunner) AddInitJob(f SyncRunnerFunc) {
	r.initJobs = append(r.initJobs, f)
}

func (r *AppRunner) AddInitJobs(jobs []SyncRunnerFunc) {
	for _, j := range jobs {
		r.AddInitJob(j)
	}
}

func (r *AppRunner) AddJob(f RunnerFunc) {
	r.jobs = append(r.jobs, f)
}

func (r *AppRunner) AddJobRunNow(f RunnerFunc) {
	r.eagerJobs = append(r.eagerJobs, f)
	r.Logger.Info("Starting an eager job")
	r.goRunPanic(f)
}

func (r *AppRunner) AddJobs(jobs []RunnerFunc) {
	for _, j := range jobs {
		r.AddJob(j)
	}
}

func (r *AppRunner) AddBackgroundJob(f RunnerFunc) {
	r.backgroundJobs = append(r.backgroundJobs, f)
}

func (r *AppRunner) goRunPanic(f RunnerFunc) {
	errs := make(chan error)
	go func() { errs <- f(r.ctx, r.wg) }()
	select {
	case e := <-errs:
		panic(e)
	default:
	}
}

func (r *AppRunner) RunForever(enableBg bool) {
	// Init jobs must run before main jobs start, the should return on
	// success but will panic on error
	r.Logger.Info("Starting initialization jobs")
	for _, f := range r.initJobs {
		if err := f(); err != nil {
			r.Logger.Error(fmt.Sprintf("Init job failed: %s", err))
			return
		}
	}
	r.Logger.Info("Initialization jobs finished")

	r.Logger.Info("Starting main jobs")
	for _, f := range r.jobs {
		r.goRunPanic(f)
	}

	if enableBg {
		r.Logger.Info("Starting background jobs")
		for _, f := range r.backgroundJobs {
			r.goRunPanic(f)
		}
	} else {
		r.Logger.Info("Background jobs not enabled")
	}

	// go run foreground
	// go run background if enabled
	select {
	case <-r.ctx.Done():
		goto done
	case <-r.sigs:
		goto done
	}

done:
	r.Logger.Info("Shutting down main app context")
	r.ctxCancel()

	r.Logger.Info("Waiting for jobs to terminate")
	r.wg.Wait()

	r.Logger.Info("Shutdown completed")
	return
}