1/* 2 * libwebsockets - small server side websockets and web server implementation 3 * 4 * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com> 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to 8 * deal in the Software without restriction, including without limitation the 9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10 * sell copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in 14 * all copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22 * IN THE SOFTWARE. 23 * 24 * MQTT v5 25 * 26 * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html 27 * 28 * Control Packet structure 29 * 30 * - Always: 2+ byte: Fixed Hdr 31 * - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props 32 * - Required in some: variable: Payload 33 * 34 * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1] 35 * 36 * - Client Identifier 37 * - Will Properties 38 * - Will Topic 39 * - Will Payload 40 * - User Name 41 * - Password 42 */ 43 44#include "private-lib-core.h" 45#include <string.h> 46#include <sys/types.h> 47#include <assert.h> 48 49typedef enum { 50 LMQPRS_AWAITING_CONNECT, 51 52} lws_mqtt_protocol_server_connstate_t; 53 54const char * const reason_names_g1[] = { 55 "Success / Normal disconnection / QoS0", 56 "QoS1", 57 "QoS2", 58 "Disconnect Will", 59 "No matching subscriber", 60 "No subscription existed", 61 "Continue authentication", 62 "Re-authenticate" 63}; 64 65const char * const reason_names_g2[] = { 66 "Unspecified error", 67 "Malformed packet", 68 "Protocol error", 69 "Implementation specific error", 70 "Unsupported protocol", 71 "Client ID invalid", 72 "Bad credentials", 73 "Not Authorized", 74 "Server Unavailable", 75 "Server Busy", 76 "Banned", 77 "Server Shutting Down", 78 "Bad Authentication Method", 79 "Keepalive Timeout", 80 "Session taken over", 81 "Topic Filter Invalid", 82 "Packet ID in use", 83 "Packet ID not found", 84 "Max RX Exceeded", 85 "Topic Alias Invalid", 86 "Packet too large", 87 "Ratelimit", 88 "Quota Exceeded", 89 "Administrative Action", 90 "Payload format invalid", 91 "Retain not supported", 92 "QoS not supported", 93 "Use another server", 94 "Server Moved", 95 "Shared subscriptions not supported", 96 "Connection rate exceeded", 97 "Maximum Connect Time", 98 "Subscription IDs not supported", 99 "Wildcard subscriptions not supported" 100}; 101 102#define LMQCP_WILL_PROPERTIES 0 103 104/* For each property, a bitmap describing which commands it is valid for */ 105 106static const uint16_t property_valid[] = { 107 [LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) | 108 (1 << LMQCP_WILL_PROPERTIES), 109 [LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) | 110 (1 << LMQCP_WILL_PROPERTIES), 111 [LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) | 112 (1 << LMQCP_WILL_PROPERTIES), 113 [LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) | 114 (1 << LMQCP_WILL_PROPERTIES), 115 [LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) | 116 (1 << LMQCP_WILL_PROPERTIES), 117 [LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) | 118 (1 << LMQCP_CTOS_SUBSCRIBE), 119 [LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) | 120 (1 << LMQCP_STOC_CONNACK) | 121 (1 << LMQCP_DISCONNECT), 122 [LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK), 123 [LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK), 124 [LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) | 125 (1 << LMQCP_STOC_CONNACK) | 126 (1 << LMQCP_AUTH), 127 [LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) | 128 (1 << LMQCP_STOC_CONNACK) | 129 (1 << LMQCP_AUTH), 130 [LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT), 131 [LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES), 132 [LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT), 133 [LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK), 134 [LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) | 135 (1 << LMQCP_DISCONNECT), 136 [LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) | 137 (1 << LMQCP_PUBACK) | 138 (1 << LMQCP_PUBREC) | 139 (1 << LMQCP_PUBREL) | 140 (1 << LMQCP_PUBCOMP) | 141 (1 << LMQCP_STOC_SUBACK) | 142 (1 << LMQCP_STOC_UNSUBACK) | 143 (1 << LMQCP_DISCONNECT) | 144 (1 << LMQCP_AUTH), 145 [LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) | 146 (1 << LMQCP_STOC_CONNACK), 147 [LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) | 148 (1 << LMQCP_STOC_CONNACK), 149 [LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH), 150 [LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK), 151 [LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK), 152 [LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) | 153 (1 << LMQCP_STOC_CONNACK) | 154 (1 << LMQCP_PUBLISH) | 155 (1 << LMQCP_WILL_PROPERTIES) | 156 (1 << LMQCP_PUBACK) | 157 (1 << LMQCP_PUBREC) | 158 (1 << LMQCP_PUBREL) | 159 (1 << LMQCP_PUBCOMP) | 160 (1 << LMQCP_CTOS_SUBSCRIBE) | 161 (1 << LMQCP_STOC_SUBACK) | 162 (1 << LMQCP_CTOS_UNSUBSCRIBE) | 163 (1 << LMQCP_STOC_UNSUBACK) | 164 (1 << LMQCP_DISCONNECT) | 165 (1 << LMQCP_AUTH), 166 [LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) | 167 (1 << LMQCP_STOC_CONNACK), 168 [LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK), 169 [LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK), 170 [LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK) 171}; 172 173 174/* 175 * For each command index, maps flags, id, qos and payload legality 176 * notice in most cases PUBLISH requires further processing 177 */ 178static const uint8_t map_flags[] = { 179 [LMQCP_RESERVED] = 0x00, 180 [LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 181 LMQCP_LUT_FLAG_PAYLOAD | 182 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 183 [LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 184 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 185 [LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */ 186 LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00, 187 [LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 188 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 189 [LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 190 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 191 [LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 192 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02, 193 [LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 194 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 195 [LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 196 LMQCP_LUT_FLAG_PAYLOAD | 197 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02, 198 [LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 199 LMQCP_LUT_FLAG_PAYLOAD | 200 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 201 [LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 202 LMQCP_LUT_FLAG_PAYLOAD | 203 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02, 204 [LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 205 LMQCP_LUT_FLAG_PAYLOAD | 206 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 207 [LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 208 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 209 [LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 210 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 211 [LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 212 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 213 [LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 214 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 215}; 216 217static int 218lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed) 219{ 220 par->consumed += (unsigned int)consumed; 221 222 if (par->consumed > par->props_len) 223 return -1; 224 225 /* more properties coming */ 226 227 if (par->consumed < par->props_len) { 228 par->state = LMQCPP_PROP_ID_VBI; 229 return 0; 230 } 231 232 /* properties finished: are we headed for payload or idle? */ 233 234 if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) && 235 /* A PUBLISH packet MUST NOT contain a Packet Identifier if 236 * its QoS value is set to 0 [MQTT-2.2.1-2]. */ 237 (ctl_pkt_type(par) != LMQCP_PUBLISH || 238 (par->packet_type_flags & 6))) { 239 par->state = LMQCPP_PAYLOAD; 240 return 0; 241 } 242 243 par->state = LMQCPP_IDLE; 244 245 return 0; 246} 247 248static int 249lws_mqtt_set_client_established(struct lws *wsi) 250{ 251 lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED, 252 &role_ops_mqtt); 253 254 if (user_callback_handle_rxflow(wsi->a.protocol->callback, 255 wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED, 256 wsi->user_space, NULL, 0) < 0) { 257 lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__); 258 259 return -1; 260 } 261 /* 262 * If we made a new connection and got the ACK, our connection is 263 * definitely working in both directions at the moment 264 */ 265 lws_validity_confirmed(wsi); 266 267 /* clear connection timeout */ 268 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0); 269 270 return 0; 271} 272 273static lws_mqtt_validate_topic_return_t 274lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot) 275{ 276 size_t spos = 0; 277 const char *sub = topic; 278 int8_t slashes = 0; 279 lws_mqtt_validate_topic_return_t ret = LMVTR_VALID; 280 281 if (awsiot) { 282 if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN) 283 return LMVTR_FAILED_OVERSIZE; 284 if (topic[0] == '$') { 285 ret = LMVTR_VALID_SHADOW; 286 slashes = -3; 287 } 288 } else { 289 if (topiclen > LWS_MQTT_MAX_TOPICLEN) 290 return LMVTR_FAILED_OVERSIZE; 291 if (topic[0] == '$') 292 return LMVTR_FAILED_WILDCARD_FORMAT; 293 } 294 295 while (*sub != 0) { 296 if (sub[0] == '+') { 297 /* topic == "+foo" || "a/+foo" ? */ 298 if (spos > 0 && sub[-1] != '/') 299 return LMVTR_FAILED_WILDCARD_FORMAT; 300 301 /* topic == "foo+" or "foo+/a" ? */ 302 if (sub[1] != 0 && sub[1] != '/') 303 return LMVTR_FAILED_WILDCARD_FORMAT; 304 305 ret = LMVTR_VALID_WILDCARD; 306 } else if (sub[0] == '#') { 307 /* topic == "foo#" ? */ 308 if (spos > 0 && sub[-1] != '/') 309 return LMVTR_FAILED_WILDCARD_FORMAT; 310 311 /* topic == "#foo" ? */ 312 if (sub[1] != 0) 313 return LMVTR_FAILED_WILDCARD_FORMAT; 314 315 ret = LMVTR_VALID_WILDCARD; 316 } else if (sub[0] == '/') { 317 slashes++; 318 } 319 spos++; 320 sub++; 321 } 322 323 if (awsiot && (slashes < 0 || slashes > 7)) 324 return LMVTR_FAILED_SHADOW_FORMAT; 325 326 return ret; 327} 328 329static lws_mqtt_subs_t * 330lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic) 331{ 332 lws_mqtt_subs_t *mysub; 333 size_t topiclen = strlen(topic); 334 lws_mqtt_validate_topic_return_t flag; 335 336 flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot); 337 switch (flag) { 338 case LMVTR_FAILED_OVERSIZE: 339 lwsl_err("%s: Topic is too long\n", 340 __func__); 341 return NULL; 342 case LMVTR_FAILED_SHADOW_FORMAT: 343 case LMVTR_FAILED_WILDCARD_FORMAT: 344 lwsl_err("%s: Invalid topic format \"%s\"\n", 345 __func__, topic); 346 return NULL; 347 348 case LMVTR_VALID: 349 case LMVTR_VALID_WILDCARD: 350 case LMVTR_VALID_SHADOW: 351 mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub"); 352 if (!mysub) { 353 lwsl_err("%s: Error allocating mysub\n", 354 __func__); 355 return NULL; 356 } 357 mysub->wildcard = (flag == LMVTR_VALID_WILDCARD); 358 mysub->shadow = (flag == LMVTR_VALID_SHADOW); 359 break; 360 361 default: 362 lwsl_err("%s: Unknown flag - %d\n", 363 __func__, flag); 364 return NULL; 365 } 366 367 mysub->next = mqtt->subs_head; 368 mqtt->subs_head = mysub; 369 memcpy(mysub->topic, topic, strlen(topic) + 1); 370 mysub->ref_count = 1; 371 372 lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n", 373 __func__, mysub, mqtt); 374 375 return mysub; 376} 377 378static int 379lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt) 380{ 381 lws_mqtt_subs_t *s = mqtt->subs_head; 382 lws_mqtt_subs_t *temp = NULL; 383 384 385 lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n", 386 __func__, mqtt); 387 388 while (s && s->next) { 389 if (s->next->ref_count == 0) 390 break; 391 s = s->next; 392 } 393 394 if (s && s->next) { 395 temp = s->next; 396 lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n", 397 __func__, temp, mqtt); 398 s->next = temp->next; 399 lws_free(temp); 400 return 0; 401 } 402 return 1; 403} 404 405/* 406 * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or 407 * PUBREC came before the timeout period 408 */ 409 410static void 411lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul) 412{ 413 struct _lws_mqtt_related *mqtt = lws_container_of(sul, 414 struct _lws_mqtt_related, sul_qos_puback_pubrec_wait); 415 416 lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); 417 418 if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND, 419 mqtt->wsi->user_space, NULL, 0)) 420 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); 421} 422 423static void 424lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul) 425{ 426 struct _lws_mqtt_related *mqtt = lws_container_of(sul, 427 struct _lws_mqtt_related, sul_unsuback_wait); 428 429 lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); 430 431 if (mqtt->wsi->a.protocol->callback(mqtt->wsi, 432 LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT, 433 mqtt->wsi->user_space, NULL, 0)) 434 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); 435} 436 437static void 438lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul) 439{ 440 struct _lws_mqtt_related *mqtt = lws_container_of(sul, 441 struct _lws_mqtt_related, sul_shadow_wait); 442 443 lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); 444 445 if (mqtt->wsi->a.protocol->callback(mqtt->wsi, 446 LWS_CALLBACK_MQTT_SHADOW_TIMEOUT, 447 mqtt->wsi->user_space, NULL, 0)) 448 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); 449} 450 451void 452lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s) 453{ 454 lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s); 455 c->estate = s; 456} 457 458lws_mqtt_match_topic_return_t 459lws_mqtt_is_topic_matched(const char* sub, const char* pub) 460{ 461 const char *ppos = pub, *spos = sub; 462 463 if (!ppos || !spos) { 464 return LMMTR_TOPIC_MATCH_ERROR; 465 } 466 467 while (*spos) { 468 if (*ppos == '#' || *ppos == '+') { 469 lwsl_err("%s: PUBLISH to wildcard " 470 "topic \"%s\" not supported\n", 471 __func__, pub); 472 return LMMTR_TOPIC_MATCH_ERROR; 473 } 474 /* foo/+/bar == foo/xyz/bar ? */ 475 if (*spos == '+') { 476 /* Skip ahead */ 477 while (*ppos != '\0' && *ppos != '/') { 478 ppos++; 479 } 480 } else if (*spos == '#') { 481 return LMMTR_TOPIC_MATCH; 482 } else { 483 if (*ppos == '\0') { 484 /* foo/bar == foo/bar/# ? */ 485 if (!strncmp(spos, "/#", 2)) 486 return LMMTR_TOPIC_MATCH; 487 return LMMTR_TOPIC_NOMATCH; 488 /* Non-matching character */ 489 } else if (*ppos != *spos) { 490 return LMMTR_TOPIC_NOMATCH; 491 } 492 ppos++; 493 } 494 spos++; 495 } 496 497 if (*spos == '\0' && *ppos == '\0') 498 return LMMTR_TOPIC_MATCH; 499 500 return LMMTR_TOPIC_NOMATCH; 501} 502 503lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt, 504 const char* ptopic) { 505 lws_mqtt_subs_t *s = mqtt->subs_head; 506 507 while (s) { 508 /* SUB topic == PUB topic ? */ 509 /* foo/bar/xyz == foo/bar/xyz ? */ 510 if (!s->wildcard) { 511 if (!strcmp((const char*)s->topic, ptopic)) 512 return s; 513 } else { 514 if (lws_mqtt_is_topic_matched( 515 s->topic, ptopic) == LMMTR_TOPIC_MATCH) 516 return s; 517 } 518 519 s = s->next; 520 } 521 522 return NULL; 523} 524 525int 526_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par, 527 const uint8_t *buf, size_t len) 528{ 529 struct lws *w; 530 int n; 531 532 if (par->flag_pending_send_reason_close) 533 return 0; 534 535 /* 536 * Stateful, fragmentation-immune parser 537 * 538 * Notice that len can always be 1 if under attack, even over tls if 539 * the server is compromised or malicious. 540 */ 541 542 while (len) { 543 lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len); 544 switch (par->state) { 545 case LMQCPP_IDLE: 546 par->packet_type_flags = *buf++; 547 len--; 548 549#if defined(LWS_WITH_CLIENT) 550 /* 551 * The case where we sent the connect, but we received 552 * something else before any CONNACK 553 */ 554 if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK && 555 par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) { 556 lwsl_notice("%s: server sent non-CONNACK\n", 557 __func__); 558 goto send_protocol_error_and_close; 559 } 560#endif /* LWS_WITH_CLIENT */ 561 562 n = map_flags[par->packet_type_flags >> 4]; 563 /* 564 * Where a flag bit is marked as “Reserved”, it is 565 * reserved for future use and MUST be set to the value 566 * listed [MQTT-2.1.3-1]. 567 */ 568 if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) && 569 ((par->packet_type_flags & 0x0f) != (n & 0x0f))) { 570 lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n", 571 __func__, lws_wsi_tag(wsi), 572 par->packet_type_flags, n, (int)len + 1); 573 lwsl_hexdump_err(buf - 1, len + 1); 574 goto send_protocol_error_and_close; 575 } 576 577 lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n", 578 __func__, par->packet_type_flags >> 4, 579 par->packet_type_flags & 0xf); 580 581 /* allows us to know if a property that can only be 582 * given once, appears twice */ 583 memset(par->props_seen, 0, sizeof(par->props_seen)); 584 par->state = par->packet_type_flags & 0xf0; 585 break; 586 587 case LMQCPP_CONNECT_PACKET: 588 lwsl_debug("%s: received CONNECT pkt\n", __func__); 589 par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI; 590 lws_mqtt_vbi_init(&par->vbit); 591 break; 592 593 case LMQCPP_CONNECT_REMAINING_LEN_VBI: 594 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 595 case LMSPR_NEED_MORE: 596 break; 597 case LMSPR_COMPLETED: 598 par->cpkt_remlen = par->vbit.value; 599 n = map_flags[ctl_pkt_type(par)]; 600 lws_mqtt_str_init(&par->s_temp, par->temp, 601 sizeof(par->temp), 0); 602 par->state = LMQCPP_CONNECT_VH_PNAME; 603 break; 604 default: 605 lwsl_notice("%s: bad vbi\n", __func__); 606 goto send_protocol_error_and_close; 607 } 608 break; 609 610 case LMQCPP_CONNECT_VH_PNAME: 611 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) { 612 case LMSPR_NEED_MORE: 613 break; 614 case LMSPR_COMPLETED: 615 if (par->s_temp.len != 4 || 616 memcmp(par->s_temp.buf, "MQTT", 617 par->s_temp.len)) { 618 lwsl_notice("%s: protocol name: %.*s\n", 619 __func__, par->s_temp.len, 620 par->s_temp.buf); 621 goto send_unsupp_connack_and_close; 622 } 623 par->state = LMQCPP_CONNECT_VH_PVERSION; 624 break; 625 default: 626 lwsl_notice("%s: bad protocol name\n", __func__); 627 goto send_protocol_error_and_close; 628 } 629 break; 630 631 case LMQCPP_CONNECT_VH_PVERSION: 632 par->conn_protocol_version = *buf++; 633 len--; 634 if (par->conn_protocol_version != 5) { 635 lwsl_info("%s: unsupported MQTT version %d\n", 636 __func__, par->conn_protocol_version); 637 goto send_unsupp_connack_and_close; 638 } 639 par->state = LMQCPP_CONNECT_VH_FLAGS; 640 break; 641 642 case LMQCPP_CONNECT_VH_FLAGS: 643 par->cpkt_flags = *buf++; 644 len--; 645 if (par->cpkt_flags & 1) { 646 /* 647 * The Server MUST validate that the reserved 648 * flag in the CONNECT packet is set to 0 649 * [MQTT-3.1.2-3]. 650 */ 651 par->reason = LMQCP_REASON_MALFORMED_PACKET; 652 goto send_reason_and_close; 653 } 654 /* 655 * conn_flags specifies the Will Properties that should 656 * appear in the payload section 657 */ 658 lws_mqtt_2byte_init(&par->vbit); 659 par->state = LMQCPP_CONNECT_VH_KEEPALIVE; 660 break; 661 662 case LMQCPP_CONNECT_VH_KEEPALIVE: 663 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 664 case LMSPR_NEED_MORE: 665 break; 666 case LMSPR_COMPLETED: 667 par->keepalive = (uint16_t)par->vbit.value; 668 lws_mqtt_vbi_init(&par->vbit); 669 par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN; 670 break; 671 default: 672 lwsl_notice("%s: ka bad vbi\n", __func__); 673 goto send_protocol_error_and_close; 674 } 675 break; 676 677 case LMQCPP_PINGRESP_ZERO: 678 len--; 679 /* second byte of PINGRESP must be zero */ 680 if (*buf++) 681 goto send_protocol_error_and_close; 682 goto cmd_completion; 683 684 case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN: 685 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 686 case LMSPR_NEED_MORE: 687 break; 688 case LMSPR_COMPLETED: 689 /* reset consumption counter */ 690 par->consumed = 0; 691 par->props_len = par->vbit.value; 692 lws_mqtt_vbi_init(&par->vbit); 693 par->state = LMQCPP_PROP_ID_VBI; 694 break; 695 default: 696 lwsl_notice("%s: connpr bad vbi\n", __func__); 697 goto send_protocol_error_and_close; 698 } 699 break; 700 701 /* PUBREC */ 702 case LMQCPP_PUBREC_PACKET: 703 lwsl_debug("%s: received PUBREC pkt\n", __func__); 704 lws_mqtt_vbi_init(&par->vbit); 705 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 706 case LMSPR_NEED_MORE: 707 break; 708 case LMSPR_COMPLETED: 709 par->cpkt_remlen = par->vbit.value; 710 lwsl_debug("%s: PUBREC pkt len = %d\n", 711 __func__, (int)par->cpkt_remlen); 712 if (par->cpkt_remlen < 2) 713 goto send_protocol_error_and_close; 714 par->state = LMQCPP_PUBREC_VH_PKT_ID; 715 break; 716 default: 717 lwsl_notice("%s: pubrec bad vbi\n", __func__); 718 goto send_protocol_error_and_close; 719 } 720 break; 721 722 case LMQCPP_PUBREC_VH_PKT_ID: 723 if (len < 2) { 724 lwsl_notice("%s: len breakage 3\n", __func__); 725 return -1; 726 } 727 728 par->cpkt_id = lws_ser_ru16be(buf); 729 wsi->mqtt->ack_pkt_id = par->cpkt_id; 730 buf += 2; 731 len -= 2; 732 par->cpkt_remlen -= 2; 733 par->n = 0; 734 735 goto cmd_completion; 736 737 /* PUBREL */ 738 case LMQCPP_PUBREL_PACKET: 739 lwsl_debug("%s: received PUBREL pkt\n", __func__); 740 lws_mqtt_vbi_init(&par->vbit); 741 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 742 case LMSPR_NEED_MORE: 743 break; 744 case LMSPR_COMPLETED: 745 par->cpkt_remlen = par->vbit.value; 746 lwsl_debug("%s: PUBREL pkt len = %d\n", 747 __func__, (int)par->cpkt_remlen); 748 if (par->cpkt_remlen < 2) 749 goto send_protocol_error_and_close; 750 par->state = LMQCPP_PUBREL_VH_PKT_ID; 751 break; 752 default: 753 lwsl_err("%s: pubrel bad vbi\n", __func__); 754 goto send_protocol_error_and_close; 755 } 756 break; 757 758 case LMQCPP_PUBREL_VH_PKT_ID: 759 if (len < 2) { 760 lwsl_notice("%s: len breakage 3\n", __func__); 761 return -1; 762 } 763 764 par->cpkt_id = lws_ser_ru16be(buf); 765 wsi->mqtt->ack_pkt_id = par->cpkt_id; 766 buf += 2; 767 len -= 2; 768 par->cpkt_remlen -= 2; 769 par->n = 0; 770 771 goto cmd_completion; 772 773 /* PUBCOMP */ 774 case LMQCPP_PUBCOMP_PACKET: 775 lwsl_debug("%s: received PUBCOMP pkt\n", __func__); 776 lws_mqtt_vbi_init(&par->vbit); 777 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 778 case LMSPR_NEED_MORE: 779 break; 780 case LMSPR_COMPLETED: 781 par->cpkt_remlen = par->vbit.value; 782 lwsl_debug("%s: PUBCOMP pkt len = %d\n", 783 __func__, (int)par->cpkt_remlen); 784 if (par->cpkt_remlen < 2) 785 goto send_protocol_error_and_close; 786 par->state = LMQCPP_PUBCOMP_VH_PKT_ID; 787 break; 788 default: 789 lwsl_err("%s: pubcmp bad vbi\n", __func__); 790 goto send_protocol_error_and_close; 791 } 792 break; 793 794 case LMQCPP_PUBCOMP_VH_PKT_ID: 795 if (len < 2) { 796 lwsl_notice("%s: len breakage 3\n", __func__); 797 return -1; 798 } 799 800 par->cpkt_id = lws_ser_ru16be(buf); 801 wsi->mqtt->ack_pkt_id = par->cpkt_id; 802 buf += 2; 803 len -= 2; 804 par->cpkt_remlen -= 2; 805 par->n = 0; 806 807 goto cmd_completion; 808 809 case LMQCPP_PUBLISH_PACKET: 810 if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) { 811 lwsl_notice("%s: Topic rx before subscribing\n", 812 __func__); 813 goto send_protocol_error_and_close; 814 } 815 lwsl_info("%s: received PUBLISH pkt\n", __func__); 816 par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI; 817 lws_mqtt_vbi_init(&par->vbit); 818 break; 819 case LMQCPP_PUBLISH_REMAINING_LEN_VBI: 820 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 821 case LMSPR_NEED_MORE: 822 break; 823 case LMSPR_COMPLETED: 824 par->cpkt_remlen = par->vbit.value; 825 lwsl_debug("%s: PUBLISH pkt len = %d\n", 826 __func__, (int)par->cpkt_remlen); 827 /* Move on to PUBLISH's variable header */ 828 par->state = LMQCPP_PUBLISH_VH_TOPIC; 829 break; 830 default: 831 lwsl_notice("%s: pubrem bad vbi\n", __func__); 832 goto send_protocol_error_and_close; 833 } 834 break; 835 836 case LMQCPP_PUBLISH_VH_TOPIC: 837 { 838 lws_mqtt_publish_param_t *pub = NULL; 839 840 if (len < 2) { 841 lwsl_notice("%s: topic too short\n", __func__); 842 return -1; 843 } 844 845 /* Topic len */ 846 par->n = lws_ser_ru16be(buf); 847 buf += 2; 848 len -= 2; 849 850 if (len < par->n) {/* the way this is written... */ 851 lwsl_notice("%s: len breakage\n", __func__); 852 return -1; 853 } 854 855 /* Invalid topic len */ 856 if (par->n == 0) { 857 lwsl_notice("%s: zero topic len\n", __func__); 858 par->reason = LMQCP_REASON_MALFORMED_PACKET; 859 goto send_reason_and_close; 860 } 861 lwsl_debug("%s: PUBLISH topic len %d\n", 862 __func__, (int)par->n); 863 assert(!wsi->mqtt->rx_cpkt_param); 864 wsi->mqtt->rx_cpkt_param = lws_zalloc( 865 sizeof(lws_mqtt_publish_param_t), "rx pub param"); 866 if (!wsi->mqtt->rx_cpkt_param) 867 goto oom; 868 pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param; 869 870 pub->topic_len = (uint16_t)par->n; 871 872 /* Topic Name */ 873 pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1, 874 "rx publish topic"); 875 if (!pub->topic) 876 goto oom; 877 lws_strncpy(pub->topic, (const char *)buf, 878 (size_t)pub->topic_len + 1); 879 buf += pub->topic_len; 880 len -= pub->topic_len; 881 882 /* Extract QoS Level from Fixed Header Flags */ 883 pub->qos = (lws_mqtt_qos_levels_t) 884 ((par->packet_type_flags >> 1) & 0x3); 885 886 pub->payload_pos = 0; 887 888 pub->payload_len = par->cpkt_remlen - 889 (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0)); 890 891 switch (pub->qos) { 892 case QOS0: 893 par->state = LMQCPP_PAYLOAD; 894 if (pub->payload_len == 0) 895 goto cmd_completion; 896 897 break; 898 case QOS1: 899 case QOS2: 900 par->state = LMQCPP_PUBLISH_VH_PKT_ID; 901 break; 902 default: 903 par->reason = LMQCP_REASON_MALFORMED_PACKET; 904 lws_free_set_NULL(pub->topic); 905 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param); 906 goto send_reason_and_close; 907 } 908 break; 909 } 910 case LMQCPP_PUBLISH_VH_PKT_ID: 911 { 912 lws_mqtt_publish_param_t *pub = 913 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param; 914 915 if (len < 2) { 916 lwsl_notice("%s: len breakage 2\n", __func__); 917 return -1; 918 } 919 920 par->cpkt_id = lws_ser_ru16be(buf); 921 buf += 2; 922 len -= 2; 923 wsi->mqtt->peer_ack_pkt_id = par->cpkt_id; 924 lwsl_debug("%s: Packet ID %d\n", 925 __func__, (int)par->cpkt_id); 926 par->state = LMQCPP_PAYLOAD; 927 pub->payload_pos = 0; 928 pub->payload_len = par->cpkt_remlen - 929 (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0)); 930 if (pub->payload_len == 0) 931 goto cmd_completion; 932 933 break; 934 } 935 case LMQCPP_PAYLOAD: 936 { 937 lws_mqtt_publish_param_t *pub = 938 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param; 939 if (pub == NULL) { 940 lwsl_err("%s: Uninitialized pub_param\n", 941 __func__); 942 goto send_protocol_error_and_close; 943 } 944 945 pub->payload = buf; 946 goto cmd_completion; 947 } 948 949 case LMQCPP_CONNACK_PACKET: 950 if (!lwsi_role_client(wsi)) { 951 lwsl_err("%s: CONNACK is only Server to Client", 952 __func__); 953 goto send_unsupp_connack_and_close; 954 } 955 956 lwsl_debug("%s: received CONNACK pkt\n", __func__); 957 lws_mqtt_vbi_init(&par->vbit); 958 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 959 case LMSPR_NEED_MORE: 960 break; 961 case LMSPR_COMPLETED: 962 par->cpkt_remlen = par->vbit.value; 963 lwsl_debug("%s: CONNACK pkt len = %d\n", 964 __func__, (int)par->cpkt_remlen); 965 if (par->cpkt_remlen != 2) 966 goto send_protocol_error_and_close; 967 968 par->state = LMQCPP_CONNACK_VH_FLAGS; 969 break; 970 default: 971 lwsl_notice("%s: connack bad vbi\n", __func__); 972 goto send_protocol_error_and_close; 973 } 974 break; 975 976 case LMQCPP_CONNACK_VH_FLAGS: 977 { 978 lws_mqttc_t *c = &wsi->mqtt->client; 979 par->cpkt_flags = *buf++; 980 len--; 981 982 if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) { 983 /* 984 * Byte 1 is the "Connect Acknowledge 985 * Flags". Bits 7-1 are reserved and 986 * MUST be set to 0. 987 */ 988 par->reason = LMQCP_REASON_MALFORMED_PACKET; 989 goto send_reason_and_close; 990 } 991 /* 992 * If the Server accepts a connection with 993 * CleanSession set to 1, the Server MUST set 994 * Session Present to 0 in the CONNACK packet 995 * in addition to setting a zero return code 996 * in the CONNACK packet [MQTT-3.2.2-1]. If 997 * the Server accepts a connection with 998 * CleanSession set to 0, the value set in 999 * Session Present depends on whether the 1000 * Server already has stored Session state for 1001 * the supplied client ID. If the Server has 1002 * stored Session state, it MUST set 1003 * SessionPresent to 1 in the CONNACK packet 1004 * [MQTT-3.2.2-2]. If the Server does not have 1005 * stored Session state, it MUST set Session 1006 * Present to 0 in the CONNACK packet. This is 1007 * in addition to setting a zero return code 1008 * in the CONNACK packet [MQTT-3.2.2-3]. 1009 */ 1010 if ((c->conn_flags & LMQCFT_CLEAN_START) && 1011 (par->cpkt_flags & LMQCFT_SESSION_PRESENT)) 1012 goto send_protocol_error_and_close; 1013 1014 wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags & 1015 LMQCFT_SESSION_PRESENT); 1016 1017 /* Move on to Connect Return Code */ 1018 par->state = LMQCPP_CONNACK_VH_RETURN_CODE; 1019 break; 1020 } 1021 case LMQCPP_CONNACK_VH_RETURN_CODE: 1022 par->conn_rc = *buf++; 1023 len--; 1024 /* 1025 * If a server sends a CONNACK packet containing a 1026 * non-zero return code it MUST then close the Network 1027 * Connection [MQTT-3.2.2-5] 1028 */ 1029 switch (par->conn_rc) { 1030 case 0: 1031 goto cmd_completion; 1032 case 1: 1033 case 2: 1034 case 3: 1035 case 4: 1036 case 5: 1037 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL + 1038 par->conn_rc - 1; 1039 goto send_reason_and_close; 1040 default: 1041 lwsl_notice("%s: bad connack retcode\n", __func__); 1042 goto send_protocol_error_and_close; 1043 } 1044 break; 1045 1046 /* SUBACK */ 1047 case LMQCPP_SUBACK_PACKET: 1048 if (!lwsi_role_client(wsi)) { 1049 lwsl_err("%s: SUBACK is only Server to Client", 1050 __func__); 1051 goto send_unsupp_connack_and_close; 1052 } 1053 1054 lwsl_debug("%s: received SUBACK pkt\n", __func__); 1055 lws_mqtt_vbi_init(&par->vbit); 1056 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1057 case LMSPR_NEED_MORE: 1058 break; 1059 case LMSPR_COMPLETED: 1060 par->cpkt_remlen = par->vbit.value; 1061 lwsl_debug("%s: SUBACK pkt len = %d\n", 1062 __func__, (int)par->cpkt_remlen); 1063 if (par->cpkt_remlen <= 2) 1064 goto send_protocol_error_and_close; 1065 par->state = LMQCPP_SUBACK_VH_PKT_ID; 1066 break; 1067 default: 1068 lwsl_notice("%s: suback bad vbi\n", __func__); 1069 goto send_protocol_error_and_close; 1070 } 1071 break; 1072 1073 case LMQCPP_SUBACK_VH_PKT_ID: 1074 1075 if (len < 2) { 1076 lwsl_notice("%s: len breakage 4\n", __func__); 1077 return -1; 1078 } 1079 1080 par->cpkt_id = lws_ser_ru16be(buf); 1081 wsi->mqtt->ack_pkt_id = par->cpkt_id; 1082 buf += 2; 1083 len -= 2; 1084 par->cpkt_remlen -= 2; 1085 par->n = 0; 1086 par->state = LMQCPP_SUBACK_PAYLOAD; 1087 *par->temp = 0; 1088 break; 1089 1090 case LMQCPP_SUBACK_PAYLOAD: 1091 { 1092 lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++; 1093 1094 len--; 1095 switch (qos) { 1096 case QOS0: 1097 case QOS1: 1098 case QOS2: 1099 break; 1100 case FAILURE_QOS_LEVEL: 1101 goto send_protocol_error_and_close; 1102 1103 default: 1104 par->reason = LMQCP_REASON_MALFORMED_PACKET; 1105 goto send_reason_and_close; 1106 } 1107 1108 if (++(par->n) == par->cpkt_remlen) { 1109 par->n = 0; 1110 goto cmd_completion; 1111 } 1112 1113 break; 1114 } 1115 1116 /* UNSUBACK */ 1117 case LMQCPP_UNSUBACK_PACKET: 1118 if (!lwsi_role_client(wsi)) { 1119 lwsl_err("%s: UNSUBACK is only Server to Client", 1120 __func__); 1121 goto send_unsupp_connack_and_close; 1122 } 1123 1124 lwsl_debug("%s: received UNSUBACK pkt\n", __func__); 1125 lws_mqtt_vbi_init(&par->vbit); 1126 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1127 case LMSPR_NEED_MORE: 1128 break; 1129 case LMSPR_COMPLETED: 1130 par->cpkt_remlen = par->vbit.value; 1131 lwsl_debug("%s: UNSUBACK pkt len = %d\n", 1132 __func__, (int)par->cpkt_remlen); 1133 if (par->cpkt_remlen < 2) 1134 goto send_protocol_error_and_close; 1135 par->state = LMQCPP_UNSUBACK_VH_PKT_ID; 1136 break; 1137 default: 1138 lwsl_notice("%s: unsuback bad vbi\n", __func__); 1139 goto send_protocol_error_and_close; 1140 } 1141 break; 1142 1143 case LMQCPP_UNSUBACK_VH_PKT_ID: 1144 1145 if (len < 2) { 1146 lwsl_notice("%s: len breakage 3\n", __func__); 1147 return -1; 1148 } 1149 1150 par->cpkt_id = lws_ser_ru16be(buf); 1151 wsi->mqtt->ack_pkt_id = par->cpkt_id; 1152 buf += 2; 1153 len -= 2; 1154 par->cpkt_remlen -= 2; 1155 par->n = 0; 1156 1157 goto cmd_completion; 1158 1159 case LMQCPP_PUBACK_PACKET: 1160 lws_mqtt_vbi_init(&par->vbit); 1161 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1162 case LMSPR_NEED_MORE: 1163 break; 1164 case LMSPR_COMPLETED: 1165 par->cpkt_remlen = par->vbit.value; 1166 lwsl_info("%s: PUBACK pkt len = %d\n", __func__, 1167 (int)par->cpkt_remlen); 1168 /* 1169 * must be 4 or more, with special case that 2 1170 * means success with no reason code or props 1171 */ 1172 if (par->cpkt_remlen <= 1 || 1173 par->cpkt_remlen == 3) 1174 goto send_protocol_error_and_close; 1175 1176 par->state = LMQCPP_PUBACK_VH_PKT_ID; 1177 par->fixed_seen[2] = par->fixed_seen[3] = 0; 1178 par->fixed = 0; 1179 par->n = 0; 1180 break; 1181 default: 1182 lwsl_notice("%s: puback bad vbi\n", __func__); 1183 goto send_protocol_error_and_close; 1184 } 1185 break; 1186 1187 case LMQCPP_PUBACK_VH_PKT_ID: 1188 /* 1189 * There are 3 fixed bytes and then a VBI for the 1190 * property section length 1191 */ 1192 par->fixed_seen[par->fixed++] = *buf++; 1193 if (len < par->cpkt_remlen - par->n) { 1194 lwsl_notice("%s: len breakage 4\n", __func__); 1195 return -1; 1196 } 1197 len--; 1198 par->n++; 1199 if (par->fixed == 2) 1200 par->cpkt_id = lws_ser_ru16be(par->fixed_seen); 1201 1202 if (par->fixed == 3) { 1203 lws_mqtt_vbi_init(&par->vbit); 1204 par->props_consumed = 0; 1205 par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI; 1206 } 1207 /* length of 2 is truncated packet and we completed it */ 1208 if (par->cpkt_remlen == par->fixed) 1209 goto cmd_completion; 1210 break; 1211 1212 case LMQCPP_PUBACK_PROPERTIES_LEN_VBI: 1213 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1214 case LMSPR_NEED_MORE: 1215 break; 1216 case LMSPR_COMPLETED: 1217 par->props_len = par->vbit.value; 1218 lwsl_info("%s: PUBACK props len = %d\n", 1219 __func__, (int)par->cpkt_remlen); 1220 /* 1221 * If there are no properties, this is a 1222 * command completion event in itself 1223 */ 1224 if (!par->props_len) 1225 goto cmd_completion; 1226 1227 /* 1228 * Otherwise consume the properties before 1229 * completing the command 1230 */ 1231 lws_mqtt_vbi_init(&par->vbit); 1232 par->state = LMQCPP_PUBACK_VH_PKT_ID; 1233 break; 1234 default: 1235 lwsl_notice("%s: puback pr bad vbi\n", __func__); 1236 goto send_protocol_error_and_close; 1237 } 1238 break; 1239 1240 case LMQCPP_EAT_PROPERTIES_AND_COMPLETE: 1241 /* 1242 * TODO: stash the props 1243 */ 1244 par->props_consumed++; 1245 len--; 1246 buf++; 1247 if (par->props_len != par->props_consumed) 1248 break; 1249 1250cmd_completion: 1251 /* 1252 * We come here when we understood we just processed 1253 * the last byte of a command packet, regardless of the 1254 * packet type 1255 */ 1256 par->state = LMQCPP_IDLE; 1257 1258 switch (par->packet_type_flags >> 4) { 1259 case LMQCP_STOC_CONNACK: 1260 lwsl_info("%s: cmd_completion: CONNACK\n", 1261 __func__); 1262 1263 /* 1264 * Getting the CONNACK means we are the first, 1265 * the nwsi, and we succeeded to create a new 1266 * network connection ourselves. 1267 * 1268 * Since others may join us sharing the nwsi, 1269 * and we may close while they still want to use 1270 * it, our wsi lifecycle alone can no longer 1271 * define the lifecycle of the nwsi... it means 1272 * we need to do a "magic trick" and instead of 1273 * being both the nwsi and act like a child 1274 * stream, create a new wsi to take over the 1275 * nwsi duties and turn our wsi into a child of 1276 * the nwsi with its own lifecycle. 1277 * 1278 * The nwsi gets a mostly empty wsi->nwsi used 1279 * to track already-subscribed topics globally 1280 * for the connection. 1281 */ 1282 1283 /* we were under SENT_CLIENT_HANDSHAKE timeout */ 1284 lws_set_timeout(wsi, 0, 0); 1285 1286 w = lws_create_new_server_wsi(wsi->a.vhost, 1287 wsi->tsi, "mqtt_sid1"); 1288 if (!w) { 1289 lwsl_notice("%s: sid 1 migrate failed\n", 1290 __func__); 1291 return -1; 1292 } 1293 1294 wsi->mux.highest_sid = 1; 1295 lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++); 1296 1297 wsi->mux_substream = 1; 1298 w->mux_substream = 1; 1299 w->client_mux_substream = 1; 1300 wsi->client_mux_migrated = 1; 1301 wsi->told_user_closed = 1; /* don't tell nwsi closed */ 1302 1303 lwsi_set_state(w, LRS_ESTABLISHED); 1304 lwsi_set_state(wsi, LRS_ESTABLISHED); 1305 lwsi_set_role(w, lwsi_role(wsi)); 1306 1307#if defined(LWS_WITH_CLIENT) 1308 w->flags = wsi->flags; 1309#endif 1310 1311 w->mqtt = wsi->mqtt; 1312 wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt"); 1313 if (!wsi->mqtt) 1314 return -1; 1315 w->mqtt->wsi = w; 1316 w->a.protocol = wsi->a.protocol; 1317 if (w->user_space && 1318 !w->user_space_externally_allocated) 1319 lws_free_set_NULL(w->user_space); 1320 w->user_space = wsi->user_space; 1321 wsi->user_space = NULL; 1322 w->user_space_externally_allocated = 1323 wsi->user_space_externally_allocated; 1324 if (lws_ensure_user_space(w)) 1325 goto bail1; 1326 w->a.opaque_user_data = wsi->a.opaque_user_data; 1327 wsi->a.opaque_user_data = NULL; 1328 w->stash = wsi->stash; 1329 wsi->stash = NULL; 1330 1331 lws_mux_mark_immortal(w); 1332 1333 lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n", 1334 __func__, lws_wsi_tag(wsi), 1335 lws_wsi_tag(w)); 1336 1337 /* 1338 * It was the last thing we were waiting for 1339 * before we can be fully ESTABLISHED 1340 */ 1341 if (lws_mqtt_set_client_established(w)) { 1342 lwsl_notice("%s: set EST fail\n", __func__); 1343 return -1; 1344 } 1345 1346 /* get the ball rolling */ 1347 lws_validity_confirmed(wsi); 1348 1349 /* well, add the queued guys as children */ 1350 lws_wsi_mux_apply_queue(wsi); 1351 break; 1352 1353bail1: 1354 /* undo the insert */ 1355 wsi->mux.child_list = w->mux.sibling_list; 1356 wsi->mux.child_count--; 1357 1358 if (w->user_space) 1359 lws_free_set_NULL(w->user_space); 1360 w->a.vhost->protocols[0].callback(w, 1361 LWS_CALLBACK_WSI_DESTROY, 1362 NULL, NULL, 0); 1363 __lws_vhost_unbind_wsi(w); /* cx + vh lock */ 1364 lws_free(w); 1365 1366 return 0; 1367 1368 case LMQCP_PUBREC: 1369 lwsl_err("%s: cmd_completion: PUBREC\n", 1370 __func__); 1371 /* 1372 * Figure out which child asked for this 1373 */ 1374 n = 0; 1375 lws_start_foreach_ll(struct lws *, w, 1376 wsi->mux.child_list) { 1377 if (w->mqtt->unacked_publish && 1378 w->mqtt->ack_pkt_id == par->cpkt_id) { 1379 char requested_close = 0; 1380 1381 w->mqtt->unacked_publish = 0; 1382 w->mqtt->unacked_pubrel = 1; 1383 1384 if (user_callback_handle_rxflow( 1385 w->a.protocol->callback, 1386 w, LWS_CALLBACK_MQTT_ACK, 1387 w->user_space, NULL, 0) < 0) { 1388 lwsl_info("%s: MQTT_ACK requests close\n", 1389 __func__); 1390 requested_close = 1; 1391 } 1392 n = 1; 1393 1394 /* 1395 * We got an assertive PUBREC, 1396 * no need for timeout wait 1397 * any more 1398 */ 1399 lws_sul_cancel(&w->mqtt-> 1400 sul_qos_puback_pubrec_wait); 1401 1402 if (requested_close) { 1403 __lws_close_free_wsi(w, 1404 0, "ack cb"); 1405 break; 1406 } 1407 1408 break; 1409 } 1410 } lws_end_foreach_ll(w, mux.sibling_list); 1411 1412 if (!n) { 1413 lwsl_err("%s: unsolicited PUBREC\n", 1414 __func__); 1415 return -1; 1416 } 1417 wsi->mqtt->send_pubrel = 1; 1418 lws_callback_on_writable(wsi); 1419 break; 1420 1421 case LMQCP_PUBCOMP: 1422 lwsl_err("%s: cmd_completion: PUBCOMP\n", 1423 __func__); 1424 n = 0; 1425 lws_start_foreach_ll(struct lws *, w, 1426 wsi->mux.child_list) { 1427 if (w->mqtt->unacked_pubrel > 0 && 1428 w->mqtt->ack_pkt_id == par->cpkt_id) { 1429 w->mqtt->unacked_pubrel = 0; 1430 n = 1; 1431 } 1432 } lws_end_foreach_ll(w, mux.sibling_list); 1433 1434 if (!n) { 1435 lwsl_err("%s: unsolicited PUBCOMP\n", 1436 __func__); 1437 return -1; 1438 } 1439 1440 /* 1441 * If we published something and PUBCOMP arrived, 1442 * our connection is definitely working in both 1443 * directions at the moment. 1444 */ 1445 lws_validity_confirmed(wsi); 1446 break; 1447 1448 case LMQCP_PUBREL: 1449 lwsl_err("%s: cmd_completion: PUBREL\n", 1450 __func__); 1451 wsi->mqtt->send_pubcomp = 1; 1452 lws_callback_on_writable(wsi); 1453 break; 1454 1455 case LMQCP_PUBACK: 1456 lwsl_info("%s: cmd_completion: PUBACK\n", 1457 __func__); 1458 1459 /* 1460 * Figure out which child asked for this 1461 */ 1462 1463 n = 0; 1464 lws_start_foreach_ll(struct lws *, w, 1465 wsi->mux.child_list) { 1466 if (w->mqtt->unacked_publish && 1467 w->mqtt->ack_pkt_id == par->cpkt_id) { 1468 char requested_close = 0; 1469 1470 w->mqtt->unacked_publish = 0; 1471 if (user_callback_handle_rxflow( 1472 w->a.protocol->callback, 1473 w, LWS_CALLBACK_MQTT_ACK, 1474 w->user_space, NULL, 0) < 0) { 1475 lwsl_info("%s: MQTT_ACK requests close\n", 1476 __func__); 1477 requested_close = 1; 1478 } 1479 n = 1; 1480 1481 /* 1482 * We got an assertive PUBACK, 1483 * no need for ACK timeout wait 1484 * any more 1485 */ 1486 lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait); 1487 1488 if (requested_close) { 1489 __lws_close_free_wsi(w, 1490 0, "ack cb"); 1491 break; 1492 } 1493 1494 break; 1495 } 1496 } lws_end_foreach_ll(w, mux.sibling_list); 1497 1498 if (!n) { 1499 lwsl_err("%s: unsolicited PUBACK\n", 1500 __func__); 1501 return -1; 1502 } 1503 1504 /* 1505 * If we published something and it was acked, 1506 * our connection is definitely working in both 1507 * directions at the moment. 1508 */ 1509 lws_validity_confirmed(wsi); 1510 break; 1511 1512 case LMQCP_STOC_PINGRESP: 1513 lwsl_info("%s: cmd_completion: PINGRESP\n", 1514 __func__); 1515 /* 1516 * If we asked for a PINGRESP and it came, 1517 * our connection is definitely working in both 1518 * directions at the moment. 1519 */ 1520 lws_validity_confirmed(wsi); 1521 break; 1522 1523 case LMQCP_STOC_SUBACK: 1524 lwsl_info("%s: cmd_completion: SUBACK\n", 1525 __func__); 1526 1527 /* 1528 * Figure out which child asked for this 1529 */ 1530 1531 n = 0; 1532 lws_start_foreach_ll(struct lws *, w, 1533 wsi->mux.child_list) { 1534 if (w->mqtt->inside_subscribe && 1535 w->mqtt->ack_pkt_id == par->cpkt_id) { 1536 w->mqtt->inside_subscribe = 0; 1537 if (user_callback_handle_rxflow( 1538 w->a.protocol->callback, 1539 w, LWS_CALLBACK_MQTT_SUBSCRIBED, 1540 w->user_space, NULL, 0) < 0) { 1541 lwsl_err("%s: MQTT_SUBSCRIBE failed\n", 1542 __func__); 1543 return -1; 1544 } 1545 n = 1; 1546 break; 1547 } 1548 } lws_end_foreach_ll(w, mux.sibling_list); 1549 1550 if (!n) { 1551 lwsl_err("%s: unsolicited SUBACK\n", 1552 __func__); 1553 return -1; 1554 } 1555 1556 /* 1557 * If we subscribed to something and SUBACK came, 1558 * our connection is definitely working in both 1559 * directions at the moment. 1560 */ 1561 lws_validity_confirmed(wsi); 1562 1563 break; 1564 1565 case LMQCP_STOC_UNSUBACK: 1566 { 1567 char requested_close = 0; 1568 lwsl_info("%s: cmd_completion: UNSUBACK\n", 1569 __func__); 1570 /* 1571 * Figure out which child asked for this 1572 */ 1573 n = 0; 1574 lws_start_foreach_ll(struct lws *, w, 1575 wsi->mux.child_list) { 1576 if (w->mqtt->inside_unsubscribe && 1577 w->mqtt->ack_pkt_id == par->cpkt_id) { 1578 struct lws *nwsi = lws_get_network_wsi(w); 1579 1580 /* 1581 * No more subscribers left, 1582 * remove the topic from nwsi 1583 */ 1584 lws_mqtt_client_remove_subs(nwsi->mqtt); 1585 1586 w->mqtt->inside_unsubscribe = 0; 1587 if (user_callback_handle_rxflow( 1588 w->a.protocol->callback, 1589 w, LWS_CALLBACK_MQTT_UNSUBSCRIBED, 1590 w->user_space, NULL, 0) < 0) { 1591 lwsl_info("%s: MQTT_UNSUBACK requests close\n", 1592 __func__); 1593 requested_close = 1; 1594 } 1595 n = 1; 1596 1597 lws_sul_cancel(&w->mqtt->sul_unsuback_wait); 1598 if (requested_close) { 1599 __lws_close_free_wsi(w, 1600 0, "unsub ack cb"); 1601 break; 1602 } 1603 break; 1604 } 1605 } lws_end_foreach_ll(w, mux.sibling_list); 1606 1607 if (!n) { 1608 lwsl_err("%s: unsolicited UNSUBACK\n", 1609 __func__); 1610 return -1; 1611 } 1612 1613 1614 /* 1615 * If we unsubscribed to something and 1616 * UNSUBACK came, our connection is 1617 * definitely working in both 1618 * directions at the moment. 1619 */ 1620 lws_validity_confirmed(wsi); 1621 1622 break; 1623 } 1624 case LMQCP_PUBLISH: 1625 { 1626 lws_mqtt_publish_param_t *pub = 1627 (lws_mqtt_publish_param_t *) 1628 wsi->mqtt->rx_cpkt_param; 1629 size_t chunk; 1630 1631 if (pub == NULL) { 1632 lwsl_notice("%s: no pub\n", __func__); 1633 return -1; 1634 } 1635 1636 /* 1637 * RX PUBLISH is delivered to any children that 1638 * registered for the related topic 1639 */ 1640 1641 n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)]; 1642 1643 chunk = pub->payload_len - pub->payload_pos; 1644 if (chunk > len) 1645 chunk = len; 1646 1647 lws_start_foreach_ll(struct lws *, w, 1648 wsi->mux.child_list) { 1649 if (lws_mqtt_find_sub(w->mqtt, 1650 pub->topic)) 1651 if (w->a.protocol->callback( 1652 w, (enum lws_callback_reasons)n, 1653 w->user_space, 1654 (void *)pub, 1655 chunk)) { 1656 par->payload_consumed = 0; 1657 lws_free_set_NULL(pub->topic); 1658 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param); 1659 return 1; 1660 } 1661 } lws_end_foreach_ll(w, mux.sibling_list); 1662 1663 1664 pub->payload_pos += (uint32_t)chunk; 1665 len -= chunk; 1666 buf += chunk; 1667 1668 lwsl_debug("%s: post pos %d, plen %d, len %d\n", 1669 __func__, (int)pub->payload_pos, 1670 (int)pub->payload_len, (int)len); 1671 1672 if (pub->payload_pos != pub->payload_len) { 1673 /* 1674 * More chunks of the payload pending, 1675 * blocking this connection from doing 1676 * anything else 1677 */ 1678 par->state = LMQCPP_PAYLOAD; 1679 break; 1680 } 1681 1682 if (pub->qos == 1) { 1683 /* For QOS = 1, send out PUBACK */ 1684 wsi->mqtt->send_puback = 1; 1685 lws_callback_on_writable(wsi); 1686 } else if (pub->qos == 2) { 1687 /* For QOS = 2, send out PUBREC */ 1688 wsi->mqtt->send_pubrec = 1; 1689 lws_callback_on_writable(wsi); 1690 } 1691 1692 par->payload_consumed = 0; 1693 lws_free_set_NULL(pub->topic); 1694 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param); 1695 1696 break; 1697 } 1698 default: 1699 break; 1700 } 1701 1702 break; 1703 1704 1705 case LMQCPP_PROP_ID_VBI: 1706 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1707 case LMSPR_NEED_MORE: 1708 break; 1709 case LMSPR_COMPLETED: 1710 par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed); 1711 if (par->vbit.value > 1712 LWS_ARRAY_SIZE(property_valid)) { 1713 lwsl_notice("%s: undef prop id 0x%x\n", 1714 __func__, (int)par->vbit.value); 1715 goto send_protocol_error_and_close; 1716 } 1717 if (!(property_valid[par->vbit.value] & 1718 (1 << ctl_pkt_type(par)))) { 1719 lwsl_notice("%s: prop id 0x%x invalid for" 1720 " control pkt %d\n", __func__, 1721 (int)par->vbit.value, 1722 ctl_pkt_type(par)); 1723 goto send_protocol_error_and_close; 1724 } 1725 par->prop_id = par->vbit.value; 1726 par->flag_prop_multi = !!( 1727 par->props_seen[par->prop_id >> 3] & 1728 (1 << (par->prop_id & 7))); 1729 par->props_seen[par->prop_id >> 3] = 1730 (uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7))); 1731 /* 1732 * even if it's not a vbi property arg, 1733 * .consumed of this will be zero the first time 1734 */ 1735 lws_mqtt_vbi_init(&par->vbit); 1736 /* 1737 * if it's a string, next state must set the 1738 * destination and size limit itself. But 1739 * resetting it generically here lets it use 1740 * lws_mqtt_str_first() to understand it's the 1741 * first time around. 1742 */ 1743 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0); 1744 1745 /* property arg state enums are so encoded */ 1746 par->state = 0x100 | par->vbit.value; 1747 break; 1748 default: 1749 lwsl_notice("%s: prop id bad vbi\n", __func__); 1750 goto send_protocol_error_and_close; 1751 } 1752 break; 1753 1754 /* 1755 * All possible property payloads... restricting which ones 1756 * can appear in which control packets is already done above 1757 * in LMQCPP_PROP_ID_VBI 1758 */ 1759 1760 case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE: 1761 case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE: 1762 case LMQCPP_PROP_MAXIMUM_QOS_1BYTE: 1763 case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE: 1764 case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE: 1765 case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE: 1766 case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE: 1767 case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */ 1768 if (par->flag_prop_multi) 1769 goto singular_prop_seen_twice; 1770 par->payload_format = *buf++; 1771 len--; 1772 if (lws_mqtt_pconsume(par, 1)) 1773 goto send_protocol_error_and_close; 1774 break; 1775 1776 case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE: 1777 case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE: 1778 case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE: 1779 case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE: 1780 if (par->flag_prop_multi) 1781 goto singular_prop_seen_twice; 1782 1783 if (lws_mqtt_mb_first(&par->vbit)) 1784 lws_mqtt_4byte_init(&par->vbit); 1785 1786 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) { 1787 case LMSPR_NEED_MORE: 1788 break; 1789 case LMSPR_COMPLETED: 1790 if (lws_mqtt_pconsume(par, par->vbit.consumed)) 1791 goto send_protocol_error_and_close; 1792 break; 1793 default: 1794 goto send_protocol_error_and_close; 1795 } 1796 break; 1797 1798 case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE: 1799 case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE: 1800 case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE: 1801 case LMQCPP_PROP_TOPIC_ALIAS_2BYTE: 1802 if (par->flag_prop_multi) 1803 goto singular_prop_seen_twice; 1804 1805 if (lws_mqtt_mb_first(&par->vbit)) 1806 lws_mqtt_2byte_init(&par->vbit); 1807 1808 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) { 1809 case LMSPR_NEED_MORE: 1810 break; 1811 case LMSPR_COMPLETED: 1812 if (lws_mqtt_pconsume(par, par->vbit.consumed)) 1813 goto send_protocol_error_and_close; 1814 break; 1815 default: 1816 goto send_protocol_error_and_close; 1817 } 1818 break; 1819 1820 case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S: 1821 case LMQCPP_PROP_AUTH_METHOD_UTF8S: 1822 case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S: 1823 case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S: 1824 case LMQCPP_PROP_RESPONSE_INFO_UTF8S: 1825 case LMQCPP_PROP_SERVER_REFERENCE_UTF8S: 1826 case LMQCPP_PROP_REASON_STRING_UTF8S: 1827 case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S: 1828 case LMQCPP_PROP_CONTENT_TYPE_UTF8S: 1829 if (par->flag_prop_multi) 1830 goto singular_prop_seen_twice; 1831 1832 if (lws_mqtt_str_first(&par->s_temp)) 1833 lws_mqtt_str_init(&par->s_temp, par->temp, 1834 sizeof(par->temp), 0); 1835 1836 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) { 1837 case LMSPR_NEED_MORE: 1838 break; 1839 case LMSPR_COMPLETED: 1840 if (lws_mqtt_pconsume(par, par->s_temp.len)) 1841 goto send_protocol_error_and_close; 1842 break; 1843 1844 default: 1845 lwsl_info("%s: bad protocol name\n", __func__); 1846 goto send_protocol_error_and_close; 1847 } 1848 break; 1849 1850 case LMQCPP_PROP_SUBSCRIPTION_ID_VBI: 1851 1852 case LMQCPP_PROP_CORRELATION_BINDATA: 1853 case LMQCPP_PROP_AUTH_DATA_BINDATA: 1854 1855 /* TODO */ 1856 lwsl_err("%s: Unimplemented packet state 0x%x\n", 1857 __func__, par->state); 1858 return -1; 1859 } 1860 } 1861 1862 return 0; 1863 1864oom: 1865 lwsl_err("%s: OOM!\n", __func__); 1866 goto send_protocol_error_and_close; 1867 1868singular_prop_seen_twice: 1869 lwsl_info("%s: property appears twice\n", __func__); 1870 1871send_protocol_error_and_close: 1872 lwsl_notice("%s: peac\n", __func__); 1873 par->reason = LMQCP_REASON_PROTOCOL_ERROR; 1874 1875send_reason_and_close: 1876 lwsl_notice("%s: srac\n", __func__); 1877 par->flag_pending_send_reason_close = 1; 1878 goto ask; 1879 1880send_unsupp_connack_and_close: 1881 lwsl_notice("%s: unsupac\n", __func__); 1882 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL; 1883 par->flag_pending_send_connack_close = 1; 1884 1885ask: 1886 /* Should we ask for clients? */ 1887 lws_callback_on_writable(wsi); 1888 1889 return -1; 1890} 1891 1892int 1893lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type, 1894 uint8_t dup, lws_mqtt_qos_levels_t qos, 1895 uint8_t retain) 1896{ 1897 lws_mqtt_fixed_hdr_t hdr; 1898 1899 hdr.bits = 0; 1900 hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf; 1901 1902 switch(ctrl_pkt_type) { 1903 case LMQCP_PUBLISH: 1904 hdr.flags.dup = !!dup; 1905 /* 1906 * A PUBLISH Packet MUST NOT have both QoS bits set to 1907 * 1. If a Server or Client receives a PUBLISH Packet 1908 * which has both QoS bits set to 1 it MUST close the 1909 * Network Connection [MQTT-3.3.1-4]. 1910 */ 1911 if (qos >= RESERVED_QOS_LEVEL) { 1912 lwsl_err("%s: Unsupport QoS level 0x%x\n", 1913 __func__, qos); 1914 return -1; 1915 } 1916 hdr.flags.qos = qos & 3; 1917 hdr.flags.retain = !!retain; 1918 break; 1919 1920 case LMQCP_CTOS_CONNECT: 1921 case LMQCP_STOC_CONNACK: 1922 case LMQCP_PUBACK: 1923 case LMQCP_PUBREC: 1924 case LMQCP_PUBCOMP: 1925 case LMQCP_STOC_SUBACK: 1926 case LMQCP_STOC_UNSUBACK: 1927 case LMQCP_CTOS_PINGREQ: 1928 case LMQCP_STOC_PINGRESP: 1929 case LMQCP_DISCONNECT: 1930 case LMQCP_AUTH: 1931 hdr.bits &= 0xf0; 1932 break; 1933 1934 /* 1935 * Bits 3,2,1 and 0 of the fixed header of the PUBREL, 1936 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and 1937 * MUST be set to 0,0,1 and 0 respectively. The Server MUST 1938 * treat any other value as malformed and close the Network 1939 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1]. 1940 */ 1941 case LMQCP_PUBREL: 1942 case LMQCP_CTOS_SUBSCRIBE: 1943 case LMQCP_CTOS_UNSUBSCRIBE: 1944 hdr.bits |= 0x02; 1945 break; 1946 1947 default: 1948 return -1; 1949 } 1950 1951 *p = hdr.bits; 1952 1953 return 0; 1954} 1955 1956int 1957lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, 1958 const void *buf, uint32_t len, int is_complete) 1959{ 1960 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi]; 1961 uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p; 1962 struct lws *nwsi = lws_get_network_wsi(wsi); 1963 lws_mqtt_str_t mqtt_vh_payload; 1964 uint32_t vh_len, rem_len; 1965 1966 assert(pub->topic); 1967 1968 lwsl_debug("%s: len = %d, is_complete = %d\n", 1969 __func__, (int)len, (int)is_complete); 1970 1971 if (lwsi_state(wsi) != LRS_ESTABLISHED) { 1972 lwsl_err("%s: %s: unknown state 0x%x\n", __func__, 1973 lws_wsi_tag(wsi), lwsi_state(wsi)); 1974 assert(0); 1975 return 1; 1976 } 1977 1978 if (wsi->mqtt->inside_payload) { 1979 /* 1980 * Headers are filled, we are sending 1981 * the payload - a buffer with LWS_PRE 1982 * in front it. 1983 */ 1984 start = (uint8_t *)buf; 1985 p = start + len; 1986 if (is_complete) 1987 wsi->mqtt->inside_payload = 0; 1988 goto do_write; 1989 } 1990 1991 start = b + LWS_PRE; 1992 p = start; 1993 /* 1994 * Fill headers and the first chunk of the 1995 * payload (if any) 1996 */ 1997 if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH, 1998 pub->dup, pub->qos, pub->retain)) { 1999 lwsl_err("%s: Failed to fill fixed header\n", __func__); 2000 return 1; 2001 } 2002 2003 /* 2004 * Topic len field + Topic len + Packet ID 2005 * (for QOS>0) + Payload len 2006 */ 2007 vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0)); 2008 rem_len = vh_len + pub->payload_len; 2009 lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len); 2010 2011 /* Will the chunk of payload fit? */ 2012 if ((vh_len + len) >= 2013 (wsi->a.context->pt_serv_buf_size - LWS_PRE)) { 2014 lwsl_err("%s: Payload is too big\n", __func__); 2015 return 1; 2016 } 2017 2018 p += lws_mqtt_vbi_encode(rem_len, p); 2019 2020 /* Topic's Len */ 2021 lws_ser_wu16be(p, pub->topic_len); 2022 p += 2; 2023 2024 /* 2025 * Init lws_mqtt_str for "MQTT Variable 2026 * Headers + payload" (only the supplied 2027 * chuncked payload) 2028 */ 2029 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, 2030 (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len), 2031 0); 2032 2033 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2034 lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1); 2035 if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) { 2036 lwsl_err("%s: a\n", __func__); 2037 return 1; 2038 } 2039 2040 /* Packet ID */ 2041 if (pub->qos != QOS0) { 2042 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2043 if (!pub->dup) 2044 nwsi->mqtt->pkt_id++; 2045 wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id; 2046 lwsl_debug("%s: pkt_id = %d\n", __func__, 2047 (int)wsi->mqtt->ack_pkt_id); 2048 lws_ser_wu16be(p, pub->packet_id); 2049 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) { 2050 lwsl_err("%s: b\n", __func__); 2051 return 1; 2052 } 2053 } 2054 2055 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2056 memcpy(p, buf, len); 2057 if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len)) 2058 return 1; 2059 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2060 2061 if (!is_complete) 2062 nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1; 2063 2064do_write: 2065 2066 // lwsl_hexdump_err(start, lws_ptr_diff(p, start)); 2067 2068 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) != 2069 lws_ptr_diff(p, start)) { 2070 lwsl_err("%s: write failed\n", __func__); 2071 return 1; 2072 } 2073 2074 if (!is_complete) { 2075 /* still some more chunks to come... */ 2076 lws_callback_on_writable(wsi); 2077 2078 return 0; 2079 } 2080 2081 wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0; 2082 2083 if (pub->qos != QOS0) 2084 wsi->mqtt->unacked_publish = 1; 2085 2086 /* this was the last part of the publish message */ 2087 2088 if (pub->qos == QOS0) { 2089 /* 2090 * There won't be any real PUBACK, act like we got one 2091 * so the user callback logic is the same for QoS0 or 2092 * QoS1 2093 */ 2094 if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK, 2095 wsi->user_space, NULL, 0)) { 2096 lwsl_err("%s: ACK callback exited\n", __func__); 2097 return 1; 2098 } 2099 } else if (pub->qos == QOS1 || pub->qos == QOS2) { 2100 /* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s, 2101 * we must RETRY the publish 2102 */ 2103 wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend; 2104 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], 2105 &wsi->mqtt->sul_qos_puback_pubrec_wait, 2106 3 * LWS_USEC_PER_SEC); 2107 } 2108 2109 if (wsi->mqtt->inside_shadow) { 2110 wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout; 2111 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], 2112 &wsi->mqtt->sul_shadow_wait, 2113 60 * LWS_USEC_PER_SEC); 2114 } 2115 2116 return 0; 2117} 2118 2119int 2120lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub) 2121{ 2122 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi]; 2123 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start; 2124 struct lws *nwsi = lws_get_network_wsi(wsi); 2125 lws_mqtt_str_t mqtt_vh_payload; 2126 uint8_t exists[8], extant; 2127 lws_mqtt_subs_t *mysub; 2128 uint32_t rem_len; 2129#if defined(_DEBUG) 2130 uint32_t tops; 2131#endif 2132 uint32_t n; 2133 2134 assert(sub->num_topics); 2135 assert(sub->num_topics < sizeof(exists)); 2136 2137 switch (lwsi_state(wsi)) { 2138 case LRS_ESTABLISHED: /* Protocol connection established */ 2139 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE, 2140 0, 0, 0)) { 2141 lwsl_err("%s: Failed to fill fixed header\n", __func__); 2142 return 1; 2143 } 2144 2145 /* 2146 * The stream wants to subscribe to one or more topic, but 2147 * the shared nwsi may already be subscribed to some or all of 2148 * them from interactions with other streams. For those cases, 2149 * we filter them from the list the child wants until we just 2150 * have ones that are new to the nwsi. If nothing left, we just 2151 * synthesize the callback to the child as if SUBACK had come 2152 * and we're done, otherwise just ask the server for topics that 2153 * are new to the wsi. 2154 */ 2155 2156 extant = 0; 2157 memset(&exists, 0, sizeof(exists)); 2158 for (n = 0; n < sub->num_topics; n++) { 2159 lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n", 2160 __func__, (int)n, sub->topic[n].name); 2161 2162 mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name); 2163 if (mysub && mysub->ref_count) { 2164 mysub->ref_count++; /* another stream using it */ 2165 exists[n] = 1; 2166 extant++; 2167 } 2168 2169 /* 2170 * Attach the topic we're subscribing to, to wsi->mqtt 2171 */ 2172 if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) { 2173 lwsl_err("%s: create sub fail\n", __func__); 2174 return 1; 2175 } 2176 } 2177 2178 if (extant == sub->num_topics) { 2179 /* 2180 * It turns out there's nothing to do here, the nwsi has 2181 * already subscribed to all the topics this stream 2182 * wanted. Just tell it it can have them. 2183 */ 2184 lwsl_notice("%s: all topics already subscribed\n", __func__); 2185 if (user_callback_handle_rxflow( 2186 wsi->a.protocol->callback, 2187 wsi, LWS_CALLBACK_MQTT_SUBSCRIBED, 2188 wsi->user_space, NULL, 0) < 0) { 2189 lwsl_err("%s: MQTT_SUBSCRIBE failed\n", 2190 __func__); 2191 return -1; 2192 } 2193 2194 return 0; 2195 } 2196 2197#if defined(_DEBUG) 2198 /* 2199 * zero or more of the topics already existed, but not all, 2200 * so we must go to the server with a filtered list of the 2201 * new ones only 2202 */ 2203 2204 tops = sub->num_topics - extant; 2205#endif 2206 2207 /* 2208 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics 2209 */ 2210 rem_len = 2; 2211 for (n = 0; n < sub->num_topics; n++) 2212 if (!exists[n]) 2213 rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1); 2214 2215 wsi->mqtt->sub_size = (uint16_t)rem_len; 2216 2217#if defined(_DEBUG) 2218 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n", 2219 __func__, (int)tops, (int)rem_len); 2220#endif 2221 2222 p += lws_mqtt_vbi_encode(rem_len, p); 2223 2224 if ((rem_len + lws_ptr_diff_size_t(p, start)) >= 2225 wsi->a.context->pt_serv_buf_size) { 2226 lwsl_err("%s: Payload is too big\n", __func__); 2227 return 1; 2228 } 2229 2230 /* Init lws_mqtt_str */ 2231 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0); 2232 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2233 2234 /* Packet ID */ 2235 wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id; 2236 lwsl_debug("%s: pkt_id = %d\n", __func__, 2237 (int)sub->packet_id); 2238 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id); 2239 2240 nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot; 2241 2242 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2243 return 1; 2244 2245 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2246 2247 for (n = 0; n < sub->num_topics; n++) { 2248 lwsl_info("%s: topics[%d] = %s\n", __func__, 2249 (int)n, sub->topic[n].name); 2250 2251 /* if the nwsi already has it, don't ask server for it */ 2252 if (exists[n]) { 2253 lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n", 2254 __func__, (int)n, sub->topic[n].name); 2255 continue; 2256 } 2257 2258 /* 2259 * Attach the topic we're subscribing to, to nwsi->mqtt 2260 * so we know the nwsi itself has a subscription to it 2261 */ 2262 2263 if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name)) 2264 return 1; 2265 2266 /* Topic's Len */ 2267 lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name)); 2268 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2269 return 1; 2270 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2271 2272 /* Topic Name */ 2273 lws_strncpy((char *)p, sub->topic[n].name, 2274 strlen(sub->topic[n].name) + 1); 2275 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2276 (int)strlen(sub->topic[n].name))) 2277 return 1; 2278 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2279 2280 /* QoS */ 2281 *p = (uint8_t)sub->topic[n].qos; 2282 if (lws_mqtt_str_advance(&mqtt_vh_payload, 1)) 2283 return 1; 2284 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2285 } 2286 break; 2287 2288 default: 2289 return 1; 2290 } 2291 2292 if (wsi->mqtt->inside_resume_session) 2293 return 0; 2294 2295 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) != 2296 lws_ptr_diff(p, start)) 2297 return 1; 2298 2299 wsi->mqtt->inside_subscribe = 1; 2300 2301 return 0; 2302} 2303 2304int 2305lws_mqtt_client_send_unsubcribe(struct lws *wsi, 2306 const lws_mqtt_subscribe_param_t *unsub) 2307{ 2308 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi]; 2309 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start; 2310 struct lws *nwsi = lws_get_network_wsi(wsi); 2311 lws_mqtt_str_t mqtt_vh_payload; 2312 uint8_t send_unsub[8], orphaned; 2313 uint32_t rem_len, n; 2314 lws_mqtt_subs_t *mysub; 2315#if defined(_DEBUG) 2316 uint32_t tops; 2317#endif 2318 2319 lwsl_info("%s: Enter\n", __func__); 2320 2321 switch (lwsi_state(wsi)) { 2322 case LRS_ESTABLISHED: /* Protocol connection established */ 2323 orphaned = 0; 2324 memset(&send_unsub, 0, sizeof(send_unsub)); 2325 for (n = 0; n < unsub->num_topics; n++) { 2326 mysub = lws_mqtt_find_sub(nwsi->mqtt, 2327 unsub->topic[n].name); 2328 assert(mysub); 2329 2330 if (mysub && --mysub->ref_count == 0) { 2331 lwsl_notice("%s: Need to send UNSUB\n", __func__); 2332 send_unsub[n] = 1; 2333 orphaned++; 2334 } 2335 } 2336 2337 if (!orphaned) { 2338 /* 2339 * The nwsi still has other subscribers bound to the 2340 * topics. 2341 * 2342 * So, don't send UNSUB to server, and just fake the 2343 * UNSUB ACK event for the guy going away. 2344 */ 2345 lwsl_notice("%s: unsubscribed!\n", __func__); 2346 if (user_callback_handle_rxflow( 2347 wsi->a.protocol->callback, 2348 wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED, 2349 wsi->user_space, NULL, 0) < 0) { 2350 /* 2351 * We can't directly close here, because the 2352 * caller still has the wsi. Inform the 2353 * caller that we want to close 2354 */ 2355 2356 return 1; 2357 } 2358 2359 return 0; 2360 } 2361#if defined(_DEBUG) 2362 /* 2363 * one or more of the topics needs to be unsubscribed 2364 * from, so we must go to the server with a filtered 2365 * list of the new ones only 2366 */ 2367 2368 tops = orphaned; 2369#endif 2370 2371 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE, 2372 0, 0, 0)) { 2373 lwsl_err("%s: Failed to fill fixed header\n", __func__); 2374 return 1; 2375 } 2376 2377 /* 2378 * Pid + (Topic len field + Topic len) x Num of Topics 2379 */ 2380 rem_len = 2; 2381 for (n = 0; n < unsub->num_topics; n++) 2382 if (send_unsub[n]) 2383 rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name)); 2384 2385 wsi->mqtt->sub_size = (uint16_t)rem_len; 2386 2387#if defined(_DEBUG) 2388 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n", 2389 __func__, (int)tops, (int)rem_len); 2390#endif 2391 2392 p += lws_mqtt_vbi_encode(rem_len, p); 2393 2394 if ((rem_len + lws_ptr_diff_size_t(p, start)) >= 2395 wsi->a.context->pt_serv_buf_size) { 2396 lwsl_err("%s: Payload is too big\n", __func__); 2397 return 1; 2398 } 2399 2400 /* Init lws_mqtt_str */ 2401 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0); 2402 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2403 2404 /* Packet ID */ 2405 wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id; 2406 lwsl_debug("%s: pkt_id = %d\n", __func__, 2407 (int)wsi->mqtt->ack_pkt_id); 2408 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id); 2409 2410 nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot; 2411 2412 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2413 return 1; 2414 2415 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2416 2417 for (n = 0; n < unsub->num_topics; n++) { 2418 lwsl_info("%s: topics[%d] = %s\n", __func__, 2419 (int)n, unsub->topic[n].name); 2420 2421 /* 2422 * Subscriber still bound to it, don't UBSUB 2423 * from the server 2424 */ 2425 if (!send_unsub[n]) 2426 continue; 2427 2428 /* Topic's Len */ 2429 lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name)); 2430 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2431 return 1; 2432 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2433 2434 /* Topic Name */ 2435 lws_strncpy((char *)p, unsub->topic[n].name, 2436 strlen(unsub->topic[n].name) + 1); 2437 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2438 (int)strlen(unsub->topic[n].name))) 2439 return 1; 2440 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2441 } 2442 break; 2443 2444 default: 2445 return 1; 2446 } 2447 2448 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) != 2449 lws_ptr_diff(p, start)) 2450 return 1; 2451 2452 wsi->mqtt->inside_unsubscribe = 1; 2453 2454 wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout; 2455 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], 2456 &wsi->mqtt->sul_unsuback_wait, 2457 3 * LWS_USEC_PER_SEC); 2458 2459 return 0; 2460} 2461 2462/* 2463 * This is called when child streams bind to an already-existing and compatible 2464 * MQTT stream 2465 */ 2466 2467struct lws * 2468lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi) 2469{ 2470 /* no more children allowed by parent? */ 2471 2472 if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) { 2473 lwsl_err("%s: reached concurrent stream limit\n", __func__); 2474 return NULL; 2475 } 2476 2477#if defined(LWS_WITH_CLIENT) 2478 wsi->client_mux_substream = 1; 2479#endif 2480 2481 lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid); 2482 2483 if (lws_ensure_user_space(wsi)) 2484 goto bail1; 2485 2486 lws_mqtt_set_client_established(wsi); 2487 lws_callback_on_writable(wsi); 2488 2489 return wsi; 2490 2491bail1: 2492 /* undo the insert */ 2493 parent_wsi->mux.child_list = wsi->mux.sibling_list; 2494 parent_wsi->mux.child_count--; 2495 2496 if (wsi->user_space) 2497 lws_free_set_NULL(wsi->user_space); 2498 2499 wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0); 2500 lws_free(wsi); 2501 2502 return NULL; 2503} 2504 2505