diff options
author | Mike Crute <mcrute@gmail.com> | 2016-09-24 19:35:35 -0700 |
---|---|---|
committer | Mike Crute <mike@crute.us> | 2017-07-18 03:48:44 +0000 |
commit | 8fcbc57005d7a7195ee71d2ddc0b1ab70427ec3d (patch) | |
tree | 6cba2091448fe85d5650829f04f6239cdef80b6f /inform | |
parent | 409dcc5e2a0a603bf34a8066ca8f5451464e1e13 (diff) | |
download | go-inform-8fcbc57005d7a7195ee71d2ddc0b1ab70427ec3d.tar.bz2 go-inform-8fcbc57005d7a7195ee71d2ddc0b1ab70427ec3d.tar.xz go-inform-8fcbc57005d7a7195ee71d2ddc0b1ab70427ec3d.zip |
WIP
Diffstat (limited to 'inform')
-rw-r--r-- | inform/inform.go | 29 | ||||
-rw-r--r-- | inform/server.go | 206 | ||||
-rw-r--r-- | inform/tx_messages.go | 20 |
3 files changed, 211 insertions, 44 deletions
diff --git a/inform/inform.go b/inform/inform.go index 5c3ca1c..ac3b57d 100644 --- a/inform/inform.go +++ b/inform/inform.go | |||
@@ -27,12 +27,26 @@ type InformWrapper struct { | |||
27 | 27 | ||
28 | // Create InformWrapper with sane defaults | 28 | // Create InformWrapper with sane defaults |
29 | func NewInformWrapper() *InformWrapper { | 29 | func NewInformWrapper() *InformWrapper { |
30 | return &InformWrapper{ | 30 | w := &InformWrapper{ |
31 | Version: INFORM_VERSION, | 31 | Version: INFORM_VERSION, |
32 | MacAddr: make([]byte, 6), | 32 | MacAddr: make([]byte, 6), |
33 | Flags: 0, | 33 | Flags: 0, |
34 | DataVersion: DATA_VERSION, | 34 | DataVersion: DATA_VERSION, |
35 | } | 35 | } |
36 | |||
37 | // Almost all messages are encrypted outside of provisioning so default | ||
38 | // this and make users explicitly disable it. | ||
39 | w.SetEncrypted(true) | ||
40 | |||
41 | return w | ||
42 | } | ||
43 | |||
44 | // Create an InformWrapper that is a response to an incoming wrapper. Copies | ||
45 | // all necessary data for a response so callers can just set a payload | ||
46 | func NewInformWrapperResponse(msg *InformWrapper) *InformWrapper { | ||
47 | w := NewInformWrapper() | ||
48 | copy(w.MacAddr, msg.MacAddr) | ||
49 | return w | ||
36 | } | 50 | } |
37 | 51 | ||
38 | // Update the payload data with JSON value | 52 | // Update the payload data with JSON value |
@@ -45,6 +59,19 @@ func (i *InformWrapper) UpdatePayload(v interface{}) error { | |||
45 | } | 59 | } |
46 | } | 60 | } |
47 | 61 | ||
62 | // Unmarshal a payload body that we received from a device. Does not work for | ||
63 | // user-set messages | ||
64 | func (i *InformWrapper) UnmarshalPayload() (*DeviceMessage, error) { | ||
65 | var m DeviceMessage | ||
66 | |||
67 | err := json.Unmarshal(i.Payload, &m) | ||
68 | if err != nil { | ||
69 | return nil, err | ||
70 | } | ||
71 | |||
72 | return &m, nil | ||
73 | } | ||
74 | |||
48 | // Format Mac address bytes as lowercase string with colons | 75 | // Format Mac address bytes as lowercase string with colons |
49 | func (i *InformWrapper) FormattedMac() string { | 76 | func (i *InformWrapper) FormattedMac() string { |
50 | return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", | 77 | return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", |
diff --git a/inform/server.go b/inform/server.go index 7e33a86..710b8ac 100644 --- a/inform/server.go +++ b/inform/server.go | |||
@@ -1,55 +1,155 @@ | |||
1 | package inform | 1 | package inform |
2 | 2 | ||
3 | import ( | 3 | import ( |
4 | "container/list" | ||
5 | "errors" | ||
4 | "fmt" | 6 | "fmt" |
5 | "log" | 7 | "log" |
6 | "net/http" | 8 | "net/http" |
9 | "sync" | ||
7 | "time" | 10 | "time" |
8 | ) | 11 | ) |
9 | 12 | ||
10 | type StateTree struct { | 13 | func Log(handler http.Handler) http.Handler { |
11 | states map[string]map[int]int | 14 | return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
15 | handler.ServeHTTP(w, r) | ||
16 | t := time.Now().Format("02/Jan/2006 15:04:05") | ||
17 | log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) | ||
18 | }) | ||
12 | } | 19 | } |
13 | 20 | ||
14 | func NewStateTree() *StateTree { | 21 | type Device struct { |
15 | return &StateTree{make(map[string]map[int]int)} | 22 | Initialized bool |
23 | CurrentState bool | ||
24 | DesiredState bool | ||
25 | *sync.Mutex | ||
26 | } | ||
27 | |||
28 | type InformHandler struct { | ||
29 | Codec *Codec | ||
30 | ports map[string]map[int]*Device | ||
31 | queue map[string]*list.List | ||
32 | *sync.RWMutex | ||
33 | } | ||
34 | |||
35 | func NewInformHandler(c *Codec) *InformHandler { | ||
36 | return &InformHandler{ | ||
37 | Codec: c, | ||
38 | ports: make(map[string]map[int]*Device), | ||
39 | queue: make(map[string]*list.List), | ||
40 | RWMutex: &sync.RWMutex{}, | ||
41 | } | ||
16 | } | 42 | } |
17 | 43 | ||
18 | func (t *StateTree) ensureNode(device string, port int) { | 44 | func (h *InformHandler) AddPort(dev string, port int) { |
19 | _, ok := t.states[device] | 45 | h.Lock() |
46 | defer h.Unlock() | ||
47 | |||
48 | _, ok := h.ports[dev] | ||
20 | if !ok { | 49 | if !ok { |
21 | t.states[device] = make(map[int]int) | 50 | h.ports[dev] = make(map[int]*Device) |
22 | } | 51 | } |
23 | 52 | ||
24 | _, ok = t.states[device][port] | 53 | _, ok = h.queue[dev] |
25 | if !ok { | 54 | if !ok { |
26 | t.states[device][port] = 0 | 55 | log.Printf("Adding queue for %s", dev) |
56 | h.queue[dev] = list.New() | ||
57 | } | ||
58 | |||
59 | log.Printf("Adding %s port %d", dev, port) | ||
60 | h.ports[dev][port] = &Device{ | ||
61 | Mutex: &sync.Mutex{}, | ||
27 | } | 62 | } |
28 | } | 63 | } |
29 | 64 | ||
30 | func (t *StateTree) GetState(device string, port int) int { | 65 | func (h *InformHandler) getPort(dev string, port int) (*Device, error) { |
31 | t.ensureNode(device, port) | 66 | h.RLock() |
32 | return t.states[device][port] | 67 | defer h.RUnlock() |
68 | |||
69 | _, ok := h.ports[dev] | ||
70 | if !ok { | ||
71 | return nil, errors.New("No device found") | ||
72 | } | ||
73 | |||
74 | p, ok := h.ports[dev][port] | ||
75 | if !ok { | ||
76 | return nil, errors.New("No port found") | ||
77 | } | ||
78 | |||
79 | return p, nil | ||
33 | } | 80 | } |
34 | 81 | ||
35 | func (t *StateTree) SetState(device string, port, value int) { | 82 | func (h *InformHandler) SetState(dev string, port int, state bool) error { |
36 | t.ensureNode(device, port) | 83 | p, err := h.getPort(dev, port) |
37 | t.states[device][port] = value | 84 | if err != nil { |
85 | return err | ||
86 | } | ||
87 | |||
88 | p.Lock() | ||
89 | defer p.Unlock() | ||
90 | |||
91 | log.Printf("Set state to %t for %s port %d", state, dev, port) | ||
92 | p.DesiredState = state | ||
93 | return nil | ||
38 | } | 94 | } |
39 | 95 | ||
40 | func Log(handler http.Handler) http.Handler { | 96 | func (h *InformHandler) buildCommands(dev string, pl *DeviceMessage) error { |
41 | return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | 97 | for _, o := range pl.Outputs { |
42 | handler.ServeHTTP(w, r) | 98 | ds, err := h.getPort(dev, o.Port) |
43 | t := time.Now().Format("02/Jan/2006 15:04:05") | 99 | if err != nil { |
44 | log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) | 100 | return err |
45 | // Addr - - [D/M/Y H:M:S] "Method RequestURI Proto" Code Size | 101 | } |
46 | // 127.0.0.1 - - [24/Sep/2016 14:30:35] "GET / HTTP/1.1" 200 - | 102 | ds.Lock() |
47 | }) | 103 | |
104 | // Get initial state | ||
105 | if !ds.Initialized { | ||
106 | ds.CurrentState = o.OutputState | ||
107 | ds.Initialized = true | ||
108 | return nil | ||
109 | } | ||
110 | |||
111 | // State didn't change at the sensor | ||
112 | if ds.CurrentState == o.OutputState { | ||
113 | if ds.DesiredState != o.OutputState { | ||
114 | log.Printf("Toggle state %t for %s port %d", ds.DesiredState, dev, o.Port) | ||
115 | // Generate change command | ||
116 | // TODO: Don't lock the whole handler | ||
117 | h.Lock() | ||
118 | h.queue[dev].PushFront(NewOutputCommand(o.Port, ds.DesiredState, 0)) | ||
119 | h.Unlock() | ||
120 | } | ||
121 | } else { // Sensor caused the change, leave it alone | ||
122 | log.Printf("Sensor state changed %s port %d", dev, o.Port) | ||
123 | ds.DesiredState = o.OutputState | ||
124 | } | ||
125 | |||
126 | ds.CurrentState = o.OutputState | ||
127 | ds.Unlock() // Don't hold the lock the entire loop | ||
128 | } | ||
129 | |||
130 | return nil | ||
48 | } | 131 | } |
49 | 132 | ||
50 | type InformHandler struct { | 133 | func (h *InformHandler) pop(dev string) *CommandMessage { |
51 | Codec *Codec | 134 | // TODO: Don't lock the whole handler |
52 | StateTree *StateTree | 135 | h.Lock() |
136 | defer h.Unlock() | ||
137 | |||
138 | q, ok := h.queue[dev] | ||
139 | if !ok { | ||
140 | log.Printf("No queue for %s", dev) | ||
141 | return nil | ||
142 | } | ||
143 | |||
144 | e := q.Front() | ||
145 | if e != nil { | ||
146 | h.queue[dev].Remove(e) | ||
147 | cmd := e.Value.(*CommandMessage) | ||
148 | cmd.Freshen() | ||
149 | return cmd | ||
150 | } | ||
151 | |||
152 | return nil | ||
53 | } | 153 | } |
54 | 154 | ||
55 | func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | 155 | func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
@@ -58,35 +158,61 @@ func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |||
58 | return | 158 | return |
59 | } | 159 | } |
60 | 160 | ||
61 | if r.URL != nil && r.URL.Path != "/inform" { | 161 | msg, err := h.Codec.Unmarshal(r.Body) |
62 | http.Error(w, "404 Not Found", http.StatusNotFound) | 162 | if err != nil { |
163 | log.Printf("Unmarshal message: %s", err.Error()) | ||
164 | http.Error(w, "400 Bad Request", http.StatusBadRequest) | ||
63 | return | 165 | return |
64 | } | 166 | } |
65 | 167 | ||
66 | msg, err := h.Codec.Unmarshal(r.Body) | 168 | pl, err := msg.UnmarshalPayload() |
67 | if err != nil { | 169 | if err != nil { |
68 | http.Error(w, "Bad Request", http.StatusBadRequest) | 170 | log.Printf("Unmarshal payload: %s", err.Error()) |
171 | http.Error(w, "400 Bad Request", http.StatusBadRequest) | ||
69 | return | 172 | return |
70 | } | 173 | } |
71 | 174 | ||
72 | pl := NewInformWrapper() | 175 | dev := msg.FormattedMac() |
73 | copy(pl.MacAddr, msg.MacAddr) | 176 | ret := NewInformWrapperResponse(msg) |
74 | pl.SetEncrypted(true) | 177 | log.Printf("Inform from %s", dev) |
75 | 178 | ||
76 | // TODO: compare current state to tree and update | 179 | // Send a command until the queue is empty |
180 | if cmd := h.pop(dev); cmd != nil { | ||
181 | ret.UpdatePayload(cmd) | ||
182 | } else { | ||
183 | // Update internal state vs reality | ||
184 | if err := h.buildCommands(msg.FormattedMac(), pl); err != nil { | ||
185 | http.Error(w, "500 Server Error", 500) | ||
186 | return | ||
187 | } | ||
188 | |||
189 | // If that generated a command send it | ||
190 | if cmd = h.pop(dev); cmd != nil { | ||
191 | ret.UpdatePayload(cmd) | ||
192 | } else { | ||
193 | // Otherwise noop | ||
194 | ret.UpdatePayload(NewNoop(10)) | ||
195 | } | ||
196 | } | ||
77 | 197 | ||
78 | res, err := h.Codec.Marshal(pl) | 198 | res, err := h.Codec.Marshal(ret) |
79 | if err != nil { | 199 | if err != nil { |
80 | http.Error(w, "Server Error", 500) | 200 | http.Error(w, "500 Server Error", 500) |
81 | return | 201 | return |
82 | } | 202 | } |
83 | 203 | ||
84 | fmt.Fprintf(w, "%s", res) | 204 | fmt.Fprintf(w, "%s", res) |
85 | } | 205 | } |
86 | 206 | ||
87 | func NewServer(handler *InformHandler) *http.Server { | 207 | // Create a new server, returns the mux so users can add other methods (for |
208 | // example, if they want to share a process to build a console that also | ||
209 | // accepts informs) | ||
210 | func NewServer(handler *InformHandler) (*http.Server, *http.ServeMux) { | ||
211 | mux := http.NewServeMux() | ||
212 | mux.Handle("/inform", handler) | ||
213 | |||
88 | return &http.Server{ | 214 | return &http.Server{ |
89 | Addr: ":6080", | 215 | Addr: ":6080", |
90 | Handler: Log(handler), | 216 | Handler: Log(mux), |
91 | } | 217 | }, mux |
92 | } | 218 | } |
diff --git a/inform/tx_messages.go b/inform/tx_messages.go index 0021ef3..8962cad 100644 --- a/inform/tx_messages.go +++ b/inform/tx_messages.go | |||
@@ -25,8 +25,15 @@ type CommandMessage struct { | |||
25 | Voltage int `json:"volt,omitempty"` | 25 | Voltage int `json:"volt,omitempty"` |
26 | } | 26 | } |
27 | 27 | ||
28 | func NewOutputCommand(port, val, timer int) *CommandMessage { | 28 | // Freshen timestamps |
29 | return &CommandMessage{ | 29 | func (m *CommandMessage) Freshen() { |
30 | m.DateTime = time.Now().Format(time.RFC3339) | ||
31 | m.ServerTime = unixMicroPSTString() | ||
32 | m.Time = unixMicroPST() | ||
33 | } | ||
34 | |||
35 | func NewOutputCommand(port int, val bool, timer int) *CommandMessage { | ||
36 | m := &CommandMessage{ | ||
30 | Type: "cmd", | 37 | Type: "cmd", |
31 | Command: "mfi-output", | 38 | Command: "mfi-output", |
32 | DateTime: time.Now().Format(time.RFC3339), | 39 | DateTime: time.Now().Format(time.RFC3339), |
@@ -34,8 +41,15 @@ func NewOutputCommand(port, val, timer int) *CommandMessage { | |||
34 | ServerTime: unixMicroPSTString(), | 41 | ServerTime: unixMicroPSTString(), |
35 | Time: unixMicroPST(), | 42 | Time: unixMicroPST(), |
36 | Timer: timer, | 43 | Timer: timer, |
37 | Value: val, | ||
38 | } | 44 | } |
45 | |||
46 | if val { | ||
47 | m.Value = 1 | ||
48 | } else { | ||
49 | m.Value = 0 | ||
50 | } | ||
51 | |||
52 | return m | ||
39 | } | 53 | } |
40 | 54 | ||
41 | type NoopMessage struct { | 55 | type NoopMessage struct { |