summaryrefslogtreecommitdiff
path: root/mqtt-controller/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'mqtt-controller/main.go')
-rw-r--r--mqtt-controller/main.go146
1 files changed, 146 insertions, 0 deletions
diff --git a/mqtt-controller/main.go b/mqtt-controller/main.go
new file mode 100644
index 0000000..29d5d28
--- /dev/null
+++ b/mqtt-controller/main.go
@@ -0,0 +1,146 @@
1package main
2
3import (
4 "encoding/json"
5 "log"
6 "os"
7 "os/signal"
8 "sync"
9 "syscall"
10 "time"
11
12 "github.com/brutella/hc"
13 "github.com/brutella/hc/accessory"
14 "github.com/brutella/hc/util"
15)
16
17type AppConfig struct {
18 Broker string
19 BridgeName string
20 Pin string
21}
22
23func loadAppConfig(s util.Storage) (*AppConfig, error) {
24 d, err := s.Get("bridgeConfig")
25 if err != nil {
26 return nil, err
27 }
28
29 c := &AppConfig{}
30 if err = json.Unmarshal(d, c); err != nil {
31 return nil, err
32 }
33
34 return c, nil
35}
36
37func main() {
38 wg := &sync.WaitGroup{}
39 done := make(chan interface{})
40
41 c := make(chan os.Signal)
42 signal.Notify(c, os.Interrupt)
43 signal.Notify(c, os.Kill)
44 signal.Notify(c, syscall.SIGTERM)
45
46 storage, err := util.NewFileStorage("db")
47 if err != nil {
48 panic(err)
49 }
50
51 cfg, err := loadAppConfig(storage)
52 if err != nil {
53 panic(err)
54 }
55
56 broker, err := NewMQTTBroker(cfg.Broker)
57 if err != nil {
58 panic(err)
59 }
60
61 if err := broker.Subscribe("shellies/announce", nil); err != nil {
62 panic(err)
63 }
64
65 if err := broker.Publish("shellies/command", "announce"); err != nil {
66 panic(err)
67 }
68
69 ac := make(chan *UnifiedSwitch, 10)
70
71 go func() {
72 wg.Add(1)
73 defer wg.Done()
74
75 log.Println("Starting homekit thread")
76
77 reg := []*accessory.Accessory{}
78
79 // Collect devices until the discovery process quiesces, then start the
80 // transport
81 quiescence := 10 * time.Second
82 t := time.NewTimer(quiescence)
83
84 C:
85 for {
86 select {
87 case a := <-ac:
88 log.Println("Homekit: new device")
89 reg = append(reg, a.HomeKitSwitch.Accessory)
90 // TODO: Extend timer deadline
91 case <-t.C:
92 log.Println("Homekit: timeout over")
93 t.Stop()
94 break C
95 case <-done:
96 log.Println("Homekit: cancelled")
97 return
98 }
99 }
100
101 log.Println("MQTT quiesces, starting HomeKit")
102
103 br := accessory.NewBridge(accessory.Info{Name: cfg.BridgeName})
104
105 transport, err := hc.NewIPTransport(hc.Config{Pin: cfg.Pin}, br.Accessory, reg...)
106 if err != nil {
107 log.Fatalf("%s", err)
108 }
109 go transport.Start()
110 log.Printf("HomeKit transport started")
111
112 select {
113 case <-done:
114 log.Printf("Shutting down transport")
115 if transport != nil {
116 <-transport.Stop()
117 }
118 }
119 log.Printf("Transport shut down")
120 }()
121
122 for {
123 select {
124 case d := <-broker.Devices:
125 if d == nil {
126 continue
127 }
128
129 us, err := NewUnifiedSwitch(broker, d)
130 if err != nil {
131 log.Printf("Error connecting device: %e", err)
132 continue
133 }
134
135 go us.MessageLoop(done, wg)
136
137 ac <- us
138 log.Printf("New Device: %s\n", d.Identity())
139 case <-c:
140 close(done)
141 broker.Shutdown()
142 wg.Wait()
143 return
144 }
145 }
146}