summaryrefslogtreecommitdiff
path: root/mqtt-controller/mqtt.go
blob: 0b2687c7b9bfe2858ddaaeeb29f42cdf4b6cf917 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main

import (
	"encoding/json"
	"log"

	"github.com/eclipse/paho.mqtt.golang"
)

type MessageHandler func(topic string, payload []byte)

func mqttSync(t mqtt.Token) error {
	if t.Wait() && t.Error() != nil {
		return t.Error()
	}
	return nil
}

type MQTTBroker struct {
	Devices  chan Device
	messages chan mqtt.Message
	done     chan bool

	client mqtt.Client
}

func NewMQTTBroker(broker string) (*MQTTBroker, error) {
	bufferSize := 100

	b := &MQTTBroker{
		Devices:  make(chan Device, bufferSize),
		messages: make(chan mqtt.Message, bufferSize),
		done:     make(chan bool),
	}

	b.client = mqtt.NewClient(
		mqtt.NewClientOptions().
			AddBroker(broker).
			SetDefaultPublishHandler(b.handleMessage).
			SetClientID("homekit-controller"))

	if err := mqttSync(b.client.Connect()); err != nil {
		return nil, err
	}

	return b, nil
}

func (b *MQTTBroker) Subscribe(topic string, h MessageHandler) error {
	var handler mqtt.MessageHandler
	if h != nil {
		handler = func(c mqtt.Client, m mqtt.Message) {
			h(m.Topic(), m.Payload())
		}
	}

	if err := mqttSync(b.client.Subscribe(topic, 0, handler)); err != nil {
		return err
	}

	return nil
}

func (b *MQTTBroker) Publish(topic string, msg interface{}) error {
	if err := mqttSync(b.client.Publish(topic, 0, false, msg)); err != nil {
		return err
	}
	return nil
}

// Runs synchronously in the mqtt client comms handler goroutine
func (b *MQTTBroker) handleMessage(c mqtt.Client, m mqtt.Message) {
	switch m.Topic() {
	case "shellies/announce", "ubiquiti/announce":
		a := DeviceAnnounce{}
		if err := json.Unmarshal(m.Payload(), &a); err != nil {
			log.Printf("Unable to unmarshal device announce JSON: %e", err)
			return
		}

		d, err := NewDevice(b, a)
		if err != nil {
			log.Printf("Error creating device driver: %e", err)
			return
		}

		b.Devices <- d
	}
}

func (b *MQTTBroker) Shutdown() {
	b.client.Disconnect(250)
}