From 8fcbc57005d7a7195ee71d2ddc0b1ab70427ec3d Mon Sep 17 00:00:00 2001 From: Mike Crute Date: Sat, 24 Sep 2016 19:35:35 -0700 Subject: WIP --- example.go | 78 ++++++++++--------- example_hk.go | 125 ++++++++++++++++++++++++++++++ inform/inform.go | 29 ++++++- inform/server.go | 206 ++++++++++++++++++++++++++++++++++++++++---------- inform/tx_messages.go | 20 ++++- 5 files changed, 378 insertions(+), 80 deletions(-) create mode 100644 example_hk.go diff --git a/example.go b/example.go index 8e10090..440b4ee 100644 --- a/example.go +++ b/example.go @@ -1,49 +1,55 @@ package main import ( - "encoding/json" - "fmt" - "github.com/mcrute/go-inform/inform" - "io/ioutil" - "os" + "github.com/brutella/hc" + "github.com/brutella/hc/accessory" + + "log" + "time" ) func main() { - fp, err := os.Open("data/test_files/1.bin") - if err != nil { - fmt.Println("Error loading file") - return - } - defer fp.Close() - - kp, err := os.Open("data/device_keys.json") - if err != nil { - fmt.Println("Error loading key file") - return + switchInfo := accessory.Info{ + Name: "Lamp", + SerialNumber: "051AC-23AAM1", + Manufacturer: "Foobar", + Model: "AB", } - defer kp.Close() + acc := accessory.NewSwitch(switchInfo) - var keys map[string]string - kd, _ := ioutil.ReadAll(kp) - json.Unmarshal(kd, &keys) + config := hc.Config{Pin: "12344321", Port: "12345", StoragePath: "./db"} + t, err := hc.NewIPTransport(config, acc.Accessory) - codec := &inform.Codec{keys} - - msg, err := codec.Unmarshal(fp) if err != nil { - fmt.Println(err.Error()) - return + log.Fatal(err) } - fmt.Printf("%s", msg) - - out, _ := os.Create("test.out") - defer out.Close() - - pkt, err := codec.Marshal(msg) - if err != nil { - fmt.Println(err.Error()) - return - } - out.Write(pkt) + // Log to console when client (e.g. iOS app) changes the value of the on characteristic + acc.Switch.On.OnValueRemoteUpdate(func(on bool) { + if on == true { + log.Println("[INFO] Client changed switch to on") + } else { + log.Println("[INFO] Client changed switch to off") + } + }) + + // Periodically toggle the switch's on characteristic + go func() { + for { + on := !acc.Switch.On.GetValue() + if on == true { + log.Println("[INFO] Switch is on") + } else { + log.Println("[INFO] Switch is off") + } + acc.Switch.On.SetValue(on) + time.Sleep(5 * time.Second) + } + }() + + hc.OnTermination(func() { + t.Stop() + }) + + t.Start() } diff --git a/example_hk.go b/example_hk.go new file mode 100644 index 0000000..ee89281 --- /dev/null +++ b/example_hk.go @@ -0,0 +1,125 @@ +package main + +import ( + "encoding/json" + "fmt" + "github.com/brutella/hc" + "github.com/brutella/hc/accessory" + "github.com/mcrute/go-inform/inform" + "io/ioutil" + "log" + "os" +) + +// Load devices into state +// Gather current initial state from devices +// Track state transitions +// Inputs: +// - Homekit +// - Devices + +type Port struct { + Label string `json:"label"` + Port int `json:"port"` +} + +type Device struct { + Key string `json:"key"` + Name string `json:"name"` + Model string `json:"model"` + Serial string `json:"serial"` + Ports []*Port `json:"ports"` +} + +type DeviceMap map[string]*Device + +func LoadKeys(file string) (DeviceMap, error) { + var keys DeviceMap + + kp, err := os.Open(file) + if err != nil { + return nil, err + } + defer kp.Close() + + kd, err := ioutil.ReadAll(kp) + if err != nil { + return nil, err + } + + err = json.Unmarshal(kd, &keys) + if err != nil { + return nil, err + } + + return keys, nil +} + +func main() { + devs, err := LoadKeys("data/device_keys.json") + if err != nil { + log.Println("Error loading key file") + log.Println(err.Error()) + return + } + + keys := make(map[string]string, len(devs)) + for i, d := range devs { + keys[i] = d.Key + } + + h := inform.NewInformHandler(&inform.Codec{keys}) + s, _ := inform.NewServer(h) + as := make([]*accessory.Accessory, 0, len(devs)*3) + + for i, d := range devs { + for _, p := range d.Ports { + a := accessory.NewSwitch(accessory.Info{ + Name: p.Label, + SerialNumber: fmt.Sprintf("%s-%d", d.Serial, p.Port), + Manufacturer: "Ubiquiti", + Model: d.Model, + }) + + // Capture these for the closure, otherwise they're bound to the + // single loop variable and will only see the final value of that + // variable + dev, port := i, p.Port + + a.Switch.On.OnValueRemoteUpdate(func(on bool) { + h.SetState(dev, port, on) + }) + + h.AddPort(dev, port) + as = append(as, a.Accessory) + } + } + + // The root accessory is what gets used to name the bridge so let's make it + // an actual bridge + br := accessory.New(accessory.Info{ + Name: "UnifiBridge", + Manufacturer: "Mike Crute", + Model: "0.1", + }, accessory.TypeBridge) + + config := hc.Config{ + Pin: "12344321", + Port: "12345", + StoragePath: "./db", + } + + t, err := hc.NewIPTransport(config, br, as...) + if err != nil { + log.Fatal(err) + return + } + + hc.OnTermination(func() { + t.Stop() + os.Exit(0) // Otherwise homekit doesn't actually stop + }) + + go t.Start() + s.ListenAndServe() +} diff --git a/inform/inform.go b/inform/inform.go index 5c3ca1c..ac3b57d 100644 --- a/inform/inform.go +++ b/inform/inform.go @@ -27,12 +27,26 @@ type InformWrapper struct { // Create InformWrapper with sane defaults func NewInformWrapper() *InformWrapper { - return &InformWrapper{ + w := &InformWrapper{ Version: INFORM_VERSION, MacAddr: make([]byte, 6), Flags: 0, DataVersion: DATA_VERSION, } + + // Almost all messages are encrypted outside of provisioning so default + // this and make users explicitly disable it. + w.SetEncrypted(true) + + return w +} + +// Create an InformWrapper that is a response to an incoming wrapper. Copies +// all necessary data for a response so callers can just set a payload +func NewInformWrapperResponse(msg *InformWrapper) *InformWrapper { + w := NewInformWrapper() + copy(w.MacAddr, msg.MacAddr) + return w } // Update the payload data with JSON value @@ -45,6 +59,19 @@ func (i *InformWrapper) UpdatePayload(v interface{}) error { } } +// Unmarshal a payload body that we received from a device. Does not work for +// user-set messages +func (i *InformWrapper) UnmarshalPayload() (*DeviceMessage, error) { + var m DeviceMessage + + err := json.Unmarshal(i.Payload, &m) + if err != nil { + return nil, err + } + + return &m, nil +} + // Format Mac address bytes as lowercase string with colons func (i *InformWrapper) FormattedMac() string { return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", diff --git a/inform/server.go b/inform/server.go index 7e33a86..710b8ac 100644 --- a/inform/server.go +++ b/inform/server.go @@ -1,55 +1,155 @@ package inform import ( + "container/list" + "errors" "fmt" "log" "net/http" + "sync" "time" ) -type StateTree struct { - states map[string]map[int]int +func Log(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler.ServeHTTP(w, r) + t := time.Now().Format("02/Jan/2006 15:04:05") + log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) + }) } -func NewStateTree() *StateTree { - return &StateTree{make(map[string]map[int]int)} +type Device struct { + Initialized bool + CurrentState bool + DesiredState bool + *sync.Mutex +} + +type InformHandler struct { + Codec *Codec + ports map[string]map[int]*Device + queue map[string]*list.List + *sync.RWMutex +} + +func NewInformHandler(c *Codec) *InformHandler { + return &InformHandler{ + Codec: c, + ports: make(map[string]map[int]*Device), + queue: make(map[string]*list.List), + RWMutex: &sync.RWMutex{}, + } } -func (t *StateTree) ensureNode(device string, port int) { - _, ok := t.states[device] +func (h *InformHandler) AddPort(dev string, port int) { + h.Lock() + defer h.Unlock() + + _, ok := h.ports[dev] if !ok { - t.states[device] = make(map[int]int) + h.ports[dev] = make(map[int]*Device) } - _, ok = t.states[device][port] + _, ok = h.queue[dev] if !ok { - t.states[device][port] = 0 + log.Printf("Adding queue for %s", dev) + h.queue[dev] = list.New() + } + + log.Printf("Adding %s port %d", dev, port) + h.ports[dev][port] = &Device{ + Mutex: &sync.Mutex{}, } } -func (t *StateTree) GetState(device string, port int) int { - t.ensureNode(device, port) - return t.states[device][port] +func (h *InformHandler) getPort(dev string, port int) (*Device, error) { + h.RLock() + defer h.RUnlock() + + _, ok := h.ports[dev] + if !ok { + return nil, errors.New("No device found") + } + + p, ok := h.ports[dev][port] + if !ok { + return nil, errors.New("No port found") + } + + return p, nil } -func (t *StateTree) SetState(device string, port, value int) { - t.ensureNode(device, port) - t.states[device][port] = value +func (h *InformHandler) SetState(dev string, port int, state bool) error { + p, err := h.getPort(dev, port) + if err != nil { + return err + } + + p.Lock() + defer p.Unlock() + + log.Printf("Set state to %t for %s port %d", state, dev, port) + p.DesiredState = state + return nil } -func Log(handler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - handler.ServeHTTP(w, r) - t := time.Now().Format("02/Jan/2006 15:04:05") - log.Printf("%s - - [%s] \"%s %s %s\"", r.RemoteAddr, t, r.Method, r.URL, r.Proto) - // Addr - - [D/M/Y H:M:S] "Method RequestURI Proto" Code Size - // 127.0.0.1 - - [24/Sep/2016 14:30:35] "GET / HTTP/1.1" 200 - - }) +func (h *InformHandler) buildCommands(dev string, pl *DeviceMessage) error { + for _, o := range pl.Outputs { + ds, err := h.getPort(dev, o.Port) + if err != nil { + return err + } + ds.Lock() + + // Get initial state + if !ds.Initialized { + ds.CurrentState = o.OutputState + ds.Initialized = true + return nil + } + + // State didn't change at the sensor + if ds.CurrentState == o.OutputState { + if ds.DesiredState != o.OutputState { + log.Printf("Toggle state %t for %s port %d", ds.DesiredState, dev, o.Port) + // Generate change command + // TODO: Don't lock the whole handler + h.Lock() + h.queue[dev].PushFront(NewOutputCommand(o.Port, ds.DesiredState, 0)) + h.Unlock() + } + } else { // Sensor caused the change, leave it alone + log.Printf("Sensor state changed %s port %d", dev, o.Port) + ds.DesiredState = o.OutputState + } + + ds.CurrentState = o.OutputState + ds.Unlock() // Don't hold the lock the entire loop + } + + return nil } -type InformHandler struct { - Codec *Codec - StateTree *StateTree +func (h *InformHandler) pop(dev string) *CommandMessage { + // TODO: Don't lock the whole handler + h.Lock() + defer h.Unlock() + + q, ok := h.queue[dev] + if !ok { + log.Printf("No queue for %s", dev) + return nil + } + + e := q.Front() + if e != nil { + h.queue[dev].Remove(e) + cmd := e.Value.(*CommandMessage) + cmd.Freshen() + return cmd + } + + return nil } func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -58,35 +158,61 @@ func (h *InformHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if r.URL != nil && r.URL.Path != "/inform" { - http.Error(w, "404 Not Found", http.StatusNotFound) + msg, err := h.Codec.Unmarshal(r.Body) + if err != nil { + log.Printf("Unmarshal message: %s", err.Error()) + http.Error(w, "400 Bad Request", http.StatusBadRequest) return } - msg, err := h.Codec.Unmarshal(r.Body) + pl, err := msg.UnmarshalPayload() if err != nil { - http.Error(w, "Bad Request", http.StatusBadRequest) + log.Printf("Unmarshal payload: %s", err.Error()) + http.Error(w, "400 Bad Request", http.StatusBadRequest) return } - pl := NewInformWrapper() - copy(pl.MacAddr, msg.MacAddr) - pl.SetEncrypted(true) - - // TODO: compare current state to tree and update + dev := msg.FormattedMac() + ret := NewInformWrapperResponse(msg) + log.Printf("Inform from %s", dev) + + // Send a command until the queue is empty + if cmd := h.pop(dev); cmd != nil { + ret.UpdatePayload(cmd) + } else { + // Update internal state vs reality + if err := h.buildCommands(msg.FormattedMac(), pl); err != nil { + http.Error(w, "500 Server Error", 500) + return + } + + // If that generated a command send it + if cmd = h.pop(dev); cmd != nil { + ret.UpdatePayload(cmd) + } else { + // Otherwise noop + ret.UpdatePayload(NewNoop(10)) + } + } - res, err := h.Codec.Marshal(pl) + res, err := h.Codec.Marshal(ret) if err != nil { - http.Error(w, "Server Error", 500) + http.Error(w, "500 Server Error", 500) return } fmt.Fprintf(w, "%s", res) } -func NewServer(handler *InformHandler) *http.Server { +// Create a new server, returns the mux so users can add other methods (for +// example, if they want to share a process to build a console that also +// accepts informs) +func NewServer(handler *InformHandler) (*http.Server, *http.ServeMux) { + mux := http.NewServeMux() + mux.Handle("/inform", handler) + return &http.Server{ Addr: ":6080", - Handler: Log(handler), - } + Handler: Log(mux), + }, mux } diff --git a/inform/tx_messages.go b/inform/tx_messages.go index 0021ef3..8962cad 100644 --- a/inform/tx_messages.go +++ b/inform/tx_messages.go @@ -25,8 +25,15 @@ type CommandMessage struct { Voltage int `json:"volt,omitempty"` } -func NewOutputCommand(port, val, timer int) *CommandMessage { - return &CommandMessage{ +// Freshen timestamps +func (m *CommandMessage) Freshen() { + m.DateTime = time.Now().Format(time.RFC3339) + m.ServerTime = unixMicroPSTString() + m.Time = unixMicroPST() +} + +func NewOutputCommand(port int, val bool, timer int) *CommandMessage { + m := &CommandMessage{ Type: "cmd", Command: "mfi-output", DateTime: time.Now().Format(time.RFC3339), @@ -34,8 +41,15 @@ func NewOutputCommand(port, val, timer int) *CommandMessage { ServerTime: unixMicroPSTString(), Time: unixMicroPST(), Timer: timer, - Value: val, } + + if val { + m.Value = 1 + } else { + m.Value = 0 + } + + return m } type NoopMessage struct { -- cgit v1.2.3