aboutsummaryrefslogtreecommitdiff
path: root/inform/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'inform/server.go')
-rw-r--r--inform/server.go206
1 files changed, 166 insertions, 40 deletions
diff --git a/inform/server.go b/inform/server.go
index 7e33a86..710b8ac 100644
--- a/inform/server.go
+++ b/inform/server.go
@@ -1,55 +1,155 @@
1package inform 1package inform
2 2
3import ( 3import (
4 "container/list"
5 "errors"
4 "fmt" 6 "fmt"
5 "log" 7 "log"
6 "net/http" 8 "net/http"
9 "sync"
7 "time" 10 "time"
8) 11)
9 12
10type StateTree struct { 13func Log(handler http.Handler) http.Handler {
11 states map[string]map[int]int 14 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15 handler.ServeHTTP(w, r)
16 t := time.Now().Format("02/Jan/2006 15:04:05")
17 log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto)
18 })
12} 19}
13 20
14func NewStateTree() *StateTree { 21type Device struct {
15 return &StateTree{make(map[string]map[int]int)} 22 Initialized bool
23 CurrentState bool
24 DesiredState bool
25 *sync.Mutex
26}
27
28type InformHandler struct {
29 Codec *Codec
30 ports map[string]map[int]*Device
31 queue map[string]*list.List
32 *sync.RWMutex
33}
34
35func NewInformHandler(c *Codec) *InformHandler {
36 return &InformHandler{
37 Codec: c,
38 ports: make(map[string]map[int]*Device),
39 queue: make(map[string]*list.List),
40 RWMutex: &sync.RWMutex{},
41 }
16} 42}
17 43
18func (t *StateTree) ensureNode(device string, port int) { 44func (h *InformHandler) AddPort(dev string, port int) {
19 _, ok := t.states[device] 45 h.Lock()
46 defer h.Unlock()
47
48 _, ok := h.ports[dev]
20 if !ok { 49 if !ok {
21 t.states[device] = make(map[int]int) 50 h.ports[dev] = make(map[int]*Device)
22 } 51 }
23 52
24 _, ok = t.states[device][port] 53 _, ok = h.queue[dev]
25 if !ok { 54 if !ok {
26 t.states[device][port] = 0 55 log.Printf("Adding queue for %s", dev)
56 h.queue[dev] = list.New()
57 }
58
59 log.Printf("Adding %s port %d", dev, port)
60 h.ports[dev][port] = &Device{
61 Mutex: &sync.Mutex{},
27 } 62 }
28} 63}
29 64
30func (t *StateTree) GetState(device string, port int) int { 65func (h *InformHandler) getPort(dev string, port int) (*Device, error) {
31 t.ensureNode(device, port) 66 h.RLock()
32 return t.states[device][port] 67 defer h.RUnlock()
68
69 _, ok := h.ports[dev]
70 if !ok {
71 return nil, errors.New("No device found")
72 }
73
74 p, ok := h.ports[dev][port]
75 if !ok {
76 return nil, errors.New("No port found")
77 }
78
79 return p, nil
33} 80}
34 81
35func (t *StateTree) SetState(device string, port, value int) { 82func (h *InformHandler) SetState(dev string, port int, state bool) error {
36 t.ensureNode(device, port) 83 p, err := h.getPort(dev, port)
37 t.states[device][port] = value 84 if err != nil {
85 return err
86 }
87
88 p.Lock()
89 defer p.Unlock()
90
91 log.Printf("Set state to %t for %s port %d", state, dev, port)
92 p.DesiredState = state
93 return nil
38} 94}
39 95
40func Log(handler http.Handler) http.Handler { 96func (h *InformHandler) buildCommands(dev string, pl *DeviceMessage) error {
41 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 97 for _, o := range pl.Outputs {
42 handler.ServeHTTP(w, r) 98 ds, err := h.getPort(dev, o.Port)
43 t := time.Now().Format("02/Jan/2006 15:04:05") 99 if err != nil {
44 log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) 100 return err
45 // Addr - - [D/M/Y H:M:S] "Method RequestURI Proto" Code Size 101 }
46 // 127.0.0.1 - - [24/Sep/2016 14:30:35] "GET / HTTP/1.1" 200 - 102 ds.Lock()
47 }) 103
104 // Get initial state
105 if !ds.Initialized {
106 ds.CurrentState = o.OutputState
107 ds.Initialized = true
108 return nil
109 }
110
111 // State didn't change at the sensor
112 if ds.CurrentState == o.OutputState {
113 if ds.DesiredState != o.OutputState {
114 log.Printf("Toggle state %t for %s port %d", ds.DesiredState, dev, o.Port)
115 // Generate change command
116 // TODO: Don't lock the whole handler
117 h.Lock()
118 h.queue[dev].PushFront(NewOutputCommand(o.Port, ds.DesiredState, 0))
119 h.Unlock()
120 }
121 } else { // Sensor caused the change, leave it alone
122 log.Printf("Sensor state changed %s port %d", dev, o.Port)
123 ds.DesiredState = o.OutputState
124 }
125
126 ds.CurrentState = o.OutputState
127 ds.Unlock() // Don't hold the lock the entire loop
128 }
129
130 return nil
48} 131}
49 132
50type InformHandler struct { 133func (h *InformHandler) pop(dev string) *CommandMessage {
51 Codec *Codec 134 // TODO: Don't lock the whole handler
52 StateTree *StateTree 135 h.Lock()
136 defer h.Unlock()
137
138 q, ok := h.queue[dev]
139 if !ok {
140 log.Printf("No queue for %s", dev)
141 return nil
142 }
143
144 e := q.Front()
145 if e != nil {
146 h.queue[dev].Remove(e)
147 cmd := e.Value.(*CommandMessage)
148 cmd.Freshen()
149 return cmd
150 }
151
152 return nil
53} 153}
54 154
55func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 155func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -58,35 +158,61 @@ func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
58 return 158 return
59 } 159 }
60 160
61 if r.URL != nil && r.URL.Path != "/inform" { 161 msg, err := h.Codec.Unmarshal(r.Body)
62 http.Error(w, "404 Not Found", http.StatusNotFound) 162 if err != nil {
163 log.Printf("Unmarshal message: %s", err.Error())
164 http.Error(w, "400 Bad Request", http.StatusBadRequest)
63 return 165 return
64 } 166 }
65 167
66 msg, err := h.Codec.Unmarshal(r.Body) 168 pl, err := msg.UnmarshalPayload()
67 if err != nil { 169 if err != nil {
68 http.Error(w, "Bad Request", http.StatusBadRequest) 170 log.Printf("Unmarshal payload: %s", err.Error())
171 http.Error(w, "400 Bad Request", http.StatusBadRequest)
69 return 172 return
70 } 173 }
71 174
72 pl := NewInformWrapper() 175 dev := msg.FormattedMac()
73 copy(pl.MacAddr, msg.MacAddr) 176 ret := NewInformWrapperResponse(msg)
74 pl.SetEncrypted(true) 177 log.Printf("Inform from %s", dev)
75 178
76 // TODO: compare current state to tree and update 179 // Send a command until the queue is empty
180 if cmd := h.pop(dev); cmd != nil {
181 ret.UpdatePayload(cmd)
182 } else {
183 // Update internal state vs reality
184 if err := h.buildCommands(msg.FormattedMac(), pl); err != nil {
185 http.Error(w, "500 Server Error", 500)
186 return
187 }
188
189 // If that generated a command send it
190 if cmd = h.pop(dev); cmd != nil {
191 ret.UpdatePayload(cmd)
192 } else {
193 // Otherwise noop
194 ret.UpdatePayload(NewNoop(10))
195 }
196 }
77 197
78 res, err := h.Codec.Marshal(pl) 198 res, err := h.Codec.Marshal(ret)
79 if err != nil { 199 if err != nil {
80 http.Error(w, "Server Error", 500) 200 http.Error(w, "500 Server Error", 500)
81 return 201 return
82 } 202 }
83 203
84 fmt.Fprintf(w, "%s", res) 204 fmt.Fprintf(w, "%s", res)
85} 205}
86 206
87func NewServer(handler *InformHandler) *http.Server { 207// Create a new server, returns the mux so users can add other methods (for
208// example, if they want to share a process to build a console that also
209// accepts informs)
210func NewServer(handler *InformHandler) (*http.Server, *http.ServeMux) {
211 mux := http.NewServeMux()
212 mux.Handle("/inform", handler)
213
88 return &http.Server{ 214 return &http.Server{
89 Addr: ":6080", 215 Addr: ":6080",
90 Handler: Log(handler), 216 Handler: Log(mux),
91 } 217 }, mux
92} 218}