summaryrefslogtreecommitdiff
path: root/mfi-mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'mfi-mqtt.c')
-rw-r--r--mfi-mqtt.c493
1 files changed, 0 insertions, 493 deletions
diff --git a/mfi-mqtt.c b/mfi-mqtt.c
deleted file mode 100644
index 0155039..0000000
--- a/mfi-mqtt.c
+++ /dev/null
@@ -1,493 +0,0 @@
1#include <assert.h>
2#include <errno.h>
3#include <stdio.h>
4#include <stdlib.h>
5#include <string.h>
6#include <time.h>
7#include <unistd.h>
8#include <signal.h>
9#include <pthread.h>
10
11#include <net/if.h>
12#include <sys/socket.h>
13#include <sys/ioctl.h>
14#include <arpa/inet.h>
15
16#include <json-c/json.h>
17#include <mosquitto.h>
18
19static volatile sig_atomic_t closing_time = 0;
20
21void get_primary_ip_address(char ip_address[15]);
22
23/* ================================== REPORTING ========================================= */
24typedef struct power_statistics {
25 int relay_num;
26 char *engaged;
27 float active_power;
28 float energy_sum;
29 float current_rms;
30 float voltage_rms;
31 float power_factor;
32} power_statistics;
33
34power_statistics * init_power_statistics(int relay_num) {
35 power_statistics *stats = malloc(sizeof(power_statistics));
36 stats->relay_num = relay_num;
37 stats->engaged = calloc(strlen("false") + 1, sizeof(char));
38 return stats;
39}
40
41void free_power_statistics(power_statistics *stats) {
42 free(stats->engaged);
43 free(stats);
44}
45
46bool power_statistics_is_engaged(power_statistics *stats) {
47 return strcmp(stats->engaged, "on") == 0;
48}
49
50int power_statistics_load_from_file(power_statistics *stats) {
51 FILE *fp;
52 char filename[12];
53
54 snprintf(filename, 12, "/dev/power%d", stats->relay_num);
55
56 fp = fopen(filename, "r");
57 if (!fp) {
58 return 1;
59 }
60
61 if (fscanf(fp, "%s %f\n %f\n %f\n %f\n %f\n",
62 stats->engaged,
63 &stats->active_power,
64 &stats->energy_sum,
65 &stats->current_rms,
66 &stats->voltage_rms,
67 &stats->power_factor) != 6) {
68 fclose(fp);
69 return 1;
70 }
71
72 fclose(fp);
73 return 0;
74}
75
76int get_output_count() {
77 return access("/dev/power8", F_OK) != -1 ? 8 : 3;
78}
79
80int get_stats_all_outputs(power_statistics ***out_stats) {
81 int i;
82 int output_count;
83 power_statistics *stat;
84 power_statistics **stats;
85
86 output_count = get_output_count();
87 stats = malloc(sizeof(power_statistics *) * (output_count + 1));
88
89 for (i = output_count; i > 0; i--) {
90 stat = init_power_statistics(i);
91
92 if (power_statistics_load_from_file(stat)) {
93 continue;
94 }
95
96 stats[i] = stat;
97 }
98
99 *out_stats = stats;
100
101 return output_count;
102}
103
104json_object * format_power_satistics_output_json(power_statistics *stats) {
105 json_object *top_object;
106
107 top_object = json_object_new_object();
108 json_object_object_add(top_object, "output", json_object_new_int(stats->relay_num));
109 json_object_object_add(top_object, "engaged", json_object_new_boolean(power_statistics_is_engaged(stats)));
110 json_object_object_add(top_object, "active_power", json_object_new_double(stats->active_power));
111 json_object_object_add(top_object, "energy_sum", json_object_new_double(stats->energy_sum));
112 json_object_object_add(top_object, "current_rms", json_object_new_double(stats->current_rms));
113 json_object_object_add(top_object, "voltage_rms", json_object_new_double(stats->voltage_rms));
114 json_object_object_add(top_object, "power_factor", json_object_new_double(stats->power_factor));
115
116 return top_object;
117}
118
119char * format_report_all_outputs(power_statistics **stats, int report_count) {
120 int i;
121 char *output;
122 char hostname[256];
123 char ip_address[15];
124 const char *tmp;
125 json_object *top_object, *report_array;
126
127 memset(hostname, 0, sizeof(hostname));
128 gethostname(hostname, 255);
129
130 memset(ip_address, 0, sizeof(ip_address));
131 get_primary_ip_address(ip_address);
132
133 report_array = json_object_new_array();
134 top_object = json_object_new_object();
135 json_object_object_add(top_object, "hostname", json_object_new_string(hostname));
136 json_object_object_add(top_object, "ip_address", json_object_new_string(ip_address));
137 json_object_object_add(top_object, "reports", report_array);
138
139 for (i = report_count; i > 0; i--) {
140 json_object_array_add(report_array, format_power_satistics_output_json(stats[i]));
141 free_power_statistics(stats[i]);
142 }
143
144 tmp = json_object_to_json_string_ext(top_object, JSON_C_TO_STRING_PLAIN);
145 output = strdup(tmp);
146 json_object_put(top_object);
147
148 return output;
149}
150/* ================================== REPORTING ========================================= */
151typedef struct control_message {
152 int output;
153 int state;
154} control_message;
155
156control_message * parse_control_message(const struct mosquitto_message *message) {
157 struct json_tokener *tok;
158 enum json_tokener_error jerr;
159 control_message *out_message = NULL;
160 json_object *msg, *state_key, *output_key;
161
162 tok = json_tokener_new();
163 msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen);
164 jerr = json_tokener_get_error(tok);
165 if (jerr != json_tokener_success) {
166 fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr));
167 goto cleanup;
168 }
169
170 output_key = json_object_object_get(msg, "output");
171 if (!output_key) {
172 fprintf(stderr, "Invalid message format: no output key\n");
173 goto cleanup;
174 }
175
176 state_key = json_object_object_get(msg, "state");
177 if (!state_key) {
178 fprintf(stderr, "Invalid message format: no state key\n");
179 goto cleanup;
180 }
181
182 out_message = calloc(sizeof(control_message), 1);
183 out_message->state = json_object_get_int(state_key);
184 out_message->output = json_object_get_int(output_key);
185 // Clamp the value to on or off
186 out_message->state = out_message->state > 0 ? 1 : 0;
187
188cleanup:
189 if (msg) json_object_put(msg);
190 json_tokener_free(tok);
191
192 return out_message;
193}
194
195void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
196{
197 FILE *f;
198 char *basename;
199 int output_count;
200 char filename[255];
201 control_message *msg;
202
203 basename = "/proc/power/relay";
204 output_count = get_output_count();
205
206 msg = parse_control_message(message);
207 if (!msg) {
208 return;
209 }
210
211 if (msg->output > output_count || msg->output <= 0) {
212 fprintf(stderr, "Invalid output number: %i\n", msg->output);
213 goto cleanup;
214 }
215
216 fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state);
217
218 memset(filename, 0, sizeof(filename));
219 snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output);
220
221 f = fopen(filename, "w");
222 if (!f) {
223 fprintf(stderr, "Failed to open relay file %s\n", filename);
224 goto cleanup;
225 }
226
227 fprintf(f, "%i\n", msg->state);
228 fclose(f);
229
230cleanup:
231 free(msg);
232}
233
234void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags)
235{
236 char hostname[255];
237 char *topic;
238 char *prefix;
239
240 memset(hostname, 0, sizeof(hostname));
241 gethostname(hostname, 255);
242
243 prefix = "/mfi/devices";
244 topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2);
245 sprintf(topic, "%s/%s", prefix, hostname);
246
247 fprintf(stderr, "Subscribed to topic %s\n", topic);
248
249 if (!result) {
250 mosquitto_subscribe(mosq, NULL, topic, 0);
251 } else {
252 fprintf(stderr, "%s\n", mosquitto_connack_string(result));
253 mosquitto_disconnect(mosq);
254 }
255}
256
257void get_primary_ip_address(char ip_address[15])
258{
259 int fd;
260 struct ifreq ifr;
261
262 fd = socket(AF_INET, SOCK_DGRAM, 0);
263 memcpy(ifr.ifr_name, "ath0", IFNAMSIZ-1);
264 ioctl(fd, SIOCGIFADDR, &ifr);
265 close(fd);
266
267 strncpy(ip_address, inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr), 15);
268}
269
270int client_id_generate(char **output)
271{
272 int len;
273 char hostname[256];
274
275 *output = calloc(sizeof(char), MOSQ_MQTT_ID_MAX_LENGTH);
276 if (!*output) {
277 fprintf(stderr, "Error: Out of memory. %s\n", strerror(errno));
278 mosquitto_lib_cleanup();
279 return 1;
280 }
281
282 memset(hostname, 0, sizeof(hostname));
283 gethostname(hostname, 255);
284
285 /* Clamp length to MQTT maximum client ID length */
286 len = strlen("mfi|-") + 6 + strlen(hostname);
287 if (len > MOSQ_MQTT_ID_MAX_LENGTH - 1) {
288 len = MOSQ_MQTT_ID_MAX_LENGTH - 1;
289 }
290
291 snprintf(*output, len, "mfi|%d-%s", getpid(), hostname);
292
293 return MOSQ_ERR_SUCCESS;
294}
295
296//void signal_handler(int signo, siginfo_t *info, void *context) {
297static void signal_handler(int signo) {
298 closing_time = 1;
299}
300
301void set_signal_handler() {
302 struct sigaction action;
303
304 action.sa_handler = signal_handler;
305 action.sa_flags = SA_SIGINFO | SA_RESTART;
306 //action.sa_sigaction = signal_handler;
307
308 //sigfillset(&action.sa_mask);
309
310 signal(SIGINT, SIG_IGN);
311
312 if (sigaction(SIGINT, &action, NULL) == -1) {
313 perror("Error connecting signal");
314 exit(-1);
315 }
316}
317
318void mask_sig() {
319 sigset_t mask;
320 sigemptyset(&mask);
321 sigaddset(&mask, SIGINT);
322 pthread_sigmask(SIG_BLOCK, &mask, NULL);
323}
324
325void * control_thread(struct mosquitto *mosq) {
326 mask_sig();
327 mosquitto_loop_forever(mosq, 1000*86400, 1);
328 return NULL;
329}
330
331void * pinger_thread(struct mosquitto *mosq) {
332 power_statistics **stats;
333 int report_count;
334 char *output2;
335
336 mask_sig();
337
338 do {
339 if (closing_time) {
340 break;
341 }
342
343 report_count = get_stats_all_outputs(&stats);
344 output2 = format_report_all_outputs(stats, report_count);
345
346 mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false);
347
348 free(output2);
349 free(stats);
350
351 sleep(2);
352 } while(true);
353
354 return NULL;
355}
356
357// Stop all the blinking!
358void * light_management_thread(void *arg) {
359 FILE *light_fp;
360 FILE *freq_fp;
361
362 do {
363 if (closing_time) {
364 break;
365 }
366
367 light_fp = fopen("/proc/led/status", "w");
368 if (light_fp) {
369 fprintf(light_fp, "1\n");
370 fclose(light_fp);
371 }
372
373 freq_fp = fopen("/proc/led/freq", "w");
374 if (freq_fp) {
375 fprintf(freq_fp, "0\n");
376 fclose(freq_fp);
377 }
378
379 sleep(1);
380 } while(true);
381
382 return NULL;
383}
384
385// Doing this the "C way" is a real pain in the ass, just shell out and forget
386// about it
387void cleanup_crap_processes() {
388 // Otherwise init will continue to respawn them
389 system("sed -i "
390 "-e '/ubnt-websockets/s/^/#/' "
391 "-e '/telnetd/s/^/#/' "
392 "-e '/mca[-d]/s/^/#/' "
393 "-e '/lighttpd/s/^/#/' "
394 "/etc/inittab");
395
396 system("kill -HUP 1");
397
398 // Most of these kill cleanly but a few are stubborn so don't ask, tell.
399 system("pkill -9 ubnt-websockets");
400 system("pkill -9 lighttpd");
401 system("pkill upnpd");
402 system("pkill telnetd");
403 system("pkill mca-monitor");
404 system("pkill mcad");
405 system("pkill avahi-daemon");
406}
407
408struct mosquitto * connect_to_broker(char *host, int port) {
409 int rc;
410 char *id;
411 struct mosquitto *mosq;
412
413 mosquitto_lib_init();
414
415 if (client_id_generate(&id)) {
416 return NULL;
417 }
418
419 mosq = mosquitto_new(id, true, NULL);
420 if (!mosq) {
421 switch (errno) {
422 case ENOMEM:
423 fprintf(stderr, "Error: Out of memory.\n");
424 break;
425 case EINVAL:
426 fprintf(stderr, "Error: Invalid id and/or clean_session.\n");
427 break;
428 }
429 mosquitto_lib_cleanup();
430 return NULL;
431 }
432
433 int protocol_version = MQTT_PROTOCOL_V311;
434
435 mosquitto_max_inflight_messages_set(mosq, 20);
436 mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &protocol_version);
437 mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback);
438 mosquitto_message_callback_set(mosq, my_message_callback);
439
440 rc = mosquitto_connect(mosq, host, port, 60);
441 if (rc > 0) {
442 if (rc == MOSQ_ERR_ERRNO) {
443 fprintf(stderr, "Error: %s\n", strerror(errno));
444 } else {
445 fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc));
446 }
447 mosquitto_lib_cleanup();
448 return NULL;
449 }
450
451 return mosq;
452}
453
454void shutdown_broker(struct mosquitto *mosq) {
455 mosquitto_destroy(mosq);
456 mosquitto_lib_cleanup();
457}
458
459int main(int argc, char *argv[])
460{
461 struct mosquitto *mosq;
462 pthread_t pinger_thread_h, control_thread_h, light_management_thread_h;
463
464 cleanup_crap_processes();
465
466 set_signal_handler();
467
468 mosq = connect_to_broker("172.16.0.191", 1883);
469 if (!mosq) {
470 return 1;
471 }
472 fprintf(stderr, "Connected to broker\n");
473
474 pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq);
475 pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq);
476 pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL);
477
478 do {
479 sleep(2);
480 if (closing_time) {
481 fprintf(stderr, "Shutting down\n");
482
483 mosquitto_disconnect(mosq);
484 pthread_join(pinger_thread_h, NULL);
485 pthread_join(control_thread_h, NULL);
486 break;
487 }
488 } while(true);
489
490 shutdown_broker(mosq);
491
492 return 0;
493}