summaryrefslogtreecommitdiff
path: root/mfi-mqtt/mfi-mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'mfi-mqtt/mfi-mqtt.c')
-rw-r--r--mfi-mqtt/mfi-mqtt.c269
1 files changed, 269 insertions, 0 deletions
diff --git a/mfi-mqtt/mfi-mqtt.c b/mfi-mqtt/mfi-mqtt.c
new file mode 100644
index 0000000..25527bc
--- /dev/null
+++ b/mfi-mqtt/mfi-mqtt.c
@@ -0,0 +1,269 @@
1#include <errno.h>
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <time.h>
6#include <unistd.h>
7#include <signal.h>
8#include <pthread.h>
9
10#include <json-c/json.h>
11#include <mosquitto.h>
12
13#include "mosquitto.h"
14#include "reporting.h"
15
16static volatile sig_atomic_t closing_time = 0;
17
18typedef struct control_message {
19 int output;
20 int state;
21} control_message;
22
23control_message * parse_control_message(const struct mosquitto_message *message) {
24 struct json_tokener *tok;
25 enum json_tokener_error jerr;
26 control_message *out_message = NULL;
27 json_object *msg, *state_key, *output_key;
28
29 tok = json_tokener_new();
30 msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen);
31 jerr = json_tokener_get_error(tok);
32 if (jerr != json_tokener_success) {
33 fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr));
34 goto cleanup;
35 }
36
37 output_key = json_object_object_get(msg, "output");
38 if (!output_key) {
39 fprintf(stderr, "Invalid message format: no output key\n");
40 goto cleanup;
41 }
42
43 state_key = json_object_object_get(msg, "state");
44 if (!state_key) {
45 fprintf(stderr, "Invalid message format: no state key\n");
46 goto cleanup;
47 }
48
49 out_message = calloc(sizeof(control_message), 1);
50 out_message->state = json_object_get_int(state_key);
51 out_message->output = json_object_get_int(output_key);
52 // Clamp the value to on or off
53 out_message->state = out_message->state > 0 ? 1 : 0;
54
55cleanup:
56 if (msg) json_object_put(msg);
57 json_tokener_free(tok);
58
59 return out_message;
60}
61
62void set_relay(control_message *msg) {
63 FILE *f;
64 char *basename;
65 char filename[255];
66
67 basename = "/proc/power/relay";
68
69 memset(filename, 0, sizeof(filename));
70 snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output);
71
72 f = fopen(filename, "w");
73 if (!f) {
74 fprintf(stderr, "Failed to open relay file %s\n", filename);
75 return;
76 }
77
78 fprintf(f, "%i\n", msg->state);
79 fclose(f);
80}
81
82void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
83{
84 control_message *msg;
85
86 msg = parse_control_message(message);
87 if (!msg) {
88 return;
89 }
90
91 if (msg->output > get_output_count() || msg->output <= 0) {
92 fprintf(stderr, "Invalid output number: %i\n", msg->output);
93 goto cleanup;
94 }
95
96 fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state);
97 set_relay(msg);
98
99cleanup:
100 free(msg);
101}
102
103void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags)
104{
105 char *topic;
106 char *prefix;
107 char hostname[255];
108
109 memset(hostname, 0, sizeof(hostname));
110 gethostname(hostname, 255);
111
112 prefix = "/mfi/devices";
113 topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2);
114 sprintf(topic, "%s/%s", prefix, hostname);
115
116 fprintf(stderr, "Subscribed to topic %s\n", topic);
117
118 if (!result) {
119 mosquitto_subscribe(mosq, NULL, topic, 0);
120 } else {
121 fprintf(stderr, "%s\n", mosquitto_connack_string(result));
122 mosquitto_disconnect(mosq);
123 }
124}
125
126static void signal_handler(int signo) {
127 closing_time = 1;
128}
129
130void set_signal_handler() {
131 struct sigaction action;
132
133 action.sa_handler = signal_handler;
134 action.sa_flags = SA_SIGINFO | SA_RESTART;
135
136 if (sigaction(SIGINT, &action, NULL) == -1) {
137 perror("Error connecting signal");
138 exit(-1);
139 }
140}
141
142void mask_sig() {
143 sigset_t mask;
144 sigemptyset(&mask);
145 sigaddset(&mask, SIGINT);
146 pthread_sigmask(SIG_BLOCK, &mask, NULL);
147}
148
149void * control_thread(struct mosquitto *mosq) {
150 mask_sig();
151 mosquitto_loop_forever(mosq, 1000*86400, 1);
152 return NULL;
153}
154
155void * pinger_thread(struct mosquitto *mosq) {
156 power_statistics **stats;
157 int report_count;
158 char *output2;
159
160 mask_sig();
161
162 do {
163 if (closing_time) {
164 break;
165 }
166
167 report_count = get_stats_all_outputs(&stats);
168 output2 = format_report_all_outputs(stats, report_count);
169
170 mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false);
171
172 free(output2);
173 free(stats);
174
175 sleep(2);
176 } while(true);
177
178 return NULL;
179}
180
181// Stop all the blinking!
182void * light_management_thread(void *arg) {
183 FILE *light_fp;
184 FILE *freq_fp;
185
186 do {
187 if (closing_time) {
188 break;
189 }
190
191 light_fp = fopen("/proc/led/status", "w");
192 if (light_fp) {
193 fprintf(light_fp, "1\n");
194 fclose(light_fp);
195 }
196
197 freq_fp = fopen("/proc/led/freq", "w");
198 if (freq_fp) {
199 fprintf(freq_fp, "0\n");
200 fclose(freq_fp);
201 }
202
203 sleep(1);
204 } while(true);
205
206 return NULL;
207}
208
209// Doing this the "C way" is a real pain in the ass, just shell out and forget
210// about it
211void cleanup_crap_processes() {
212 // Otherwise init will continue to respawn them
213 system("sed -i "
214 "-e '/ubnt-websockets/s/^/#/' "
215 "-e '/telnetd/s/^/#/' "
216 "-e '/mca[-d]/s/^/#/' "
217 "-e '/lighttpd/s/^/#/' "
218 "/etc/inittab");
219
220 system("kill -HUP 1");
221
222 // Most of these kill cleanly but a few are stubborn so don't ask, tell.
223 system("pkill -9 ubnt-websockets");
224 system("pkill -9 lighttpd");
225 system("pkill upnpd");
226 system("pkill telnetd");
227 system("pkill mca-monitor");
228 system("pkill mcad");
229 system("pkill avahi-daemon");
230}
231
232int main(int argc, char *argv[])
233{
234 struct mosquitto *mosq;
235 pthread_t pinger_thread_h, control_thread_h, light_management_thread_h;
236
237 cleanup_crap_processes();
238
239 set_signal_handler();
240
241 mosq = connect_to_broker("172.16.0.191", 1883);
242 if (!mosq) {
243 return 1;
244 }
245 fprintf(stderr, "Connected to broker\n");
246
247 mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback);
248 mosquitto_message_callback_set(mosq, my_message_callback);
249
250 pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq);
251 pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq);
252 pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL);
253
254 do {
255 sleep(2);
256 if (closing_time) {
257 fprintf(stderr, "Shutting down\n");
258
259 mosquitto_disconnect(mosq);
260 pthread_join(pinger_thread_h, NULL);
261 pthread_join(control_thread_h, NULL);
262 break;
263 }
264 } while(true);
265
266 shutdown_broker(mosq);
267
268 return 0;
269}