1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2019 - 2022 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25 #include <private-lib-core.h>
26
27 static void
secstream_mqtt_cleanup(lws_ss_handle_t *h)28 secstream_mqtt_cleanup(lws_ss_handle_t *h)
29 {
30 uint32_t i;
31
32 if (h->u.mqtt.heap_baggage) {
33 lws_free(h->u.mqtt.heap_baggage);
34 h->u.mqtt.heap_baggage = NULL;
35 }
36
37 if (h->u.mqtt.sub_info.topic) {
38 for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) {
39 if (h->u.mqtt.sub_info.topic[i].name) {
40 lws_free((void*)h->u.mqtt.sub_info.topic[i].name);
41 h->u.mqtt.sub_info.topic[i].name = NULL;
42 }
43 }
44 lws_free(h->u.mqtt.sub_info.topic);
45 h->u.mqtt.sub_info.topic = NULL;
46 }
47 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
48 }
49
50 static int
secstream_mqtt_subscribe(struct lws *wsi)51 secstream_mqtt_subscribe(struct lws *wsi)
52 {
53 size_t used_in, used_out, topic_limit;
54 lws_strexp_t exp;
55 char* expbuf;
56 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
57
58 if (!h || !h->policy)
59 return -1;
60
61 if (h->policy->u.mqtt.aws_iot)
62 topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
63 else
64 topic_limit = LWS_MQTT_MAX_TOPICLEN;
65
66 if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe)
67 return 0;
68
69 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
70 topic_limit);
71 /*
72 * Expand with no output first to calculate the size of
73 * expanded string then, allocate new buffer and expand
74 * again with the buffer
75 */
76 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
77 strlen(h->policy->u.mqtt.subscribe), &used_in,
78 &used_out) != LSTRX_DONE) {
79 lwsl_err(
80 "%s, failed to expand MQTT subscribe"
81 " topic with no output\n",
82 __func__);
83 return 1;
84 }
85
86 expbuf = lws_malloc(used_out + 1, __func__);
87 if (!expbuf) {
88 lwsl_err(
89 "%s, failed to allocate MQTT subscribe"
90 "topic",
91 __func__);
92 return 1;
93 }
94
95 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
96 used_out + 1);
97
98 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
99 strlen(h->policy->u.mqtt.subscribe), &used_in,
100 &used_out) != LSTRX_DONE) {
101 lwsl_err("%s, failed to expand MQTT subscribe topic\n",
102 __func__);
103 lws_free(expbuf);
104 return 1;
105 }
106 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
107 h->u.mqtt.sub_top.name = expbuf;
108
109 /*
110 * The policy says to subscribe to something, and we
111 * haven't done it yet. Do it using the pre-prepared
112 * string-substituted version of the policy string.
113 */
114
115 lwsl_notice("%s: subscribing %s\n", __func__,
116 h->u.mqtt.sub_top.name);
117
118 h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
119 memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
120 h->u.mqtt.sub_info.num_topics = 1;
121 h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
122 h->u.mqtt.sub_info.topic =
123 lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__);
124 h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
125 h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
126
127 if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
128 lwsl_notice("%s: unable to subscribe", __func__);
129 lws_free(expbuf);
130 h->u.mqtt.sub_top.name = NULL;
131 return -1;
132 }
133 lws_free(expbuf);
134 h->u.mqtt.sub_top.name = NULL;
135
136 /* Expect a SUBACK */
137 if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
138 lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
139 return -1;
140 }
141 return 0;
142 }
143
144 static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, uint32_t payload_len, const char* topic, lws_mqtt_qos_levels_t qos, uint8_t retain, uint8_t dup, int f)145 secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
146 uint32_t payload_len, const char* topic,
147 lws_mqtt_qos_levels_t qos, uint8_t retain, uint8_t dup,
148 int f)
149 {
150 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
151 size_t used_in, used_out, topic_limit;
152 lws_strexp_t exp;
153 char *expbuf;
154 lws_mqtt_publish_param_t mqpp;
155
156 if (h->policy->u.mqtt.aws_iot)
157 topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
158 else
159 topic_limit = LWS_MQTT_MAX_TOPICLEN;
160
161 memset(&mqpp, 0, sizeof(mqpp));
162
163 lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
164 topic_limit);
165
166 if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
167 &used_out) != LSTRX_DONE) {
168 lwsl_err("%s, failed to expand MQTT publish"
169 " topic with no output\n", __func__);
170 return 1;
171 }
172 expbuf = lws_malloc(used_out + 1, __func__);
173 if (!expbuf) {
174 lwsl_err("%s, failed to allocate MQTT publish topic",
175 __func__);
176 return 1;
177 }
178
179 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
180 used_out + 1);
181
182 if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
183 &used_out) != LSTRX_DONE) {
184 lws_free(expbuf);
185 return 1;
186 }
187 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
188 mqpp.topic = (char *)expbuf;
189
190 mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
191 mqpp.packet_id = (uint16_t)(h->txord - 1);
192 mqpp.qos = qos;
193 mqpp.retain = !!retain;
194 mqpp.payload = buf;
195 mqpp.dup = !!dup;
196 if (payload_len)
197 mqpp.payload_len = payload_len;
198 else
199 mqpp.payload_len = (uint32_t)buf_len;
200
201 lwsl_notice("%s: payload len %d\n", __func__,
202 (int)mqpp.payload_len);
203
204 if (lws_mqtt_client_send_publish(wsi, &mqpp,
205 (const char *)buf,
206 (uint32_t)buf_len,
207 f & LWSSS_FLAG_EOM)) {
208 lwsl_notice("%s: failed to publish\n", __func__);
209 lws_free(expbuf);
210 return -1;
211 }
212 lws_free(expbuf);
213
214 if ((mqpp.qos == QOS1 || mqpp.qos == QOS2) && buf_len > 0) {
215 if (lws_buflist_append_segment(&h->u.mqtt.buflist_unacked,
216 buf, buf_len) < 0) {
217 lwsl_notice("%s: failed to store unacked\n", __func__);
218 return -1;
219 }
220 }
221
222 return 0;
223 }
224
225 static int
secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen)226 secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) {
227 lws_strexp_t exp;
228 size_t used_in, used_out = 0;
229 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
230
231 if (h->policy->u.mqtt.birth_message) {
232 lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
233 (char *)buf, buflen);
234 if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
235 strlen(h->policy->u.mqtt.birth_message),
236 &used_in, &used_out) != LSTRX_DONE) {
237 return 1;
238 }
239 }
240 wsi->mqtt->inside_birth = 1;
241 return secstream_mqtt_publish(wsi, buf,
242 used_out, 0, h->policy->u.mqtt.birth_topic,
243 h->policy->u.mqtt.birth_qos,
244 h->policy->u.mqtt.birth_retain, 0,
245 LWSSS_FLAG_EOM);
246 }
247
248 static int
secstream_mqtt_resend(struct lws *wsi, uint8_t *buf)249 secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) {
250 uint8_t *buffered;
251 size_t len;
252 int f = 0, r;
253 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
254
255 len = lws_buflist_next_segment_len(&h->u.mqtt.buflist_unacked,
256 &buffered);
257
258 if (h->u.mqtt.unacked_size <= len)
259 f |= LWSSS_FLAG_EOM;
260
261 if (!len) {
262 /* when the message does not have payload */
263 buffered = buf;
264 } else {
265 h->u.mqtt.unacked_size -= (uint32_t)len;
266 }
267
268 if (wsi->mqtt->inside_birth) {
269 r = secstream_mqtt_publish(wsi, buffered, len, 0,
270 h->policy->u.mqtt.birth_topic,
271 h->policy->u.mqtt.birth_qos,
272 h->policy->u.mqtt.birth_retain,
273 1, f);
274 } else {
275 r = secstream_mqtt_publish(wsi, buffered, len,
276 (uint32_t)h->writeable_len,
277 h->policy->u.mqtt.topic,
278 h->policy->u.mqtt.qos,
279 h->policy->u.mqtt.retain, 1, f);
280 }
281 if (len)
282 lws_buflist_use_segment(&h->u.mqtt.buflist_unacked, len);
283
284 if (r) {
285 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
286 h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
287
288 if (wsi->mqtt->inside_birth) {
289 lwsl_err("%s: %s: failed to send Birth\n", __func__,
290 lws_ss_tag(h));
291 return -1;
292 } else {
293 r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
294 if (r != LWSSSSRET_OK)
295 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
296 }
297 }
298 return 0;
299 }
300
301 static char *
expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len)302 expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len)
303 {
304 lws_strexp_t exp;
305 char *expbuf = NULL;
306 size_t used_in = 0, used_out = 0, post_len = 0;
307
308 memset(&exp, 0, sizeof(exp));
309
310 if (post)
311 post_len = strlen(post);
312
313 if (post_len > max_len)
314 return NULL;
315
316 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, NULL,
317 max_len - post_len);
318
319 if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
320 &used_out) != LSTRX_DONE) {
321 lwsl_err("%s, failed to expand %s", __func__, str);
322
323 return NULL;
324 }
325
326 expbuf = lws_malloc(used_out + 1 + post_len, __func__);
327 if (!expbuf) {
328 lwsl_err("%s, failed to allocate str_exp for %s", __func__, str);
329
330 return NULL;
331 }
332
333 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
334 used_out + 1 + post_len);
335
336 if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
337 &used_out) != LSTRX_DONE) {
338 lwsl_err("%s, failed to expand str_exp %s\n", __func__, str);
339 lws_free(expbuf);
340
341 return NULL;
342 }
343 if (post)
344 strcat(expbuf, post);
345
346 return expbuf;
347 }
348
349 static lws_mqtt_match_topic_return_t
secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic)350 secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic)
351 {
352 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
353 const char *match[] = { LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH,
354 LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH };
355 char *expbuf = NULL;
356 unsigned int i = 0;
357 lws_mqtt_match_topic_return_t ret = LMMTR_TOPIC_NOMATCH;
358
359 if (!topic)
360 return LMMTR_TOPIC_MATCH_ERROR;
361
362 expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN);
363 if (!expbuf) {
364 lwsl_wsi_warn(wsi, "Failed to expand Shadow topic");
365
366 return LMMTR_TOPIC_MATCH_ERROR;
367 }
368 for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) {
369 if (lws_mqtt_is_topic_matched(
370 match[i], expbuf) == LMMTR_TOPIC_MATCH) {
371 ret = LMMTR_TOPIC_MATCH;
372 break;
373 }
374 }
375 lws_free(expbuf);
376
377 return ret;
378 }
379
380 static void
secstream_mqtt_shadow_cleanup(struct lws *wsi)381 secstream_mqtt_shadow_cleanup(struct lws *wsi)
382 {
383 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
384 uint32_t i = 0;
385
386 for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++)
387 lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name);
388
389 h->u.mqtt.shadow_sub.num_topics = 0;
390
391 if (h->u.mqtt.shadow_sub.topic) {
392 lws_free(h->u.mqtt.shadow_sub.topic);
393 h->u.mqtt.shadow_sub.topic = NULL;
394 }
395 }
396
397 static lws_ss_state_return_t
secstream_mqtt_shadow_unsubscribe(struct lws *wsi)398 secstream_mqtt_shadow_unsubscribe(struct lws *wsi)
399 {
400 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
401
402 if (h->u.mqtt.shadow_sub.num_topics == 0) {
403 wsi->mqtt->send_shadow_unsubscribe = 0;
404 wsi->mqtt->inside_shadow = 0;
405 wsi->mqtt->done_shadow_subscribe = 0;
406
407 return LWSSSSRET_OK;
408 }
409
410 if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) {
411 lwsl_wsi_err(wsi, "Failed to send MQTT unsubsribe");
412
413 return LWSSSSRET_DISCONNECT_ME;
414 }
415 /* Expect a UNSUBACK */
416 if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
417 lwsl_wsi_err(wsi, "Unable to set LWS_POLLIN");
418
419 return LWSSSSRET_DISCONNECT_ME;
420 }
421 wsi->mqtt->send_shadow_unsubscribe = 0;
422
423 return LWSSSSRET_OK;
424 }
425
426 static int
secstream_mqtt_shadow_subscribe(struct lws *wsi)427 secstream_mqtt_shadow_subscribe(struct lws *wsi)
428 {
429 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
430 char* expbuf = NULL;
431 const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR,
432 LWS_MQTT_SHADOW_RESP_REJECTED_STR };
433 unsigned int i, suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]);
434
435 if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow)
436 return 0;
437
438 if (h->u.mqtt.shadow_sub.num_topics > 0)
439 secstream_mqtt_shadow_cleanup(wsi);
440
441 memset(&h->u.mqtt.shadow_sub, 0, sizeof(lws_mqtt_subscribe_param_t));
442 h->u.mqtt.shadow_sub.topic = lws_malloc(
443 sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__);
444 if (!h->u.mqtt.shadow_sub.topic) {
445 lwsl_ss_err(h, "Failed to allocate Shadow topics");
446 return -1;
447 }
448 h->u.mqtt.shadow_sub.num_topics = suffixes_len;
449 for (i = 0; i < suffixes_len; i++) {
450 expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i],
451 LWS_MQTT_MAX_AWSIOT_TOPICLEN);
452 if (!expbuf) {
453 lwsl_ss_err(h, "Failed to allocate Shadow topic");
454 secstream_mqtt_shadow_cleanup(wsi);
455
456 return -1;
457 }
458 h->u.mqtt.shadow_sub.topic[i].name = expbuf;
459 h->u.mqtt.shadow_sub.topic[i].qos = h->policy->u.mqtt.qos;
460 }
461 h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1);
462
463 if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) {
464 lwsl_wsi_notice(wsi, "Unable to subscribe Shadow topics");
465
466 return 0;
467 }
468
469 /* Expect a SUBACK */
470 if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
471 lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
472 return -1;
473 }
474 wsi->mqtt->inside_shadow = 1;
475
476 return 0;
477 }
478
479 static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)480 secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
481 void *in, size_t len)
482 {
483 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
484 size_t used_in = 0, used_out = 0, topic_len = 0;
485 lws_mqtt_publish_param_t *pmqpp = NULL;
486 lws_ss_state_return_t r = LWSSSSRET_OK;
487 uint8_t buf[LWS_PRE + 1400];
488 size_t buflen = sizeof(buf) - LWS_PRE;
489 lws_ss_metadata_t *omd = NULL;
490 char *sub_topic = NULL;
491 lws_strexp_t exp;
492 int f = 0;
493
494 memset(buf, 0, sizeof(buf));
495 memset(&exp, 0, sizeof(exp));
496
497 switch (reason) {
498
499 /* because we are protocols[0] ... */
500 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
501 lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
502 in ? (char *)in : "(null)");
503 if (!h)
504 break;
505
506 #if defined(LWS_WITH_CONMON)
507 lws_conmon_ss_json(h);
508 #endif
509
510 r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
511 h->wsi = NULL;
512
513 secstream_mqtt_cleanup(h);
514
515 if (r == LWSSSSRET_DESTROY_ME)
516 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
517
518 r = lws_ss_backoff(h);
519 if (r != LWSSSSRET_OK)
520 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
521
522 break;
523
524 case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
525 if (!h)
526 break;
527 lws_sul_cancel(&h->sul_timeout);
528 #if defined(LWS_WITH_CONMON)
529 lws_conmon_ss_json(h);
530 #endif
531 if (h->ss_dangling_connected)
532 r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
533 else
534 r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
535 if (h->wsi)
536 lws_set_opaque_user_data(h->wsi, NULL);
537 h->wsi = NULL;
538
539 secstream_mqtt_cleanup(h);
540
541 if (r)
542 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
543
544 if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
545 !h->txn_ok && !wsi->a.context->being_destroyed) {
546 r = lws_ss_backoff(h);
547 if (r != LWSSSSRET_OK)
548 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
549 }
550 break;
551
552 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
553 /*
554 * Make sure the handle wsi points to the stream wsi not the
555 * original nwsi, in the case it was migrated
556 */
557 h->wsi = wsi;
558 h->retry = 0;
559 h->seqstate = SSSEQ_CONNECTED;
560
561 if (h->policy->u.mqtt.birth_topic &&
562 !wsi->mqtt->done_birth) {
563 struct lws *nwsi = lws_get_network_wsi(wsi);
564 lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) {
565 if (w != wsi &&
566 (w->mqtt->done_birth || w->mqtt->inside_birth)) {
567 /*
568 * If any Birth was sent out or
569 * is pending on other stream,
570 * skip sending Birth.
571 */
572 wsi->mqtt->done_birth = 1;
573 break;
574 }
575 } lws_end_foreach_ll(w, mux.sibling_list);
576 }
577
578 if (!h->policy->u.mqtt.subscribe ||
579 !h->policy->u.mqtt.subscribe[0]) {
580 /*
581 * If subscribe is empty in the policy, then,
582 * skip sending SUBSCRIBE and signal the user
583 * application.
584 */
585 wsi->mqtt->done_subscribe = 1;
586 } else if (!h->policy->u.mqtt.clean_start &&
587 wsi->mqtt->session_resumed) {
588 wsi->mqtt->inside_resume_session = 1;
589 /*
590 * If the previous session is resumed and Server has
591 * stored session, then, do not subscribe.
592 */
593 if (!secstream_mqtt_subscribe(wsi))
594 wsi->mqtt->done_subscribe = 1;
595 wsi->mqtt->inside_resume_session = 0;
596 } else if (h->policy->u.mqtt.subscribe &&
597 !wsi->mqtt->done_subscribe) {
598 /*
599 * If a subscribe is pending on the stream, then make
600 * sure the SUBSCRIBE is done before signaling the
601 * user application.
602 */
603 lws_callback_on_writable(wsi);
604 break;
605 }
606
607 if (h->policy->u.mqtt.birth_topic &&
608 !wsi->mqtt->done_birth) {
609 /*
610 * If a Birth is pending on the stream, then make
611 * sure the Birth is done before signaling the
612 * user application.
613 */
614 lws_callback_on_writable(wsi);
615 break;
616 }
617 lws_sul_cancel(&h->sul);
618 #if defined(LWS_WITH_SYS_METRICS)
619 /*
620 * If any hanging caliper measurement, dump it, and free any tags
621 */
622 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
623 #endif
624 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
625 if (r != LWSSSSRET_OK)
626 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
627 if (h->policy->u.mqtt.topic)
628 lws_callback_on_writable(wsi);
629 break;
630
631 case LWS_CALLBACK_MQTT_CLIENT_RX:
632 // lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len);
633 if (!h || !h->info.rx)
634 return 0;
635
636 pmqpp = (lws_mqtt_publish_param_t *)in;
637
638 f = 0;
639 if (!pmqpp->payload_pos)
640 f |= LWSSS_FLAG_SOM;
641 if (pmqpp->payload_pos + len == pmqpp->payload_len)
642 f |= LWSSS_FLAG_EOM;
643
644 h->subseq = 1;
645
646 if (wsi->mqtt->inside_shadow) {
647 /*
648 * When Shadow is used, the stream receives multiple
649 * topics including Shadow response, set received
650 * topic on the metadata
651 */
652 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata,
653 NULL, (size_t)-1);
654
655 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
656 strlen(h->policy->u.mqtt.subscribe),
657 &used_in, &used_out) != LSTRX_DONE) {
658 lwsl_err("%s, failed to expand subscribe topic",
659 __func__);
660 return -1;
661 }
662 omd = lws_ss_get_handle_metadata(h, exp.name);
663
664 if (!omd) {
665 lwsl_err("%s, failed to find metadata for subscribe",
666 __func__);
667 return -1;
668 }
669 sub_topic = omd->value__may_own_heap;
670 topic_len = omd->length;
671
672 _lws_ss_set_metadata(omd, exp.name,
673 (const void *)pmqpp->topic,
674 pmqpp->topic_len);
675 }
676
677 r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
678 len, f);
679
680 if (wsi->mqtt->inside_shadow)
681 _lws_ss_set_metadata(omd, exp.name, &sub_topic,
682 topic_len);
683
684 if (r != LWSSSSRET_OK)
685 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
686
687 if (wsi->mqtt->inside_shadow) {
688 size_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR);
689 size_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR);
690 uint32_t i;
691
692 for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) {
693 /*
694 * received response ('/accepted' or 'rejected')
695 * and clean up Shadow operation
696 */
697 if (strncmp(h->u.mqtt.shadow_sub.topic[i].name,
698 pmqpp->topic, pmqpp->topic_len) ||
699 (strlen(pmqpp->topic) < acc_n ||
700 strlen(pmqpp->topic) < rej_n))
701 continue;
702
703 if (!strcmp(pmqpp->topic +
704 (strlen(pmqpp->topic) - acc_n),
705 LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) ||
706 !strcmp(pmqpp->topic +
707 (strlen(pmqpp->topic) - rej_n),
708 LWS_MQTT_SHADOW_RESP_REJECTED_STR)) {
709 lws_sul_cancel(&wsi->mqtt->sul_shadow_wait);
710 wsi->mqtt->send_shadow_unsubscribe = 1;
711 lws_callback_on_writable(wsi);
712
713 return 0;
714 }
715 }
716 }
717 return 0; /* don't passthru */
718
719 case LWS_CALLBACK_MQTT_SUBSCRIBED:
720 if (wsi->mqtt->inside_shadow) {
721 wsi->mqtt->done_shadow_subscribe = 1;
722 lws_callback_on_writable(wsi);
723
724 return 0;
725 }
726 /*
727 * Stream demanded a subscribe without a Birth while connecting, once
728 * done notify CONNECTED event to the application.
729 */
730 if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) {
731 lws_sul_cancel(&h->sul);
732 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
733 if (r != LWSSSSRET_OK)
734 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
735 }
736 wsi->mqtt->done_subscribe = 1;
737 lws_callback_on_writable(wsi);
738 break;
739
740 case LWS_CALLBACK_MQTT_ACK:
741 lws_sul_cancel(&h->sul_timeout);
742 if (h->u.mqtt.send_unacked) {
743 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
744 h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
745 }
746
747 if (wsi->mqtt->inside_birth) {
748 /*
749 * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify
750 * CONNECTED event to the application.
751 */
752 wsi->mqtt->inside_birth = 0;
753 wsi->mqtt->done_birth = 1;
754 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
755 if (r != LWSSSSRET_OK)
756 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
757 lws_callback_on_writable(wsi);
758 break;
759 }
760 r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
761 if (r != LWSSSSRET_OK)
762 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
763 break;
764
765 case LWS_CALLBACK_MQTT_RESEND:
766 lws_sul_cancel(&h->sul_timeout);
767 if (h->u.mqtt.retry_count++ < LWS_MQTT_MAX_PUBLISH_RETRY) {
768 h->u.mqtt.unacked_size =
769 (uint32_t)lws_buflist_total_len(&h->u.mqtt.buflist_unacked);
770 if (h->u.mqtt.unacked_size) {
771 lwsl_notice("%s: %s: resend unacked message (%d/%d) \n",
772 __func__, lws_ss_tag(h),
773 h->u.mqtt.retry_count,
774 LWS_MQTT_MAX_PUBLISH_RETRY);
775 h->u.mqtt.send_unacked = 1;
776 lws_callback_on_writable(wsi);
777 break;
778 }
779 }
780
781 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
782 h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
783
784 if (wsi->mqtt->inside_birth) {
785 lwsl_err("%s: %s: failed to send Birth\n", __func__,
786 lws_ss_tag(h));
787 return -1;
788 }
789
790 r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
791 if (r != LWSSSSRET_OK)
792 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
793 break;
794
795 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
796 {
797 if (!h || !h->info.tx)
798 return 0;
799 lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
800
801 if (h->seqstate != SSSEQ_CONNECTED) {
802 lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
803 break;
804 }
805
806 if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
807 return secstream_mqtt_subscribe(wsi);
808
809 if (h->u.mqtt.send_unacked)
810 return secstream_mqtt_resend(wsi, buf + LWS_PRE);
811
812 if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic)
813 return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen);
814
815 if (h->policy->u.mqtt.aws_iot) {
816 if (secstream_mqtt_is_shadow_matched(wsi,
817 h->policy->u.mqtt.topic) == LMMTR_TOPIC_MATCH) {
818 if (!wsi->mqtt->done_shadow_subscribe)
819 return secstream_mqtt_shadow_subscribe(wsi);
820 if (wsi->mqtt->send_shadow_unsubscribe)
821 return secstream_mqtt_shadow_unsubscribe(wsi);
822 }
823 }
824
825 r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
826 &buflen, &f);
827
828 if (r == LWSSSSRET_TX_DONT_SEND) {
829 if (wsi->mqtt->done_shadow_subscribe) {
830 return secstream_mqtt_shadow_unsubscribe(wsi);
831 }
832 return 0;
833 }
834
835 if (r == LWSSSSRET_DISCONNECT_ME) {
836 lws_mqtt_subscribe_param_t lmsp;
837 if (h->u.mqtt.sub_info.num_topics) {
838 lmsp.num_topics = h->u.mqtt.sub_info.num_topics;
839 lmsp.topic = h->u.mqtt.sub_info.topic;
840 lmsp.packet_id = (uint16_t)(h->txord - 1);
841 if (lws_mqtt_client_send_unsubcribe(wsi,
842 &lmsp)) {
843 lwsl_err("%s, failed to send"
844 " MQTT unsubsribe", __func__);
845 return -1;
846 }
847 return 0;
848 }
849 }
850
851 if (r < 0)
852 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
853
854 if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
855 (uint32_t)h->writeable_len,
856 h->policy->u.mqtt.topic,
857 h->policy->u.mqtt.qos,
858 h->policy->u.mqtt.retain, 0, f) != 0) {
859 r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
860 if (r != LWSSSSRET_OK)
861 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
862 }
863 return 0;
864 }
865
866 case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
867 {
868 struct lws *nwsi = lws_get_network_wsi(wsi);
869
870 if (wsi->mqtt->inside_shadow) {
871 secstream_mqtt_shadow_cleanup(wsi);
872 wsi->mqtt->inside_shadow = 0;
873 wsi->mqtt->done_shadow_subscribe = 0;
874 break;
875 }
876 if (nwsi && (nwsi->mux.child_count == 1))
877 lws_mqtt_client_send_disconnect(nwsi);
878 return -1;
879 }
880
881 case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT:
882 if (!wsi->mqtt)
883 return -1;
884
885 if (wsi->mqtt->inside_shadow) {
886 secstream_mqtt_shadow_cleanup(wsi);
887 wsi->mqtt->inside_shadow = 0;
888 wsi->mqtt->done_shadow_subscribe = 0;
889 lwsl_warn("%s: %s: Unsubscribe (Shadow) timeout.\n",
890 __func__, lws_ss_tag(h));
891 break;
892 }
893
894 if (wsi->mqtt->inside_unsubscribe) {
895 lwsl_warn("%s: %s: Unsubscribe timeout.\n", __func__,
896 lws_ss_tag(h));
897 return -1;
898 }
899 break;
900
901 case LWS_CALLBACK_MQTT_SHADOW_TIMEOUT:
902 if (!wsi->mqtt)
903 return -1;
904
905 if (wsi->mqtt->inside_shadow) {
906 lwsl_warn("%s: %s: Shadow timeout.\n", __func__,
907 lws_ss_tag(h));
908 wsi->mqtt->send_shadow_unsubscribe = 1;
909 lws_callback_on_writable(wsi);
910 }
911 break;
912
913 default:
914 break;
915 }
916
917 return lws_callback_http_dummy(wsi, reason, user, in, len);
918 }
919
920 const struct lws_protocols protocol_secstream_mqtt = {
921 "lws-secstream-mqtt",
922 secstream_mqtt,
923 0, 0, 0, NULL, 0
924 };
925 /*
926 * Munge connect info according to protocol-specific considerations... this
927 * usually means interpreting aux in a protocol-specific way and using the
928 * pieces at connection setup time, eg, http url pieces.
929 *
930 * len bytes of buf can be used for things with scope until after the actual
931 * connect.
932 *
933 * For ws, protocol aux is <url path>;<ws subprotocol name>
934 */
935
936 enum {
937 SSCMM_STRSUB_WILL_TOPIC,
938 SSCMM_STRSUB_WILL_MESSAGE,
939 SSCMM_STRSUB_SUBSCRIBE,
940 SSCMM_STRSUB_TOPIC,
941 SSCMM_STRSUB_BIRTH_TOPIC,
942 SSCMM_STRSUB_BIRTH_MESSAGE
943 };
944
945 static int
secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, struct lws_client_connect_info *i, union lws_ss_contemp *ct)946 secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
947 struct lws_client_connect_info *i,
948 union lws_ss_contemp *ct)
949 {
950 const char *sources[6] = {
951 /* we're going to string-substitute these before use */
952 h->policy->u.mqtt.will_topic,
953 h->policy->u.mqtt.will_message,
954 h->policy->u.mqtt.subscribe,
955 h->policy->u.mqtt.topic,
956 h->policy->u.mqtt.birth_topic,
957 h->policy->u.mqtt.birth_message
958 };
959 size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
960 lws_strexp_t exp;
961 char *ps[6];
962 uint8_t *p = NULL;
963 int n = -1;
964 size_t blen;
965 lws_system_blob_t *b = NULL;
966
967 memset(&ct->ccp, 0, sizeof(ct->ccp));
968 b = lws_system_get_blob(i->context,
969 LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID, 0);
970
971 /* If LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID is set */
972 if (b && (blen = lws_system_blob_get_size(b))) {
973 if (blen > LWS_MQTT_MAX_CIDLEN) {
974 lwsl_err("%s - Client ID too long.\n",
975 __func__);
976 return -1;
977 }
978 p = (uint8_t *)lws_zalloc(blen+1, __func__);
979 if (!p)
980 return -1;
981 n = lws_system_blob_get(b, p, &blen, 0);
982 if (n) {
983 ct->ccp.client_id = NULL;
984 } else {
985 ct->ccp.client_id = (const char *)p;
986 lwsl_notice("%s - Client ID = %s\n",
987 __func__, ct->ccp.client_id);
988 }
989 } else {
990 /* Default (Random) client ID */
991 ct->ccp.client_id = NULL;
992 }
993
994 b = lws_system_get_blob(i->context,
995 LWS_SYSBLOB_TYPE_MQTT_USERNAME, 0);
996
997 /* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
998 if (b && (blen = lws_system_blob_get_size(b))) {
999 p = (uint8_t *)lws_zalloc(blen+1, __func__);
1000 if (!p)
1001 return -1;
1002 n = lws_system_blob_get(b, p, &blen, 0);
1003 if (n) {
1004 ct->ccp.username = NULL;
1005 } else {
1006 ct->ccp.username = (const char *)p;
1007 lwsl_notice("%s - Username ID = %s\n",
1008 __func__, ct->ccp.username);
1009 }
1010 }
1011
1012 b = lws_system_get_blob(i->context,
1013 LWS_SYSBLOB_TYPE_MQTT_PASSWORD, 0);
1014
1015 /* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
1016 if (b && (blen = lws_system_blob_get_size(b))) {
1017 p = (uint8_t *)lws_zalloc(blen+1, __func__);
1018 if (!p)
1019 return -1;
1020 n = lws_system_blob_get(b, p, &blen, 0);
1021 if (n) {
1022 ct->ccp.password = NULL;
1023 } else {
1024 ct->ccp.password = (const char *)p;
1025 lwsl_notice("%s - Password ID = %s\n",
1026 __func__, ct->ccp.password);
1027 }
1028 }
1029
1030 ct->ccp.keep_alive = h->policy->u.mqtt.keep_alive;
1031 ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u);
1032 ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
1033 ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
1034 ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos;
1035 ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain;
1036 ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot;
1037 h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
1038
1039 /*
1040 * We're going to string-substitute several of these parameters, which
1041 * have unknown, possibly large size. And, as their usage is deferred
1042 * inside the asynchronous lifetime of the MQTT connection, they need
1043 * to live on the heap.
1044 *
1045 * Notice these allocations at h->u.mqtt.heap_baggage belong to the
1046 * underlying MQTT stream lifetime, not the logical SS lifetime, and
1047 * are destroyed if present at connection error or close of the
1048 * underlying connection.
1049 *
1050 *
1051 * First, compute the length of each without producing strsubst output,
1052 * and keep a running total.
1053 */
1054
1055 for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
1056 if (!sources[n])
1057 continue;
1058
1059 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
1060 NULL, (size_t)-1);
1061 if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
1062 &used_in, &olen[n]) != LSTRX_DONE) {
1063 lwsl_err("%s: failed to subsitute %s\n", __func__,
1064 sources[n]);
1065 return 1;
1066 }
1067 tot += olen[n] + 1;
1068 }
1069
1070 /*
1071 * Then, allocate enough space on the heap for the total of the
1072 * substituted results
1073 */
1074
1075 h->u.mqtt.heap_baggage = lws_malloc(tot, __func__);
1076 if (!h->u.mqtt.heap_baggage)
1077 return 1;
1078
1079 /*
1080 * Finally, issue the subsitutions one after the other into the single
1081 * allocated result buffer and prepare pointers into them
1082 */
1083
1084 p = h->u.mqtt.heap_baggage;
1085 for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
1086 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
1087 (char *)p, (size_t)-1);
1088 if (!sources[n]) {
1089 ps[n] = NULL;
1090 continue;
1091 }
1092 ps[n] = (char *)p;
1093 if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
1094 &used_in, &olen[n]) != LSTRX_DONE)
1095 return 1;
1096
1097 p += olen[n] + 1;
1098 }
1099
1100 /*
1101 * Point the guys who want the substituted content at the substituted
1102 * strings
1103 */
1104
1105 ct->ccp.will_param.topic = ps[SSCMM_STRSUB_WILL_TOPIC];
1106 ct->ccp.will_param.message = ps[SSCMM_STRSUB_WILL_MESSAGE];
1107 h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE];
1108 h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE];
1109 h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC];
1110 ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC];
1111 ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE];
1112
1113 i->method = "MQTT";
1114 i->mqtt_cp = &ct->ccp;
1115
1116 i->alpn = "x-amzn-mqtt-ca";
1117
1118 /* share connections where possible */
1119 i->ssl_connection |= LCCSCF_PIPELINE;
1120
1121 return 0;
1122 }
1123
1124 const struct ss_pcols ss_pcol_mqtt = {
1125 "MQTT",
1126 "x-amzn-mqtt-ca", //"mqtt/3.1.1",
1127 &protocol_secstream_mqtt,
1128 secstream_connect_munge_mqtt,
1129 NULL, NULL
1130 };
1131