1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
package main
import (
"encoding/json"
"log"
"github.com/eclipse/paho.mqtt.golang"
)
type MessageHandler func(topic string, payload []byte)
func mqttSync(t mqtt.Token) error {
if t.Wait() && t.Error() != nil {
return t.Error()
}
return nil
}
type MQTTBroker struct {
Devices chan Device
messages chan mqtt.Message
done chan bool
client mqtt.Client
}
func NewMQTTBroker(broker string) (*MQTTBroker, error) {
bufferSize := 100
b := &MQTTBroker{
Devices: make(chan Device, bufferSize),
messages: make(chan mqtt.Message, bufferSize),
done: make(chan bool),
}
b.client = mqtt.NewClient(
mqtt.NewClientOptions().
AddBroker(broker).
SetDefaultPublishHandler(b.handleMessage).
SetClientID("homekit-controller"))
if err := mqttSync(b.client.Connect()); err != nil {
return nil, err
}
return b, nil
}
func (b *MQTTBroker) Subscribe(topic string, h MessageHandler) error {
var handler mqtt.MessageHandler
if h != nil {
handler = func(c mqtt.Client, m mqtt.Message) {
h(m.Topic(), m.Payload())
}
}
if err := mqttSync(b.client.Subscribe(topic, 0, handler)); err != nil {
return err
}
return nil
}
func (b *MQTTBroker) Publish(topic string, msg interface{}) error {
if err := mqttSync(b.client.Publish(topic, 0, false, msg)); err != nil {
return err
}
return nil
}
// Runs synchronously in the mqtt client comms handler goroutine
func (b *MQTTBroker) handleMessage(c mqtt.Client, m mqtt.Message) {
switch m.Topic() {
case "shellies/announce", "ubiquiti/announce":
a := DeviceAnnounce{}
if err := json.Unmarshal(m.Payload(), &a); err != nil {
log.Printf("Unable to unmarshal device announce JSON: %e", err)
return
}
d, err := NewDevice(b, a)
if err != nil {
log.Printf("Error creating device driver: %e", err)
return
}
b.Devices <- d
}
}
func (b *MQTTBroker) Shutdown() {
b.client.Disconnect(250)
}
|