aboutsummaryrefslogtreecommitdiff
path: root/inform/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'inform/server.go')
-rw-r--r--inform/server.go218
1 files changed, 0 insertions, 218 deletions
diff --git a/inform/server.go b/inform/server.go
deleted file mode 100644
index 710b8ac..0000000
--- a/inform/server.go
+++ /dev/null
@@ -1,218 +0,0 @@
1package inform
2
3import (
4 "container/list"
5 "errors"
6 "fmt"
7 "log"
8 "net/http"
9 "sync"
10 "time"
11)
12
13func Log(handler http.Handler) http.Handler {
14 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15 handler.ServeHTTP(w, r)
16 t := time.Now().Format("02/Jan/2006 15:04:05")
17 log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto)
18 })
19}
20
21type Device struct {
22 Initialized bool
23 CurrentState bool
24 DesiredState bool
25 *sync.Mutex
26}
27
28type InformHandler struct {
29 Codec *Codec
30 ports map[string]map[int]*Device
31 queue map[string]*list.List
32 *sync.RWMutex
33}
34
35func NewInformHandler(c *Codec) *InformHandler {
36 return &InformHandler{
37 Codec: c,
38 ports: make(map[string]map[int]*Device),
39 queue: make(map[string]*list.List),
40 RWMutex: &sync.RWMutex{},
41 }
42}
43
44func (h *InformHandler) AddPort(dev string, port int) {
45 h.Lock()
46 defer h.Unlock()
47
48 _, ok := h.ports[dev]
49 if !ok {
50 h.ports[dev] = make(map[int]*Device)
51 }
52
53 _, ok = h.queue[dev]
54 if !ok {
55 log.Printf("Adding queue for %s", dev)
56 h.queue[dev] = list.New()
57 }
58
59 log.Printf("Adding %s port %d", dev, port)
60 h.ports[dev][port] = &Device{
61 Mutex: &sync.Mutex{},
62 }
63}
64
65func (h *InformHandler) getPort(dev string, port int) (*Device, error) {
66 h.RLock()
67 defer h.RUnlock()
68
69 _, ok := h.ports[dev]
70 if !ok {
71 return nil, errors.New("No device found")
72 }
73
74 p, ok := h.ports[dev][port]
75 if !ok {
76 return nil, errors.New("No port found")
77 }
78
79 return p, nil
80}
81
82func (h *InformHandler) SetState(dev string, port int, state bool) error {
83 p, err := h.getPort(dev, port)
84 if err != nil {
85 return err
86 }
87
88 p.Lock()
89 defer p.Unlock()
90
91 log.Printf("Set state to %t for %s port %d", state, dev, port)
92 p.DesiredState = state
93 return nil
94}
95
96func (h *InformHandler) buildCommands(dev string, pl *DeviceMessage) error {
97 for _, o := range pl.Outputs {
98 ds, err := h.getPort(dev, o.Port)
99 if err != nil {
100 return err
101 }
102 ds.Lock()
103
104 // Get initial state
105 if !ds.Initialized {
106 ds.CurrentState = o.OutputState
107 ds.Initialized = true
108 return nil
109 }
110
111 // State didn't change at the sensor
112 if ds.CurrentState == o.OutputState {
113 if ds.DesiredState != o.OutputState {
114 log.Printf("Toggle state %t for %s port %d", ds.DesiredState, dev, o.Port)
115 // Generate change command
116 // TODO: Don't lock the whole handler
117 h.Lock()
118 h.queue[dev].PushFront(NewOutputCommand(o.Port, ds.DesiredState, 0))
119 h.Unlock()
120 }
121 } else { // Sensor caused the change, leave it alone
122 log.Printf("Sensor state changed %s port %d", dev, o.Port)
123 ds.DesiredState = o.OutputState
124 }
125
126 ds.CurrentState = o.OutputState
127 ds.Unlock() // Don't hold the lock the entire loop
128 }
129
130 return nil
131}
132
133func (h *InformHandler) pop(dev string) *CommandMessage {
134 // TODO: Don't lock the whole handler
135 h.Lock()
136 defer h.Unlock()
137
138 q, ok := h.queue[dev]
139 if !ok {
140 log.Printf("No queue for %s", dev)
141 return nil
142 }
143
144 e := q.Front()
145 if e != nil {
146 h.queue[dev].Remove(e)
147 cmd := e.Value.(*CommandMessage)
148 cmd.Freshen()
149 return cmd
150 }
151
152 return nil
153}
154
155func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
156 if r.Method != http.MethodPost {
157 http.Error(w, "405 Method Not Allowed", http.StatusMethodNotAllowed)
158 return
159 }
160
161 msg, err := h.Codec.Unmarshal(r.Body)
162 if err != nil {
163 log.Printf("Unmarshal message: %s", err.Error())
164 http.Error(w, "400 Bad Request", http.StatusBadRequest)
165 return
166 }
167
168 pl, err := msg.UnmarshalPayload()
169 if err != nil {
170 log.Printf("Unmarshal payload: %s", err.Error())
171 http.Error(w, "400 Bad Request", http.StatusBadRequest)
172 return
173 }
174
175 dev := msg.FormattedMac()
176 ret := NewInformWrapperResponse(msg)
177 log.Printf("Inform from %s", dev)
178
179 // Send a command until the queue is empty
180 if cmd := h.pop(dev); cmd != nil {
181 ret.UpdatePayload(cmd)
182 } else {
183 // Update internal state vs reality
184 if err := h.buildCommands(msg.FormattedMac(), pl); err != nil {
185 http.Error(w, "500 Server Error", 500)
186 return
187 }
188
189 // If that generated a command send it
190 if cmd = h.pop(dev); cmd != nil {
191 ret.UpdatePayload(cmd)
192 } else {
193 // Otherwise noop
194 ret.UpdatePayload(NewNoop(10))
195 }
196 }
197
198 res, err := h.Codec.Marshal(ret)
199 if err != nil {
200 http.Error(w, "500 Server Error", 500)
201 return
202 }
203
204 fmt.Fprintf(w, "%s", res)
205}
206
207// Create a new server, returns the mux so users can add other methods (for
208// example, if they want to share a process to build a console that also
209// accepts informs)
210func NewServer(handler *InformHandler) (*http.Server, *http.ServeMux) {
211 mux := http.NewServeMux()
212 mux.Handle("/inform", handler)
213
214 return &http.Server{
215 Addr: ":6080",
216 Handler: Log(mux),
217 }, mux
218}