1/* 2 * libwebsockets - small server side websockets and web server implementation 3 * 4 * Copyright (C) 2019 - 2022 Andy Green <andy@warmcat.com> 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to 8 * deal in the Software without restriction, including without limitation the 9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10 * sell copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in 14 * all copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22 * IN THE SOFTWARE. 23 */ 24 25#include <private-lib-core.h> 26 27static void 28secstream_mqtt_cleanup(lws_ss_handle_t *h) 29{ 30 uint32_t i; 31 32 if (h->u.mqtt.heap_baggage) { 33 lws_free(h->u.mqtt.heap_baggage); 34 h->u.mqtt.heap_baggage = NULL; 35 } 36 37 if (h->u.mqtt.sub_info.topic) { 38 for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) { 39 if (h->u.mqtt.sub_info.topic[i].name) { 40 lws_free((void*)h->u.mqtt.sub_info.topic[i].name); 41 h->u.mqtt.sub_info.topic[i].name = NULL; 42 } 43 } 44 lws_free(h->u.mqtt.sub_info.topic); 45 h->u.mqtt.sub_info.topic = NULL; 46 } 47 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 48} 49 50static int 51secstream_mqtt_subscribe(struct lws *wsi) 52{ 53 size_t used_in, used_out, topic_limit; 54 lws_strexp_t exp; 55 char* expbuf; 56 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 57 58 if (!h || !h->policy) 59 return -1; 60 61 if (h->policy->u.mqtt.aws_iot) 62 topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; 63 else 64 topic_limit = LWS_MQTT_MAX_TOPICLEN; 65 66 if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe) 67 return 0; 68 69 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL, 70 topic_limit); 71 /* 72 * Expand with no output first to calculate the size of 73 * expanded string then, allocate new buffer and expand 74 * again with the buffer 75 */ 76 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, 77 strlen(h->policy->u.mqtt.subscribe), &used_in, 78 &used_out) != LSTRX_DONE) { 79 lwsl_err( 80 "%s, failed to expand MQTT subscribe" 81 " topic with no output\n", 82 __func__); 83 return 1; 84 } 85 86 expbuf = lws_malloc(used_out + 1, __func__); 87 if (!expbuf) { 88 lwsl_err( 89 "%s, failed to allocate MQTT subscribe" 90 "topic", 91 __func__); 92 return 1; 93 } 94 95 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf, 96 used_out + 1); 97 98 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, 99 strlen(h->policy->u.mqtt.subscribe), &used_in, 100 &used_out) != LSTRX_DONE) { 101 lwsl_err("%s, failed to expand MQTT subscribe topic\n", 102 __func__); 103 lws_free(expbuf); 104 return 1; 105 } 106 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); 107 h->u.mqtt.sub_top.name = expbuf; 108 109 /* 110 * The policy says to subscribe to something, and we 111 * haven't done it yet. Do it using the pre-prepared 112 * string-substituted version of the policy string. 113 */ 114 115 lwsl_notice("%s: subscribing %s\n", __func__, 116 h->u.mqtt.sub_top.name); 117 118 h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos; 119 memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info)); 120 h->u.mqtt.sub_info.num_topics = 1; 121 h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top; 122 h->u.mqtt.sub_info.topic = 123 lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__); 124 h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf); 125 h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos; 126 127 if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) { 128 lwsl_notice("%s: unable to subscribe", __func__); 129 lws_free(expbuf); 130 h->u.mqtt.sub_top.name = NULL; 131 return -1; 132 } 133 lws_free(expbuf); 134 h->u.mqtt.sub_top.name = NULL; 135 136 /* Expect a SUBACK */ 137 if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { 138 lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); 139 return -1; 140 } 141 return 0; 142} 143 144static int 145secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, 146 uint32_t payload_len, const char* topic, 147 lws_mqtt_qos_levels_t qos, uint8_t retain, uint8_t dup, 148 int f) 149{ 150 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 151 size_t used_in, used_out, topic_limit; 152 lws_strexp_t exp; 153 char *expbuf; 154 lws_mqtt_publish_param_t mqpp; 155 156 if (h->policy->u.mqtt.aws_iot) 157 topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; 158 else 159 topic_limit = LWS_MQTT_MAX_TOPICLEN; 160 161 memset(&mqpp, 0, sizeof(mqpp)); 162 163 lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL, 164 topic_limit); 165 166 if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in, 167 &used_out) != LSTRX_DONE) { 168 lwsl_err("%s, failed to expand MQTT publish" 169 " topic with no output\n", __func__); 170 return 1; 171 } 172 expbuf = lws_malloc(used_out + 1, __func__); 173 if (!expbuf) { 174 lwsl_err("%s, failed to allocate MQTT publish topic", 175 __func__); 176 return 1; 177 } 178 179 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, 180 used_out + 1); 181 182 if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in, 183 &used_out) != LSTRX_DONE) { 184 lws_free(expbuf); 185 return 1; 186 } 187 lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); 188 mqpp.topic = (char *)expbuf; 189 190 mqpp.topic_len = (uint16_t)strlen(mqpp.topic); 191 mqpp.packet_id = (uint16_t)(h->txord - 1); 192 mqpp.qos = qos; 193 mqpp.retain = !!retain; 194 mqpp.payload = buf; 195 mqpp.dup = !!dup; 196 if (payload_len) 197 mqpp.payload_len = payload_len; 198 else 199 mqpp.payload_len = (uint32_t)buf_len; 200 201 lwsl_notice("%s: payload len %d\n", __func__, 202 (int)mqpp.payload_len); 203 204 if (lws_mqtt_client_send_publish(wsi, &mqpp, 205 (const char *)buf, 206 (uint32_t)buf_len, 207 f & LWSSS_FLAG_EOM)) { 208 lwsl_notice("%s: failed to publish\n", __func__); 209 lws_free(expbuf); 210 return -1; 211 } 212 lws_free(expbuf); 213 214 if ((mqpp.qos == QOS1 || mqpp.qos == QOS2) && buf_len > 0) { 215 if (lws_buflist_append_segment(&h->u.mqtt.buflist_unacked, 216 buf, buf_len) < 0) { 217 lwsl_notice("%s: failed to store unacked\n", __func__); 218 return -1; 219 } 220 } 221 222 return 0; 223} 224 225static int 226secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) { 227 lws_strexp_t exp; 228 size_t used_in, used_out = 0; 229 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 230 231 if (h->policy->u.mqtt.birth_message) { 232 lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, 233 (char *)buf, buflen); 234 if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message, 235 strlen(h->policy->u.mqtt.birth_message), 236 &used_in, &used_out) != LSTRX_DONE) { 237 return 1; 238 } 239 } 240 wsi->mqtt->inside_birth = 1; 241 return secstream_mqtt_publish(wsi, buf, 242 used_out, 0, h->policy->u.mqtt.birth_topic, 243 h->policy->u.mqtt.birth_qos, 244 h->policy->u.mqtt.birth_retain, 0, 245 LWSSS_FLAG_EOM); 246} 247 248static int 249secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) { 250 uint8_t *buffered; 251 size_t len; 252 int f = 0, r; 253 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 254 255 len = lws_buflist_next_segment_len(&h->u.mqtt.buflist_unacked, 256 &buffered); 257 258 if (h->u.mqtt.unacked_size <= len) 259 f |= LWSSS_FLAG_EOM; 260 261 if (!len) { 262 /* when the message does not have payload */ 263 buffered = buf; 264 } else { 265 h->u.mqtt.unacked_size -= (uint32_t)len; 266 } 267 268 if (wsi->mqtt->inside_birth) { 269 r = secstream_mqtt_publish(wsi, buffered, len, 0, 270 h->policy->u.mqtt.birth_topic, 271 h->policy->u.mqtt.birth_qos, 272 h->policy->u.mqtt.birth_retain, 273 1, f); 274 } else { 275 r = secstream_mqtt_publish(wsi, buffered, len, 276 (uint32_t)h->writeable_len, 277 h->policy->u.mqtt.topic, 278 h->policy->u.mqtt.qos, 279 h->policy->u.mqtt.retain, 1, f); 280 } 281 if (len) 282 lws_buflist_use_segment(&h->u.mqtt.buflist_unacked, len); 283 284 if (r) { 285 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 286 h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; 287 288 if (wsi->mqtt->inside_birth) { 289 lwsl_err("%s: %s: failed to send Birth\n", __func__, 290 lws_ss_tag(h)); 291 return -1; 292 } else { 293 r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); 294 if (r != LWSSSSRET_OK) 295 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 296 } 297 } 298 return 0; 299} 300 301static char * 302expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len) 303{ 304 lws_strexp_t exp; 305 char *expbuf = NULL; 306 size_t used_in = 0, used_out = 0, post_len = 0; 307 308 memset(&exp, 0, sizeof(exp)); 309 310 if (post) 311 post_len = strlen(post); 312 313 if (post_len > max_len) 314 return NULL; 315 316 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, NULL, 317 max_len - post_len); 318 319 if (lws_strexp_expand(&exp, str, strlen(str), &used_in, 320 &used_out) != LSTRX_DONE) { 321 lwsl_err("%s, failed to expand %s", __func__, str); 322 323 return NULL; 324 } 325 326 expbuf = lws_malloc(used_out + 1 + post_len, __func__); 327 if (!expbuf) { 328 lwsl_err("%s, failed to allocate str_exp for %s", __func__, str); 329 330 return NULL; 331 } 332 333 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf, 334 used_out + 1 + post_len); 335 336 if (lws_strexp_expand(&exp, str, strlen(str), &used_in, 337 &used_out) != LSTRX_DONE) { 338 lwsl_err("%s, failed to expand str_exp %s\n", __func__, str); 339 lws_free(expbuf); 340 341 return NULL; 342 } 343 if (post) 344 strcat(expbuf, post); 345 346 return expbuf; 347} 348 349static lws_mqtt_match_topic_return_t 350secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic) 351{ 352 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 353 const char *match[] = { LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH, 354 LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH }; 355 char *expbuf = NULL; 356 unsigned int i = 0; 357 lws_mqtt_match_topic_return_t ret = LMMTR_TOPIC_NOMATCH; 358 359 if (!topic) 360 return LMMTR_TOPIC_MATCH_ERROR; 361 362 expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN); 363 if (!expbuf) { 364 lwsl_wsi_warn(wsi, "Failed to expand Shadow topic"); 365 366 return LMMTR_TOPIC_MATCH_ERROR; 367 } 368 for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) { 369 if (lws_mqtt_is_topic_matched( 370 match[i], expbuf) == LMMTR_TOPIC_MATCH) { 371 ret = LMMTR_TOPIC_MATCH; 372 break; 373 } 374 } 375 lws_free(expbuf); 376 377 return ret; 378} 379 380static void 381secstream_mqtt_shadow_cleanup(struct lws *wsi) 382{ 383 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 384 uint32_t i = 0; 385 386 for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) 387 lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name); 388 389 h->u.mqtt.shadow_sub.num_topics = 0; 390 391 if (h->u.mqtt.shadow_sub.topic) { 392 lws_free(h->u.mqtt.shadow_sub.topic); 393 h->u.mqtt.shadow_sub.topic = NULL; 394 } 395} 396 397static lws_ss_state_return_t 398secstream_mqtt_shadow_unsubscribe(struct lws *wsi) 399{ 400 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 401 402 if (h->u.mqtt.shadow_sub.num_topics == 0) { 403 wsi->mqtt->send_shadow_unsubscribe = 0; 404 wsi->mqtt->inside_shadow = 0; 405 wsi->mqtt->done_shadow_subscribe = 0; 406 407 return LWSSSSRET_OK; 408 } 409 410 if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) { 411 lwsl_wsi_err(wsi, "Failed to send MQTT unsubsribe"); 412 413 return LWSSSSRET_DISCONNECT_ME; 414 } 415 /* Expect a UNSUBACK */ 416 if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { 417 lwsl_wsi_err(wsi, "Unable to set LWS_POLLIN"); 418 419 return LWSSSSRET_DISCONNECT_ME; 420 } 421 wsi->mqtt->send_shadow_unsubscribe = 0; 422 423 return LWSSSSRET_OK; 424} 425 426static int 427secstream_mqtt_shadow_subscribe(struct lws *wsi) 428{ 429 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 430 char* expbuf = NULL; 431 const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR, 432 LWS_MQTT_SHADOW_RESP_REJECTED_STR }; 433 unsigned int i, suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]); 434 435 if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow) 436 return 0; 437 438 if (h->u.mqtt.shadow_sub.num_topics > 0) 439 secstream_mqtt_shadow_cleanup(wsi); 440 441 memset(&h->u.mqtt.shadow_sub, 0, sizeof(lws_mqtt_subscribe_param_t)); 442 h->u.mqtt.shadow_sub.topic = lws_malloc( 443 sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__); 444 if (!h->u.mqtt.shadow_sub.topic) { 445 lwsl_ss_err(h, "Failed to allocate Shadow topics"); 446 return -1; 447 } 448 h->u.mqtt.shadow_sub.num_topics = suffixes_len; 449 for (i = 0; i < suffixes_len; i++) { 450 expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i], 451 LWS_MQTT_MAX_AWSIOT_TOPICLEN); 452 if (!expbuf) { 453 lwsl_ss_err(h, "Failed to allocate Shadow topic"); 454 secstream_mqtt_shadow_cleanup(wsi); 455 456 return -1; 457 } 458 h->u.mqtt.shadow_sub.topic[i].name = expbuf; 459 h->u.mqtt.shadow_sub.topic[i].qos = h->policy->u.mqtt.qos; 460 } 461 h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1); 462 463 if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) { 464 lwsl_wsi_notice(wsi, "Unable to subscribe Shadow topics"); 465 466 return 0; 467 } 468 469 /* Expect a SUBACK */ 470 if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { 471 lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); 472 return -1; 473 } 474 wsi->mqtt->inside_shadow = 1; 475 476 return 0; 477} 478 479static int 480secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, 481 void *in, size_t len) 482{ 483 lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 484 size_t used_in = 0, used_out = 0, topic_len = 0; 485 lws_mqtt_publish_param_t *pmqpp = NULL; 486 lws_ss_state_return_t r = LWSSSSRET_OK; 487 uint8_t buf[LWS_PRE + 1400]; 488 size_t buflen = sizeof(buf) - LWS_PRE; 489 lws_ss_metadata_t *omd = NULL; 490 char *sub_topic = NULL; 491 lws_strexp_t exp; 492 int f = 0; 493 494 memset(buf, 0, sizeof(buf)); 495 memset(&exp, 0, sizeof(exp)); 496 497 switch (reason) { 498 499 /* because we are protocols[0] ... */ 500 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: 501 lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__, 502 in ? (char *)in : "(null)"); 503 if (!h) 504 break; 505 506#if defined(LWS_WITH_CONMON) 507 lws_conmon_ss_json(h); 508#endif 509 510 r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE); 511 h->wsi = NULL; 512 513 secstream_mqtt_cleanup(h); 514 515 if (r == LWSSSSRET_DESTROY_ME) 516 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 517 518 r = lws_ss_backoff(h); 519 if (r != LWSSSSRET_OK) 520 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 521 522 break; 523 524 case LWS_CALLBACK_MQTT_CLIENT_CLOSED: 525 if (!h) 526 break; 527 lws_sul_cancel(&h->sul_timeout); 528#if defined(LWS_WITH_CONMON) 529 lws_conmon_ss_json(h); 530#endif 531 if (h->ss_dangling_connected) 532 r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED); 533 else 534 r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE); 535 if (h->wsi) 536 lws_set_opaque_user_data(h->wsi, NULL); 537 h->wsi = NULL; 538 539 secstream_mqtt_cleanup(h); 540 541 if (r) 542 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 543 544 if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) && 545 !h->txn_ok && !wsi->a.context->being_destroyed) { 546 r = lws_ss_backoff(h); 547 if (r != LWSSSSRET_OK) 548 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 549 } 550 break; 551 552 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED: 553 /* 554 * Make sure the handle wsi points to the stream wsi not the 555 * original nwsi, in the case it was migrated 556 */ 557 h->wsi = wsi; 558 h->retry = 0; 559 h->seqstate = SSSEQ_CONNECTED; 560 561 if (h->policy->u.mqtt.birth_topic && 562 !wsi->mqtt->done_birth) { 563 struct lws *nwsi = lws_get_network_wsi(wsi); 564 lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) { 565 if (w != wsi && 566 (w->mqtt->done_birth || w->mqtt->inside_birth)) { 567 /* 568 * If any Birth was sent out or 569 * is pending on other stream, 570 * skip sending Birth. 571 */ 572 wsi->mqtt->done_birth = 1; 573 break; 574 } 575 } lws_end_foreach_ll(w, mux.sibling_list); 576 } 577 578 if (!h->policy->u.mqtt.subscribe || 579 !h->policy->u.mqtt.subscribe[0]) { 580 /* 581 * If subscribe is empty in the policy, then, 582 * skip sending SUBSCRIBE and signal the user 583 * application. 584 */ 585 wsi->mqtt->done_subscribe = 1; 586 } else if (!h->policy->u.mqtt.clean_start && 587 wsi->mqtt->session_resumed) { 588 wsi->mqtt->inside_resume_session = 1; 589 /* 590 * If the previous session is resumed and Server has 591 * stored session, then, do not subscribe. 592 */ 593 if (!secstream_mqtt_subscribe(wsi)) 594 wsi->mqtt->done_subscribe = 1; 595 wsi->mqtt->inside_resume_session = 0; 596 } else if (h->policy->u.mqtt.subscribe && 597 !wsi->mqtt->done_subscribe) { 598 /* 599 * If a subscribe is pending on the stream, then make 600 * sure the SUBSCRIBE is done before signaling the 601 * user application. 602 */ 603 lws_callback_on_writable(wsi); 604 break; 605 } 606 607 if (h->policy->u.mqtt.birth_topic && 608 !wsi->mqtt->done_birth) { 609 /* 610 * If a Birth is pending on the stream, then make 611 * sure the Birth is done before signaling the 612 * user application. 613 */ 614 lws_callback_on_writable(wsi); 615 break; 616 } 617 lws_sul_cancel(&h->sul); 618#if defined(LWS_WITH_SYS_METRICS) 619 /* 620 * If any hanging caliper measurement, dump it, and free any tags 621 */ 622 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL); 623#endif 624 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); 625 if (r != LWSSSSRET_OK) 626 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 627 if (h->policy->u.mqtt.topic) 628 lws_callback_on_writable(wsi); 629 break; 630 631 case LWS_CALLBACK_MQTT_CLIENT_RX: 632 // lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len); 633 if (!h || !h->info.rx) 634 return 0; 635 636 pmqpp = (lws_mqtt_publish_param_t *)in; 637 638 f = 0; 639 if (!pmqpp->payload_pos) 640 f |= LWSSS_FLAG_SOM; 641 if (pmqpp->payload_pos + len == pmqpp->payload_len) 642 f |= LWSSS_FLAG_EOM; 643 644 h->subseq = 1; 645 646 if (wsi->mqtt->inside_shadow) { 647 /* 648 * When Shadow is used, the stream receives multiple 649 * topics including Shadow response, set received 650 * topic on the metadata 651 */ 652 lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, 653 NULL, (size_t)-1); 654 655 if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, 656 strlen(h->policy->u.mqtt.subscribe), 657 &used_in, &used_out) != LSTRX_DONE) { 658 lwsl_err("%s, failed to expand subscribe topic", 659 __func__); 660 return -1; 661 } 662 omd = lws_ss_get_handle_metadata(h, exp.name); 663 664 if (!omd) { 665 lwsl_err("%s, failed to find metadata for subscribe", 666 __func__); 667 return -1; 668 } 669 sub_topic = omd->value__may_own_heap; 670 topic_len = omd->length; 671 672 _lws_ss_set_metadata(omd, exp.name, 673 (const void *)pmqpp->topic, 674 pmqpp->topic_len); 675 } 676 677 r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload, 678 len, f); 679 680 if (wsi->mqtt->inside_shadow) 681 _lws_ss_set_metadata(omd, exp.name, &sub_topic, 682 topic_len); 683 684 if (r != LWSSSSRET_OK) 685 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 686 687 if (wsi->mqtt->inside_shadow) { 688 size_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR); 689 size_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR); 690 uint32_t i; 691 692 for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) { 693 /* 694 * received response ('/accepted' or 'rejected') 695 * and clean up Shadow operation 696 */ 697 if (strncmp(h->u.mqtt.shadow_sub.topic[i].name, 698 pmqpp->topic, pmqpp->topic_len) || 699 (strlen(pmqpp->topic) < acc_n || 700 strlen(pmqpp->topic) < rej_n)) 701 continue; 702 703 if (!strcmp(pmqpp->topic + 704 (strlen(pmqpp->topic) - acc_n), 705 LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) || 706 !strcmp(pmqpp->topic + 707 (strlen(pmqpp->topic) - rej_n), 708 LWS_MQTT_SHADOW_RESP_REJECTED_STR)) { 709 lws_sul_cancel(&wsi->mqtt->sul_shadow_wait); 710 wsi->mqtt->send_shadow_unsubscribe = 1; 711 lws_callback_on_writable(wsi); 712 713 return 0; 714 } 715 } 716 } 717 return 0; /* don't passthru */ 718 719 case LWS_CALLBACK_MQTT_SUBSCRIBED: 720 if (wsi->mqtt->inside_shadow) { 721 wsi->mqtt->done_shadow_subscribe = 1; 722 lws_callback_on_writable(wsi); 723 724 return 0; 725 } 726 /* 727 * Stream demanded a subscribe without a Birth while connecting, once 728 * done notify CONNECTED event to the application. 729 */ 730 if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) { 731 lws_sul_cancel(&h->sul); 732 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); 733 if (r != LWSSSSRET_OK) 734 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 735 } 736 wsi->mqtt->done_subscribe = 1; 737 lws_callback_on_writable(wsi); 738 break; 739 740 case LWS_CALLBACK_MQTT_ACK: 741 lws_sul_cancel(&h->sul_timeout); 742 if (h->u.mqtt.send_unacked) { 743 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 744 h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; 745 } 746 747 if (wsi->mqtt->inside_birth) { 748 /* 749 * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify 750 * CONNECTED event to the application. 751 */ 752 wsi->mqtt->inside_birth = 0; 753 wsi->mqtt->done_birth = 1; 754 r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); 755 if (r != LWSSSSRET_OK) 756 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 757 lws_callback_on_writable(wsi); 758 break; 759 } 760 r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE); 761 if (r != LWSSSSRET_OK) 762 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 763 break; 764 765 case LWS_CALLBACK_MQTT_RESEND: 766 lws_sul_cancel(&h->sul_timeout); 767 if (h->u.mqtt.retry_count++ < LWS_MQTT_MAX_PUBLISH_RETRY) { 768 h->u.mqtt.unacked_size = 769 (uint32_t)lws_buflist_total_len(&h->u.mqtt.buflist_unacked); 770 if (h->u.mqtt.unacked_size) { 771 lwsl_notice("%s: %s: resend unacked message (%d/%d) \n", 772 __func__, lws_ss_tag(h), 773 h->u.mqtt.retry_count, 774 LWS_MQTT_MAX_PUBLISH_RETRY); 775 h->u.mqtt.send_unacked = 1; 776 lws_callback_on_writable(wsi); 777 break; 778 } 779 } 780 781 lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 782 h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; 783 784 if (wsi->mqtt->inside_birth) { 785 lwsl_err("%s: %s: failed to send Birth\n", __func__, 786 lws_ss_tag(h)); 787 return -1; 788 } 789 790 r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); 791 if (r != LWSSSSRET_OK) 792 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 793 break; 794 795 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: 796 { 797 if (!h || !h->info.tx) 798 return 0; 799 lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h)); 800 801 if (h->seqstate != SSSEQ_CONNECTED) { 802 lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate); 803 break; 804 } 805 806 if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) 807 return secstream_mqtt_subscribe(wsi); 808 809 if (h->u.mqtt.send_unacked) 810 return secstream_mqtt_resend(wsi, buf + LWS_PRE); 811 812 if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) 813 return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen); 814 815 if (h->policy->u.mqtt.aws_iot) { 816 if (secstream_mqtt_is_shadow_matched(wsi, 817 h->policy->u.mqtt.topic) == LMMTR_TOPIC_MATCH) { 818 if (!wsi->mqtt->done_shadow_subscribe) 819 return secstream_mqtt_shadow_subscribe(wsi); 820 if (wsi->mqtt->send_shadow_unsubscribe) 821 return secstream_mqtt_shadow_unsubscribe(wsi); 822 } 823 } 824 825 r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, 826 &buflen, &f); 827 828 if (r == LWSSSSRET_TX_DONT_SEND) { 829 if (wsi->mqtt->done_shadow_subscribe) { 830 return secstream_mqtt_shadow_unsubscribe(wsi); 831 } 832 return 0; 833 } 834 835 if (r == LWSSSSRET_DISCONNECT_ME) { 836 lws_mqtt_subscribe_param_t lmsp; 837 if (h->u.mqtt.sub_info.num_topics) { 838 lmsp.num_topics = h->u.mqtt.sub_info.num_topics; 839 lmsp.topic = h->u.mqtt.sub_info.topic; 840 lmsp.packet_id = (uint16_t)(h->txord - 1); 841 if (lws_mqtt_client_send_unsubcribe(wsi, 842 &lmsp)) { 843 lwsl_err("%s, failed to send" 844 " MQTT unsubsribe", __func__); 845 return -1; 846 } 847 return 0; 848 } 849 } 850 851 if (r < 0) 852 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 853 854 if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, 855 (uint32_t)h->writeable_len, 856 h->policy->u.mqtt.topic, 857 h->policy->u.mqtt.qos, 858 h->policy->u.mqtt.retain, 0, f) != 0) { 859 r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); 860 if (r != LWSSSSRET_OK) 861 return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 862 } 863 return 0; 864 } 865 866 case LWS_CALLBACK_MQTT_UNSUBSCRIBED: 867 { 868 struct lws *nwsi = lws_get_network_wsi(wsi); 869 870 if (wsi->mqtt->inside_shadow) { 871 secstream_mqtt_shadow_cleanup(wsi); 872 wsi->mqtt->inside_shadow = 0; 873 wsi->mqtt->done_shadow_subscribe = 0; 874 break; 875 } 876 if (nwsi && (nwsi->mux.child_count == 1)) 877 lws_mqtt_client_send_disconnect(nwsi); 878 return -1; 879 } 880 881 case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT: 882 if (!wsi->mqtt) 883 return -1; 884 885 if (wsi->mqtt->inside_shadow) { 886 secstream_mqtt_shadow_cleanup(wsi); 887 wsi->mqtt->inside_shadow = 0; 888 wsi->mqtt->done_shadow_subscribe = 0; 889 lwsl_warn("%s: %s: Unsubscribe (Shadow) timeout.\n", 890 __func__, lws_ss_tag(h)); 891 break; 892 } 893 894 if (wsi->mqtt->inside_unsubscribe) { 895 lwsl_warn("%s: %s: Unsubscribe timeout.\n", __func__, 896 lws_ss_tag(h)); 897 return -1; 898 } 899 break; 900 901 case LWS_CALLBACK_MQTT_SHADOW_TIMEOUT: 902 if (!wsi->mqtt) 903 return -1; 904 905 if (wsi->mqtt->inside_shadow) { 906 lwsl_warn("%s: %s: Shadow timeout.\n", __func__, 907 lws_ss_tag(h)); 908 wsi->mqtt->send_shadow_unsubscribe = 1; 909 lws_callback_on_writable(wsi); 910 } 911 break; 912 913 default: 914 break; 915 } 916 917 return lws_callback_http_dummy(wsi, reason, user, in, len); 918} 919 920const struct lws_protocols protocol_secstream_mqtt = { 921 "lws-secstream-mqtt", 922 secstream_mqtt, 923 0, 0, 0, NULL, 0 924}; 925/* 926 * Munge connect info according to protocol-specific considerations... this 927 * usually means interpreting aux in a protocol-specific way and using the 928 * pieces at connection setup time, eg, http url pieces. 929 * 930 * len bytes of buf can be used for things with scope until after the actual 931 * connect. 932 * 933 * For ws, protocol aux is <url path>;<ws subprotocol name> 934 */ 935 936enum { 937 SSCMM_STRSUB_WILL_TOPIC, 938 SSCMM_STRSUB_WILL_MESSAGE, 939 SSCMM_STRSUB_SUBSCRIBE, 940 SSCMM_STRSUB_TOPIC, 941 SSCMM_STRSUB_BIRTH_TOPIC, 942 SSCMM_STRSUB_BIRTH_MESSAGE 943}; 944 945static int 946secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, 947 struct lws_client_connect_info *i, 948 union lws_ss_contemp *ct) 949{ 950 const char *sources[6] = { 951 /* we're going to string-substitute these before use */ 952 h->policy->u.mqtt.will_topic, 953 h->policy->u.mqtt.will_message, 954 h->policy->u.mqtt.subscribe, 955 h->policy->u.mqtt.topic, 956 h->policy->u.mqtt.birth_topic, 957 h->policy->u.mqtt.birth_message 958 }; 959 size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0; 960 lws_strexp_t exp; 961 char *ps[6]; 962 uint8_t *p = NULL; 963 int n = -1; 964 size_t blen; 965 lws_system_blob_t *b = NULL; 966 967 memset(&ct->ccp, 0, sizeof(ct->ccp)); 968 b = lws_system_get_blob(i->context, 969 LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID, 0); 970 971 /* If LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID is set */ 972 if (b && (blen = lws_system_blob_get_size(b))) { 973 if (blen > LWS_MQTT_MAX_CIDLEN) { 974 lwsl_err("%s - Client ID too long.\n", 975 __func__); 976 return -1; 977 } 978 p = (uint8_t *)lws_zalloc(blen+1, __func__); 979 if (!p) 980 return -1; 981 n = lws_system_blob_get(b, p, &blen, 0); 982 if (n) { 983 ct->ccp.client_id = NULL; 984 } else { 985 ct->ccp.client_id = (const char *)p; 986 lwsl_notice("%s - Client ID = %s\n", 987 __func__, ct->ccp.client_id); 988 } 989 } else { 990 /* Default (Random) client ID */ 991 ct->ccp.client_id = NULL; 992 } 993 994 b = lws_system_get_blob(i->context, 995 LWS_SYSBLOB_TYPE_MQTT_USERNAME, 0); 996 997 /* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */ 998 if (b && (blen = lws_system_blob_get_size(b))) { 999 p = (uint8_t *)lws_zalloc(blen+1, __func__); 1000 if (!p) 1001 return -1; 1002 n = lws_system_blob_get(b, p, &blen, 0); 1003 if (n) { 1004 ct->ccp.username = NULL; 1005 } else { 1006 ct->ccp.username = (const char *)p; 1007 lwsl_notice("%s - Username ID = %s\n", 1008 __func__, ct->ccp.username); 1009 } 1010 } 1011 1012 b = lws_system_get_blob(i->context, 1013 LWS_SYSBLOB_TYPE_MQTT_PASSWORD, 0); 1014 1015 /* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */ 1016 if (b && (blen = lws_system_blob_get_size(b))) { 1017 p = (uint8_t *)lws_zalloc(blen+1, __func__); 1018 if (!p) 1019 return -1; 1020 n = lws_system_blob_get(b, p, &blen, 0); 1021 if (n) { 1022 ct->ccp.password = NULL; 1023 } else { 1024 ct->ccp.password = (const char *)p; 1025 lwsl_notice("%s - Password ID = %s\n", 1026 __func__, ct->ccp.password); 1027 } 1028 } 1029 1030 ct->ccp.keep_alive = h->policy->u.mqtt.keep_alive; 1031 ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u); 1032 ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos; 1033 ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain; 1034 ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos; 1035 ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain; 1036 ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot; 1037 h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos; 1038 1039 /* 1040 * We're going to string-substitute several of these parameters, which 1041 * have unknown, possibly large size. And, as their usage is deferred 1042 * inside the asynchronous lifetime of the MQTT connection, they need 1043 * to live on the heap. 1044 * 1045 * Notice these allocations at h->u.mqtt.heap_baggage belong to the 1046 * underlying MQTT stream lifetime, not the logical SS lifetime, and 1047 * are destroyed if present at connection error or close of the 1048 * underlying connection. 1049 * 1050 * 1051 * First, compute the length of each without producing strsubst output, 1052 * and keep a running total. 1053 */ 1054 1055 for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) { 1056 if (!sources[n]) 1057 continue; 1058 1059 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, 1060 NULL, (size_t)-1); 1061 if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]), 1062 &used_in, &olen[n]) != LSTRX_DONE) { 1063 lwsl_err("%s: failed to subsitute %s\n", __func__, 1064 sources[n]); 1065 return 1; 1066 } 1067 tot += olen[n] + 1; 1068 } 1069 1070 /* 1071 * Then, allocate enough space on the heap for the total of the 1072 * substituted results 1073 */ 1074 1075 h->u.mqtt.heap_baggage = lws_malloc(tot, __func__); 1076 if (!h->u.mqtt.heap_baggage) 1077 return 1; 1078 1079 /* 1080 * Finally, issue the subsitutions one after the other into the single 1081 * allocated result buffer and prepare pointers into them 1082 */ 1083 1084 p = h->u.mqtt.heap_baggage; 1085 for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) { 1086 lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, 1087 (char *)p, (size_t)-1); 1088 if (!sources[n]) { 1089 ps[n] = NULL; 1090 continue; 1091 } 1092 ps[n] = (char *)p; 1093 if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]), 1094 &used_in, &olen[n]) != LSTRX_DONE) 1095 return 1; 1096 1097 p += olen[n] + 1; 1098 } 1099 1100 /* 1101 * Point the guys who want the substituted content at the substituted 1102 * strings 1103 */ 1104 1105 ct->ccp.will_param.topic = ps[SSCMM_STRSUB_WILL_TOPIC]; 1106 ct->ccp.will_param.message = ps[SSCMM_STRSUB_WILL_MESSAGE]; 1107 h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE]; 1108 h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE]; 1109 h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC]; 1110 ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC]; 1111 ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE]; 1112 1113 i->method = "MQTT"; 1114 i->mqtt_cp = &ct->ccp; 1115 1116 i->alpn = "x-amzn-mqtt-ca"; 1117 1118 /* share connections where possible */ 1119 i->ssl_connection |= LCCSCF_PIPELINE; 1120 1121 return 0; 1122} 1123 1124const struct ss_pcols ss_pcol_mqtt = { 1125 "MQTT", 1126 "x-amzn-mqtt-ca", //"mqtt/3.1.1", 1127 &protocol_secstream_mqtt, 1128 secstream_connect_munge_mqtt, 1129 NULL, NULL 1130}; 1131