summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Crute <mike@crute.us>2019-11-25 23:00:42 -0800
committerMike Crute <mike@crute.us>2019-11-25 23:00:42 -0800
commitef4f00622d48d6d0b4a703f5a39bf8c355244af1 (patch)
tree1049be90f8e31a68b89d47a2ca12c03ea1bd71de
parent6b46779198fbf89c6688e5e9ddf12503ea29dbcd (diff)
downloadmfi_homekit-ef4f00622d48d6d0b4a703f5a39bf8c355244af1.tar.bz2
mfi_homekit-ef4f00622d48d6d0b4a703f5a39bf8c355244af1.tar.xz
mfi_homekit-ef4f00622d48d6d0b4a703f5a39bf8c355244af1.zip
Split apart mqtt agent
-rw-r--r--Makefile5
-rw-r--r--mfi-mqtt.c493
-rw-r--r--mfi-mqtt/Makefile12
-rwxr-xr-xmfi-mqtt/mfi-mqtt-mipsbin0 -> 448448 bytes
-rw-r--r--mfi-mqtt/mfi-mqtt.c269
-rw-r--r--mfi-mqtt/mfi-mqtt.obin0 -> 10304 bytes
-rw-r--r--mfi-mqtt/mosquitto.c82
-rw-r--r--mfi-mqtt/mosquitto.h4
-rw-r--r--mfi-mqtt/mosquitto.obin0 -> 4036 bytes
-rw-r--r--mfi-mqtt/reporting.c143
-rw-r--r--mfi-mqtt/reporting.h14
-rw-r--r--mfi-mqtt/reporting.obin0 -> 6892 bytes
12 files changed, 524 insertions, 498 deletions
diff --git a/Makefile b/Makefile
index b667d15..778afeb 100644
--- a/Makefile
+++ b/Makefile
@@ -10,8 +10,3 @@ docker:
10 cd docker; \ 10 cd docker; \
11 docker build -t docker.crute.me/mfi_homekit:latest .; \ 11 docker build -t docker.crute.me/mfi_homekit:latest .; \
12 docker push docker.crute.me/mfi_homekit:latest 12 docker push docker.crute.me/mfi_homekit:latest
13
14mfi-mqtt-mips: mfi-mqtt.c
15 $(BUILDROOT)/output/host/bin/mips-buildroot-linux-uclibc-gcc \
16 -Wall -o $@ $< \
17 -lpthread -lmosquitto -ljson-c -static
diff --git a/mfi-mqtt.c b/mfi-mqtt.c
deleted file mode 100644
index 0155039..0000000
--- a/mfi-mqtt.c
+++ /dev/null
@@ -1,493 +0,0 @@
1#include <assert.h>
2#include <errno.h>
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <time.h>
7#include <unistd.h>
8#include <signal.h>
9#include <pthread.h>
10
11#include <net/if.h>
12#include <sys/socket.h>
13#include <sys/ioctl.h>
14#include <arpa/inet.h>
15
16#include <json-c/json.h>
17#include <mosquitto.h>
18
19static volatile sig_atomic_t closing_time = 0;
20
21void get_primary_ip_address(char ip_address[15]);
22
23/* ================================== REPORTING ========================================= */
24typedef struct power_statistics {
25 int relay_num;
26 char *engaged;
27 float active_power;
28 float energy_sum;
29 float current_rms;
30 float voltage_rms;
31 float power_factor;
32} power_statistics;
33
34power_statistics * init_power_statistics(int relay_num) {
35 power_statistics *stats = malloc(sizeof(power_statistics));
36 stats->relay_num = relay_num;
37 stats->engaged = calloc(strlen("false") + 1, sizeof(char));
38 return stats;
39}
40
41void free_power_statistics(power_statistics *stats) {
42 free(stats->engaged);
43 free(stats);
44}
45
46bool power_statistics_is_engaged(power_statistics *stats) {
47 return strcmp(stats->engaged, "on") == 0;
48}
49
50int power_statistics_load_from_file(power_statistics *stats) {
51 FILE *fp;
52 char filename[12];
53
54 snprintf(filename, 12, "/dev/power%d", stats->relay_num);
55
56 fp = fopen(filename, "r");
57 if (!fp) {
58 return 1;
59 }
60
61 if (fscanf(fp, "%s %f\n %f\n %f\n %f\n %f\n",
62 stats->engaged,
63 &stats->active_power,
64 &stats->energy_sum,
65 &stats->current_rms,
66 &stats->voltage_rms,
67 &stats->power_factor) != 6) {
68 fclose(fp);
69 return 1;
70 }
71
72 fclose(fp);
73 return 0;
74}
75
76int get_output_count() {
77 return access("/dev/power8", F_OK) != -1 ? 8 : 3;
78}
79
80int get_stats_all_outputs(power_statistics ***out_stats) {
81 int i;
82 int output_count;
83 power_statistics *stat;
84 power_statistics **stats;
85
86 output_count = get_output_count();
87 stats = malloc(sizeof(power_statistics *) * (output_count + 1));
88
89 for (i = output_count; i > 0; i--) {
90 stat = init_power_statistics(i);
91
92 if (power_statistics_load_from_file(stat)) {
93 continue;
94 }
95
96 stats[i] = stat;
97 }
98
99 *out_stats = stats;
100
101 return output_count;
102}
103
104json_object * format_power_satistics_output_json(power_statistics *stats) {
105 json_object *top_object;
106
107 top_object = json_object_new_object();
108 json_object_object_add(top_object, "output", json_object_new_int(stats->relay_num));
109 json_object_object_add(top_object, "engaged", json_object_new_boolean(power_statistics_is_engaged(stats)));
110 json_object_object_add(top_object, "active_power", json_object_new_double(stats->active_power));
111 json_object_object_add(top_object, "energy_sum", json_object_new_double(stats->energy_sum));
112 json_object_object_add(top_object, "current_rms", json_object_new_double(stats->current_rms));
113 json_object_object_add(top_object, "voltage_rms", json_object_new_double(stats->voltage_rms));
114 json_object_object_add(top_object, "power_factor", json_object_new_double(stats->power_factor));
115
116 return top_object;
117}
118
119char * format_report_all_outputs(power_statistics **stats, int report_count) {
120 int i;
121 char *output;
122 char hostname[256];
123 char ip_address[15];
124 const char *tmp;
125 json_object *top_object, *report_array;
126
127 memset(hostname, 0, sizeof(hostname));
128 gethostname(hostname, 255);
129
130 memset(ip_address, 0, sizeof(ip_address));
131 get_primary_ip_address(ip_address);
132
133 report_array = json_object_new_array();
134 top_object = json_object_new_object();
135 json_object_object_add(top_object, "hostname", json_object_new_string(hostname));
136 json_object_object_add(top_object, "ip_address", json_object_new_string(ip_address));
137 json_object_object_add(top_object, "reports", report_array);
138
139 for (i = report_count; i > 0; i--) {
140 json_object_array_add(report_array, format_power_satistics_output_json(stats[i]));
141 free_power_statistics(stats[i]);
142 }
143
144 tmp = json_object_to_json_string_ext(top_object, JSON_C_TO_STRING_PLAIN);
145 output = strdup(tmp);
146 json_object_put(top_object);
147
148 return output;
149}
150/* ================================== REPORTING ========================================= */
151typedef struct control_message {
152 int output;
153 int state;
154} control_message;
155
156control_message * parse_control_message(const struct mosquitto_message *message) {
157 struct json_tokener *tok;
158 enum json_tokener_error jerr;
159 control_message *out_message = NULL;
160 json_object *msg, *state_key, *output_key;
161
162 tok = json_tokener_new();
163 msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen);
164 jerr = json_tokener_get_error(tok);
165 if (jerr != json_tokener_success) {
166 fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr));
167 goto cleanup;
168 }
169
170 output_key = json_object_object_get(msg, "output");
171 if (!output_key) {
172 fprintf(stderr, "Invalid message format: no output key\n");
173 goto cleanup;
174 }
175
176 state_key = json_object_object_get(msg, "state");
177 if (!state_key) {
178 fprintf(stderr, "Invalid message format: no state key\n");
179 goto cleanup;
180 }
181
182 out_message = calloc(sizeof(control_message), 1);
183 out_message->state = json_object_get_int(state_key);
184 out_message->output = json_object_get_int(output_key);
185 // Clamp the value to on or off
186 out_message->state = out_message->state > 0 ? 1 : 0;
187
188cleanup:
189 if (msg) json_object_put(msg);
190 json_tokener_free(tok);
191
192 return out_message;
193}
194
195void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
196{
197 FILE *f;
198 char *basename;
199 int output_count;
200 char filename[255];
201 control_message *msg;
202
203 basename = "/proc/power/relay";
204 output_count = get_output_count();
205
206 msg = parse_control_message(message);
207 if (!msg) {
208 return;
209 }
210
211 if (msg->output > output_count || msg->output <= 0) {
212 fprintf(stderr, "Invalid output number: %i\n", msg->output);
213 goto cleanup;
214 }
215
216 fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state);
217
218 memset(filename, 0, sizeof(filename));
219 snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output);
220
221 f = fopen(filename, "w");
222 if (!f) {
223 fprintf(stderr, "Failed to open relay file %s\n", filename);
224 goto cleanup;
225 }
226
227 fprintf(f, "%i\n", msg->state);
228 fclose(f);
229
230cleanup:
231 free(msg);
232}
233
234void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags)
235{
236 char hostname[255];
237 char *topic;
238 char *prefix;
239
240 memset(hostname, 0, sizeof(hostname));
241 gethostname(hostname, 255);
242
243 prefix = "/mfi/devices";
244 topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2);
245 sprintf(topic, "%s/%s", prefix, hostname);
246
247 fprintf(stderr, "Subscribed to topic %s\n", topic);
248
249 if (!result) {
250 mosquitto_subscribe(mosq, NULL, topic, 0);
251 } else {
252 fprintf(stderr, "%s\n", mosquitto_connack_string(result));
253 mosquitto_disconnect(mosq);
254 }
255}
256
257void get_primary_ip_address(char ip_address[15])
258{
259 int fd;
260 struct ifreq ifr;
261
262 fd = socket(AF_INET, SOCK_DGRAM, 0);
263 memcpy(ifr.ifr_name, "ath0", IFNAMSIZ-1);
264 ioctl(fd, SIOCGIFADDR, &ifr);
265 close(fd);
266
267 strncpy(ip_address, inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr), 15);
268}
269
270int client_id_generate(char **output)
271{
272 int len;
273 char hostname[256];
274
275 *output = calloc(sizeof(char), MOSQ_MQTT_ID_MAX_LENGTH);
276 if (!*output) {
277 fprintf(stderr, "Error: Out of memory. %s\n", strerror(errno));
278 mosquitto_lib_cleanup();
279 return 1;
280 }
281
282 memset(hostname, 0, sizeof(hostname));
283 gethostname(hostname, 255);
284
285 /* Clamp length to MQTT maximum client ID length */
286 len = strlen("mfi|-") + 6 + strlen(hostname);
287 if (len > MOSQ_MQTT_ID_MAX_LENGTH - 1) {
288 len = MOSQ_MQTT_ID_MAX_LENGTH - 1;
289 }
290
291 snprintf(*output, len, "mfi|%d-%s", getpid(), hostname);
292
293 return MOSQ_ERR_SUCCESS;
294}
295
296//void signal_handler(int signo, siginfo_t *info, void *context) {
297static void signal_handler(int signo) {
298 closing_time = 1;
299}
300
301void set_signal_handler() {
302 struct sigaction action;
303
304 action.sa_handler = signal_handler;
305 action.sa_flags = SA_SIGINFO | SA_RESTART;
306 //action.sa_sigaction = signal_handler;
307
308 //sigfillset(&action.sa_mask);
309
310 signal(SIGINT, SIG_IGN);
311
312 if (sigaction(SIGINT, &action, NULL) == -1) {
313 perror("Error connecting signal");
314 exit(-1);
315 }
316}
317
318void mask_sig() {
319 sigset_t mask;
320 sigemptyset(&mask);
321 sigaddset(&mask, SIGINT);
322 pthread_sigmask(SIG_BLOCK, &mask, NULL);
323}
324
325void * control_thread(struct mosquitto *mosq) {
326 mask_sig();
327 mosquitto_loop_forever(mosq, 1000*86400, 1);
328 return NULL;
329}
330
331void * pinger_thread(struct mosquitto *mosq) {
332 power_statistics **stats;
333 int report_count;
334 char *output2;
335
336 mask_sig();
337
338 do {
339 if (closing_time) {
340 break;
341 }
342
343 report_count = get_stats_all_outputs(&stats);
344 output2 = format_report_all_outputs(stats, report_count);
345
346 mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false);
347
348 free(output2);
349 free(stats);
350
351 sleep(2);
352 } while(true);
353
354 return NULL;
355}
356
357// Stop all the blinking!
358void * light_management_thread(void *arg) {
359 FILE *light_fp;
360 FILE *freq_fp;
361
362 do {
363 if (closing_time) {
364 break;
365 }
366
367 light_fp = fopen("/proc/led/status", "w");
368 if (light_fp) {
369 fprintf(light_fp, "1\n");
370 fclose(light_fp);
371 }
372
373 freq_fp = fopen("/proc/led/freq", "w");
374 if (freq_fp) {
375 fprintf(freq_fp, "0\n");
376 fclose(freq_fp);
377 }
378
379 sleep(1);
380 } while(true);
381
382 return NULL;
383}
384
385// Doing this the "C way" is a real pain in the ass, just shell out and forget
386// about it
387void cleanup_crap_processes() {
388 // Otherwise init will continue to respawn them
389 system("sed -i "
390 "-e '/ubnt-websockets/s/^/#/' "
391 "-e '/telnetd/s/^/#/' "
392 "-e '/mca[-d]/s/^/#/' "
393 "-e '/lighttpd/s/^/#/' "
394 "/etc/inittab");
395
396 system("kill -HUP 1");
397
398 // Most of these kill cleanly but a few are stubborn so don't ask, tell.
399 system("pkill -9 ubnt-websockets");
400 system("pkill -9 lighttpd");
401 system("pkill upnpd");
402 system("pkill telnetd");
403 system("pkill mca-monitor");
404 system("pkill mcad");
405 system("pkill avahi-daemon");
406}
407
408struct mosquitto * connect_to_broker(char *host, int port) {
409 int rc;
410 char *id;
411 struct mosquitto *mosq;
412
413 mosquitto_lib_init();
414
415 if (client_id_generate(&id)) {
416 return NULL;
417 }
418
419 mosq = mosquitto_new(id, true, NULL);
420 if (!mosq) {
421 switch (errno) {
422 case ENOMEM:
423 fprintf(stderr, "Error: Out of memory.\n");
424 break;
425 case EINVAL:
426 fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
427 break;
428 }
429 mosquitto_lib_cleanup();
430 return NULL;
431 }
432
433 int protocol_version = MQTT_PROTOCOL_V311;
434
435 mosquitto_max_inflight_messages_set(mosq, 20);
436 mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &protocol_version);
437 mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback);
438 mosquitto_message_callback_set(mosq, my_message_callback);
439
440 rc = mosquitto_connect(mosq, host, port, 60);
441 if (rc > 0) {
442 if (rc == MOSQ_ERR_ERRNO) {
443 fprintf(stderr, "Error: %s\n", strerror(errno));
444 } else {
445 fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc));
446 }
447 mosquitto_lib_cleanup();
448 return NULL;
449 }
450
451 return mosq;
452}
453
454void shutdown_broker(struct mosquitto *mosq) {
455 mosquitto_destroy(mosq);
456 mosquitto_lib_cleanup();
457}
458
459int main(int argc, char *argv[])
460{
461 struct mosquitto *mosq;
462 pthread_t pinger_thread_h, control_thread_h, light_management_thread_h;
463
464 cleanup_crap_processes();
465
466 set_signal_handler();
467
468 mosq = connect_to_broker("172.16.0.191", 1883);
469 if (!mosq) {
470 return 1;
471 }
472 fprintf(stderr, "Connected to broker\n");
473
474 pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq);
475 pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq);
476 pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL);
477
478 do {
479 sleep(2);
480 if (closing_time) {
481 fprintf(stderr, "Shutting down\n");
482
483 mosquitto_disconnect(mosq);
484 pthread_join(pinger_thread_h, NULL);
485 pthread_join(control_thread_h, NULL);
486 break;
487 }
488 } while(true);
489
490 shutdown_broker(mosq);
491
492 return 0;
493}
diff --git a/mfi-mqtt/Makefile b/mfi-mqtt/Makefile
new file mode 100644
index 0000000..a30dfbd
--- /dev/null
+++ b/mfi-mqtt/Makefile
@@ -0,0 +1,12 @@
1BUILDROOT := $(HOME)/tmp/buildroot-2019.02.7
2MIPS_GCC := $(BUILDROOT)/output/host/bin/mips-buildroot-linux-uclibc-gcc
3
4mfi-mqtt-mips: reporting.o mosquitto.o mfi-mqtt.o
5 $(MIPS_GCC) -o $@ $^ -lpthread -lmosquitto -ljson-c -static
6
7%.o: %.c
8 $(MIPS_GCC) -Wall -c -o $@ $<
9
10.PHONY: clean
11clean:
12 rm -f *.o mfi-mqtt-mips
diff --git a/mfi-mqtt/mfi-mqtt-mips b/mfi-mqtt/mfi-mqtt-mips
new file mode 100755
index 0000000..cc6d8aa
--- /dev/null
+++ b/mfi-mqtt/mfi-mqtt-mips
Binary files differ
diff --git a/mfi-mqtt/mfi-mqtt.c b/mfi-mqtt/mfi-mqtt.c
new file mode 100644
index 0000000..25527bc
--- /dev/null
+++ b/mfi-mqtt/mfi-mqtt.c
@@ -0,0 +1,269 @@
1#include <errno.h>
2#include <stdio.h>
3#include <stdlib.h>
4#include <string.h>
5#include <time.h>
6#include <unistd.h>
7#include <signal.h>
8#include <pthread.h>
9
10#include <json-c/json.h>
11#include <mosquitto.h>
12
13#include "mosquitto.h"
14#include "reporting.h"
15
16static volatile sig_atomic_t closing_time = 0;
17
18typedef struct control_message {
19 int output;
20 int state;
21} control_message;
22
23control_message * parse_control_message(const struct mosquitto_message *message) {
24 struct json_tokener *tok;
25 enum json_tokener_error jerr;
26 control_message *out_message = NULL;
27 json_object *msg, *state_key, *output_key;
28
29 tok = json_tokener_new();
30 msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen);
31 jerr = json_tokener_get_error(tok);
32 if (jerr != json_tokener_success) {
33 fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr));
34 goto cleanup;
35 }
36
37 output_key = json_object_object_get(msg, "output");
38 if (!output_key) {
39 fprintf(stderr, "Invalid message format: no output key\n");
40 goto cleanup;
41 }
42
43 state_key = json_object_object_get(msg, "state");
44 if (!state_key) {
45 fprintf(stderr, "Invalid message format: no state key\n");
46 goto cleanup;
47 }
48
49 out_message = calloc(sizeof(control_message), 1);
50 out_message->state = json_object_get_int(state_key);
51 out_message->output = json_object_get_int(output_key);
52 // Clamp the value to on or off
53 out_message->state = out_message->state > 0 ? 1 : 0;
54
55cleanup:
56 if (msg) json_object_put(msg);
57 json_tokener_free(tok);
58
59 return out_message;
60}
61
62void set_relay(control_message *msg) {
63 FILE *f;
64 char *basename;
65 char filename[255];
66
67 basename = "/proc/power/relay";
68
69 memset(filename, 0, sizeof(filename));
70 snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output);
71
72 f = fopen(filename, "w");
73 if (!f) {
74 fprintf(stderr, "Failed to open relay file %s\n", filename);
75 return;
76 }
77
78 fprintf(f, "%i\n", msg->state);
79 fclose(f);
80}
81
82void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
83{
84 control_message *msg;
85
86 msg = parse_control_message(message);
87 if (!msg) {
88 return;
89 }
90
91 if (msg->output > get_output_count() || msg->output <= 0) {
92 fprintf(stderr, "Invalid output number: %i\n", msg->output);
93 goto cleanup;
94 }
95
96 fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state);
97 set_relay(msg);
98
99cleanup:
100 free(msg);
101}
102
103void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags)
104{
105 char *topic;
106 char *prefix;
107 char hostname[255];
108
109 memset(hostname, 0, sizeof(hostname));
110 gethostname(hostname, 255);
111
112 prefix = "/mfi/devices";
113 topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2);
114 sprintf(topic, "%s/%s", prefix, hostname);
115
116 fprintf(stderr, "Subscribed to topic %s\n", topic);
117
118 if (!result) {
119 mosquitto_subscribe(mosq, NULL, topic, 0);
120 } else {
121 fprintf(stderr, "%s\n", mosquitto_connack_string(result));
122 mosquitto_disconnect(mosq);
123 }
124}
125
126static void signal_handler(int signo) {
127 closing_time = 1;
128}
129
130void set_signal_handler() {
131 struct sigaction action;
132
133 action.sa_handler = signal_handler;
134 action.sa_flags = SA_SIGINFO | SA_RESTART;
135
136 if (sigaction(SIGINT, &action, NULL) == -1) {
137 perror("Error connecting signal");
138 exit(-1);
139 }
140}
141
142void mask_sig() {
143 sigset_t mask;
144 sigemptyset(&mask);
145 sigaddset(&mask, SIGINT);
146 pthread_sigmask(SIG_BLOCK, &mask, NULL);
147}
148
149void * control_thread(struct mosquitto *mosq) {
150 mask_sig();
151 mosquitto_loop_forever(mosq, 1000*86400, 1);
152 return NULL;
153}
154
155void * pinger_thread(struct mosquitto *mosq) {
156 power_statistics **stats;
157 int report_count;
158 char *output2;
159
160 mask_sig();
161
162 do {
163 if (closing_time) {
164 break;
165 }
166
167 report_count = get_stats_all_outputs(&stats);
168 output2 = format_report_all_outputs(stats, report_count);
169
170 mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false);
171
172 free(output2);
173 free(stats);
174
175 sleep(2);
176 } while(true);
177
178 return NULL;
179}
180
181// Stop all the blinking!
182void * light_management_thread(void *arg) {
183 FILE *light_fp;
184 FILE *freq_fp;
185
186 do {
187 if (closing_time) {
188 break;
189 }
190
191 light_fp = fopen("/proc/led/status", "w");
192 if (light_fp) {
193 fprintf(light_fp, "1\n");
194 fclose(light_fp);
195 }
196
197 freq_fp = fopen("/proc/led/freq", "w");
198 if (freq_fp) {
199 fprintf(freq_fp, "0\n");
200 fclose(freq_fp);
201 }
202
203 sleep(1);
204 } while(true);
205
206 return NULL;
207}
208
209// Doing this the "C way" is a real pain in the ass, just shell out and forget
210// about it
211void cleanup_crap_processes() {
212 // Otherwise init will continue to respawn them
213 system("sed -i "
214 "-e '/ubnt-websockets/s/^/#/' "
215 "-e '/telnetd/s/^/#/' "
216 "-e '/mca[-d]/s/^/#/' "
217 "-e '/lighttpd/s/^/#/' "
218 "/etc/inittab");
219
220 system("kill -HUP 1");
221
222 // Most of these kill cleanly but a few are stubborn so don't ask, tell.
223 system("pkill -9 ubnt-websockets");
224 system("pkill -9 lighttpd");
225 system("pkill upnpd");
226 system("pkill telnetd");
227 system("pkill mca-monitor");
228 system("pkill mcad");
229 system("pkill avahi-daemon");
230}
231
232int main(int argc, char *argv[])
233{
234 struct mosquitto *mosq;
235 pthread_t pinger_thread_h, control_thread_h, light_management_thread_h;
236
237 cleanup_crap_processes();
238
239 set_signal_handler();
240
241 mosq = connect_to_broker("172.16.0.191", 1883);
242 if (!mosq) {
243 return 1;
244 }
245 fprintf(stderr, "Connected to broker\n");
246
247 mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback);
248 mosquitto_message_callback_set(mosq, my_message_callback);
249
250 pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq);
251 pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq);
252 pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL);
253
254 do {
255 sleep(2);
256 if (closing_time) {
257 fprintf(stderr, "Shutting down\n");
258
259 mosquitto_disconnect(mosq);
260 pthread_join(pinger_thread_h, NULL);
261 pthread_join(control_thread_h, NULL);
262 break;
263 }
264 } while(true);
265
266 shutdown_broker(mosq);
267
268 return 0;
269}
diff --git a/mfi-mqtt/mfi-mqtt.o b/mfi-mqtt/mfi-mqtt.o
new file mode 100644
index 0000000..d9ae533
--- /dev/null
+++ b/mfi-mqtt/mfi-mqtt.o
Binary files differ
diff --git a/mfi-mqtt/mosquitto.c b/mfi-mqtt/mosquitto.c
new file mode 100644
index 0000000..1f1a64e
--- /dev/null
+++ b/mfi-mqtt/mosquitto.c
@@ -0,0 +1,82 @@
1#include <stdio.h>
2#include <errno.h>
3#include <stdlib.h>
4#include <string.h>
5#include <unistd.h>
6
7#include "mosquitto.h"
8
9int client_id_generate(char **output)
10{
11 int len;
12 char hostname[256];
13
14 *output = calloc(sizeof(char), MOSQ_MQTT_ID_MAX_LENGTH);
15 if (!*output) {
16 fprintf(stderr, "Error: Out of memory. %s\n", strerror(errno));
17 mosquitto_lib_cleanup();
18 return 1;
19 }
20
21 memset(hostname, 0, sizeof(hostname));
22 gethostname(hostname, 255);
23
24 /* Clamp length to MQTT maximum client ID length */
25 len = strlen("mfi|-") + 6 + strlen(hostname);
26 if (len > MOSQ_MQTT_ID_MAX_LENGTH - 1) {
27 len = MOSQ_MQTT_ID_MAX_LENGTH - 1;
28 }
29
30 snprintf(*output, len, "mfi|%d-%s", getpid(), hostname);
31
32 return MOSQ_ERR_SUCCESS;
33}
34
35struct mosquitto * connect_to_broker(char *host, int port) {
36 int rc;
37 char *id;
38 struct mosquitto *mosq;
39
40 mosquitto_lib_init();
41
42 if (client_id_generate(&id)) {
43 return NULL;
44 }
45
46 mosq = mosquitto_new(id, true, NULL);
47 if (!mosq) {
48 switch (errno) {
49 case ENOMEM:
50 fprintf(stderr, "Error: Out of memory.\n");
51 break;
52 case EINVAL:
53 fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
54 break;
55 }
56 mosquitto_lib_cleanup();
57 return NULL;
58 }
59
60 int protocol_version = MQTT_PROTOCOL_V311;
61
62 mosquitto_max_inflight_messages_set(mosq, 20);
63 mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &protocol_version);
64
65 rc = mosquitto_connect(mosq, host, port, 60);
66 if (rc > 0) {
67 if (rc == MOSQ_ERR_ERRNO) {
68 fprintf(stderr, "Error: %s\n", strerror(errno));
69 } else {
70 fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc));
71 }
72 mosquitto_lib_cleanup();
73 return NULL;
74 }
75
76 return mosq;
77}
78
79void shutdown_broker(struct mosquitto *mosq) {
80 mosquitto_destroy(mosq);
81 mosquitto_lib_cleanup();
82}
diff --git a/mfi-mqtt/mosquitto.h b/mfi-mqtt/mosquitto.h
new file mode 100644
index 0000000..5698b7c
--- /dev/null
+++ b/mfi-mqtt/mosquitto.h
@@ -0,0 +1,4 @@
1#include <mosquitto.h>
2
3struct mosquitto * connect_to_broker(char *, int);
4void shutdown_broker(struct mosquitto *);
diff --git a/mfi-mqtt/mosquitto.o b/mfi-mqtt/mosquitto.o
new file mode 100644
index 0000000..9e9a78c
--- /dev/null
+++ b/mfi-mqtt/mosquitto.o
Binary files differ
diff --git a/mfi-mqtt/reporting.c b/mfi-mqtt/reporting.c
new file mode 100644
index 0000000..7b4131a
--- /dev/null
+++ b/mfi-mqtt/reporting.c
@@ -0,0 +1,143 @@
1#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4#include <unistd.h>
5#include <stdbool.h>
6#include <json-c/json.h>
7
8#include <net/if.h>
9#include <sys/socket.h>
10#include <sys/ioctl.h>
11#include <arpa/inet.h>
12
13#include "reporting.h"
14
15power_statistics * init_power_statistics(int relay_num) {
16 power_statistics *stats = malloc(sizeof(power_statistics));
17 stats->relay_num = relay_num;
18 stats->engaged = calloc(strlen("false") + 1, sizeof(char));
19 return stats;
20}
21
22void free_power_statistics(power_statistics *stats) {
23 free(stats->engaged);
24 free(stats);
25}
26
27bool power_statistics_is_engaged(power_statistics *stats) {
28 return strcmp(stats->engaged, "on") == 0;
29}
30
31int power_statistics_load_from_file(power_statistics *stats) {
32 FILE *fp;
33 char filename[12];
34
35 snprintf(filename, 12, "/dev/power%d", stats->relay_num);
36
37 fp = fopen(filename, "r");
38 if (!fp) {
39 return 1;
40 }
41
42 if (fscanf(fp, "%s %f\n %f\n %f\n %f\n %f\n",
43 stats->engaged,
44 &stats->active_power,
45 &stats->energy_sum,
46 &stats->current_rms,
47 &stats->voltage_rms,
48 &stats->power_factor) != 6) {
49 fclose(fp);
50 return 1;
51 }
52
53 fclose(fp);
54 return 0;
55}
56
57int get_output_count() {
58 return access("/dev/power8", F_OK) != -1 ? 8 : 3;
59}
60
61int get_stats_all_outputs(power_statistics ***out_stats) {
62 int i;
63 int output_count;
64 power_statistics *stat;
65 power_statistics **stats;
66
67 output_count = get_output_count();
68 stats = malloc(sizeof(power_statistics *) * (output_count + 1));
69
70 for (i = output_count; i > 0; i--) {
71 stat = init_power_statistics(i);
72
73 if (power_statistics_load_from_file(stat)) {
74 continue;
75 }
76
77 stats[i] = stat;
78 }
79
80 *out_stats = stats;
81
82 return output_count;
83}
84
85json_object * format_power_satistics_output_json(power_statistics *stats) {
86 json_object *top_object;
87
88 top_object = json_object_new_object();
89 json_object_object_add(top_object, "output", json_object_new_int(stats->relay_num));
90 json_object_object_add(top_object, "engaged", json_object_new_boolean(power_statistics_is_engaged(stats)));
91 json_object_object_add(top_object, "active_power", json_object_new_double(stats->active_power));
92 json_object_object_add(top_object, "energy_sum", json_object_new_double(stats->energy_sum));
93 json_object_object_add(top_object, "current_rms", json_object_new_double(stats->current_rms));
94 json_object_object_add(top_object, "voltage_rms", json_object_new_double(stats->voltage_rms));
95 json_object_object_add(top_object, "power_factor", json_object_new_double(stats->power_factor));
96
97 return top_object;
98}
99
100char * format_report_all_outputs(power_statistics **stats, int report_count) {
101 int i;
102 char *output;
103 char hostname[256];
104 char ip_address[15];
105 const char *tmp;
106 json_object *top_object, *report_array;
107
108 memset(hostname, 0, sizeof(hostname));
109 gethostname(hostname, 255);
110
111 memset(ip_address, 0, sizeof(ip_address));
112 get_primary_ip_address(ip_address);
113
114 report_array = json_object_new_array();
115 top_object = json_object_new_object();
116 json_object_object_add(top_object, "hostname", json_object_new_string(hostname));
117 json_object_object_add(top_object, "ip_address", json_object_new_string(ip_address));
118 json_object_object_add(top_object, "reports", report_array);
119
120 for (i = report_count; i > 0; i--) {
121 json_object_array_add(report_array, format_power_satistics_output_json(stats[i]));
122 free_power_statistics(stats[i]);
123 }
124
125 tmp = json_object_to_json_string_ext(top_object, JSON_C_TO_STRING_PLAIN);
126 output = strdup(tmp);
127 json_object_put(top_object);
128
129 return output;
130}
131
132void get_primary_ip_address(char ip_address[15])
133{
134 int fd;
135 struct ifreq ifr;
136
137 fd = socket(AF_INET, SOCK_DGRAM, 0);
138 memcpy(ifr.ifr_name, "ath0", IFNAMSIZ-1);
139 ioctl(fd, SIOCGIFADDR, &ifr);
140 close(fd);
141
142 strncpy(ip_address, inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr), 15);
143}
diff --git a/mfi-mqtt/reporting.h b/mfi-mqtt/reporting.h
new file mode 100644
index 0000000..1ed0448
--- /dev/null
+++ b/mfi-mqtt/reporting.h
@@ -0,0 +1,14 @@
1typedef struct power_statistics {
2 int relay_num;
3 char *engaged;
4 float active_power;
5 float energy_sum;
6 float current_rms;
7 float voltage_rms;
8 float power_factor;
9} power_statistics;
10
11int get_output_count();
12void get_primary_ip_address(char[15]);
13int get_stats_all_outputs(power_statistics ***);
14char * format_report_all_outputs(power_statistics **, int);
diff --git a/mfi-mqtt/reporting.o b/mfi-mqtt/reporting.o
new file mode 100644
index 0000000..bd257a3
--- /dev/null
+++ b/mfi-mqtt/reporting.o
Binary files differ