diff options
Diffstat (limited to 'mfi-mqtt/mfi-mqtt.c')
-rw-r--r-- | mfi-mqtt/mfi-mqtt.c | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/mfi-mqtt/mfi-mqtt.c b/mfi-mqtt/mfi-mqtt.c new file mode 100644 index 0000000..25527bc --- /dev/null +++ b/mfi-mqtt/mfi-mqtt.c | |||
@@ -0,0 +1,269 @@ | |||
1 | #include <errno.h> | ||
2 | #include <stdio.h> | ||
3 | #include <stdlib.h> | ||
4 | #include <string.h> | ||
5 | #include <time.h> | ||
6 | #include <unistd.h> | ||
7 | #include <signal.h> | ||
8 | #include <pthread.h> | ||
9 | |||
10 | #include <json-c/json.h> | ||
11 | #include <mosquitto.h> | ||
12 | |||
13 | #include "mosquitto.h" | ||
14 | #include "reporting.h" | ||
15 | |||
16 | static volatile sig_atomic_t closing_time = 0; | ||
17 | |||
18 | typedef struct control_message { | ||
19 | int output; | ||
20 | int state; | ||
21 | } control_message; | ||
22 | |||
23 | control_message * parse_control_message(const struct mosquitto_message *message) { | ||
24 | struct json_tokener *tok; | ||
25 | enum json_tokener_error jerr; | ||
26 | control_message *out_message = NULL; | ||
27 | json_object *msg, *state_key, *output_key; | ||
28 | |||
29 | tok = json_tokener_new(); | ||
30 | msg = json_tokener_parse_ex(tok, message->payload, message->payloadlen); | ||
31 | jerr = json_tokener_get_error(tok); | ||
32 | if (jerr != json_tokener_success) { | ||
33 | fprintf(stderr, "Invalid message format: %s\n", json_tokener_error_desc(jerr)); | ||
34 | goto cleanup; | ||
35 | } | ||
36 | |||
37 | output_key = json_object_object_get(msg, "output"); | ||
38 | if (!output_key) { | ||
39 | fprintf(stderr, "Invalid message format: no output key\n"); | ||
40 | goto cleanup; | ||
41 | } | ||
42 | |||
43 | state_key = json_object_object_get(msg, "state"); | ||
44 | if (!state_key) { | ||
45 | fprintf(stderr, "Invalid message format: no state key\n"); | ||
46 | goto cleanup; | ||
47 | } | ||
48 | |||
49 | out_message = calloc(sizeof(control_message), 1); | ||
50 | out_message->state = json_object_get_int(state_key); | ||
51 | out_message->output = json_object_get_int(output_key); | ||
52 | // Clamp the value to on or off | ||
53 | out_message->state = out_message->state > 0 ? 1 : 0; | ||
54 | |||
55 | cleanup: | ||
56 | if (msg) json_object_put(msg); | ||
57 | json_tokener_free(tok); | ||
58 | |||
59 | return out_message; | ||
60 | } | ||
61 | |||
62 | void set_relay(control_message *msg) { | ||
63 | FILE *f; | ||
64 | char *basename; | ||
65 | char filename[255]; | ||
66 | |||
67 | basename = "/proc/power/relay"; | ||
68 | |||
69 | memset(filename, 0, sizeof(filename)); | ||
70 | snprintf(filename, strlen(basename) + 2, "%s%i", basename, msg->output); | ||
71 | |||
72 | f = fopen(filename, "w"); | ||
73 | if (!f) { | ||
74 | fprintf(stderr, "Failed to open relay file %s\n", filename); | ||
75 | return; | ||
76 | } | ||
77 | |||
78 | fprintf(f, "%i\n", msg->state); | ||
79 | fclose(f); | ||
80 | } | ||
81 | |||
82 | void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) | ||
83 | { | ||
84 | control_message *msg; | ||
85 | |||
86 | msg = parse_control_message(message); | ||
87 | if (!msg) { | ||
88 | return; | ||
89 | } | ||
90 | |||
91 | if (msg->output > get_output_count() || msg->output <= 0) { | ||
92 | fprintf(stderr, "Invalid output number: %i\n", msg->output); | ||
93 | goto cleanup; | ||
94 | } | ||
95 | |||
96 | fprintf(stderr, "Set output %i to %i\n", msg->output, msg->state); | ||
97 | set_relay(msg); | ||
98 | |||
99 | cleanup: | ||
100 | free(msg); | ||
101 | } | ||
102 | |||
103 | void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags) | ||
104 | { | ||
105 | char *topic; | ||
106 | char *prefix; | ||
107 | char hostname[255]; | ||
108 | |||
109 | memset(hostname, 0, sizeof(hostname)); | ||
110 | gethostname(hostname, 255); | ||
111 | |||
112 | prefix = "/mfi/devices"; | ||
113 | topic = calloc(sizeof(char), strlen(prefix) + strlen(hostname) + 2); | ||
114 | sprintf(topic, "%s/%s", prefix, hostname); | ||
115 | |||
116 | fprintf(stderr, "Subscribed to topic %s\n", topic); | ||
117 | |||
118 | if (!result) { | ||
119 | mosquitto_subscribe(mosq, NULL, topic, 0); | ||
120 | } else { | ||
121 | fprintf(stderr, "%s\n", mosquitto_connack_string(result)); | ||
122 | mosquitto_disconnect(mosq); | ||
123 | } | ||
124 | } | ||
125 | |||
126 | static void signal_handler(int signo) { | ||
127 | closing_time = 1; | ||
128 | } | ||
129 | |||
130 | void set_signal_handler() { | ||
131 | struct sigaction action; | ||
132 | |||
133 | action.sa_handler = signal_handler; | ||
134 | action.sa_flags = SA_SIGINFO | SA_RESTART; | ||
135 | |||
136 | if (sigaction(SIGINT, &action, NULL) == -1) { | ||
137 | perror("Error connecting signal"); | ||
138 | exit(-1); | ||
139 | } | ||
140 | } | ||
141 | |||
142 | void mask_sig() { | ||
143 | sigset_t mask; | ||
144 | sigemptyset(&mask); | ||
145 | sigaddset(&mask, SIGINT); | ||
146 | pthread_sigmask(SIG_BLOCK, &mask, NULL); | ||
147 | } | ||
148 | |||
149 | void * control_thread(struct mosquitto *mosq) { | ||
150 | mask_sig(); | ||
151 | mosquitto_loop_forever(mosq, 1000*86400, 1); | ||
152 | return NULL; | ||
153 | } | ||
154 | |||
155 | void * pinger_thread(struct mosquitto *mosq) { | ||
156 | power_statistics **stats; | ||
157 | int report_count; | ||
158 | char *output2; | ||
159 | |||
160 | mask_sig(); | ||
161 | |||
162 | do { | ||
163 | if (closing_time) { | ||
164 | break; | ||
165 | } | ||
166 | |||
167 | report_count = get_stats_all_outputs(&stats); | ||
168 | output2 = format_report_all_outputs(stats, report_count); | ||
169 | |||
170 | mosquitto_publish(mosq, NULL, "/mfi/reports", strlen(output2), output2, 0, false); | ||
171 | |||
172 | free(output2); | ||
173 | free(stats); | ||
174 | |||
175 | sleep(2); | ||
176 | } while(true); | ||
177 | |||
178 | return NULL; | ||
179 | } | ||
180 | |||
181 | // Stop all the blinking! | ||
182 | void * light_management_thread(void *arg) { | ||
183 | FILE *light_fp; | ||
184 | FILE *freq_fp; | ||
185 | |||
186 | do { | ||
187 | if (closing_time) { | ||
188 | break; | ||
189 | } | ||
190 | |||
191 | light_fp = fopen("/proc/led/status", "w"); | ||
192 | if (light_fp) { | ||
193 | fprintf(light_fp, "1\n"); | ||
194 | fclose(light_fp); | ||
195 | } | ||
196 | |||
197 | freq_fp = fopen("/proc/led/freq", "w"); | ||
198 | if (freq_fp) { | ||
199 | fprintf(freq_fp, "0\n"); | ||
200 | fclose(freq_fp); | ||
201 | } | ||
202 | |||
203 | sleep(1); | ||
204 | } while(true); | ||
205 | |||
206 | return NULL; | ||
207 | } | ||
208 | |||
209 | // Doing this the "C way" is a real pain in the ass, just shell out and forget | ||
210 | // about it | ||
211 | void cleanup_crap_processes() { | ||
212 | // Otherwise init will continue to respawn them | ||
213 | system("sed -i " | ||
214 | "-e '/ubnt-websockets/s/^/#/' " | ||
215 | "-e '/telnetd/s/^/#/' " | ||
216 | "-e '/mca[-d]/s/^/#/' " | ||
217 | "-e '/lighttpd/s/^/#/' " | ||
218 | "/etc/inittab"); | ||
219 | |||
220 | system("kill -HUP 1"); | ||
221 | |||
222 | // Most of these kill cleanly but a few are stubborn so don't ask, tell. | ||
223 | system("pkill -9 ubnt-websockets"); | ||
224 | system("pkill -9 lighttpd"); | ||
225 | system("pkill upnpd"); | ||
226 | system("pkill telnetd"); | ||
227 | system("pkill mca-monitor"); | ||
228 | system("pkill mcad"); | ||
229 | system("pkill avahi-daemon"); | ||
230 | } | ||
231 | |||
232 | int main(int argc, char *argv[]) | ||
233 | { | ||
234 | struct mosquitto *mosq; | ||
235 | pthread_t pinger_thread_h, control_thread_h, light_management_thread_h; | ||
236 | |||
237 | cleanup_crap_processes(); | ||
238 | |||
239 | set_signal_handler(); | ||
240 | |||
241 | mosq = connect_to_broker("172.16.0.191", 1883); | ||
242 | if (!mosq) { | ||
243 | return 1; | ||
244 | } | ||
245 | fprintf(stderr, "Connected to broker\n"); | ||
246 | |||
247 | mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback); | ||
248 | mosquitto_message_callback_set(mosq, my_message_callback); | ||
249 | |||
250 | pthread_create(&pinger_thread_h, NULL, (void * (*)(void *))pinger_thread, mosq); | ||
251 | pthread_create(&control_thread_h, NULL, (void * (*)(void *))control_thread, mosq); | ||
252 | pthread_create(&light_management_thread_h, NULL, (void * (*)(void *))light_management_thread, NULL); | ||
253 | |||
254 | do { | ||
255 | sleep(2); | ||
256 | if (closing_time) { | ||
257 | fprintf(stderr, "Shutting down\n"); | ||
258 | |||
259 | mosquitto_disconnect(mosq); | ||
260 | pthread_join(pinger_thread_h, NULL); | ||
261 | pthread_join(control_thread_h, NULL); | ||
262 | break; | ||
263 | } | ||
264 | } while(true); | ||
265 | |||
266 | shutdown_broker(mosq); | ||
267 | |||
268 | return 0; | ||
269 | } | ||