From ef4f00622d48d6d0b4a703f5a39bf8c355244af1 Mon Sep 17 00:00:00 2001 From: Mike Crute Date: Mon, 25 Nov 2019 23:00:42 -0800 Subject: Split apart mqtt agent --- Makefile | 5 - mfi-mqtt.c | 493 ------------------------------------------------- mfi-mqtt/Makefile | 12 ++ mfi-mqtt/mfi-mqtt-mips | Bin 0 -> 448448 bytes mfi-mqtt/mfi-mqtt.c | 269 +++++++++++++++++++++++++++ mfi-mqtt/mfi-mqtt.o | Bin 0 -> 10304 bytes mfi-mqtt/mosquitto.c | 82 ++++++++ mfi-mqtt/mosquitto.h | 4 + mfi-mqtt/mosquitto.o | Bin 0 -> 4036 bytes mfi-mqtt/reporting.c | 143 ++++++++++++++ mfi-mqtt/reporting.h | 14 ++ mfi-mqtt/reporting.o | Bin 0 -> 6892 bytes 12 files changed, 524 insertions(+), 498 deletions(-) delete mode 100644 mfi-mqtt.c create mode 100644 mfi-mqtt/Makefile create mode 100755 mfi-mqtt/mfi-mqtt-mips create mode 100644 mfi-mqtt/mfi-mqtt.c create mode 100644 mfi-mqtt/mfi-mqtt.o create mode 100644 mfi-mqtt/mosquitto.c create mode 100644 mfi-mqtt/mosquitto.h create mode 100644 mfi-mqtt/mosquitto.o create mode 100644 mfi-mqtt/reporting.c create mode 100644 mfi-mqtt/reporting.h create mode 100644 mfi-mqtt/reporting.o diff --git a/Makefile b/Makefile index b667d15..778afeb 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,3 @@ docker: cd docker; \ docker build -t docker.crute.me/mfi_homekit:latest .; \ docker push docker.crute.me/mfi_homekit:latest - -mfi-mqtt-mips: mfi-mqtt.c - $(BUILDROOT)/output/host/bin/mips-buildroot-linux-uclibc-gcc \ - -Wall -o $@ $< \ - -lpthread -lmosquitto -ljson-c -static diff --git a/mfi-mqtt.c b/mfi-mqtt.c deleted file mode 100644 index 0155039..0000000 --- a/mfi-mqtt.c +++ /dev/null @@ -1,493 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include - -static volatile sig_atomic_t closing_time = 0; - -void get_primary_ip_address(char ip_address[15]); - -/* ================================== REPORTING ========================================= */ -typedef struct power_statistics { - int relay_num; - char *engaged; - float active_power; - float energy_sum; - float current_rms; - float voltage_rms; - float power_factor; -} power_statistics; - -power_statistics * init_power_statistics(int relay_num) { - power_statistics *stats = malloc(sizeof(power_statistics)); - stats->relay_num = relay_num; - stats->engaged = calloc(strlen("false") + 1, sizeof(char)); - return stats; -} - -void free_power_statistics(power_statistics *stats) { - free(stats->engaged); - free(stats); -} - -bool power_statistics_is_engaged(power_statistics *stats) { - return strcmp(stats->engaged, "on") == 0; -} - -int power_statistics_load_from_file(power_statistics *stats) { - FILE *fp; - char filename[12]; - - snprintf(filename, 12, "/dev/power%d", stats->relay_num); - - fp = fopen(filename, "r"); - if (!fp) { - return 1; - } - - if (fscanf(fp, "%s %f\n %f\n %f\n %f\n %f\n", - stats->engaged, - &stats->active_power, - &stats->energy_sum, - &stats->current_rms, - &stats->voltage_rms, - &stats->power_factor) != 6) { - fclose(fp); - return 1; - } - - fclose(fp); - return 0; -} - -int get_output_count() { - return access("/dev/power8", F_OK) != -1 ? 8 : 3; -} - -int get_stats_all_outputs(power_statistics ***out_stats) { - int i; - int output_count; - power_statistics *stat; - power_statistics **stats; - - output_count = get_output_count(); - stats = malloc(sizeof(power_statistics *) * (output_count + 1)); - - for (i = output_count; i > 0; i--) { - stat = init_power_statistics(i); - - if (power_statistics_load_from_file(stat)) { - continue; - } - - stats[i] = stat; - } - - *out_stats = stats; - - return output_count; -} - -json_object * format_power_satistics_output_json(power_statistics *stats) { - json_object *top_object; - - top_object = json_object_new_object(); - json_object_object_add(top_object, "output", json_object_new_int(stats->relay_num)); - json_object_object_add(top_object, "engaged", json_object_new_boolean(power_statistics_is_engaged(stats))); - json_object_object_add(top_object, "active_power", json_object_new_double(stats->active_power)); - json_object_object_add(top_object, "energy_sum", json_object_new_double(stats->energy_sum)); - json_object_object_add(top_object, "current_rms", json_object_new_double(stats->current_rms)); - json_object_object_add(top_object, "voltage_rms", json_object_new_double(stats->voltage_rms)); - json_object_object_add(top_object, "power_factor", json_object_new_double(stats->power_factor)); - - return top_object; -} - -char * format_report_all_outputs(power_statistics **stats, int report_count) { - int i; - char *output; - char hostname[256]; - char ip_address[15]; - const char *tmp; - json_object *top_object, *report_array; - - memset(hostname, 0, sizeof(hostname)); - gethostname(hostname, 255); - - memset(ip_address, 0, sizeof(ip_address)); - get_primary_ip_address(ip_address); - - report_array = json_object_new_array(); - top_object = json_object_new_object(); - json_object_object_add(top_object, "hostname", json_object_new_string(hostname)); - json_object_object_add(top_object, "ip_address", json_object_new_string(ip_address)); - json_object_object_add(top_object, "reports", report_array); - - for (i = report_count; i > 0; i--) { - json_object_array_add(report_array, format_power_satistics_output_json(stats[i])); - free_power_statistics(stats[i]); - } - - tmp = json_object_to_json_string_ext(top_object, JSON_C_TO_STRING_PLAIN); - output = strdup(tmp); - json_object_put(top_object); - - return output; -} -/* ================================== REPORTING ========================================= */ -typedef struct control_message { - int output; - int state; -} control_message; - -control_message * parse_control_message(const struct mosquitto_message *message) { - struct json_tokener *tok; - enum json_tokener_error jerr; - control_message *out_message = NULL; - json_object *msg, *state_key, *output_key; - - tok = json_tokener_new(); - msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen); - jerr = json_tokener_get_error(tok); - if (jerr != json_tokener_success) { - fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr)); - goto cleanup; - } - - output_key = json_object_object_get(msg, "output"); - if (!output_key) { - fprintf(stderr, "Invalid message format: no output key\n"); - goto cleanup; - } - - state_key = json_object_object_get(msg, "state"); - if (!state_key) { - fprintf(stderr, "Invalid message format: no state key\n"); - goto cleanup; - } - - out_message = calloc(sizeof(control_message), 1); - out_message->state = json_object_get_int(state_key); - out_message->output = json_object_get_int(output_key); - // Clamp the value to on or off - out_message->state = out_message->state > 0 ? 1 : 0; - -cleanup: - if (msg) json_object_put(msg); - json_tokener_free(tok); - - return out_message; -} - -void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) -{ - FILE *f; - char *basename; - int output_count; - char filename[255]; - control_message *msg; - - basename = "/proc/power/relay"; - output_count = get_output_count(); - - msg = parse_control_message(message); - if (!msg) { - return; - } - - if (msg->output > output_count || msg->output <= 0) { - fprintf(stderr, "Invalid output number: %i\n", msg->output); - goto cleanup; - } - - fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state); - - memset(filename, 0, sizeof(filename)); - snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output); - - f = fopen(filename, "w"); - if (!f) { - fprintf(stderr, "Failed to open relay file %s\n", filename); - goto cleanup; - } - - fprintf(f, "%i\n", msg->state); - fclose(f); - -cleanup: - free(msg); -} - -void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags) -{ - char hostname[255]; - char *topic; - char *prefix; - - memset(hostname, 0, sizeof(hostname)); - gethostname(hostname, 255); - - prefix = "/mfi/devices"; - topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2); - sprintf(topic, "%s/%s", prefix, hostname); - - fprintf(stderr, "Subscribed to topic %s\n", topic); - - if (!result) { - mosquitto_subscribe(mosq, NULL, topic, 0); - } else { - fprintf(stderr, "%s\n", mosquitto_connack_string(result)); - mosquitto_disconnect(mosq); - } -} - -void get_primary_ip_address(char ip_address[15]) -{ - int fd; - struct ifreq ifr; - - fd = socket(AF_INET, SOCK_DGRAM, 0); - memcpy(ifr.ifr_name, "ath0", IFNAMSIZ-1); - ioctl(fd, SIOCGIFADDR, &ifr); - close(fd); - - strncpy(ip_address, inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr), 15); -} - -int client_id_generate(char **output) -{ - int len; - char hostname[256]; - - *output = calloc(sizeof(char), MOSQ_MQTT_ID_MAX_LENGTH); - if (!*output) { - fprintf(stderr, "Error: Out of memory. %s\n", strerror(errno)); - mosquitto_lib_cleanup(); - return 1; - } - - memset(hostname, 0, sizeof(hostname)); - gethostname(hostname, 255); - - /* Clamp length to MQTT maximum client ID length */ - len = strlen("mfi|-") + 6 + strlen(hostname); - if (len > MOSQ_MQTT_ID_MAX_LENGTH - 1) { - len = MOSQ_MQTT_ID_MAX_LENGTH - 1; - } - - snprintf(*output, len, "mfi|%d-%s", getpid(), hostname); - - return MOSQ_ERR_SUCCESS; -} - -//void signal_handler(int signo, siginfo_t *info, void *context) { -static void signal_handler(int signo) { - closing_time = 1; -} - -void set_signal_handler() { - struct sigaction action; - - action.sa_handler = signal_handler; - action.sa_flags = SA_SIGINFO | SA_RESTART; - //action.sa_sigaction = signal_handler; - - //sigfillset(&action.sa_mask); - - signal(SIGINT, SIG_IGN); - - if (sigaction(SIGINT, &action, NULL) == -1) { - perror("Error connecting signal"); - exit(-1); - } -} - -void mask_sig() { - sigset_t mask; - sigemptyset(&mask); - sigaddset(&mask, SIGINT); - pthread_sigmask(SIG_BLOCK, &mask, NULL); -} - -void * control_thread(struct mosquitto *mosq) { - mask_sig(); - mosquitto_loop_forever(mosq, 1000*86400, 1); - return NULL; -} - -void * pinger_thread(struct mosquitto *mosq) { - power_statistics **stats; - int report_count; - char *output2; - - mask_sig(); - - do { - if (closing_time) { - break; - } - - report_count = get_stats_all_outputs(&stats); - output2 = format_report_all_outputs(stats, report_count); - - mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false); - - free(output2); - free(stats); - - sleep(2); - } while(true); - - return NULL; -} - -// Stop all the blinking! -void * light_management_thread(void *arg) { - FILE *light_fp; - FILE *freq_fp; - - do { - if (closing_time) { - break; - } - - light_fp = fopen("/proc/led/status", "w"); - if (light_fp) { - fprintf(light_fp, "1\n"); - fclose(light_fp); - } - - freq_fp = fopen("/proc/led/freq", "w"); - if (freq_fp) { - fprintf(freq_fp, "0\n"); - fclose(freq_fp); - } - - sleep(1); - } while(true); - - return NULL; -} - -// Doing this the "C way" is a real pain in the ass, just shell out and forget -// about it -void cleanup_crap_processes() { - // Otherwise init will continue to respawn them - system("sed -i " - "-e '/ubnt-websockets/s/^/#/' " - "-e '/telnetd/s/^/#/' " - "-e '/mca[-d]/s/^/#/' " - "-e '/lighttpd/s/^/#/' " - "/etc/inittab"); - - system("kill -HUP 1"); - - // Most of these kill cleanly but a few are stubborn so don't ask, tell. - system("pkill -9 ubnt-websockets"); - system("pkill -9 lighttpd"); - system("pkill upnpd"); - system("pkill telnetd"); - system("pkill mca-monitor"); - system("pkill mcad"); - system("pkill avahi-daemon"); -} - -struct mosquitto * connect_to_broker(char *host, int port) { - int rc; - char *id; - struct mosquitto *mosq; - - mosquitto_lib_init(); - - if (client_id_generate(&id)) { - return NULL; - } - - mosq = mosquitto_new(id, true, NULL); - if (!mosq) { - switch (errno) { - case ENOMEM: - fprintf(stderr, "Error: Out of memory.\n"); - break; - case EINVAL: - fprintf(stderr, "Error: Invalid id and/or clean_session.\n"); - break; - } - mosquitto_lib_cleanup(); - return NULL; - } - - int protocol_version = MQTT_PROTOCOL_V311; - - mosquitto_max_inflight_messages_set(mosq, 20); - mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &protocol_version); - mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback); - mosquitto_message_callback_set(mosq, my_message_callback); - - rc = mosquitto_connect(mosq, host, port, 60); - if (rc > 0) { - if (rc == MOSQ_ERR_ERRNO) { - fprintf(stderr, "Error: %s\n", strerror(errno)); - } else { - fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc)); - } - mosquitto_lib_cleanup(); - return NULL; - } - - return mosq; -} - -void shutdown_broker(struct mosquitto *mosq) { - mosquitto_destroy(mosq); - mosquitto_lib_cleanup(); -} - -int main(int argc, char *argv[]) -{ - struct mosquitto *mosq; - pthread_t pinger_thread_h, control_thread_h, light_management_thread_h; - - cleanup_crap_processes(); - - set_signal_handler(); - - mosq = connect_to_broker("172.16.0.191", 1883); - if (!mosq) { - return 1; - } - fprintf(stderr, "Connected to broker\n"); - - pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq); - pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq); - pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL); - - do { - sleep(2); - if (closing_time) { - fprintf(stderr, "Shutting down\n"); - - mosquitto_disconnect(mosq); - pthread_join(pinger_thread_h, NULL); - pthread_join(control_thread_h, NULL); - break; - } - } while(true); - - shutdown_broker(mosq); - - return 0; -} diff --git a/mfi-mqtt/Makefile b/mfi-mqtt/Makefile new file mode 100644 index 0000000..a30dfbd --- /dev/null +++ b/mfi-mqtt/Makefile @@ -0,0 +1,12 @@ +BUILDROOT := $(HOME)/tmp/buildroot-2019.02.7 +MIPS_GCC := $(BUILDROOT)/output/host/bin/mips-buildroot-linux-uclibc-gcc + +mfi-mqtt-mips: reporting.o mosquitto.o mfi-mqtt.o + $(MIPS_GCC) -o $@ $^ -lpthread -lmosquitto -ljson-c -static + +%.o: %.c + $(MIPS_GCC) -Wall -c -o $@ $< + +.PHONY: clean +clean: + rm -f *.o mfi-mqtt-mips diff --git a/mfi-mqtt/mfi-mqtt-mips b/mfi-mqtt/mfi-mqtt-mips new file mode 100755 index 0000000..cc6d8aa Binary files /dev/null and b/mfi-mqtt/mfi-mqtt-mips differ diff --git a/mfi-mqtt/mfi-mqtt.c b/mfi-mqtt/mfi-mqtt.c new file mode 100644 index 0000000..25527bc --- /dev/null +++ b/mfi-mqtt/mfi-mqtt.c @@ -0,0 +1,269 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "mosquitto.h" +#include "reporting.h" + +static volatile sig_atomic_t closing_time = 0; + +typedef struct control_message { + int output; + int state; +} control_message; + +control_message * parse_control_message(const struct mosquitto_message *message) { + struct json_tokener *tok; + enum json_tokener_error jerr; + control_message *out_message = NULL; + json_object *msg, *state_key, *output_key; + + tok = json_tokener_new(); + msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen); + jerr = json_tokener_get_error(tok); + if (jerr != json_tokener_success) { + fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr)); + goto cleanup; + } + + output_key = json_object_object_get(msg, "output"); + if (!output_key) { + fprintf(stderr, "Invalid message format: no output key\n"); + goto cleanup; + } + + state_key = json_object_object_get(msg, "state"); + if (!state_key) { + fprintf(stderr, "Invalid message format: no state key\n"); + goto cleanup; + } + + out_message = calloc(sizeof(control_message), 1); + out_message->state = json_object_get_int(state_key); + out_message->output = json_object_get_int(output_key); + // Clamp the value to on or off + out_message->state = out_message->state > 0 ? 1 : 0; + +cleanup: + if (msg) json_object_put(msg); + json_tokener_free(tok); + + return out_message; +} + +void set_relay(control_message *msg) { + FILE *f; + char *basename; + char filename[255]; + + basename = "/proc/power/relay"; + + memset(filename, 0, sizeof(filename)); + snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output); + + f = fopen(filename, "w"); + if (!f) { + fprintf(stderr, "Failed to open relay file %s\n", filename); + return; + } + + fprintf(f, "%i\n", msg->state); + fclose(f); +} + +void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) +{ + control_message *msg; + + msg = parse_control_message(message); + if (!msg) { + return; + } + + if (msg->output > get_output_count() || msg->output <= 0) { + fprintf(stderr, "Invalid output number: %i\n", msg->output); + goto cleanup; + } + + fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state); + set_relay(msg); + +cleanup: + free(msg); +} + +void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags) +{ + char *topic; + char *prefix; + char hostname[255]; + + memset(hostname, 0, sizeof(hostname)); + gethostname(hostname, 255); + + prefix = "/mfi/devices"; + topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2); + sprintf(topic, "%s/%s", prefix, hostname); + + fprintf(stderr, "Subscribed to topic %s\n", topic); + + if (!result) { + mosquitto_subscribe(mosq, NULL, topic, 0); + } else { + fprintf(stderr, "%s\n", mosquitto_connack_string(result)); + mosquitto_disconnect(mosq); + } +} + +static void signal_handler(int signo) { + closing_time = 1; +} + +void set_signal_handler() { + struct sigaction action; + + action.sa_handler = signal_handler; + action.sa_flags = SA_SIGINFO | SA_RESTART; + + if (sigaction(SIGINT, &action, NULL) == -1) { + perror("Error connecting signal"); + exit(-1); + } +} + +void mask_sig() { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + pthread_sigmask(SIG_BLOCK, &mask, NULL); +} + +void * control_thread(struct mosquitto *mosq) { + mask_sig(); + mosquitto_loop_forever(mosq, 1000*86400, 1); + return NULL; +} + +void * pinger_thread(struct mosquitto *mosq) { + power_statistics **stats; + int report_count; + char *output2; + + mask_sig(); + + do { + if (closing_time) { + break; + } + + report_count = get_stats_all_outputs(&stats); + output2 = format_report_all_outputs(stats, report_count); + + mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false); + + free(output2); + free(stats); + + sleep(2); + } while(true); + + return NULL; +} + +// Stop all the blinking! +void * light_management_thread(void *arg) { + FILE *light_fp; + FILE *freq_fp; + + do { + if (closing_time) { + break; + } + + light_fp = fopen("/proc/led/status", "w"); + if (light_fp) { + fprintf(light_fp, "1\n"); + fclose(light_fp); + } + + freq_fp = fopen("/proc/led/freq", "w"); + if (freq_fp) { + fprintf(freq_fp, "0\n"); + fclose(freq_fp); + } + + sleep(1); + } while(true); + + return NULL; +} + +// Doing this the "C way" is a real pain in the ass, just shell out and forget +// about it +void cleanup_crap_processes() { + // Otherwise init will continue to respawn them + system("sed -i " + "-e '/ubnt-websockets/s/^/#/' " + "-e '/telnetd/s/^/#/' " + "-e '/mca[-d]/s/^/#/' " + "-e '/lighttpd/s/^/#/' " + "/etc/inittab"); + + system("kill -HUP 1"); + + // Most of these kill cleanly but a few are stubborn so don't ask, tell. + system("pkill -9 ubnt-websockets"); + system("pkill -9 lighttpd"); + system("pkill upnpd"); + system("pkill telnetd"); + system("pkill mca-monitor"); + system("pkill mcad"); + system("pkill avahi-daemon"); +} + +int main(int argc, char *argv[]) +{ + struct mosquitto *mosq; + pthread_t pinger_thread_h, control_thread_h, light_management_thread_h; + + cleanup_crap_processes(); + + set_signal_handler(); + + mosq = connect_to_broker("172.16.0.191", 1883); + if (!mosq) { + return 1; + } + fprintf(stderr, "Connected to broker\n"); + + mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback); + mosquitto_message_callback_set(mosq, my_message_callback); + + pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq); + pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq); + pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL); + + do { + sleep(2); + if (closing_time) { + fprintf(stderr, "Shutting down\n"); + + mosquitto_disconnect(mosq); + pthread_join(pinger_thread_h, NULL); + pthread_join(control_thread_h, NULL); + break; + } + } while(true); + + shutdown_broker(mosq); + + return 0; +} diff --git a/mfi-mqtt/mfi-mqtt.o b/mfi-mqtt/mfi-mqtt.o new file mode 100644 index 0000000..d9ae533 Binary files /dev/null and b/mfi-mqtt/mfi-mqtt.o differ diff --git a/mfi-mqtt/mosquitto.c b/mfi-mqtt/mosquitto.c new file mode 100644 index 0000000..1f1a64e --- /dev/null +++ b/mfi-mqtt/mosquitto.c @@ -0,0 +1,82 @@ +#include +#include +#include +#include +#include + +#include "mosquitto.h" + +int client_id_generate(char **output) +{ + int len; + char hostname[256]; + + *output = calloc(sizeof(char), MOSQ_MQTT_ID_MAX_LENGTH); + if (!*output) { + fprintf(stderr, "Error: Out of memory. %s\n", strerror(errno)); + mosquitto_lib_cleanup(); + return 1; + } + + memset(hostname, 0, sizeof(hostname)); + gethostname(hostname, 255); + + /* Clamp length to MQTT maximum client ID length */ + len = strlen("mfi|-") + 6 + strlen(hostname); + if (len > MOSQ_MQTT_ID_MAX_LENGTH - 1) { + len = MOSQ_MQTT_ID_MAX_LENGTH - 1; + } + + snprintf(*output, len, "mfi|%d-%s", getpid(), hostname); + + return MOSQ_ERR_SUCCESS; +} + +struct mosquitto * connect_to_broker(char *host, int port) { + int rc; + char *id; + struct mosquitto *mosq; + + mosquitto_lib_init(); + + if (client_id_generate(&id)) { + return NULL; + } + + mosq = mosquitto_new(id, true, NULL); + if (!mosq) { + switch (errno) { + case ENOMEM: + fprintf(stderr, "Error: Out of memory.\n"); + break; + case EINVAL: + fprintf(stderr, "Error: Invalid id and/or clean_session.\n"); + break; + } + mosquitto_lib_cleanup(); + return NULL; + } + + int protocol_version = MQTT_PROTOCOL_V311; + + mosquitto_max_inflight_messages_set(mosq, 20); + mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, &protocol_version); + + rc = mosquitto_connect(mosq, host, port, 60); + if (rc > 0) { + if (rc == MOSQ_ERR_ERRNO) { + fprintf(stderr, "Error: %s\n", strerror(errno)); + } else { + fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc)); + } + mosquitto_lib_cleanup(); + return NULL; + } + + return mosq; +} + +void shutdown_broker(struct mosquitto *mosq) { + mosquitto_destroy(mosq); + mosquitto_lib_cleanup(); +} diff --git a/mfi-mqtt/mosquitto.h b/mfi-mqtt/mosquitto.h new file mode 100644 index 0000000..5698b7c --- /dev/null +++ b/mfi-mqtt/mosquitto.h @@ -0,0 +1,4 @@ +#include + +struct mosquitto * connect_to_broker(char *, int); +void shutdown_broker(struct mosquitto *); diff --git a/mfi-mqtt/mosquitto.o b/mfi-mqtt/mosquitto.o new file mode 100644 index 0000000..9e9a78c Binary files /dev/null and b/mfi-mqtt/mosquitto.o differ diff --git a/mfi-mqtt/reporting.c b/mfi-mqtt/reporting.c new file mode 100644 index 0000000..7b4131a --- /dev/null +++ b/mfi-mqtt/reporting.c @@ -0,0 +1,143 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "reporting.h" + +power_statistics * init_power_statistics(int relay_num) { + power_statistics *stats = malloc(sizeof(power_statistics)); + stats->relay_num = relay_num; + stats->engaged = calloc(strlen("false") + 1, sizeof(char)); + return stats; +} + +void free_power_statistics(power_statistics *stats) { + free(stats->engaged); + free(stats); +} + +bool power_statistics_is_engaged(power_statistics *stats) { + return strcmp(stats->engaged, "on") == 0; +} + +int power_statistics_load_from_file(power_statistics *stats) { + FILE *fp; + char filename[12]; + + snprintf(filename, 12, "/dev/power%d", stats->relay_num); + + fp = fopen(filename, "r"); + if (!fp) { + return 1; + } + + if (fscanf(fp, "%s %f\n %f\n %f\n %f\n %f\n", + stats->engaged, + &stats->active_power, + &stats->energy_sum, + &stats->current_rms, + &stats->voltage_rms, + &stats->power_factor) != 6) { + fclose(fp); + return 1; + } + + fclose(fp); + return 0; +} + +int get_output_count() { + return access("/dev/power8", F_OK) != -1 ? 8 : 3; +} + +int get_stats_all_outputs(power_statistics ***out_stats) { + int i; + int output_count; + power_statistics *stat; + power_statistics **stats; + + output_count = get_output_count(); + stats = malloc(sizeof(power_statistics *) * (output_count + 1)); + + for (i = output_count; i > 0; i--) { + stat = init_power_statistics(i); + + if (power_statistics_load_from_file(stat)) { + continue; + } + + stats[i] = stat; + } + + *out_stats = stats; + + return output_count; +} + +json_object * format_power_satistics_output_json(power_statistics *stats) { + json_object *top_object; + + top_object = json_object_new_object(); + json_object_object_add(top_object, "output", json_object_new_int(stats->relay_num)); + json_object_object_add(top_object, "engaged", json_object_new_boolean(power_statistics_is_engaged(stats))); + json_object_object_add(top_object, "active_power", json_object_new_double(stats->active_power)); + json_object_object_add(top_object, "energy_sum", json_object_new_double(stats->energy_sum)); + json_object_object_add(top_object, "current_rms", json_object_new_double(stats->current_rms)); + json_object_object_add(top_object, "voltage_rms", json_object_new_double(stats->voltage_rms)); + json_object_object_add(top_object, "power_factor", json_object_new_double(stats->power_factor)); + + return top_object; +} + +char * format_report_all_outputs(power_statistics **stats, int report_count) { + int i; + char *output; + char hostname[256]; + char ip_address[15]; + const char *tmp; + json_object *top_object, *report_array; + + memset(hostname, 0, sizeof(hostname)); + gethostname(hostname, 255); + + memset(ip_address, 0, sizeof(ip_address)); + get_primary_ip_address(ip_address); + + report_array = json_object_new_array(); + top_object = json_object_new_object(); + json_object_object_add(top_object, "hostname", json_object_new_string(hostname)); + json_object_object_add(top_object, "ip_address", json_object_new_string(ip_address)); + json_object_object_add(top_object, "reports", report_array); + + for (i = report_count; i > 0; i--) { + json_object_array_add(report_array, format_power_satistics_output_json(stats[i])); + free_power_statistics(stats[i]); + } + + tmp = json_object_to_json_string_ext(top_object, JSON_C_TO_STRING_PLAIN); + output = strdup(tmp); + json_object_put(top_object); + + return output; +} + +void get_primary_ip_address(char ip_address[15]) +{ + int fd; + struct ifreq ifr; + + fd = socket(AF_INET, SOCK_DGRAM, 0); + memcpy(ifr.ifr_name, "ath0", IFNAMSIZ-1); + ioctl(fd, SIOCGIFADDR, &ifr); + close(fd); + + strncpy(ip_address, inet_ntoa(((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr), 15); +} diff --git a/mfi-mqtt/reporting.h b/mfi-mqtt/reporting.h new file mode 100644 index 0000000..1ed0448 --- /dev/null +++ b/mfi-mqtt/reporting.h @@ -0,0 +1,14 @@ +typedef struct power_statistics { + int relay_num; + char *engaged; + float active_power; + float energy_sum; + float current_rms; + float voltage_rms; + float power_factor; +} power_statistics; + +int get_output_count(); +void get_primary_ip_address(char[15]); +int get_stats_all_outputs(power_statistics ***); +char * format_report_all_outputs(power_statistics **, int); diff --git a/mfi-mqtt/reporting.o b/mfi-mqtt/reporting.o new file mode 100644 index 0000000..bd257a3 Binary files /dev/null and b/mfi-mqtt/reporting.o differ -- cgit v1.2.3