summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Crute <mike@crute.us>2019-11-25 18:03:49 -0800
committerMike Crute <mike@crute.us>2019-11-25 18:03:49 -0800
commit5a5b04b795364c15be0a2917e42c0b37edbcd50b (patch)
tree5084d0eb79aa26c3bb16247919ede3ca7f4d4253
parentd41b6478c5b7309c43bf00b52e02472e6ec77dae (diff)
downloadmfi_homekit-5a5b04b795364c15be0a2917e42c0b37edbcd50b.tar.bz2
mfi_homekit-5a5b04b795364c15be0a2917e42c0b37edbcd50b.tar.xz
mfi_homekit-5a5b04b795364c15be0a2917e42c0b37edbcd50b.zip
Add hacky mqtt controller
-rw-r--r--mfi-mqtt.c486
1 files changed, 486 insertions, 0 deletions
diff --git a/mfi-mqtt.c b/mfi-mqtt.c
new file mode 100644
index 0000000..91b9212
--- /dev/null
+++ b/mfi-mqtt.c
@@ -0,0 +1,486 @@
1/*
2/home/mcrute/tmp/buildroot-2019.02.7/output/host/bin/mips-buildroot-linux-uclibc-gcc \
3 -o mfi-mqtt ~/test.c -Wall \
4 -I/home/mcrute/tmp/buildroot-2019.02.7/output/host/mips-buildroot-linux-uclibc/sysroot/usr/include/json-c \
5 -lpthread -lmosquitto -ljson-c -static
6
7scp -o KexAlgorithms=+diffie-hellman-group1-sha1 mfi-mqtt admin@192.168.2.10:
8*/
9
10#include <assert.h>
11#include <errno.h>
12#include <stdio.h>
13#include <stdlib.h>
14#include <string.h>
15#include <time.h>
16#include <unistd.h>
17#include <signal.h>
18#include <pthread.h>
19
20#include <net/if.h>
21#include <sys/socket.h>
22#include <sys/ioctl.h>
23#include <arpa/inet.h>
24
25#include <json.h>
26#include <mosquitto.h>
27
28static volatile sig_atomic_t closing_time = 0;
29struct mosquitto *mosq = NULL;
30
31void get_primary_ip_address(char ip_address[15]);
32
33/* ================================== REPORTING ========================================= */
34typedef struct power_statistics {
35 int relay_num;
36 char *engaged;
37 float active_power;
38 float energy_sum;
39 float current_rms;
40 float voltage_rms;
41 float power_factor;
42} power_statistics;
43
44power_statistics * init_power_statistics(int relay_num) {
45 power_statistics *stats = malloc(sizeof(power_statistics));
46 stats->relay_num = relay_num;
47 stats->engaged = calloc(strlen("false") + 1, sizeof(char));
48 return stats;
49}
50
51void free_power_statistics(power_statistics *stats) {
52 free(stats->engaged);
53 free(stats);
54}
55
56bool power_statistics_is_engaged(power_statistics *stats) {
57 return strcmp(stats->engaged, "on") == 0;
58}
59
60int power_statistics_load_from_file(power_statistics *stats) {
61 FILE *fp;
62 char filename[12];
63
64 snprintf(filename, 12, "/dev/power%d", stats->relay_num);
65
66 fp = fopen(filename, "r");
67 if (!fp) {
68 return 1;
69 }
70
71 if (fscanf(fp, "%s %f\n %f\n %f\n %f\n %f\n",
72 stats->engaged,
73 &stats->active_power,
74 &stats->energy_sum,
75 &stats->current_rms,
76 &stats->voltage_rms,
77 &stats->power_factor) != 6) {
78 fclose(fp);
79 return 1;
80 }
81
82 fclose(fp);
83 return 0;
84}
85
86int get_output_count() {
87 return access("/dev/power8", F_OK) != -1 ? 8 : 3;
88}
89
90int get_stats_all_outputs(power_statistics ***out_stats) {
91 int i;
92 int output_count;
93 power_statistics *stat;
94 power_statistics **stats;
95
96 output_count = get_output_count();
97 stats = malloc(sizeof(power_statistics *) * (output_count + 1));
98
99 for (i = output_count; i > 0; i--) {
100 stat = init_power_statistics(i);
101
102 if (power_statistics_load_from_file(stat)) {
103 continue;
104 }
105
106 stats[i] = stat;
107 }
108
109 *out_stats = stats;
110
111 return output_count;
112}
113
114json_object * format_power_satistics_output_json(power_statistics *stats) {
115 json_object *top_object;
116
117 top_object = json_object_new_object();
118 json_object_object_add(top_object, "output", json_object_new_int(stats->relay_num));
119 json_object_object_add(top_object, "engaged", json_object_new_boolean(power_statistics_is_engaged(stats)));
120 json_object_object_add(top_object, "active_power", json_object_new_double(stats->active_power));
121 json_object_object_add(top_object, "energy_sum", json_object_new_double(stats->energy_sum));
122 json_object_object_add(top_object, "current_rms", json_object_new_double(stats->current_rms));
123 json_object_object_add(top_object, "voltage_rms", json_object_new_double(stats->voltage_rms));
124 json_object_object_add(top_object, "power_factor", json_object_new_double(stats->power_factor));
125
126 return top_object;
127}
128
129char * format_report_all_outputs(power_statistics **stats, int report_count) {
130 int i;
131 char *output;
132 char hostname[256];
133 char ip_address[15];
134 const char *tmp;
135 json_object *top_object, *report_array;
136
137 memset(hostname, 0, sizeof(hostname));
138 gethostname(hostname, 255);
139
140 memset(ip_address, 0, sizeof(ip_address));
141 get_primary_ip_address(ip_address);
142
143 report_array = json_object_new_array();
144 top_object = json_object_new_object();
145 json_object_object_add(top_object, "hostname", json_object_new_string(hostname));
146 json_object_object_add(top_object, "ip_address", json_object_new_string(ip_address));
147 json_object_object_add(top_object, "reports", report_array);
148
149 for (i = report_count; i > 0; i--) {
150 json_object_array_add(report_array, format_power_satistics_output_json(stats[i]));
151 free_power_statistics(stats[i]);
152 }
153
154 tmp = json_object_to_json_string_ext(top_object, JSON_C_TO_STRING_PLAIN);
155 output = strdup(tmp);
156 json_object_put(top_object);
157
158 return output;
159}
160/* ================================== REPORTING ========================================= */
161typedef struct control_message {
162 int output;
163 int state;
164} control_message;
165
166control_message * parse_control_message(const struct mosquitto_message *message) {
167 struct json_tokener *tok;
168 enum json_tokener_error jerr;
169 control_message *out_message = NULL;
170 json_object *msg, *state_key, *output_key;
171
172 tok = json_tokener_new();
173 msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen);
174 jerr = json_tokener_get_error(tok);
175 if (jerr != json_tokener_success) {
176 fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr));
177 goto cleanup;
178 }
179
180 output_key = json_object_object_get(msg, "output");
181 if (!output_key) {
182 fprintf(stderr, "Invalid message format: no output key\n");
183 goto cleanup;
184 }
185
186 state_key = json_object_object_get(msg, "state");
187 if (!state_key) {
188 fprintf(stderr, "Invalid message format: no state key\n");
189 goto cleanup;
190 }
191
192 out_message = calloc(sizeof(control_message), 1);
193 out_message->state = json_object_get_int(state_key);
194 out_message->output = json_object_get_int(output_key);
195 // Clamp the value to on or off
196 out_message->state = out_message->state > 0 ? 1 : 0;
197
198cleanup:
199 if (msg) json_object_put(msg);
200 json_tokener_free(tok);
201
202 return out_message;
203}
204
205void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
206{
207 FILE *f;
208 char *basename;
209 int output_count;
210 char filename[255];
211 control_message *msg;
212
213 basename = "/proc/power/relay";
214 output_count = get_output_count();
215
216 msg = parse_control_message(message);
217 if (!msg) {
218 return;
219 }
220
221 if (msg->output > output_count || msg->output <= 0) {
222 fprintf(stderr, "Invalid output number: %i\n", msg->output);
223 goto cleanup;
224 }
225
226 fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state);
227
228 memset(filename, 0, sizeof(filename));
229 snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output);
230
231 f = fopen(filename, "w");
232 if (!f) {
233 fprintf(stderr, "Failed to open relay file %s\n", filename);
234 goto cleanup;
235 }
236
237 fprintf(f, "%i\n", msg->state);
238 fclose(f);
239
240cleanup:
241 free(msg);
242}
243
244void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags)
245{
246 char hostname[255];
247 char *topic;
248 char *prefix;
249
250 memset(hostname, 0, sizeof(hostname));
251 gethostname(hostname, 255);
252
253 prefix = "/mfi/devices";
254 topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2);
255 sprintf(topic, "%s/%s", prefix, hostname);
256
257 fprintf(stderr, "Subscribed to topic %s\n", topic);
258
259 if (!result) {
260 mosquitto_subscribe(mosq, NULL, topic, 0);
261 } else {
262 fprintf(stderr, "%s\n", mosquitto_connack_string(result));
263 mosquitto_disconnect(mosq);
264 }
265}
266
267void get_primary_ip_address(char ip_address[15])
268{
269 int fd;
270 struct ifreq ifr;
271
272 fd = socket(AF_INET, SOCK_DGRAM, 0);
273 memcpy(ifr.ifr_name, "ath0", IFNAMSIZ-1);
274 ioctl(fd, SIOCGIFADDR, &ifr);
275 close(fd);
276
277 strncpy(ip_address, inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr), 15);
278}
279
280int client_id_generate(char **output)
281{
282 int len;
283 char hostname[256];
284
285 *output = calloc(sizeof(char), MOSQ_MQTT_ID_MAX_LENGTH);
286 if (!*output) {
287 fprintf(stderr, "Error: Out of memory. %s\n", strerror(errno));
288 mosquitto_lib_cleanup();
289 return 1;
290 }
291
292 memset(hostname, 0, sizeof(hostname));
293 gethostname(hostname, 255);
294
295 /* Clamp length to MQTT maximum client ID length */
296 len = strlen("mfi|-") + 6 + strlen(hostname);
297 if (len > MOSQ_MQTT_ID_MAX_LENGTH - 1) {
298 len = MOSQ_MQTT_ID_MAX_LENGTH - 1;
299 }
300
301 snprintf(*output, len, "mfi|%d-%s", getpid(), hostname);
302
303 return MOSQ_ERR_SUCCESS;
304}
305
306//void signal_handler(int signo, siginfo_t *info, void *context) {
307static void signal_handler(int signo) {
308 closing_time = 1;
309}
310
311void set_signal_handler() {
312 struct sigaction action;
313
314 action.sa_handler = signal_handler;
315 action.sa_flags = SA_SIGINFO | SA_RESTART;
316 //action.sa_sigaction = signal_handler;
317
318 //sigfillset(&action.sa_mask);
319
320 signal(SIGINT, SIG_IGN);
321
322 if (sigaction(SIGINT, &action, NULL) == -1) {
323 perror("Error connecting signal");
324 exit(-1);
325 }
326}
327
328void mask_sig() {
329 sigset_t mask;
330 sigemptyset(&mask);
331 sigaddset(&mask, SIGINT);
332 pthread_sigmask(SIG_BLOCK, &mask, NULL);
333}
334
335void * control_thread(struct mosquitto *mosq) {
336 mask_sig();
337 mosquitto_loop_forever(mosq, 1000*86400, 1);
338 return NULL;
339}
340
341void * pinger_thread(struct mosquitto *mosq) {
342 power_statistics **stats;
343 int report_count;
344 char *output2;
345
346 mask_sig();
347
348 do {
349 if (closing_time) {
350 break;
351 }
352
353 report_count = get_stats_all_outputs(&stats);
354 output2 = format_report_all_outputs(stats, report_count);
355
356 mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false);
357
358 free(output2);
359 free(stats);
360
361 sleep(2);
362 } while(true);
363
364 return NULL;
365}
366
367// Stop all the blinking!
368void * light_management_thread(void *arg) {
369 FILE *light_fp;
370 FILE *freq_fp;
371
372 do {
373 if (closing_time) {
374 break;
375 }
376
377 light_fp = fopen("/proc/led/status", "w");
378 if (light_fp) {
379 fprintf(light_fp, "1\n");
380 fclose(light_fp);
381 }
382
383 freq_fp = fopen("/proc/led/freq", "w");
384 if (freq_fp) {
385 fprintf(freq_fp, "0\n");
386 fclose(freq_fp);
387 }
388
389 sleep(1);
390 } while(true);
391
392 return NULL;
393}
394
395// Doing this the "C way" is a real pain in the ass, just shell out and forget
396// about it
397void cleanup_crap_processes() {
398 // Otherwise init will continue to respawn them
399 system("sed -i "
400 "-e '/ubnt-websockets/s/^/#/' "
401 "-e '/telnetd/s/^/#/' "
402 "-e '/mca[-d]/s/^/#/' "
403 "-e '/lighttpd/s/^/#/' "
404 "/etc/inittab");
405
406 system("kill -HUP 1");
407
408 // Most of these kill cleanly but a few are stubborn so don't ask, tell.
409 system("pkill -9 ubnt-websockets");
410 system("pkill -9 lighttpd");
411 system("pkill upnpd");
412 system("pkill telnetd");
413 system("pkill mca-monitor");
414 system("pkill mcad");
415 system("pkill avahi-daemon");
416}
417
418int main(int argc, char *argv[])
419{
420 int rc;
421 char *id;
422 pthread_t pinger_thread_h, control_thread_h, light_management_thread_h;
423
424 cleanup_crap_processes();
425
426 set_signal_handler();
427 mosquitto_lib_init();
428
429 if (client_id_generate(&id)) {
430 return 1;
431 }
432
433 mosq = mosquitto_new(id, true, NULL);
434 if (!mosq) {
435 switch (errno) {
436 case ENOMEM:
437 fprintf(stderr, "Error: Out of memory.\n");
438 break;
439 case EINVAL:
440 fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
441 break;
442 }
443 mosquitto_lib_cleanup();
444 return 1;
445 }
446
447 int protocol_version = MQTT_PROTOCOL_V311;
448
449 mosquitto_max_inflight_messages_set(mosq, 20);
450 mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &protocol_version);
451 mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback);
452 mosquitto_message_callback_set(mosq, my_message_callback);
453
454 rc = mosquitto_connect(mosq, "172.16.0.191", 1883, 60);
455 if (rc > 0) {
456 if (rc == MOSQ_ERR_ERRNO) {
457 fprintf(stderr, "Error: %s\n", strerror(errno));
458 } else {
459 fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc));
460 }
461 mosquitto_lib_cleanup();
462 return 1;
463 }
464 fprintf(stderr, "Connected to broker\n");
465
466 pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq);
467 pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq);
468 pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL);
469
470 do {
471 sleep(2);
472 if (closing_time) {
473 fprintf(stderr, "Shutting down\n");
474
475 mosquitto_disconnect(mosq);
476 pthread_join(pinger_thread_h, NULL);
477 pthread_join(control_thread_h, NULL);
478 break;
479 }
480 } while(true);
481
482 mosquitto_destroy(mosq);
483 mosquitto_lib_cleanup();
484
485 return 0;
486}