diff options
Diffstat (limited to 'inform/server.go')
-rw-r--r-- | inform/server.go | 218 |
1 files changed, 0 insertions, 218 deletions
diff --git a/inform/server.go b/inform/server.go deleted file mode 100644 index 710b8ac..0000000 --- a/inform/server.go +++ /dev/null | |||
@@ -1,218 +0,0 @@ | |||
1 | package inform | ||
2 | |||
3 | import ( | ||
4 | "container/list" | ||
5 | "errors" | ||
6 | "fmt" | ||
7 | "log" | ||
8 | "net/http" | ||
9 | "sync" | ||
10 | "time" | ||
11 | ) | ||
12 | |||
13 | func Log(handler http.Handler) http.Handler { | ||
14 | return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
15 | handler.ServeHTTP(w, r) | ||
16 | t := time.Now().Format("02/Jan/2006 15:04:05") | ||
17 | log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) | ||
18 | }) | ||
19 | } | ||
20 | |||
21 | type Device struct { | ||
22 | Initialized bool | ||
23 | CurrentState bool | ||
24 | DesiredState bool | ||
25 | *sync.Mutex | ||
26 | } | ||
27 | |||
28 | type InformHandler struct { | ||
29 | Codec *Codec | ||
30 | ports map[string]map[int]*Device | ||
31 | queue map[string]*list.List | ||
32 | *sync.RWMutex | ||
33 | } | ||
34 | |||
35 | func NewInformHandler(c *Codec) *InformHandler { | ||
36 | return &InformHandler{ | ||
37 | Codec: c, | ||
38 | ports: make(map[string]map[int]*Device), | ||
39 | queue: make(map[string]*list.List), | ||
40 | RWMutex: &sync.RWMutex{}, | ||
41 | } | ||
42 | } | ||
43 | |||
44 | func (h *InformHandler) AddPort(dev string, port int) { | ||
45 | h.Lock() | ||
46 | defer h.Unlock() | ||
47 | |||
48 | _, ok := h.ports[dev] | ||
49 | if !ok { | ||
50 | h.ports[dev] = make(map[int]*Device) | ||
51 | } | ||
52 | |||
53 | _, ok = h.queue[dev] | ||
54 | if !ok { | ||
55 | log.Printf("Adding queue for %s", dev) | ||
56 | h.queue[dev] = list.New() | ||
57 | } | ||
58 | |||
59 | log.Printf("Adding %s port %d", dev, port) | ||
60 | h.ports[dev][port] = &Device{ | ||
61 | Mutex: &sync.Mutex{}, | ||
62 | } | ||
63 | } | ||
64 | |||
65 | func (h *InformHandler) getPort(dev string, port int) (*Device, error) { | ||
66 | h.RLock() | ||
67 | defer h.RUnlock() | ||
68 | |||
69 | _, ok := h.ports[dev] | ||
70 | if !ok { | ||
71 | return nil, errors.New("No device found") | ||
72 | } | ||
73 | |||
74 | p, ok := h.ports[dev][port] | ||
75 | if !ok { | ||
76 | return nil, errors.New("No port found") | ||
77 | } | ||
78 | |||
79 | return p, nil | ||
80 | } | ||
81 | |||
82 | func (h *InformHandler) SetState(dev string, port int, state bool) error { | ||
83 | p, err := h.getPort(dev, port) | ||
84 | if err != nil { | ||
85 | return err | ||
86 | } | ||
87 | |||
88 | p.Lock() | ||
89 | defer p.Unlock() | ||
90 | |||
91 | log.Printf("Set state to %t for %s port %d", state, dev, port) | ||
92 | p.DesiredState = state | ||
93 | return nil | ||
94 | } | ||
95 | |||
96 | func (h *InformHandler) buildCommands(dev string, pl *DeviceMessage) error { | ||
97 | for _, o := range pl.Outputs { | ||
98 | ds, err := h.getPort(dev, o.Port) | ||
99 | if err != nil { | ||
100 | return err | ||
101 | } | ||
102 | ds.Lock() | ||
103 | |||
104 | // Get initial state | ||
105 | if !ds.Initialized { | ||
106 | ds.CurrentState = o.OutputState | ||
107 | ds.Initialized = true | ||
108 | return nil | ||
109 | } | ||
110 | |||
111 | // State didn't change at the sensor | ||
112 | if ds.CurrentState == o.OutputState { | ||
113 | if ds.DesiredState != o.OutputState { | ||
114 | log.Printf("Toggle state %t for %s port %d", ds.DesiredState, dev, o.Port) | ||
115 | // Generate change command | ||
116 | // TODO: Don't lock the whole handler | ||
117 | h.Lock() | ||
118 | h.queue[dev].PushFront(NewOutputCommand(o.Port, ds.DesiredState, 0)) | ||
119 | h.Unlock() | ||
120 | } | ||
121 | } else { // Sensor caused the change, leave it alone | ||
122 | log.Printf("Sensor state changed %s port %d", dev, o.Port) | ||
123 | ds.DesiredState = o.OutputState | ||
124 | } | ||
125 | |||
126 | ds.CurrentState = o.OutputState | ||
127 | ds.Unlock() // Don't hold the lock the entire loop | ||
128 | } | ||
129 | |||
130 | return nil | ||
131 | } | ||
132 | |||
133 | func (h *InformHandler) pop(dev string) *CommandMessage { | ||
134 | // TODO: Don't lock the whole handler | ||
135 | h.Lock() | ||
136 | defer h.Unlock() | ||
137 | |||
138 | q, ok := h.queue[dev] | ||
139 | if !ok { | ||
140 | log.Printf("No queue for %s", dev) | ||
141 | return nil | ||
142 | } | ||
143 | |||
144 | e := q.Front() | ||
145 | if e != nil { | ||
146 | h.queue[dev].Remove(e) | ||
147 | cmd := e.Value.(*CommandMessage) | ||
148 | cmd.Freshen() | ||
149 | return cmd | ||
150 | } | ||
151 | |||
152 | return nil | ||
153 | } | ||
154 | |||
155 | func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
156 | if r.Method != http.MethodPost { | ||
157 | http.Error(w, "405 Method Not Allowed", http.StatusMethodNotAllowed) | ||
158 | return | ||
159 | } | ||
160 | |||
161 | msg, err := h.Codec.Unmarshal(r.Body) | ||
162 | if err != nil { | ||
163 | log.Printf("Unmarshal message: %s", err.Error()) | ||
164 | http.Error(w, "400 Bad Request", http.StatusBadRequest) | ||
165 | return | ||
166 | } | ||
167 | |||
168 | pl, err := msg.UnmarshalPayload() | ||
169 | if err != nil { | ||
170 | log.Printf("Unmarshal payload: %s", err.Error()) | ||
171 | http.Error(w, "400 Bad Request", http.StatusBadRequest) | ||
172 | return | ||
173 | } | ||
174 | |||
175 | dev := msg.FormattedMac() | ||
176 | ret := NewInformWrapperResponse(msg) | ||
177 | log.Printf("Inform from %s", dev) | ||
178 | |||
179 | // Send a command until the queue is empty | ||
180 | if cmd := h.pop(dev); cmd != nil { | ||
181 | ret.UpdatePayload(cmd) | ||
182 | } else { | ||
183 | // Update internal state vs reality | ||
184 | if err := h.buildCommands(msg.FormattedMac(), pl); err != nil { | ||
185 | http.Error(w, "500 Server Error", 500) | ||
186 | return | ||
187 | } | ||
188 | |||
189 | // If that generated a command send it | ||
190 | if cmd = h.pop(dev); cmd != nil { | ||
191 | ret.UpdatePayload(cmd) | ||
192 | } else { | ||
193 | // Otherwise noop | ||
194 | ret.UpdatePayload(NewNoop(10)) | ||
195 | } | ||
196 | } | ||
197 | |||
198 | res, err := h.Codec.Marshal(ret) | ||
199 | if err != nil { | ||
200 | http.Error(w, "500 Server Error", 500) | ||
201 | return | ||
202 | } | ||
203 | |||
204 | fmt.Fprintf(w, "%s", res) | ||
205 | } | ||
206 | |||
207 | // Create a new server, returns the mux so users can add other methods (for | ||
208 | // example, if they want to share a process to build a console that also | ||
209 | // accepts informs) | ||
210 | func NewServer(handler *InformHandler) (*http.Server, *http.ServeMux) { | ||
211 | mux := http.NewServeMux() | ||
212 | mux.Handle("/inform", handler) | ||
213 | |||
214 | return &http.Server{ | ||
215 | Addr: ":6080", | ||
216 | Handler: Log(mux), | ||
217 | }, mux | ||
218 | } | ||