diff options
Diffstat (limited to 'mqtt-controller/mqtt.go')
-rw-r--r-- | mqtt-controller/mqtt.go | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/mqtt-controller/mqtt.go b/mqtt-controller/mqtt.go new file mode 100644 index 0000000..0b2687c --- /dev/null +++ b/mqtt-controller/mqtt.go | |||
@@ -0,0 +1,93 @@ | |||
1 | package main | ||
2 | |||
3 | import ( | ||
4 | "encoding/json" | ||
5 | "log" | ||
6 | |||
7 | "github.com/eclipse/paho.mqtt.golang" | ||
8 | ) | ||
9 | |||
10 | type MessageHandler func(topic string, payload []byte) | ||
11 | |||
12 | func mqttSync(t mqtt.Token) error { | ||
13 | if t.Wait() && t.Error() != nil { | ||
14 | return t.Error() | ||
15 | } | ||
16 | return nil | ||
17 | } | ||
18 | |||
19 | type MQTTBroker struct { | ||
20 | Devices chan Device | ||
21 | messages chan mqtt.Message | ||
22 | done chan bool | ||
23 | |||
24 | client mqtt.Client | ||
25 | } | ||
26 | |||
27 | func NewMQTTBroker(broker string) (*MQTTBroker, error) { | ||
28 | bufferSize := 100 | ||
29 | |||
30 | b := &MQTTBroker{ | ||
31 | Devices: make(chan Device, bufferSize), | ||
32 | messages: make(chan mqtt.Message, bufferSize), | ||
33 | done: make(chan bool), | ||
34 | } | ||
35 | |||
36 | b.client = mqtt.NewClient( | ||
37 | mqtt.NewClientOptions(). | ||
38 | AddBroker(broker). | ||
39 | SetDefaultPublishHandler(b.handleMessage). | ||
40 | SetClientID("homekit-controller")) | ||
41 | |||
42 | if err := mqttSync(b.client.Connect()); err != nil { | ||
43 | return nil, err | ||
44 | } | ||
45 | |||
46 | return b, nil | ||
47 | } | ||
48 | |||
49 | func (b *MQTTBroker) Subscribe(topic string, h MessageHandler) error { | ||
50 | var handler mqtt.MessageHandler | ||
51 | if h != nil { | ||
52 | handler = func(c mqtt.Client, m mqtt.Message) { | ||
53 | h(m.Topic(), m.Payload()) | ||
54 | } | ||
55 | } | ||
56 | |||
57 | if err := mqttSync(b.client.Subscribe(topic, 0, handler)); err != nil { | ||
58 | return err | ||
59 | } | ||
60 | |||
61 | return nil | ||
62 | } | ||
63 | |||
64 | func (b *MQTTBroker) Publish(topic string, msg interface{}) error { | ||
65 | if err := mqttSync(b.client.Publish(topic, 0, false, msg)); err != nil { | ||
66 | return err | ||
67 | } | ||
68 | return nil | ||
69 | } | ||
70 | |||
71 | // Runs synchronously in the mqtt client comms handler goroutine | ||
72 | func (b *MQTTBroker) handleMessage(c mqtt.Client, m mqtt.Message) { | ||
73 | switch m.Topic() { | ||
74 | case "shellies/announce", "ubiquiti/announce": | ||
75 | a := DeviceAnnounce{} | ||
76 | if err := json.Unmarshal(m.Payload(), &a); err != nil { | ||
77 | log.Printf("Unable to unmarshal device announce JSON: %e", err) | ||
78 | return | ||
79 | } | ||
80 | |||
81 | d, err := NewDevice(b, a) | ||
82 | if err != nil { | ||
83 | log.Printf("Error creating device driver: %e", err) | ||
84 | return | ||
85 | } | ||
86 | |||
87 | b.Devices <- d | ||
88 | } | ||
89 | } | ||
90 | |||
91 | func (b *MQTTBroker) Shutdown() { | ||
92 | b.client.Disconnect(250) | ||
93 | } | ||