1 /*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 *
24 * MQTT v5
25 *
26 * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27 *
28 * Control Packet structure
29 *
30 * - Always: 2+ byte: Fixed Hdr
31 * - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32 * - Required in some: variable: Payload
33 *
34 * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35 *
36 * - Client Identifier
37 * - Will Properties
38 * - Will Topic
39 * - Will Payload
40 * - User Name
41 * - Password
42 */
43
44 #include "private-lib-core.h"
45 #include <string.h>
46 #include <sys/types.h>
47 #include <assert.h>
48
49 typedef enum {
50 LMQPRS_AWAITING_CONNECT,
51
52 } lws_mqtt_protocol_server_connstate_t;
53
54 const char * const reason_names_g1[] = {
55 "Success / Normal disconnection / QoS0",
56 "QoS1",
57 "QoS2",
58 "Disconnect Will",
59 "No matching subscriber",
60 "No subscription existed",
61 "Continue authentication",
62 "Re-authenticate"
63 };
64
65 const char * const reason_names_g2[] = {
66 "Unspecified error",
67 "Malformed packet",
68 "Protocol error",
69 "Implementation specific error",
70 "Unsupported protocol",
71 "Client ID invalid",
72 "Bad credentials",
73 "Not Authorized",
74 "Server Unavailable",
75 "Server Busy",
76 "Banned",
77 "Server Shutting Down",
78 "Bad Authentication Method",
79 "Keepalive Timeout",
80 "Session taken over",
81 "Topic Filter Invalid",
82 "Packet ID in use",
83 "Packet ID not found",
84 "Max RX Exceeded",
85 "Topic Alias Invalid",
86 "Packet too large",
87 "Ratelimit",
88 "Quota Exceeded",
89 "Administrative Action",
90 "Payload format invalid",
91 "Retain not supported",
92 "QoS not supported",
93 "Use another server",
94 "Server Moved",
95 "Shared subscriptions not supported",
96 "Connection rate exceeded",
97 "Maximum Connect Time",
98 "Subscription IDs not supported",
99 "Wildcard subscriptions not supported"
100 };
101
102 #define LMQCP_WILL_PROPERTIES 0
103
104 /* For each property, a bitmap describing which commands it is valid for */
105
106 static const uint16_t property_valid[] = {
107 [LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) |
108 (1 << LMQCP_WILL_PROPERTIES),
109 [LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) |
110 (1 << LMQCP_WILL_PROPERTIES),
111 [LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) |
112 (1 << LMQCP_WILL_PROPERTIES),
113 [LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) |
114 (1 << LMQCP_WILL_PROPERTIES),
115 [LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) |
116 (1 << LMQCP_WILL_PROPERTIES),
117 [LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) |
118 (1 << LMQCP_CTOS_SUBSCRIBE),
119 [LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) |
120 (1 << LMQCP_STOC_CONNACK) |
121 (1 << LMQCP_DISCONNECT),
122 [LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK),
123 [LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK),
124 [LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) |
125 (1 << LMQCP_STOC_CONNACK) |
126 (1 << LMQCP_AUTH),
127 [LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) |
128 (1 << LMQCP_STOC_CONNACK) |
129 (1 << LMQCP_AUTH),
130 [LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
131 [LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES),
132 [LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
133 [LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK),
134 [LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) |
135 (1 << LMQCP_DISCONNECT),
136 [LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) |
137 (1 << LMQCP_PUBACK) |
138 (1 << LMQCP_PUBREC) |
139 (1 << LMQCP_PUBREL) |
140 (1 << LMQCP_PUBCOMP) |
141 (1 << LMQCP_STOC_SUBACK) |
142 (1 << LMQCP_STOC_UNSUBACK) |
143 (1 << LMQCP_DISCONNECT) |
144 (1 << LMQCP_AUTH),
145 [LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
146 (1 << LMQCP_STOC_CONNACK),
147 [LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
148 (1 << LMQCP_STOC_CONNACK),
149 [LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH),
150 [LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK),
151 [LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK),
152 [LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) |
153 (1 << LMQCP_STOC_CONNACK) |
154 (1 << LMQCP_PUBLISH) |
155 (1 << LMQCP_WILL_PROPERTIES) |
156 (1 << LMQCP_PUBACK) |
157 (1 << LMQCP_PUBREC) |
158 (1 << LMQCP_PUBREL) |
159 (1 << LMQCP_PUBCOMP) |
160 (1 << LMQCP_CTOS_SUBSCRIBE) |
161 (1 << LMQCP_STOC_SUBACK) |
162 (1 << LMQCP_CTOS_UNSUBSCRIBE) |
163 (1 << LMQCP_STOC_UNSUBACK) |
164 (1 << LMQCP_DISCONNECT) |
165 (1 << LMQCP_AUTH),
166 [LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) |
167 (1 << LMQCP_STOC_CONNACK),
168 [LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK),
169 [LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK),
170 [LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK)
171 };
172
173
174 /*
175 * For each command index, maps flags, id, qos and payload legality
176 * notice in most cases PUBLISH requires further processing
177 */
178 static const uint8_t map_flags[] = {
179 [LMQCP_RESERVED] = 0x00,
180 [LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
181 LMQCP_LUT_FLAG_PAYLOAD |
182 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
183 [LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
184 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
185 [LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */
186 LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
187 [LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
188 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
189 [LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
190 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
191 [LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
192 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
193 [LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
194 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
195 [LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
196 LMQCP_LUT_FLAG_PAYLOAD |
197 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
198 [LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
199 LMQCP_LUT_FLAG_PAYLOAD |
200 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
201 [LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
202 LMQCP_LUT_FLAG_PAYLOAD |
203 LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
204 [LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
205 LMQCP_LUT_FLAG_PAYLOAD |
206 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
207 [LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
208 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
209 [LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
210 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
211 [LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
212 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
213 [LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
214 LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
215 };
216
217 static int
lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)218 lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
219 {
220 par->consumed += (unsigned int)consumed;
221
222 if (par->consumed > par->props_len)
223 return -1;
224
225 /* more properties coming */
226
227 if (par->consumed < par->props_len) {
228 par->state = LMQCPP_PROP_ID_VBI;
229 return 0;
230 }
231
232 /* properties finished: are we headed for payload or idle? */
233
234 if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
235 /* A PUBLISH packet MUST NOT contain a Packet Identifier if
236 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
237 (ctl_pkt_type(par) != LMQCP_PUBLISH ||
238 (par->packet_type_flags & 6))) {
239 par->state = LMQCPP_PAYLOAD;
240 return 0;
241 }
242
243 par->state = LMQCPP_IDLE;
244
245 return 0;
246 }
247
248 static int
lws_mqtt_set_client_established(struct lws *wsi)249 lws_mqtt_set_client_established(struct lws *wsi)
250 {
251 lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
252 &role_ops_mqtt);
253
254 if (user_callback_handle_rxflow(wsi->a.protocol->callback,
255 wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
256 wsi->user_space, NULL, 0) < 0) {
257 lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
258
259 return -1;
260 }
261 /*
262 * If we made a new connection and got the ACK, our connection is
263 * definitely working in both directions at the moment
264 */
265 lws_validity_confirmed(wsi);
266
267 /* clear connection timeout */
268 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
269
270 return 0;
271 }
272
273 static lws_mqtt_validate_topic_return_t
lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)274 lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
275 {
276 size_t spos = 0;
277 const char *sub = topic;
278 int8_t slashes = 0;
279 lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
280
281 if (awsiot) {
282 if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
283 return LMVTR_FAILED_OVERSIZE;
284 if (topic[0] == '$') {
285 ret = LMVTR_VALID_SHADOW;
286 slashes = -3;
287 }
288 } else {
289 if (topiclen > LWS_MQTT_MAX_TOPICLEN)
290 return LMVTR_FAILED_OVERSIZE;
291 if (topic[0] == '$')
292 return LMVTR_FAILED_WILDCARD_FORMAT;
293 }
294
295 while (*sub != 0) {
296 if (sub[0] == '+') {
297 /* topic == "+foo" || "a/+foo" ? */
298 if (spos > 0 && sub[-1] != '/')
299 return LMVTR_FAILED_WILDCARD_FORMAT;
300
301 /* topic == "foo+" or "foo+/a" ? */
302 if (sub[1] != 0 && sub[1] != '/')
303 return LMVTR_FAILED_WILDCARD_FORMAT;
304
305 ret = LMVTR_VALID_WILDCARD;
306 } else if (sub[0] == '#') {
307 /* topic == "foo#" ? */
308 if (spos > 0 && sub[-1] != '/')
309 return LMVTR_FAILED_WILDCARD_FORMAT;
310
311 /* topic == "#foo" ? */
312 if (sub[1] != 0)
313 return LMVTR_FAILED_WILDCARD_FORMAT;
314
315 ret = LMVTR_VALID_WILDCARD;
316 } else if (sub[0] == '/') {
317 slashes++;
318 }
319 spos++;
320 sub++;
321 }
322
323 if (awsiot && (slashes < 0 || slashes > 7))
324 return LMVTR_FAILED_SHADOW_FORMAT;
325
326 return ret;
327 }
328
329 static lws_mqtt_subs_t *
lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)330 lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
331 {
332 lws_mqtt_subs_t *mysub;
333 size_t topiclen = strlen(topic);
334 lws_mqtt_validate_topic_return_t flag;
335
336 flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
337 switch (flag) {
338 case LMVTR_FAILED_OVERSIZE:
339 lwsl_err("%s: Topic is too long\n",
340 __func__);
341 return NULL;
342 case LMVTR_FAILED_SHADOW_FORMAT:
343 case LMVTR_FAILED_WILDCARD_FORMAT:
344 lwsl_err("%s: Invalid topic format \"%s\"\n",
345 __func__, topic);
346 return NULL;
347
348 case LMVTR_VALID:
349 case LMVTR_VALID_WILDCARD:
350 case LMVTR_VALID_SHADOW:
351 mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub");
352 if (!mysub) {
353 lwsl_err("%s: Error allocating mysub\n",
354 __func__);
355 return NULL;
356 }
357 mysub->wildcard = (flag == LMVTR_VALID_WILDCARD);
358 mysub->shadow = (flag == LMVTR_VALID_SHADOW);
359 break;
360
361 default:
362 lwsl_err("%s: Unknown flag - %d\n",
363 __func__, flag);
364 return NULL;
365 }
366
367 mysub->next = mqtt->subs_head;
368 mqtt->subs_head = mysub;
369 memcpy(mysub->topic, topic, strlen(topic) + 1);
370 mysub->ref_count = 1;
371
372 lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
373 __func__, mysub, mqtt);
374
375 return mysub;
376 }
377
378 static int
lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)379 lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
380 {
381 lws_mqtt_subs_t *s = mqtt->subs_head;
382 lws_mqtt_subs_t *temp = NULL;
383
384
385 lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
386 __func__, mqtt);
387
388 while (s && s->next) {
389 if (s->next->ref_count == 0)
390 break;
391 s = s->next;
392 }
393
394 if (s && s->next) {
395 temp = s->next;
396 lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
397 __func__, temp, mqtt);
398 s->next = temp->next;
399 lws_free(temp);
400 return 0;
401 }
402 return 1;
403 }
404
405 /*
406 * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
407 * PUBREC came before the timeout period
408 */
409
410 static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)411 lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
412 {
413 struct _lws_mqtt_related *mqtt = lws_container_of(sul,
414 struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
415
416 lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
417
418 if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
419 mqtt->wsi->user_space, NULL, 0))
420 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
421 }
422
423 static void
lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)424 lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
425 {
426 struct _lws_mqtt_related *mqtt = lws_container_of(sul,
427 struct _lws_mqtt_related, sul_unsuback_wait);
428
429 lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
430
431 if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
432 LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
433 mqtt->wsi->user_space, NULL, 0))
434 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
435 }
436
437 static void
lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul)438 lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul)
439 {
440 struct _lws_mqtt_related *mqtt = lws_container_of(sul,
441 struct _lws_mqtt_related, sul_shadow_wait);
442
443 lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
444
445 if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
446 LWS_CALLBACK_MQTT_SHADOW_TIMEOUT,
447 mqtt->wsi->user_space, NULL, 0))
448 lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
449 }
450
451 void
lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)452 lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
453 {
454 lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
455 c->estate = s;
456 }
457
458 lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char* sub, const char* pub)459 lws_mqtt_is_topic_matched(const char* sub, const char* pub)
460 {
461 const char *ppos = pub, *spos = sub;
462
463 if (!ppos || !spos) {
464 return LMMTR_TOPIC_MATCH_ERROR;
465 }
466
467 while (*spos) {
468 if (*ppos == '#' || *ppos == '+') {
469 lwsl_err("%s: PUBLISH to wildcard "
470 "topic \"%s\" not supported\n",
471 __func__, pub);
472 return LMMTR_TOPIC_MATCH_ERROR;
473 }
474 /* foo/+/bar == foo/xyz/bar ? */
475 if (*spos == '+') {
476 /* Skip ahead */
477 while (*ppos != '\0' && *ppos != '/') {
478 ppos++;
479 }
480 } else if (*spos == '#') {
481 return LMMTR_TOPIC_MATCH;
482 } else {
483 if (*ppos == '\0') {
484 /* foo/bar == foo/bar/# ? */
485 if (!strncmp(spos, "/#", 2))
486 return LMMTR_TOPIC_MATCH;
487 return LMMTR_TOPIC_NOMATCH;
488 /* Non-matching character */
489 } else if (*ppos != *spos) {
490 return LMMTR_TOPIC_NOMATCH;
491 }
492 ppos++;
493 }
494 spos++;
495 }
496
497 if (*spos == '\0' && *ppos == '\0')
498 return LMMTR_TOPIC_MATCH;
499
500 return LMMTR_TOPIC_NOMATCH;
501 }
502
lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt, const char* ptopic)503 lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
504 const char* ptopic) {
505 lws_mqtt_subs_t *s = mqtt->subs_head;
506
507 while (s) {
508 /* SUB topic == PUB topic ? */
509 /* foo/bar/xyz == foo/bar/xyz ? */
510 if (!s->wildcard) {
511 if (!strcmp((const char*)s->topic, ptopic))
512 return s;
513 } else {
514 if (lws_mqtt_is_topic_matched(
515 s->topic, ptopic) == LMMTR_TOPIC_MATCH)
516 return s;
517 }
518
519 s = s->next;
520 }
521
522 return NULL;
523 }
524
525 int
_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par, const uint8_t *buf, size_t len)526 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
527 const uint8_t *buf, size_t len)
528 {
529 struct lws *w;
530 int n;
531
532 if (par->flag_pending_send_reason_close)
533 return 0;
534
535 /*
536 * Stateful, fragmentation-immune parser
537 *
538 * Notice that len can always be 1 if under attack, even over tls if
539 * the server is compromised or malicious.
540 */
541
542 while (len) {
543 lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
544 switch (par->state) {
545 case LMQCPP_IDLE:
546 par->packet_type_flags = *buf++;
547 len--;
548
549 #if defined(LWS_WITH_CLIENT)
550 /*
551 * The case where we sent the connect, but we received
552 * something else before any CONNACK
553 */
554 if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
555 par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
556 lwsl_notice("%s: server sent non-CONNACK\n",
557 __func__);
558 goto send_protocol_error_and_close;
559 }
560 #endif /* LWS_WITH_CLIENT */
561
562 n = map_flags[par->packet_type_flags >> 4];
563 /*
564 * Where a flag bit is marked as “Reserved”, it is
565 * reserved for future use and MUST be set to the value
566 * listed [MQTT-2.1.3-1].
567 */
568 if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
569 ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
570 lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n",
571 __func__, lws_wsi_tag(wsi),
572 par->packet_type_flags, n, (int)len + 1);
573 lwsl_hexdump_err(buf - 1, len + 1);
574 goto send_protocol_error_and_close;
575 }
576
577 lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
578 __func__, par->packet_type_flags >> 4,
579 par->packet_type_flags & 0xf);
580
581 /* allows us to know if a property that can only be
582 * given once, appears twice */
583 memset(par->props_seen, 0, sizeof(par->props_seen));
584 par->state = par->packet_type_flags & 0xf0;
585 break;
586
587 case LMQCPP_CONNECT_PACKET:
588 lwsl_debug("%s: received CONNECT pkt\n", __func__);
589 par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
590 lws_mqtt_vbi_init(&par->vbit);
591 break;
592
593 case LMQCPP_CONNECT_REMAINING_LEN_VBI:
594 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
595 case LMSPR_NEED_MORE:
596 break;
597 case LMSPR_COMPLETED:
598 par->cpkt_remlen = par->vbit.value;
599 n = map_flags[ctl_pkt_type(par)];
600 lws_mqtt_str_init(&par->s_temp, par->temp,
601 sizeof(par->temp), 0);
602 par->state = LMQCPP_CONNECT_VH_PNAME;
603 break;
604 default:
605 lwsl_notice("%s: bad vbi\n", __func__);
606 goto send_protocol_error_and_close;
607 }
608 break;
609
610 case LMQCPP_CONNECT_VH_PNAME:
611 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
612 case LMSPR_NEED_MORE:
613 break;
614 case LMSPR_COMPLETED:
615 if (par->s_temp.len != 4 ||
616 memcmp(par->s_temp.buf, "MQTT",
617 par->s_temp.len)) {
618 lwsl_notice("%s: protocol name: %.*s\n",
619 __func__, par->s_temp.len,
620 par->s_temp.buf);
621 goto send_unsupp_connack_and_close;
622 }
623 par->state = LMQCPP_CONNECT_VH_PVERSION;
624 break;
625 default:
626 lwsl_notice("%s: bad protocol name\n", __func__);
627 goto send_protocol_error_and_close;
628 }
629 break;
630
631 case LMQCPP_CONNECT_VH_PVERSION:
632 par->conn_protocol_version = *buf++;
633 len--;
634 if (par->conn_protocol_version != 5) {
635 lwsl_info("%s: unsupported MQTT version %d\n",
636 __func__, par->conn_protocol_version);
637 goto send_unsupp_connack_and_close;
638 }
639 par->state = LMQCPP_CONNECT_VH_FLAGS;
640 break;
641
642 case LMQCPP_CONNECT_VH_FLAGS:
643 par->cpkt_flags = *buf++;
644 len--;
645 if (par->cpkt_flags & 1) {
646 /*
647 * The Server MUST validate that the reserved
648 * flag in the CONNECT packet is set to 0
649 * [MQTT-3.1.2-3].
650 */
651 par->reason = LMQCP_REASON_MALFORMED_PACKET;
652 goto send_reason_and_close;
653 }
654 /*
655 * conn_flags specifies the Will Properties that should
656 * appear in the payload section
657 */
658 lws_mqtt_2byte_init(&par->vbit);
659 par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
660 break;
661
662 case LMQCPP_CONNECT_VH_KEEPALIVE:
663 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
664 case LMSPR_NEED_MORE:
665 break;
666 case LMSPR_COMPLETED:
667 par->keepalive = (uint16_t)par->vbit.value;
668 lws_mqtt_vbi_init(&par->vbit);
669 par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
670 break;
671 default:
672 lwsl_notice("%s: ka bad vbi\n", __func__);
673 goto send_protocol_error_and_close;
674 }
675 break;
676
677 case LMQCPP_PINGRESP_ZERO:
678 len--;
679 /* second byte of PINGRESP must be zero */
680 if (*buf++)
681 goto send_protocol_error_and_close;
682 goto cmd_completion;
683
684 case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
685 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
686 case LMSPR_NEED_MORE:
687 break;
688 case LMSPR_COMPLETED:
689 /* reset consumption counter */
690 par->consumed = 0;
691 par->props_len = par->vbit.value;
692 lws_mqtt_vbi_init(&par->vbit);
693 par->state = LMQCPP_PROP_ID_VBI;
694 break;
695 default:
696 lwsl_notice("%s: connpr bad vbi\n", __func__);
697 goto send_protocol_error_and_close;
698 }
699 break;
700
701 /* PUBREC */
702 case LMQCPP_PUBREC_PACKET:
703 lwsl_debug("%s: received PUBREC pkt\n", __func__);
704 lws_mqtt_vbi_init(&par->vbit);
705 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
706 case LMSPR_NEED_MORE:
707 break;
708 case LMSPR_COMPLETED:
709 par->cpkt_remlen = par->vbit.value;
710 lwsl_debug("%s: PUBREC pkt len = %d\n",
711 __func__, (int)par->cpkt_remlen);
712 if (par->cpkt_remlen < 2)
713 goto send_protocol_error_and_close;
714 par->state = LMQCPP_PUBREC_VH_PKT_ID;
715 break;
716 default:
717 lwsl_notice("%s: pubrec bad vbi\n", __func__);
718 goto send_protocol_error_and_close;
719 }
720 break;
721
722 case LMQCPP_PUBREC_VH_PKT_ID:
723 if (len < 2) {
724 lwsl_notice("%s: len breakage 3\n", __func__);
725 return -1;
726 }
727
728 par->cpkt_id = lws_ser_ru16be(buf);
729 wsi->mqtt->ack_pkt_id = par->cpkt_id;
730 buf += 2;
731 len -= 2;
732 par->cpkt_remlen -= 2;
733 par->n = 0;
734
735 goto cmd_completion;
736
737 /* PUBREL */
738 case LMQCPP_PUBREL_PACKET:
739 lwsl_debug("%s: received PUBREL pkt\n", __func__);
740 lws_mqtt_vbi_init(&par->vbit);
741 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
742 case LMSPR_NEED_MORE:
743 break;
744 case LMSPR_COMPLETED:
745 par->cpkt_remlen = par->vbit.value;
746 lwsl_debug("%s: PUBREL pkt len = %d\n",
747 __func__, (int)par->cpkt_remlen);
748 if (par->cpkt_remlen < 2)
749 goto send_protocol_error_and_close;
750 par->state = LMQCPP_PUBREL_VH_PKT_ID;
751 break;
752 default:
753 lwsl_err("%s: pubrel bad vbi\n", __func__);
754 goto send_protocol_error_and_close;
755 }
756 break;
757
758 case LMQCPP_PUBREL_VH_PKT_ID:
759 if (len < 2) {
760 lwsl_notice("%s: len breakage 3\n", __func__);
761 return -1;
762 }
763
764 par->cpkt_id = lws_ser_ru16be(buf);
765 wsi->mqtt->ack_pkt_id = par->cpkt_id;
766 buf += 2;
767 len -= 2;
768 par->cpkt_remlen -= 2;
769 par->n = 0;
770
771 goto cmd_completion;
772
773 /* PUBCOMP */
774 case LMQCPP_PUBCOMP_PACKET:
775 lwsl_debug("%s: received PUBCOMP pkt\n", __func__);
776 lws_mqtt_vbi_init(&par->vbit);
777 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
778 case LMSPR_NEED_MORE:
779 break;
780 case LMSPR_COMPLETED:
781 par->cpkt_remlen = par->vbit.value;
782 lwsl_debug("%s: PUBCOMP pkt len = %d\n",
783 __func__, (int)par->cpkt_remlen);
784 if (par->cpkt_remlen < 2)
785 goto send_protocol_error_and_close;
786 par->state = LMQCPP_PUBCOMP_VH_PKT_ID;
787 break;
788 default:
789 lwsl_err("%s: pubcmp bad vbi\n", __func__);
790 goto send_protocol_error_and_close;
791 }
792 break;
793
794 case LMQCPP_PUBCOMP_VH_PKT_ID:
795 if (len < 2) {
796 lwsl_notice("%s: len breakage 3\n", __func__);
797 return -1;
798 }
799
800 par->cpkt_id = lws_ser_ru16be(buf);
801 wsi->mqtt->ack_pkt_id = par->cpkt_id;
802 buf += 2;
803 len -= 2;
804 par->cpkt_remlen -= 2;
805 par->n = 0;
806
807 goto cmd_completion;
808
809 case LMQCPP_PUBLISH_PACKET:
810 if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
811 lwsl_notice("%s: Topic rx before subscribing\n",
812 __func__);
813 goto send_protocol_error_and_close;
814 }
815 lwsl_info("%s: received PUBLISH pkt\n", __func__);
816 par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
817 lws_mqtt_vbi_init(&par->vbit);
818 break;
819 case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
820 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
821 case LMSPR_NEED_MORE:
822 break;
823 case LMSPR_COMPLETED:
824 par->cpkt_remlen = par->vbit.value;
825 lwsl_debug("%s: PUBLISH pkt len = %d\n",
826 __func__, (int)par->cpkt_remlen);
827 /* Move on to PUBLISH's variable header */
828 par->state = LMQCPP_PUBLISH_VH_TOPIC;
829 break;
830 default:
831 lwsl_notice("%s: pubrem bad vbi\n", __func__);
832 goto send_protocol_error_and_close;
833 }
834 break;
835
836 case LMQCPP_PUBLISH_VH_TOPIC:
837 {
838 lws_mqtt_publish_param_t *pub = NULL;
839
840 if (len < 2) {
841 lwsl_notice("%s: topic too short\n", __func__);
842 return -1;
843 }
844
845 /* Topic len */
846 par->n = lws_ser_ru16be(buf);
847 buf += 2;
848 len -= 2;
849
850 if (len < par->n) {/* the way this is written... */
851 lwsl_notice("%s: len breakage\n", __func__);
852 return -1;
853 }
854
855 /* Invalid topic len */
856 if (par->n == 0) {
857 lwsl_notice("%s: zero topic len\n", __func__);
858 par->reason = LMQCP_REASON_MALFORMED_PACKET;
859 goto send_reason_and_close;
860 }
861 lwsl_debug("%s: PUBLISH topic len %d\n",
862 __func__, (int)par->n);
863 assert(!wsi->mqtt->rx_cpkt_param);
864 wsi->mqtt->rx_cpkt_param = lws_zalloc(
865 sizeof(lws_mqtt_publish_param_t), "rx pub param");
866 if (!wsi->mqtt->rx_cpkt_param)
867 goto oom;
868 pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
869
870 pub->topic_len = (uint16_t)par->n;
871
872 /* Topic Name */
873 pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
874 "rx publish topic");
875 if (!pub->topic)
876 goto oom;
877 lws_strncpy(pub->topic, (const char *)buf,
878 (size_t)pub->topic_len + 1);
879 buf += pub->topic_len;
880 len -= pub->topic_len;
881
882 /* Extract QoS Level from Fixed Header Flags */
883 pub->qos = (lws_mqtt_qos_levels_t)
884 ((par->packet_type_flags >> 1) & 0x3);
885
886 pub->payload_pos = 0;
887
888 pub->payload_len = par->cpkt_remlen -
889 (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
890
891 switch (pub->qos) {
892 case QOS0:
893 par->state = LMQCPP_PAYLOAD;
894 if (pub->payload_len == 0)
895 goto cmd_completion;
896
897 break;
898 case QOS1:
899 case QOS2:
900 par->state = LMQCPP_PUBLISH_VH_PKT_ID;
901 break;
902 default:
903 par->reason = LMQCP_REASON_MALFORMED_PACKET;
904 lws_free_set_NULL(pub->topic);
905 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
906 goto send_reason_and_close;
907 }
908 break;
909 }
910 case LMQCPP_PUBLISH_VH_PKT_ID:
911 {
912 lws_mqtt_publish_param_t *pub =
913 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
914
915 if (len < 2) {
916 lwsl_notice("%s: len breakage 2\n", __func__);
917 return -1;
918 }
919
920 par->cpkt_id = lws_ser_ru16be(buf);
921 buf += 2;
922 len -= 2;
923 wsi->mqtt->peer_ack_pkt_id = par->cpkt_id;
924 lwsl_debug("%s: Packet ID %d\n",
925 __func__, (int)par->cpkt_id);
926 par->state = LMQCPP_PAYLOAD;
927 pub->payload_pos = 0;
928 pub->payload_len = par->cpkt_remlen -
929 (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
930 if (pub->payload_len == 0)
931 goto cmd_completion;
932
933 break;
934 }
935 case LMQCPP_PAYLOAD:
936 {
937 lws_mqtt_publish_param_t *pub =
938 (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
939 if (pub == NULL) {
940 lwsl_err("%s: Uninitialized pub_param\n",
941 __func__);
942 goto send_protocol_error_and_close;
943 }
944
945 pub->payload = buf;
946 goto cmd_completion;
947 }
948
949 case LMQCPP_CONNACK_PACKET:
950 if (!lwsi_role_client(wsi)) {
951 lwsl_err("%s: CONNACK is only Server to Client",
952 __func__);
953 goto send_unsupp_connack_and_close;
954 }
955
956 lwsl_debug("%s: received CONNACK pkt\n", __func__);
957 lws_mqtt_vbi_init(&par->vbit);
958 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
959 case LMSPR_NEED_MORE:
960 break;
961 case LMSPR_COMPLETED:
962 par->cpkt_remlen = par->vbit.value;
963 lwsl_debug("%s: CONNACK pkt len = %d\n",
964 __func__, (int)par->cpkt_remlen);
965 if (par->cpkt_remlen != 2)
966 goto send_protocol_error_and_close;
967
968 par->state = LMQCPP_CONNACK_VH_FLAGS;
969 break;
970 default:
971 lwsl_notice("%s: connack bad vbi\n", __func__);
972 goto send_protocol_error_and_close;
973 }
974 break;
975
976 case LMQCPP_CONNACK_VH_FLAGS:
977 {
978 lws_mqttc_t *c = &wsi->mqtt->client;
979 par->cpkt_flags = *buf++;
980 len--;
981
982 if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
983 /*
984 * Byte 1 is the "Connect Acknowledge
985 * Flags". Bits 7-1 are reserved and
986 * MUST be set to 0.
987 */
988 par->reason = LMQCP_REASON_MALFORMED_PACKET;
989 goto send_reason_and_close;
990 }
991 /*
992 * If the Server accepts a connection with
993 * CleanSession set to 1, the Server MUST set
994 * Session Present to 0 in the CONNACK packet
995 * in addition to setting a zero return code
996 * in the CONNACK packet [MQTT-3.2.2-1]. If
997 * the Server accepts a connection with
998 * CleanSession set to 0, the value set in
999 * Session Present depends on whether the
1000 * Server already has stored Session state for
1001 * the supplied client ID. If the Server has
1002 * stored Session state, it MUST set
1003 * SessionPresent to 1 in the CONNACK packet
1004 * [MQTT-3.2.2-2]. If the Server does not have
1005 * stored Session state, it MUST set Session
1006 * Present to 0 in the CONNACK packet. This is
1007 * in addition to setting a zero return code
1008 * in the CONNACK packet [MQTT-3.2.2-3].
1009 */
1010 if ((c->conn_flags & LMQCFT_CLEAN_START) &&
1011 (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
1012 goto send_protocol_error_and_close;
1013
1014 wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags &
1015 LMQCFT_SESSION_PRESENT);
1016
1017 /* Move on to Connect Return Code */
1018 par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
1019 break;
1020 }
1021 case LMQCPP_CONNACK_VH_RETURN_CODE:
1022 par->conn_rc = *buf++;
1023 len--;
1024 /*
1025 * If a server sends a CONNACK packet containing a
1026 * non-zero return code it MUST then close the Network
1027 * Connection [MQTT-3.2.2-5]
1028 */
1029 switch (par->conn_rc) {
1030 case 0:
1031 goto cmd_completion;
1032 case 1:
1033 case 2:
1034 case 3:
1035 case 4:
1036 case 5:
1037 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
1038 par->conn_rc - 1;
1039 goto send_reason_and_close;
1040 default:
1041 lwsl_notice("%s: bad connack retcode\n", __func__);
1042 goto send_protocol_error_and_close;
1043 }
1044 break;
1045
1046 /* SUBACK */
1047 case LMQCPP_SUBACK_PACKET:
1048 if (!lwsi_role_client(wsi)) {
1049 lwsl_err("%s: SUBACK is only Server to Client",
1050 __func__);
1051 goto send_unsupp_connack_and_close;
1052 }
1053
1054 lwsl_debug("%s: received SUBACK pkt\n", __func__);
1055 lws_mqtt_vbi_init(&par->vbit);
1056 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1057 case LMSPR_NEED_MORE:
1058 break;
1059 case LMSPR_COMPLETED:
1060 par->cpkt_remlen = par->vbit.value;
1061 lwsl_debug("%s: SUBACK pkt len = %d\n",
1062 __func__, (int)par->cpkt_remlen);
1063 if (par->cpkt_remlen <= 2)
1064 goto send_protocol_error_and_close;
1065 par->state = LMQCPP_SUBACK_VH_PKT_ID;
1066 break;
1067 default:
1068 lwsl_notice("%s: suback bad vbi\n", __func__);
1069 goto send_protocol_error_and_close;
1070 }
1071 break;
1072
1073 case LMQCPP_SUBACK_VH_PKT_ID:
1074
1075 if (len < 2) {
1076 lwsl_notice("%s: len breakage 4\n", __func__);
1077 return -1;
1078 }
1079
1080 par->cpkt_id = lws_ser_ru16be(buf);
1081 wsi->mqtt->ack_pkt_id = par->cpkt_id;
1082 buf += 2;
1083 len -= 2;
1084 par->cpkt_remlen -= 2;
1085 par->n = 0;
1086 par->state = LMQCPP_SUBACK_PAYLOAD;
1087 *par->temp = 0;
1088 break;
1089
1090 case LMQCPP_SUBACK_PAYLOAD:
1091 {
1092 lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
1093
1094 len--;
1095 switch (qos) {
1096 case QOS0:
1097 case QOS1:
1098 case QOS2:
1099 break;
1100 case FAILURE_QOS_LEVEL:
1101 goto send_protocol_error_and_close;
1102
1103 default:
1104 par->reason = LMQCP_REASON_MALFORMED_PACKET;
1105 goto send_reason_and_close;
1106 }
1107
1108 if (++(par->n) == par->cpkt_remlen) {
1109 par->n = 0;
1110 goto cmd_completion;
1111 }
1112
1113 break;
1114 }
1115
1116 /* UNSUBACK */
1117 case LMQCPP_UNSUBACK_PACKET:
1118 if (!lwsi_role_client(wsi)) {
1119 lwsl_err("%s: UNSUBACK is only Server to Client",
1120 __func__);
1121 goto send_unsupp_connack_and_close;
1122 }
1123
1124 lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
1125 lws_mqtt_vbi_init(&par->vbit);
1126 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1127 case LMSPR_NEED_MORE:
1128 break;
1129 case LMSPR_COMPLETED:
1130 par->cpkt_remlen = par->vbit.value;
1131 lwsl_debug("%s: UNSUBACK pkt len = %d\n",
1132 __func__, (int)par->cpkt_remlen);
1133 if (par->cpkt_remlen < 2)
1134 goto send_protocol_error_and_close;
1135 par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
1136 break;
1137 default:
1138 lwsl_notice("%s: unsuback bad vbi\n", __func__);
1139 goto send_protocol_error_and_close;
1140 }
1141 break;
1142
1143 case LMQCPP_UNSUBACK_VH_PKT_ID:
1144
1145 if (len < 2) {
1146 lwsl_notice("%s: len breakage 3\n", __func__);
1147 return -1;
1148 }
1149
1150 par->cpkt_id = lws_ser_ru16be(buf);
1151 wsi->mqtt->ack_pkt_id = par->cpkt_id;
1152 buf += 2;
1153 len -= 2;
1154 par->cpkt_remlen -= 2;
1155 par->n = 0;
1156
1157 goto cmd_completion;
1158
1159 case LMQCPP_PUBACK_PACKET:
1160 lws_mqtt_vbi_init(&par->vbit);
1161 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1162 case LMSPR_NEED_MORE:
1163 break;
1164 case LMSPR_COMPLETED:
1165 par->cpkt_remlen = par->vbit.value;
1166 lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
1167 (int)par->cpkt_remlen);
1168 /*
1169 * must be 4 or more, with special case that 2
1170 * means success with no reason code or props
1171 */
1172 if (par->cpkt_remlen <= 1 ||
1173 par->cpkt_remlen == 3)
1174 goto send_protocol_error_and_close;
1175
1176 par->state = LMQCPP_PUBACK_VH_PKT_ID;
1177 par->fixed_seen[2] = par->fixed_seen[3] = 0;
1178 par->fixed = 0;
1179 par->n = 0;
1180 break;
1181 default:
1182 lwsl_notice("%s: puback bad vbi\n", __func__);
1183 goto send_protocol_error_and_close;
1184 }
1185 break;
1186
1187 case LMQCPP_PUBACK_VH_PKT_ID:
1188 /*
1189 * There are 3 fixed bytes and then a VBI for the
1190 * property section length
1191 */
1192 par->fixed_seen[par->fixed++] = *buf++;
1193 if (len < par->cpkt_remlen - par->n) {
1194 lwsl_notice("%s: len breakage 4\n", __func__);
1195 return -1;
1196 }
1197 len--;
1198 par->n++;
1199 if (par->fixed == 2)
1200 par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
1201
1202 if (par->fixed == 3) {
1203 lws_mqtt_vbi_init(&par->vbit);
1204 par->props_consumed = 0;
1205 par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
1206 }
1207 /* length of 2 is truncated packet and we completed it */
1208 if (par->cpkt_remlen == par->fixed)
1209 goto cmd_completion;
1210 break;
1211
1212 case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
1213 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1214 case LMSPR_NEED_MORE:
1215 break;
1216 case LMSPR_COMPLETED:
1217 par->props_len = par->vbit.value;
1218 lwsl_info("%s: PUBACK props len = %d\n",
1219 __func__, (int)par->cpkt_remlen);
1220 /*
1221 * If there are no properties, this is a
1222 * command completion event in itself
1223 */
1224 if (!par->props_len)
1225 goto cmd_completion;
1226
1227 /*
1228 * Otherwise consume the properties before
1229 * completing the command
1230 */
1231 lws_mqtt_vbi_init(&par->vbit);
1232 par->state = LMQCPP_PUBACK_VH_PKT_ID;
1233 break;
1234 default:
1235 lwsl_notice("%s: puback pr bad vbi\n", __func__);
1236 goto send_protocol_error_and_close;
1237 }
1238 break;
1239
1240 case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
1241 /*
1242 * TODO: stash the props
1243 */
1244 par->props_consumed++;
1245 len--;
1246 buf++;
1247 if (par->props_len != par->props_consumed)
1248 break;
1249
1250 cmd_completion:
1251 /*
1252 * We come here when we understood we just processed
1253 * the last byte of a command packet, regardless of the
1254 * packet type
1255 */
1256 par->state = LMQCPP_IDLE;
1257
1258 switch (par->packet_type_flags >> 4) {
1259 case LMQCP_STOC_CONNACK:
1260 lwsl_info("%s: cmd_completion: CONNACK\n",
1261 __func__);
1262
1263 /*
1264 * Getting the CONNACK means we are the first,
1265 * the nwsi, and we succeeded to create a new
1266 * network connection ourselves.
1267 *
1268 * Since others may join us sharing the nwsi,
1269 * and we may close while they still want to use
1270 * it, our wsi lifecycle alone can no longer
1271 * define the lifecycle of the nwsi... it means
1272 * we need to do a "magic trick" and instead of
1273 * being both the nwsi and act like a child
1274 * stream, create a new wsi to take over the
1275 * nwsi duties and turn our wsi into a child of
1276 * the nwsi with its own lifecycle.
1277 *
1278 * The nwsi gets a mostly empty wsi->nwsi used
1279 * to track already-subscribed topics globally
1280 * for the connection.
1281 */
1282
1283 /* we were under SENT_CLIENT_HANDSHAKE timeout */
1284 lws_set_timeout(wsi, 0, 0);
1285
1286 w = lws_create_new_server_wsi(wsi->a.vhost,
1287 wsi->tsi, "mqtt_sid1");
1288 if (!w) {
1289 lwsl_notice("%s: sid 1 migrate failed\n",
1290 __func__);
1291 return -1;
1292 }
1293
1294 wsi->mux.highest_sid = 1;
1295 lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1296
1297 wsi->mux_substream = 1;
1298 w->mux_substream = 1;
1299 w->client_mux_substream = 1;
1300 wsi->client_mux_migrated = 1;
1301 wsi->told_user_closed = 1; /* don't tell nwsi closed */
1302
1303 lwsi_set_state(w, LRS_ESTABLISHED);
1304 lwsi_set_state(wsi, LRS_ESTABLISHED);
1305 lwsi_set_role(w, lwsi_role(wsi));
1306
1307 #if defined(LWS_WITH_CLIENT)
1308 w->flags = wsi->flags;
1309 #endif
1310
1311 w->mqtt = wsi->mqtt;
1312 wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1313 if (!wsi->mqtt)
1314 return -1;
1315 w->mqtt->wsi = w;
1316 w->a.protocol = wsi->a.protocol;
1317 if (w->user_space &&
1318 !w->user_space_externally_allocated)
1319 lws_free_set_NULL(w->user_space);
1320 w->user_space = wsi->user_space;
1321 wsi->user_space = NULL;
1322 w->user_space_externally_allocated =
1323 wsi->user_space_externally_allocated;
1324 if (lws_ensure_user_space(w))
1325 goto bail1;
1326 w->a.opaque_user_data = wsi->a.opaque_user_data;
1327 wsi->a.opaque_user_data = NULL;
1328 w->stash = wsi->stash;
1329 wsi->stash = NULL;
1330
1331 lws_mux_mark_immortal(w);
1332
1333 lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n",
1334 __func__, lws_wsi_tag(wsi),
1335 lws_wsi_tag(w));
1336
1337 /*
1338 * It was the last thing we were waiting for
1339 * before we can be fully ESTABLISHED
1340 */
1341 if (lws_mqtt_set_client_established(w)) {
1342 lwsl_notice("%s: set EST fail\n", __func__);
1343 return -1;
1344 }
1345
1346 /* get the ball rolling */
1347 lws_validity_confirmed(wsi);
1348
1349 /* well, add the queued guys as children */
1350 lws_wsi_mux_apply_queue(wsi);
1351 break;
1352
1353 bail1:
1354 /* undo the insert */
1355 wsi->mux.child_list = w->mux.sibling_list;
1356 wsi->mux.child_count--;
1357
1358 if (w->user_space)
1359 lws_free_set_NULL(w->user_space);
1360 w->a.vhost->protocols[0].callback(w,
1361 LWS_CALLBACK_WSI_DESTROY,
1362 NULL, NULL, 0);
1363 __lws_vhost_unbind_wsi(w); /* cx + vh lock */
1364 lws_free(w);
1365
1366 return 0;
1367
1368 case LMQCP_PUBREC:
1369 lwsl_err("%s: cmd_completion: PUBREC\n",
1370 __func__);
1371 /*
1372 * Figure out which child asked for this
1373 */
1374 n = 0;
1375 lws_start_foreach_ll(struct lws *, w,
1376 wsi->mux.child_list) {
1377 if (w->mqtt->unacked_publish &&
1378 w->mqtt->ack_pkt_id == par->cpkt_id) {
1379 char requested_close = 0;
1380
1381 w->mqtt->unacked_publish = 0;
1382 w->mqtt->unacked_pubrel = 1;
1383
1384 if (user_callback_handle_rxflow(
1385 w->a.protocol->callback,
1386 w, LWS_CALLBACK_MQTT_ACK,
1387 w->user_space, NULL, 0) < 0) {
1388 lwsl_info("%s: MQTT_ACK requests close\n",
1389 __func__);
1390 requested_close = 1;
1391 }
1392 n = 1;
1393
1394 /*
1395 * We got an assertive PUBREC,
1396 * no need for timeout wait
1397 * any more
1398 */
1399 lws_sul_cancel(&w->mqtt->
1400 sul_qos_puback_pubrec_wait);
1401
1402 if (requested_close) {
1403 __lws_close_free_wsi(w,
1404 0, "ack cb");
1405 break;
1406 }
1407
1408 break;
1409 }
1410 } lws_end_foreach_ll(w, mux.sibling_list);
1411
1412 if (!n) {
1413 lwsl_err("%s: unsolicited PUBREC\n",
1414 __func__);
1415 return -1;
1416 }
1417 wsi->mqtt->send_pubrel = 1;
1418 lws_callback_on_writable(wsi);
1419 break;
1420
1421 case LMQCP_PUBCOMP:
1422 lwsl_err("%s: cmd_completion: PUBCOMP\n",
1423 __func__);
1424 n = 0;
1425 lws_start_foreach_ll(struct lws *, w,
1426 wsi->mux.child_list) {
1427 if (w->mqtt->unacked_pubrel > 0 &&
1428 w->mqtt->ack_pkt_id == par->cpkt_id) {
1429 w->mqtt->unacked_pubrel = 0;
1430 n = 1;
1431 }
1432 } lws_end_foreach_ll(w, mux.sibling_list);
1433
1434 if (!n) {
1435 lwsl_err("%s: unsolicited PUBCOMP\n",
1436 __func__);
1437 return -1;
1438 }
1439
1440 /*
1441 * If we published something and PUBCOMP arrived,
1442 * our connection is definitely working in both
1443 * directions at the moment.
1444 */
1445 lws_validity_confirmed(wsi);
1446 break;
1447
1448 case LMQCP_PUBREL:
1449 lwsl_err("%s: cmd_completion: PUBREL\n",
1450 __func__);
1451 wsi->mqtt->send_pubcomp = 1;
1452 lws_callback_on_writable(wsi);
1453 break;
1454
1455 case LMQCP_PUBACK:
1456 lwsl_info("%s: cmd_completion: PUBACK\n",
1457 __func__);
1458
1459 /*
1460 * Figure out which child asked for this
1461 */
1462
1463 n = 0;
1464 lws_start_foreach_ll(struct lws *, w,
1465 wsi->mux.child_list) {
1466 if (w->mqtt->unacked_publish &&
1467 w->mqtt->ack_pkt_id == par->cpkt_id) {
1468 char requested_close = 0;
1469
1470 w->mqtt->unacked_publish = 0;
1471 if (user_callback_handle_rxflow(
1472 w->a.protocol->callback,
1473 w, LWS_CALLBACK_MQTT_ACK,
1474 w->user_space, NULL, 0) < 0) {
1475 lwsl_info("%s: MQTT_ACK requests close\n",
1476 __func__);
1477 requested_close = 1;
1478 }
1479 n = 1;
1480
1481 /*
1482 * We got an assertive PUBACK,
1483 * no need for ACK timeout wait
1484 * any more
1485 */
1486 lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait);
1487
1488 if (requested_close) {
1489 __lws_close_free_wsi(w,
1490 0, "ack cb");
1491 break;
1492 }
1493
1494 break;
1495 }
1496 } lws_end_foreach_ll(w, mux.sibling_list);
1497
1498 if (!n) {
1499 lwsl_err("%s: unsolicited PUBACK\n",
1500 __func__);
1501 return -1;
1502 }
1503
1504 /*
1505 * If we published something and it was acked,
1506 * our connection is definitely working in both
1507 * directions at the moment.
1508 */
1509 lws_validity_confirmed(wsi);
1510 break;
1511
1512 case LMQCP_STOC_PINGRESP:
1513 lwsl_info("%s: cmd_completion: PINGRESP\n",
1514 __func__);
1515 /*
1516 * If we asked for a PINGRESP and it came,
1517 * our connection is definitely working in both
1518 * directions at the moment.
1519 */
1520 lws_validity_confirmed(wsi);
1521 break;
1522
1523 case LMQCP_STOC_SUBACK:
1524 lwsl_info("%s: cmd_completion: SUBACK\n",
1525 __func__);
1526
1527 /*
1528 * Figure out which child asked for this
1529 */
1530
1531 n = 0;
1532 lws_start_foreach_ll(struct lws *, w,
1533 wsi->mux.child_list) {
1534 if (w->mqtt->inside_subscribe &&
1535 w->mqtt->ack_pkt_id == par->cpkt_id) {
1536 w->mqtt->inside_subscribe = 0;
1537 if (user_callback_handle_rxflow(
1538 w->a.protocol->callback,
1539 w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1540 w->user_space, NULL, 0) < 0) {
1541 lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1542 __func__);
1543 return -1;
1544 }
1545 n = 1;
1546 break;
1547 }
1548 } lws_end_foreach_ll(w, mux.sibling_list);
1549
1550 if (!n) {
1551 lwsl_err("%s: unsolicited SUBACK\n",
1552 __func__);
1553 return -1;
1554 }
1555
1556 /*
1557 * If we subscribed to something and SUBACK came,
1558 * our connection is definitely working in both
1559 * directions at the moment.
1560 */
1561 lws_validity_confirmed(wsi);
1562
1563 break;
1564
1565 case LMQCP_STOC_UNSUBACK:
1566 {
1567 char requested_close = 0;
1568 lwsl_info("%s: cmd_completion: UNSUBACK\n",
1569 __func__);
1570 /*
1571 * Figure out which child asked for this
1572 */
1573 n = 0;
1574 lws_start_foreach_ll(struct lws *, w,
1575 wsi->mux.child_list) {
1576 if (w->mqtt->inside_unsubscribe &&
1577 w->mqtt->ack_pkt_id == par->cpkt_id) {
1578 struct lws *nwsi = lws_get_network_wsi(w);
1579
1580 /*
1581 * No more subscribers left,
1582 * remove the topic from nwsi
1583 */
1584 lws_mqtt_client_remove_subs(nwsi->mqtt);
1585
1586 w->mqtt->inside_unsubscribe = 0;
1587 if (user_callback_handle_rxflow(
1588 w->a.protocol->callback,
1589 w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1590 w->user_space, NULL, 0) < 0) {
1591 lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1592 __func__);
1593 requested_close = 1;
1594 }
1595 n = 1;
1596
1597 lws_sul_cancel(&w->mqtt->sul_unsuback_wait);
1598 if (requested_close) {
1599 __lws_close_free_wsi(w,
1600 0, "unsub ack cb");
1601 break;
1602 }
1603 break;
1604 }
1605 } lws_end_foreach_ll(w, mux.sibling_list);
1606
1607 if (!n) {
1608 lwsl_err("%s: unsolicited UNSUBACK\n",
1609 __func__);
1610 return -1;
1611 }
1612
1613
1614 /*
1615 * If we unsubscribed to something and
1616 * UNSUBACK came, our connection is
1617 * definitely working in both
1618 * directions at the moment.
1619 */
1620 lws_validity_confirmed(wsi);
1621
1622 break;
1623 }
1624 case LMQCP_PUBLISH:
1625 {
1626 lws_mqtt_publish_param_t *pub =
1627 (lws_mqtt_publish_param_t *)
1628 wsi->mqtt->rx_cpkt_param;
1629 size_t chunk;
1630
1631 if (pub == NULL) {
1632 lwsl_notice("%s: no pub\n", __func__);
1633 return -1;
1634 }
1635
1636 /*
1637 * RX PUBLISH is delivered to any children that
1638 * registered for the related topic
1639 */
1640
1641 n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1642
1643 chunk = pub->payload_len - pub->payload_pos;
1644 if (chunk > len)
1645 chunk = len;
1646
1647 lws_start_foreach_ll(struct lws *, w,
1648 wsi->mux.child_list) {
1649 if (lws_mqtt_find_sub(w->mqtt,
1650 pub->topic))
1651 if (w->a.protocol->callback(
1652 w, (enum lws_callback_reasons)n,
1653 w->user_space,
1654 (void *)pub,
1655 chunk)) {
1656 par->payload_consumed = 0;
1657 lws_free_set_NULL(pub->topic);
1658 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1659 return 1;
1660 }
1661 } lws_end_foreach_ll(w, mux.sibling_list);
1662
1663
1664 pub->payload_pos += (uint32_t)chunk;
1665 len -= chunk;
1666 buf += chunk;
1667
1668 lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1669 __func__, (int)pub->payload_pos,
1670 (int)pub->payload_len, (int)len);
1671
1672 if (pub->payload_pos != pub->payload_len) {
1673 /*
1674 * More chunks of the payload pending,
1675 * blocking this connection from doing
1676 * anything else
1677 */
1678 par->state = LMQCPP_PAYLOAD;
1679 break;
1680 }
1681
1682 if (pub->qos == 1) {
1683 /* For QOS = 1, send out PUBACK */
1684 wsi->mqtt->send_puback = 1;
1685 lws_callback_on_writable(wsi);
1686 } else if (pub->qos == 2) {
1687 /* For QOS = 2, send out PUBREC */
1688 wsi->mqtt->send_pubrec = 1;
1689 lws_callback_on_writable(wsi);
1690 }
1691
1692 par->payload_consumed = 0;
1693 lws_free_set_NULL(pub->topic);
1694 lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1695
1696 break;
1697 }
1698 default:
1699 break;
1700 }
1701
1702 break;
1703
1704
1705 case LMQCPP_PROP_ID_VBI:
1706 switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1707 case LMSPR_NEED_MORE:
1708 break;
1709 case LMSPR_COMPLETED:
1710 par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed);
1711 if (par->vbit.value >
1712 LWS_ARRAY_SIZE(property_valid)) {
1713 lwsl_notice("%s: undef prop id 0x%x\n",
1714 __func__, (int)par->vbit.value);
1715 goto send_protocol_error_and_close;
1716 }
1717 if (!(property_valid[par->vbit.value] &
1718 (1 << ctl_pkt_type(par)))) {
1719 lwsl_notice("%s: prop id 0x%x invalid for"
1720 " control pkt %d\n", __func__,
1721 (int)par->vbit.value,
1722 ctl_pkt_type(par));
1723 goto send_protocol_error_and_close;
1724 }
1725 par->prop_id = par->vbit.value;
1726 par->flag_prop_multi = !!(
1727 par->props_seen[par->prop_id >> 3] &
1728 (1 << (par->prop_id & 7)));
1729 par->props_seen[par->prop_id >> 3] =
1730 (uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7)));
1731 /*
1732 * even if it's not a vbi property arg,
1733 * .consumed of this will be zero the first time
1734 */
1735 lws_mqtt_vbi_init(&par->vbit);
1736 /*
1737 * if it's a string, next state must set the
1738 * destination and size limit itself. But
1739 * resetting it generically here lets it use
1740 * lws_mqtt_str_first() to understand it's the
1741 * first time around.
1742 */
1743 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1744
1745 /* property arg state enums are so encoded */
1746 par->state = 0x100 | par->vbit.value;
1747 break;
1748 default:
1749 lwsl_notice("%s: prop id bad vbi\n", __func__);
1750 goto send_protocol_error_and_close;
1751 }
1752 break;
1753
1754 /*
1755 * All possible property payloads... restricting which ones
1756 * can appear in which control packets is already done above
1757 * in LMQCPP_PROP_ID_VBI
1758 */
1759
1760 case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1761 case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1762 case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1763 case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1764 case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1765 case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1766 case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1767 case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1768 if (par->flag_prop_multi)
1769 goto singular_prop_seen_twice;
1770 par->payload_format = *buf++;
1771 len--;
1772 if (lws_mqtt_pconsume(par, 1))
1773 goto send_protocol_error_and_close;
1774 break;
1775
1776 case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1777 case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1778 case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1779 case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1780 if (par->flag_prop_multi)
1781 goto singular_prop_seen_twice;
1782
1783 if (lws_mqtt_mb_first(&par->vbit))
1784 lws_mqtt_4byte_init(&par->vbit);
1785
1786 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1787 case LMSPR_NEED_MORE:
1788 break;
1789 case LMSPR_COMPLETED:
1790 if (lws_mqtt_pconsume(par, par->vbit.consumed))
1791 goto send_protocol_error_and_close;
1792 break;
1793 default:
1794 goto send_protocol_error_and_close;
1795 }
1796 break;
1797
1798 case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1799 case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1800 case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1801 case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1802 if (par->flag_prop_multi)
1803 goto singular_prop_seen_twice;
1804
1805 if (lws_mqtt_mb_first(&par->vbit))
1806 lws_mqtt_2byte_init(&par->vbit);
1807
1808 switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1809 case LMSPR_NEED_MORE:
1810 break;
1811 case LMSPR_COMPLETED:
1812 if (lws_mqtt_pconsume(par, par->vbit.consumed))
1813 goto send_protocol_error_and_close;
1814 break;
1815 default:
1816 goto send_protocol_error_and_close;
1817 }
1818 break;
1819
1820 case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1821 case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1822 case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1823 case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1824 case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1825 case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1826 case LMQCPP_PROP_REASON_STRING_UTF8S:
1827 case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1828 case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1829 if (par->flag_prop_multi)
1830 goto singular_prop_seen_twice;
1831
1832 if (lws_mqtt_str_first(&par->s_temp))
1833 lws_mqtt_str_init(&par->s_temp, par->temp,
1834 sizeof(par->temp), 0);
1835
1836 switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1837 case LMSPR_NEED_MORE:
1838 break;
1839 case LMSPR_COMPLETED:
1840 if (lws_mqtt_pconsume(par, par->s_temp.len))
1841 goto send_protocol_error_and_close;
1842 break;
1843
1844 default:
1845 lwsl_info("%s: bad protocol name\n", __func__);
1846 goto send_protocol_error_and_close;
1847 }
1848 break;
1849
1850 case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1851
1852 case LMQCPP_PROP_CORRELATION_BINDATA:
1853 case LMQCPP_PROP_AUTH_DATA_BINDATA:
1854
1855 /* TODO */
1856 lwsl_err("%s: Unimplemented packet state 0x%x\n",
1857 __func__, par->state);
1858 return -1;
1859 }
1860 }
1861
1862 return 0;
1863
1864 oom:
1865 lwsl_err("%s: OOM!\n", __func__);
1866 goto send_protocol_error_and_close;
1867
1868 singular_prop_seen_twice:
1869 lwsl_info("%s: property appears twice\n", __func__);
1870
1871 send_protocol_error_and_close:
1872 lwsl_notice("%s: peac\n", __func__);
1873 par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1874
1875 send_reason_and_close:
1876 lwsl_notice("%s: srac\n", __func__);
1877 par->flag_pending_send_reason_close = 1;
1878 goto ask;
1879
1880 send_unsupp_connack_and_close:
1881 lwsl_notice("%s: unsupac\n", __func__);
1882 par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1883 par->flag_pending_send_connack_close = 1;
1884
1885 ask:
1886 /* Should we ask for clients? */
1887 lws_callback_on_writable(wsi);
1888
1889 return -1;
1890 }
1891
1892 int
lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type, uint8_t dup, lws_mqtt_qos_levels_t qos, uint8_t retain)1893 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1894 uint8_t dup, lws_mqtt_qos_levels_t qos,
1895 uint8_t retain)
1896 {
1897 lws_mqtt_fixed_hdr_t hdr;
1898
1899 hdr.bits = 0;
1900 hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf;
1901
1902 switch(ctrl_pkt_type) {
1903 case LMQCP_PUBLISH:
1904 hdr.flags.dup = !!dup;
1905 /*
1906 * A PUBLISH Packet MUST NOT have both QoS bits set to
1907 * 1. If a Server or Client receives a PUBLISH Packet
1908 * which has both QoS bits set to 1 it MUST close the
1909 * Network Connection [MQTT-3.3.1-4].
1910 */
1911 if (qos >= RESERVED_QOS_LEVEL) {
1912 lwsl_err("%s: Unsupport QoS level 0x%x\n",
1913 __func__, qos);
1914 return -1;
1915 }
1916 hdr.flags.qos = qos & 3;
1917 hdr.flags.retain = !!retain;
1918 break;
1919
1920 case LMQCP_CTOS_CONNECT:
1921 case LMQCP_STOC_CONNACK:
1922 case LMQCP_PUBACK:
1923 case LMQCP_PUBREC:
1924 case LMQCP_PUBCOMP:
1925 case LMQCP_STOC_SUBACK:
1926 case LMQCP_STOC_UNSUBACK:
1927 case LMQCP_CTOS_PINGREQ:
1928 case LMQCP_STOC_PINGRESP:
1929 case LMQCP_DISCONNECT:
1930 case LMQCP_AUTH:
1931 hdr.bits &= 0xf0;
1932 break;
1933
1934 /*
1935 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1936 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1937 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1938 * treat any other value as malformed and close the Network
1939 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1940 */
1941 case LMQCP_PUBREL:
1942 case LMQCP_CTOS_SUBSCRIBE:
1943 case LMQCP_CTOS_UNSUBSCRIBE:
1944 hdr.bits |= 0x02;
1945 break;
1946
1947 default:
1948 return -1;
1949 }
1950
1951 *p = hdr.bits;
1952
1953 return 0;
1954 }
1955
1956 int
lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, const void *buf, uint32_t len, int is_complete)1957 lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1958 const void *buf, uint32_t len, int is_complete)
1959 {
1960 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
1961 uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1962 struct lws *nwsi = lws_get_network_wsi(wsi);
1963 lws_mqtt_str_t mqtt_vh_payload;
1964 uint32_t vh_len, rem_len;
1965
1966 assert(pub->topic);
1967
1968 lwsl_debug("%s: len = %d, is_complete = %d\n",
1969 __func__, (int)len, (int)is_complete);
1970
1971 if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1972 lwsl_err("%s: %s: unknown state 0x%x\n", __func__,
1973 lws_wsi_tag(wsi), lwsi_state(wsi));
1974 assert(0);
1975 return 1;
1976 }
1977
1978 if (wsi->mqtt->inside_payload) {
1979 /*
1980 * Headers are filled, we are sending
1981 * the payload - a buffer with LWS_PRE
1982 * in front it.
1983 */
1984 start = (uint8_t *)buf;
1985 p = start + len;
1986 if (is_complete)
1987 wsi->mqtt->inside_payload = 0;
1988 goto do_write;
1989 }
1990
1991 start = b + LWS_PRE;
1992 p = start;
1993 /*
1994 * Fill headers and the first chunk of the
1995 * payload (if any)
1996 */
1997 if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1998 pub->dup, pub->qos, pub->retain)) {
1999 lwsl_err("%s: Failed to fill fixed header\n", __func__);
2000 return 1;
2001 }
2002
2003 /*
2004 * Topic len field + Topic len + Packet ID
2005 * (for QOS>0) + Payload len
2006 */
2007 vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
2008 rem_len = vh_len + pub->payload_len;
2009 lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
2010
2011 /* Will the chunk of payload fit? */
2012 if ((vh_len + len) >=
2013 (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
2014 lwsl_err("%s: Payload is too big\n", __func__);
2015 return 1;
2016 }
2017
2018 p += lws_mqtt_vbi_encode(rem_len, p);
2019
2020 /* Topic's Len */
2021 lws_ser_wu16be(p, pub->topic_len);
2022 p += 2;
2023
2024 /*
2025 * Init lws_mqtt_str for "MQTT Variable
2026 * Headers + payload" (only the supplied
2027 * chuncked payload)
2028 */
2029 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
2030 (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len),
2031 0);
2032
2033 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2034 lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
2035 if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
2036 lwsl_err("%s: a\n", __func__);
2037 return 1;
2038 }
2039
2040 /* Packet ID */
2041 if (pub->qos != QOS0) {
2042 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2043 if (!pub->dup)
2044 nwsi->mqtt->pkt_id++;
2045 wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id;
2046 lwsl_debug("%s: pkt_id = %d\n", __func__,
2047 (int)wsi->mqtt->ack_pkt_id);
2048 lws_ser_wu16be(p, pub->packet_id);
2049 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
2050 lwsl_err("%s: b\n", __func__);
2051 return 1;
2052 }
2053 }
2054
2055 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2056 memcpy(p, buf, len);
2057 if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
2058 return 1;
2059 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2060
2061 if (!is_complete)
2062 nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
2063
2064 do_write:
2065
2066 // lwsl_hexdump_err(start, lws_ptr_diff(p, start));
2067
2068 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2069 lws_ptr_diff(p, start)) {
2070 lwsl_err("%s: write failed\n", __func__);
2071 return 1;
2072 }
2073
2074 if (!is_complete) {
2075 /* still some more chunks to come... */
2076 lws_callback_on_writable(wsi);
2077
2078 return 0;
2079 }
2080
2081 wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
2082
2083 if (pub->qos != QOS0)
2084 wsi->mqtt->unacked_publish = 1;
2085
2086 /* this was the last part of the publish message */
2087
2088 if (pub->qos == QOS0) {
2089 /*
2090 * There won't be any real PUBACK, act like we got one
2091 * so the user callback logic is the same for QoS0 or
2092 * QoS1
2093 */
2094 if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
2095 wsi->user_space, NULL, 0)) {
2096 lwsl_err("%s: ACK callback exited\n", __func__);
2097 return 1;
2098 }
2099 } else if (pub->qos == QOS1 || pub->qos == QOS2) {
2100 /* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s,
2101 * we must RETRY the publish
2102 */
2103 wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend;
2104 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2105 &wsi->mqtt->sul_qos_puback_pubrec_wait,
2106 3 * LWS_USEC_PER_SEC);
2107 }
2108
2109 if (wsi->mqtt->inside_shadow) {
2110 wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout;
2111 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2112 &wsi->mqtt->sul_shadow_wait,
2113 60 * LWS_USEC_PER_SEC);
2114 }
2115
2116 return 0;
2117 }
2118
2119 int
lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)2120 lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
2121 {
2122 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2123 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2124 struct lws *nwsi = lws_get_network_wsi(wsi);
2125 lws_mqtt_str_t mqtt_vh_payload;
2126 uint8_t exists[8], extant;
2127 lws_mqtt_subs_t *mysub;
2128 uint32_t rem_len;
2129 #if defined(_DEBUG)
2130 uint32_t tops;
2131 #endif
2132 uint32_t n;
2133
2134 assert(sub->num_topics);
2135 assert(sub->num_topics < sizeof(exists));
2136
2137 switch (lwsi_state(wsi)) {
2138 case LRS_ESTABLISHED: /* Protocol connection established */
2139 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
2140 0, 0, 0)) {
2141 lwsl_err("%s: Failed to fill fixed header\n", __func__);
2142 return 1;
2143 }
2144
2145 /*
2146 * The stream wants to subscribe to one or more topic, but
2147 * the shared nwsi may already be subscribed to some or all of
2148 * them from interactions with other streams. For those cases,
2149 * we filter them from the list the child wants until we just
2150 * have ones that are new to the nwsi. If nothing left, we just
2151 * synthesize the callback to the child as if SUBACK had come
2152 * and we're done, otherwise just ask the server for topics that
2153 * are new to the wsi.
2154 */
2155
2156 extant = 0;
2157 memset(&exists, 0, sizeof(exists));
2158 for (n = 0; n < sub->num_topics; n++) {
2159 lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
2160 __func__, (int)n, sub->topic[n].name);
2161
2162 mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
2163 if (mysub && mysub->ref_count) {
2164 mysub->ref_count++; /* another stream using it */
2165 exists[n] = 1;
2166 extant++;
2167 }
2168
2169 /*
2170 * Attach the topic we're subscribing to, to wsi->mqtt
2171 */
2172 if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
2173 lwsl_err("%s: create sub fail\n", __func__);
2174 return 1;
2175 }
2176 }
2177
2178 if (extant == sub->num_topics) {
2179 /*
2180 * It turns out there's nothing to do here, the nwsi has
2181 * already subscribed to all the topics this stream
2182 * wanted. Just tell it it can have them.
2183 */
2184 lwsl_notice("%s: all topics already subscribed\n", __func__);
2185 if (user_callback_handle_rxflow(
2186 wsi->a.protocol->callback,
2187 wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
2188 wsi->user_space, NULL, 0) < 0) {
2189 lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
2190 __func__);
2191 return -1;
2192 }
2193
2194 return 0;
2195 }
2196
2197 #if defined(_DEBUG)
2198 /*
2199 * zero or more of the topics already existed, but not all,
2200 * so we must go to the server with a filtered list of the
2201 * new ones only
2202 */
2203
2204 tops = sub->num_topics - extant;
2205 #endif
2206
2207 /*
2208 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
2209 */
2210 rem_len = 2;
2211 for (n = 0; n < sub->num_topics; n++)
2212 if (!exists[n])
2213 rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
2214
2215 wsi->mqtt->sub_size = (uint16_t)rem_len;
2216
2217 #if defined(_DEBUG)
2218 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2219 __func__, (int)tops, (int)rem_len);
2220 #endif
2221
2222 p += lws_mqtt_vbi_encode(rem_len, p);
2223
2224 if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2225 wsi->a.context->pt_serv_buf_size) {
2226 lwsl_err("%s: Payload is too big\n", __func__);
2227 return 1;
2228 }
2229
2230 /* Init lws_mqtt_str */
2231 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2232 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2233
2234 /* Packet ID */
2235 wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id;
2236 lwsl_debug("%s: pkt_id = %d\n", __func__,
2237 (int)sub->packet_id);
2238 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2239
2240 nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2241
2242 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2243 return 1;
2244
2245 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2246
2247 for (n = 0; n < sub->num_topics; n++) {
2248 lwsl_info("%s: topics[%d] = %s\n", __func__,
2249 (int)n, sub->topic[n].name);
2250
2251 /* if the nwsi already has it, don't ask server for it */
2252 if (exists[n]) {
2253 lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
2254 __func__, (int)n, sub->topic[n].name);
2255 continue;
2256 }
2257
2258 /*
2259 * Attach the topic we're subscribing to, to nwsi->mqtt
2260 * so we know the nwsi itself has a subscription to it
2261 */
2262
2263 if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
2264 return 1;
2265
2266 /* Topic's Len */
2267 lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
2268 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2269 return 1;
2270 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2271
2272 /* Topic Name */
2273 lws_strncpy((char *)p, sub->topic[n].name,
2274 strlen(sub->topic[n].name) + 1);
2275 if (lws_mqtt_str_advance(&mqtt_vh_payload,
2276 (int)strlen(sub->topic[n].name)))
2277 return 1;
2278 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2279
2280 /* QoS */
2281 *p = (uint8_t)sub->topic[n].qos;
2282 if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
2283 return 1;
2284 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2285 }
2286 break;
2287
2288 default:
2289 return 1;
2290 }
2291
2292 if (wsi->mqtt->inside_resume_session)
2293 return 0;
2294
2295 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2296 lws_ptr_diff(p, start))
2297 return 1;
2298
2299 wsi->mqtt->inside_subscribe = 1;
2300
2301 return 0;
2302 }
2303
2304 int
lws_mqtt_client_send_unsubcribe(struct lws *wsi, const lws_mqtt_subscribe_param_t *unsub)2305 lws_mqtt_client_send_unsubcribe(struct lws *wsi,
2306 const lws_mqtt_subscribe_param_t *unsub)
2307 {
2308 struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2309 uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2310 struct lws *nwsi = lws_get_network_wsi(wsi);
2311 lws_mqtt_str_t mqtt_vh_payload;
2312 uint8_t send_unsub[8], orphaned;
2313 uint32_t rem_len, n;
2314 lws_mqtt_subs_t *mysub;
2315 #if defined(_DEBUG)
2316 uint32_t tops;
2317 #endif
2318
2319 lwsl_info("%s: Enter\n", __func__);
2320
2321 switch (lwsi_state(wsi)) {
2322 case LRS_ESTABLISHED: /* Protocol connection established */
2323 orphaned = 0;
2324 memset(&send_unsub, 0, sizeof(send_unsub));
2325 for (n = 0; n < unsub->num_topics; n++) {
2326 mysub = lws_mqtt_find_sub(nwsi->mqtt,
2327 unsub->topic[n].name);
2328 assert(mysub);
2329
2330 if (mysub && --mysub->ref_count == 0) {
2331 lwsl_notice("%s: Need to send UNSUB\n", __func__);
2332 send_unsub[n] = 1;
2333 orphaned++;
2334 }
2335 }
2336
2337 if (!orphaned) {
2338 /*
2339 * The nwsi still has other subscribers bound to the
2340 * topics.
2341 *
2342 * So, don't send UNSUB to server, and just fake the
2343 * UNSUB ACK event for the guy going away.
2344 */
2345 lwsl_notice("%s: unsubscribed!\n", __func__);
2346 if (user_callback_handle_rxflow(
2347 wsi->a.protocol->callback,
2348 wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
2349 wsi->user_space, NULL, 0) < 0) {
2350 /*
2351 * We can't directly close here, because the
2352 * caller still has the wsi. Inform the
2353 * caller that we want to close
2354 */
2355
2356 return 1;
2357 }
2358
2359 return 0;
2360 }
2361 #if defined(_DEBUG)
2362 /*
2363 * one or more of the topics needs to be unsubscribed
2364 * from, so we must go to the server with a filtered
2365 * list of the new ones only
2366 */
2367
2368 tops = orphaned;
2369 #endif
2370
2371 if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2372 0, 0, 0)) {
2373 lwsl_err("%s: Failed to fill fixed header\n", __func__);
2374 return 1;
2375 }
2376
2377 /*
2378 * Pid + (Topic len field + Topic len) x Num of Topics
2379 */
2380 rem_len = 2;
2381 for (n = 0; n < unsub->num_topics; n++)
2382 if (send_unsub[n])
2383 rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2384
2385 wsi->mqtt->sub_size = (uint16_t)rem_len;
2386
2387 #if defined(_DEBUG)
2388 lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2389 __func__, (int)tops, (int)rem_len);
2390 #endif
2391
2392 p += lws_mqtt_vbi_encode(rem_len, p);
2393
2394 if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2395 wsi->a.context->pt_serv_buf_size) {
2396 lwsl_err("%s: Payload is too big\n", __func__);
2397 return 1;
2398 }
2399
2400 /* Init lws_mqtt_str */
2401 lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2402 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2403
2404 /* Packet ID */
2405 wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2406 lwsl_debug("%s: pkt_id = %d\n", __func__,
2407 (int)wsi->mqtt->ack_pkt_id);
2408 lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2409
2410 nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2411
2412 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2413 return 1;
2414
2415 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2416
2417 for (n = 0; n < unsub->num_topics; n++) {
2418 lwsl_info("%s: topics[%d] = %s\n", __func__,
2419 (int)n, unsub->topic[n].name);
2420
2421 /*
2422 * Subscriber still bound to it, don't UBSUB
2423 * from the server
2424 */
2425 if (!send_unsub[n])
2426 continue;
2427
2428 /* Topic's Len */
2429 lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2430 if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2431 return 1;
2432 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2433
2434 /* Topic Name */
2435 lws_strncpy((char *)p, unsub->topic[n].name,
2436 strlen(unsub->topic[n].name) + 1);
2437 if (lws_mqtt_str_advance(&mqtt_vh_payload,
2438 (int)strlen(unsub->topic[n].name)))
2439 return 1;
2440 p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2441 }
2442 break;
2443
2444 default:
2445 return 1;
2446 }
2447
2448 if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2449 lws_ptr_diff(p, start))
2450 return 1;
2451
2452 wsi->mqtt->inside_unsubscribe = 1;
2453
2454 wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout;
2455 __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2456 &wsi->mqtt->sul_unsuback_wait,
2457 3 * LWS_USEC_PER_SEC);
2458
2459 return 0;
2460 }
2461
2462 /*
2463 * This is called when child streams bind to an already-existing and compatible
2464 * MQTT stream
2465 */
2466
2467 struct lws *
lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)2468 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2469 {
2470 /* no more children allowed by parent? */
2471
2472 if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2473 lwsl_err("%s: reached concurrent stream limit\n", __func__);
2474 return NULL;
2475 }
2476
2477 #if defined(LWS_WITH_CLIENT)
2478 wsi->client_mux_substream = 1;
2479 #endif
2480
2481 lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2482
2483 if (lws_ensure_user_space(wsi))
2484 goto bail1;
2485
2486 lws_mqtt_set_client_established(wsi);
2487 lws_callback_on_writable(wsi);
2488
2489 return wsi;
2490
2491 bail1:
2492 /* undo the insert */
2493 parent_wsi->mux.child_list = wsi->mux.sibling_list;
2494 parent_wsi->mux.child_count--;
2495
2496 if (wsi->user_space)
2497 lws_free_set_NULL(wsi->user_space);
2498
2499 wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2500 lws_free(wsi);
2501
2502 return NULL;
2503 }
2504
2505