summaryrefslogtreecommitdiff
path: root/mqtt-controller/mqtt.go
diff options
context:
space:
mode:
Diffstat (limited to 'mqtt-controller/mqtt.go')
-rw-r--r--mqtt-controller/mqtt.go93
1 files changed, 93 insertions, 0 deletions
diff --git a/mqtt-controller/mqtt.go b/mqtt-controller/mqtt.go
new file mode 100644
index 0000000..0b2687c
--- /dev/null
+++ b/mqtt-controller/mqtt.go
@@ -0,0 +1,93 @@
1package main
2
3import (
4 "encoding/json"
5 "log"
6
7 "github.com/eclipse/paho.mqtt.golang"
8)
9
10type MessageHandler func(topic string, payload []byte)
11
12func mqttSync(t mqtt.Token) error {
13 if t.Wait() && t.Error() != nil {
14 return t.Error()
15 }
16 return nil
17}
18
19type MQTTBroker struct {
20 Devices chan Device
21 messages chan mqtt.Message
22 done chan bool
23
24 client mqtt.Client
25}
26
27func NewMQTTBroker(broker string) (*MQTTBroker, error) {
28 bufferSize := 100
29
30 b := &MQTTBroker{
31 Devices: make(chan Device, bufferSize),
32 messages: make(chan mqtt.Message, bufferSize),
33 done: make(chan bool),
34 }
35
36 b.client = mqtt.NewClient(
37 mqtt.NewClientOptions().
38 AddBroker(broker).
39 SetDefaultPublishHandler(b.handleMessage).
40 SetClientID("homekit-controller"))
41
42 if err := mqttSync(b.client.Connect()); err != nil {
43 return nil, err
44 }
45
46 return b, nil
47}
48
49func (b *MQTTBroker) Subscribe(topic string, h MessageHandler) error {
50 var handler mqtt.MessageHandler
51 if h != nil {
52 handler = func(c mqtt.Client, m mqtt.Message) {
53 h(m.Topic(), m.Payload())
54 }
55 }
56
57 if err := mqttSync(b.client.Subscribe(topic, 0, handler)); err != nil {
58 return err
59 }
60
61 return nil
62}
63
64func (b *MQTTBroker) Publish(topic string, msg interface{}) error {
65 if err := mqttSync(b.client.Publish(topic, 0, false, msg)); err != nil {
66 return err
67 }
68 return nil
69}
70
71// Runs synchronously in the mqtt client comms handler goroutine
72func (b *MQTTBroker) handleMessage(c mqtt.Client, m mqtt.Message) {
73 switch m.Topic() {
74 case "shellies/announce", "ubiquiti/announce":
75 a := DeviceAnnounce{}
76 if err := json.Unmarshal(m.Payload(), &a); err != nil {
77 log.Printf("Unable to unmarshal device announce JSON: %e", err)
78 return
79 }
80
81 d, err := NewDevice(b, a)
82 if err != nil {
83 log.Printf("Error creating device driver: %e", err)
84 return
85 }
86
87 b.Devices <- d
88 }
89}
90
91func (b *MQTTBroker) Shutdown() {
92 b.client.Disconnect(250)
93}