diff options
Diffstat (limited to 'inform/server.go')
-rw-r--r-- | inform/server.go | 206 |
1 files changed, 166 insertions, 40 deletions
diff --git a/inform/server.go b/inform/server.go index 7e33a86..710b8ac 100644 --- a/inform/server.go +++ b/inform/server.go | |||
@@ -1,55 +1,155 @@ | |||
1 | package inform | 1 | package inform |
2 | 2 | ||
3 | import ( | 3 | import ( |
4 | "container/list" | ||
5 | "errors" | ||
4 | "fmt" | 6 | "fmt" |
5 | "log" | 7 | "log" |
6 | "net/http" | 8 | "net/http" |
9 | "sync" | ||
7 | "time" | 10 | "time" |
8 | ) | 11 | ) |
9 | 12 | ||
10 | type StateTree struct { | 13 | func Log(handler http.Handler) http.Handler { |
11 | states map[string]map[int]int | 14 | return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
15 | handler.ServeHTTP(w, r) | ||
16 | t := time.Now().Format("02/Jan/2006 15:04:05") | ||
17 | log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) | ||
18 | }) | ||
12 | } | 19 | } |
13 | 20 | ||
14 | func NewStateTree() *StateTree { | 21 | type Device struct { |
15 | return &StateTree{make(map[string]map[int]int)} | 22 | Initialized bool |
23 | CurrentState bool | ||
24 | DesiredState bool | ||
25 | *sync.Mutex | ||
26 | } | ||
27 | |||
28 | type InformHandler struct { | ||
29 | Codec *Codec | ||
30 | ports map[string]map[int]*Device | ||
31 | queue map[string]*list.List | ||
32 | *sync.RWMutex | ||
33 | } | ||
34 | |||
35 | func NewInformHandler(c *Codec) *InformHandler { | ||
36 | return &InformHandler{ | ||
37 | Codec: c, | ||
38 | ports: make(map[string]map[int]*Device), | ||
39 | queue: make(map[string]*list.List), | ||
40 | RWMutex: &sync.RWMutex{}, | ||
41 | } | ||
16 | } | 42 | } |
17 | 43 | ||
18 | func (t *StateTree) ensureNode(device string, port int) { | 44 | func (h *InformHandler) AddPort(dev string, port int) { |
19 | _, ok := t.states[device] | 45 | h.Lock() |
46 | defer h.Unlock() | ||
47 | |||
48 | _, ok := h.ports[dev] | ||
20 | if !ok { | 49 | if !ok { |
21 | t.states[device] = make(map[int]int) | 50 | h.ports[dev] = make(map[int]*Device) |
22 | } | 51 | } |
23 | 52 | ||
24 | _, ok = t.states[device][port] | 53 | _, ok = h.queue[dev] |
25 | if !ok { | 54 | if !ok { |
26 | t.states[device][port] = 0 | 55 | log.Printf("Adding queue for %s", dev) |
56 | h.queue[dev] = list.New() | ||
57 | } | ||
58 | |||
59 | log.Printf("Adding %s port %d", dev, port) | ||
60 | h.ports[dev][port] = &Device{ | ||
61 | Mutex: &sync.Mutex{}, | ||
27 | } | 62 | } |
28 | } | 63 | } |
29 | 64 | ||
30 | func (t *StateTree) GetState(device string, port int) int { | 65 | func (h *InformHandler) getPort(dev string, port int) (*Device, error) { |
31 | t.ensureNode(device, port) | 66 | h.RLock() |
32 | return t.states[device][port] | 67 | defer h.RUnlock() |
68 | |||
69 | _, ok := h.ports[dev] | ||
70 | if !ok { | ||
71 | return nil, errors.New("No device found") | ||
72 | } | ||
73 | |||
74 | p, ok := h.ports[dev][port] | ||
75 | if !ok { | ||
76 | return nil, errors.New("No port found") | ||
77 | } | ||
78 | |||
79 | return p, nil | ||
33 | } | 80 | } |
34 | 81 | ||
35 | func (t *StateTree) SetState(device string, port, value int) { | 82 | func (h *InformHandler) SetState(dev string, port int, state bool) error { |
36 | t.ensureNode(device, port) | 83 | p, err := h.getPort(dev, port) |
37 | t.states[device][port] = value | 84 | if err != nil { |
85 | return err | ||
86 | } | ||
87 | |||
88 | p.Lock() | ||
89 | defer p.Unlock() | ||
90 | |||
91 | log.Printf("Set state to %t for %s port %d", state, dev, port) | ||
92 | p.DesiredState = state | ||
93 | return nil | ||
38 | } | 94 | } |
39 | 95 | ||
40 | func Log(handler http.Handler) http.Handler { | 96 | func (h *InformHandler) buildCommands(dev string, pl *DeviceMessage) error { |
41 | return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | 97 | for _, o := range pl.Outputs { |
42 | handler.ServeHTTP(w, r) | 98 | ds, err := h.getPort(dev, o.Port) |
43 | t := time.Now().Format("02/Jan/2006 15:04:05") | 99 | if err != nil { |
44 | log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) | 100 | return err |
45 | // Addr - - [D/M/Y H:M:S] "Method RequestURI Proto" Code Size | 101 | } |
46 | // 127.0.0.1 - - [24/Sep/2016 14:30:35] "GET / HTTP/1.1" 200 - | 102 | ds.Lock() |
47 | }) | 103 | |
104 | // Get initial state | ||
105 | if !ds.Initialized { | ||
106 | ds.CurrentState = o.OutputState | ||
107 | ds.Initialized = true | ||
108 | return nil | ||
109 | } | ||
110 | |||
111 | // State didn't change at the sensor | ||
112 | if ds.CurrentState == o.OutputState { | ||
113 | if ds.DesiredState != o.OutputState { | ||
114 | log.Printf("Toggle state %t for %s port %d", ds.DesiredState, dev, o.Port) | ||
115 | // Generate change command | ||
116 | // TODO: Don't lock the whole handler | ||
117 | h.Lock() | ||
118 | h.queue[dev].PushFront(NewOutputCommand(o.Port, ds.DesiredState, 0)) | ||
119 | h.Unlock() | ||
120 | } | ||
121 | } else { // Sensor caused the change, leave it alone | ||
122 | log.Printf("Sensor state changed %s port %d", dev, o.Port) | ||
123 | ds.DesiredState = o.OutputState | ||
124 | } | ||
125 | |||
126 | ds.CurrentState = o.OutputState | ||
127 | ds.Unlock() // Don't hold the lock the entire loop | ||
128 | } | ||
129 | |||
130 | return nil | ||
48 | } | 131 | } |
49 | 132 | ||
50 | type InformHandler struct { | 133 | func (h *InformHandler) pop(dev string) *CommandMessage { |
51 | Codec *Codec | 134 | // TODO: Don't lock the whole handler |
52 | StateTree *StateTree | 135 | h.Lock() |
136 | defer h.Unlock() | ||
137 | |||
138 | q, ok := h.queue[dev] | ||
139 | if !ok { | ||
140 | log.Printf("No queue for %s", dev) | ||
141 | return nil | ||
142 | } | ||
143 | |||
144 | e := q.Front() | ||
145 | if e != nil { | ||
146 | h.queue[dev].Remove(e) | ||
147 | cmd := e.Value.(*CommandMessage) | ||
148 | cmd.Freshen() | ||
149 | return cmd | ||
150 | } | ||
151 | |||
152 | return nil | ||
53 | } | 153 | } |
54 | 154 | ||
55 | func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | 155 | func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
@@ -58,35 +158,61 @@ func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |||
58 | return | 158 | return |
59 | } | 159 | } |
60 | 160 | ||
61 | if r.URL != nil && r.URL.Path != "/inform" { | 161 | msg, err := h.Codec.Unmarshal(r.Body) |
62 | http.Error(w, "404 Not Found", http.StatusNotFound) | 162 | if err != nil { |
163 | log.Printf("Unmarshal message: %s", err.Error()) | ||
164 | http.Error(w, "400 Bad Request", http.StatusBadRequest) | ||
63 | return | 165 | return |
64 | } | 166 | } |
65 | 167 | ||
66 | msg, err := h.Codec.Unmarshal(r.Body) | 168 | pl, err := msg.UnmarshalPayload() |
67 | if err != nil { | 169 | if err != nil { |
68 | http.Error(w, "Bad Request", http.StatusBadRequest) | 170 | log.Printf("Unmarshal payload: %s", err.Error()) |
171 | http.Error(w, "400 Bad Request", http.StatusBadRequest) | ||
69 | return | 172 | return |
70 | } | 173 | } |
71 | 174 | ||
72 | pl := NewInformWrapper() | 175 | dev := msg.FormattedMac() |
73 | copy(pl.MacAddr, msg.MacAddr) | 176 | ret := NewInformWrapperResponse(msg) |
74 | pl.SetEncrypted(true) | 177 | log.Printf("Inform from %s", dev) |
75 | 178 | ||
76 | // TODO: compare current state to tree and update | 179 | // Send a command until the queue is empty |
180 | if cmd := h.pop(dev); cmd != nil { | ||
181 | ret.UpdatePayload(cmd) | ||
182 | } else { | ||
183 | // Update internal state vs reality | ||
184 | if err := h.buildCommands(msg.FormattedMac(), pl); err != nil { | ||
185 | http.Error(w, "500 Server Error", 500) | ||
186 | return | ||
187 | } | ||
188 | |||
189 | // If that generated a command send it | ||
190 | if cmd = h.pop(dev); cmd != nil { | ||
191 | ret.UpdatePayload(cmd) | ||
192 | } else { | ||
193 | // Otherwise noop | ||
194 | ret.UpdatePayload(NewNoop(10)) | ||
195 | } | ||
196 | } | ||
77 | 197 | ||
78 | res, err := h.Codec.Marshal(pl) | 198 | res, err := h.Codec.Marshal(ret) |
79 | if err != nil { | 199 | if err != nil { |
80 | http.Error(w, "Server Error", 500) | 200 | http.Error(w, "500 Server Error", 500) |
81 | return | 201 | return |
82 | } | 202 | } |
83 | 203 | ||
84 | fmt.Fprintf(w, "%s", res) | 204 | fmt.Fprintf(w, "%s", res) |
85 | } | 205 | } |
86 | 206 | ||
87 | func NewServer(handler *InformHandler) *http.Server { | 207 | // Create a new server, returns the mux so users can add other methods (for |
208 | // example, if they want to share a process to build a console that also | ||
209 | // accepts informs) | ||
210 | func NewServer(handler *InformHandler) (*http.Server, *http.ServeMux) { | ||
211 | mux := http.NewServeMux() | ||
212 | mux.Handle("/inform", handler) | ||
213 | |||
88 | return &http.Server{ | 214 | return &http.Server{ |
89 | Addr: ":6080", | 215 | Addr: ":6080", |
90 | Handler: Log(handler), | 216 | Handler: Log(mux), |
91 | } | 217 | }, mux |
92 | } | 218 | } |