1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  *
24  * MQTT v5
25  *
26  * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27  *
28  * Control Packet structure
29  *
30  *  - Always:           2+ byte:  Fixed Hdr
31  *  - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32  *  - Required in some: variable: Payload
33  *
34  * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35  *
36  *  - Client Identifier
37  *  - Will Properties
38  *  - Will Topic
39  *  - Will Payload
40  *  - User Name
41  *  - Password
42  */
43 
44 #include "private-lib-core.h"
45 #include <string.h>
46 #include <sys/types.h>
47 #include <assert.h>
48 
49 typedef enum {
50 	LMQPRS_AWAITING_CONNECT,
51 
52 } lws_mqtt_protocol_server_connstate_t;
53 
54 const char * const reason_names_g1[] = {
55 	"Success / Normal disconnection / QoS0",
56 	"QoS1",
57 	"QoS2",
58 	"Disconnect Will",
59 	"No matching subscriber",
60 	"No subscription existed",
61 	"Continue authentication",
62 	"Re-authenticate"
63 };
64 
65 const char * const reason_names_g2[] = {
66 	"Unspecified error",
67 	"Malformed packet",
68 	"Protocol error",
69 	"Implementation specific error",
70 	"Unsupported protocol",
71 	"Client ID invalid",
72 	"Bad credentials",
73 	"Not Authorized",
74 	"Server Unavailable",
75 	"Server Busy",
76 	"Banned",
77 	"Server Shutting Down",
78 	"Bad Authentication Method",
79 	"Keepalive Timeout",
80 	"Session taken over",
81 	"Topic Filter Invalid",
82 	"Packet ID in use",
83 	"Packet ID not found",
84 	"Max RX Exceeded",
85 	"Topic Alias Invalid",
86 	"Packet too large",
87 	"Ratelimit",
88 	"Quota Exceeded",
89 	"Administrative Action",
90 	"Payload format invalid",
91 	"Retain not supported",
92 	"QoS not supported",
93 	"Use another server",
94 	"Server Moved",
95 	"Shared subscriptions not supported",
96 	"Connection rate exceeded",
97 	"Maximum Connect Time",
98 	"Subscription IDs not supported",
99 	"Wildcard subscriptions not supported"
100 };
101 
102 #define LMQCP_WILL_PROPERTIES 0
103 
104 /* For each property, a bitmap describing which commands it is valid for */
105 
106 static const uint16_t property_valid[] = {
107 	[LMQPROP_PAYLOAD_FORMAT_INDICATOR]	= (1 << LMQCP_PUBLISH) |
108 						  (1 << LMQCP_WILL_PROPERTIES),
109 	[LMQPROP_MESSAGE_EXPIRY_INTERVAL]	= (1 << LMQCP_PUBLISH) |
110 						  (1 << LMQCP_WILL_PROPERTIES),
111 	[LMQPROP_CONTENT_TYPE]			= (1 << LMQCP_PUBLISH) |
112 						  (1 << LMQCP_WILL_PROPERTIES),
113 	[LMQPROP_RESPONSE_TOPIC]		= (1 << LMQCP_PUBLISH) |
114 						  (1 << LMQCP_WILL_PROPERTIES),
115 	[LMQPROP_CORRELATION_DATA]		= (1 << LMQCP_PUBLISH) |
116 						  (1 << LMQCP_WILL_PROPERTIES),
117 	[LMQPROP_SUBSCRIPTION_IDENTIFIER]	= (1 << LMQCP_PUBLISH) |
118 						  (1 << LMQCP_CTOS_SUBSCRIBE),
119 	[LMQPROP_SESSION_EXPIRY_INTERVAL]	= (1 << LMQCP_CTOS_CONNECT) |
120 						  (1 << LMQCP_STOC_CONNACK) |
121 						  (1 << LMQCP_DISCONNECT),
122 	[LMQPROP_ASSIGNED_CLIENT_IDENTIFIER]	= (1 << LMQCP_STOC_CONNACK),
123 	[LMQPROP_SERVER_KEEP_ALIVE]		= (1 << LMQCP_STOC_CONNACK),
124 	[LMQPROP_AUTHENTICATION_METHOD]		= (1 << LMQCP_CTOS_CONNECT) |
125 						  (1 << LMQCP_STOC_CONNACK) |
126 						  (1 << LMQCP_AUTH),
127 	[LMQPROP_AUTHENTICATION_DATA]		= (1 << LMQCP_CTOS_CONNECT) |
128 						  (1 << LMQCP_STOC_CONNACK) |
129 						  (1 << LMQCP_AUTH),
130 	[LMQPROP_REQUEST_PROBLEM_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
131 	[LMQPROP_WILL_DELAY_INTERVAL]		= (1 << LMQCP_WILL_PROPERTIES),
132 	[LMQPROP_REQUEST_RESPONSE_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
133 	[LMQPROP_RESPONSE_INFORMATION]		= (1 << LMQCP_STOC_CONNACK),
134 	[LMQPROP_SERVER_REFERENCE]		= (1 << LMQCP_STOC_CONNACK) |
135 						  (1 << LMQCP_DISCONNECT),
136 	[LMQPROP_REASON_STRING]			= (1 << LMQCP_STOC_CONNACK) |
137 						  (1 << LMQCP_PUBACK) |
138 						  (1 << LMQCP_PUBREC) |
139 						  (1 << LMQCP_PUBREL) |
140 						  (1 << LMQCP_PUBCOMP) |
141 						  (1 << LMQCP_STOC_SUBACK) |
142 						  (1 << LMQCP_STOC_UNSUBACK) |
143 						  (1 << LMQCP_DISCONNECT) |
144 						  (1 << LMQCP_AUTH),
145 	[LMQPROP_RECEIVE_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
146 						  (1 << LMQCP_STOC_CONNACK),
147 	[LMQPROP_TOPIC_ALIAS_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
148 						  (1 << LMQCP_STOC_CONNACK),
149 	[LMQPROP_TOPIC_ALIAS]			= (1 << LMQCP_PUBLISH),
150 	[LMQPROP_MAXIMUM_QOS]			= (1 << LMQCP_STOC_CONNACK),
151 	[LMQPROP_RETAIN_AVAILABLE]		= (1 << LMQCP_STOC_CONNACK),
152 	[LMQPROP_USER_PROPERTY]			= (1 << LMQCP_CTOS_CONNECT) |
153 						  (1 << LMQCP_STOC_CONNACK) |
154 						  (1 << LMQCP_PUBLISH) |
155 						  (1 << LMQCP_WILL_PROPERTIES) |
156 						  (1 << LMQCP_PUBACK) |
157 						  (1 << LMQCP_PUBREC) |
158 						  (1 << LMQCP_PUBREL) |
159 						  (1 << LMQCP_PUBCOMP) |
160 						  (1 << LMQCP_CTOS_SUBSCRIBE) |
161 						  (1 << LMQCP_STOC_SUBACK) |
162 						  (1 << LMQCP_CTOS_UNSUBSCRIBE) |
163 						  (1 << LMQCP_STOC_UNSUBACK) |
164 						  (1 << LMQCP_DISCONNECT) |
165 						  (1 << LMQCP_AUTH),
166 	[LMQPROP_MAXIMUM_PACKET_SIZE]		= (1 << LMQCP_CTOS_CONNECT) |
167 						  (1 << LMQCP_STOC_CONNACK),
168 	[LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
169 	[LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
170 	[LMQPROP_SHARED_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK)
171 };
172 
173 
174 /*
175  * For each command index, maps flags, id, qos and payload legality
176  * notice in most cases PUBLISH requires further processing
177  */
178 static const uint8_t map_flags[] = {
179 	[LMQCP_RESERVED]		= 0x00,
180 	[LMQCP_CTOS_CONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
181 					  LMQCP_LUT_FLAG_PAYLOAD |
182 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
183 	[LMQCP_STOC_CONNACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
184 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
185 	[LMQCP_PUBLISH]			= LMQCP_LUT_FLAG_PAYLOAD | /* option */
186 					  LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
187 	[LMQCP_PUBACK]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
188 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
189 	[LMQCP_PUBREC]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
190 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
191 	[LMQCP_PUBREL]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
192 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
193 	[LMQCP_PUBCOMP]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
194 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
195 	[LMQCP_CTOS_SUBSCRIBE]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
196 					  LMQCP_LUT_FLAG_PAYLOAD |
197 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
198 	[LMQCP_STOC_SUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
199 					  LMQCP_LUT_FLAG_PAYLOAD |
200 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
201 	[LMQCP_CTOS_UNSUBSCRIBE]	= LMQCP_LUT_FLAG_RESERVED_FLAGS |
202 					  LMQCP_LUT_FLAG_PAYLOAD |
203 					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
204 	[LMQCP_STOC_UNSUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
205 					  LMQCP_LUT_FLAG_PAYLOAD |
206 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
207 	[LMQCP_CTOS_PINGREQ]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
208 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
209 	[LMQCP_STOC_PINGRESP]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
210 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
211 	[LMQCP_DISCONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
212 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
213 	[LMQCP_AUTH]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
214 					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
215 };
216 
217 static int
lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)218 lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
219 {
220 	par->consumed += (unsigned int)consumed;
221 
222 	if (par->consumed > par->props_len)
223 		return -1;
224 
225 	/* more properties coming */
226 
227 	if (par->consumed < par->props_len) {
228 		par->state = LMQCPP_PROP_ID_VBI;
229 		return 0;
230 	}
231 
232 	/* properties finished: are we headed for payload or idle? */
233 
234 	if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
235 		/* A PUBLISH packet MUST NOT contain a Packet Identifier if
236 		 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
237 	    (ctl_pkt_type(par) != LMQCP_PUBLISH ||
238 	     (par->packet_type_flags & 6))) {
239 		par->state = LMQCPP_PAYLOAD;
240 		return 0;
241 	}
242 
243 	par->state = LMQCPP_IDLE;
244 
245 	return 0;
246 }
247 
248 static int
lws_mqtt_set_client_established(struct lws *wsi)249 lws_mqtt_set_client_established(struct lws *wsi)
250 {
251 	lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
252 			    &role_ops_mqtt);
253 
254 	if (user_callback_handle_rxflow(wsi->a.protocol->callback,
255 					wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
256 					wsi->user_space, NULL, 0) < 0) {
257 		lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
258 
259 		return -1;
260 	}
261 	/*
262 	 * If we made a new connection and got the ACK, our connection is
263 	 * definitely working in both directions at the moment
264 	 */
265 	lws_validity_confirmed(wsi);
266 
267 	/* clear connection timeout */
268 	lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
269 
270 	return 0;
271 }
272 
273 static lws_mqtt_validate_topic_return_t
lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)274 lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
275 {
276 	size_t spos = 0;
277 	const char *sub = topic;
278 	int8_t slashes = 0;
279 	lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
280 
281 	if (awsiot) {
282 		if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
283 			return LMVTR_FAILED_OVERSIZE;
284 		if (topic[0] == '$') {
285 			ret = LMVTR_VALID_SHADOW;
286 			slashes = -3;
287 		}
288 	} else {
289 		if (topiclen > LWS_MQTT_MAX_TOPICLEN)
290 			return LMVTR_FAILED_OVERSIZE;
291 		if (topic[0] == '$')
292 			return LMVTR_FAILED_WILDCARD_FORMAT;
293 	}
294 
295 	while (*sub != 0) {
296 		if (sub[0] == '+') {
297 			/* topic == "+foo" || "a/+foo" ? */
298 			if (spos > 0 && sub[-1] != '/')
299 				return LMVTR_FAILED_WILDCARD_FORMAT;
300 
301 			/* topic == "foo+" or "foo+/a" ? */
302 			if (sub[1] != 0 && sub[1] != '/')
303 				return LMVTR_FAILED_WILDCARD_FORMAT;
304 
305 			ret = LMVTR_VALID_WILDCARD;
306 		} else if (sub[0] == '#') {
307 			/* topic == "foo#" ? */
308 			if (spos > 0 && sub[-1] != '/')
309 				return LMVTR_FAILED_WILDCARD_FORMAT;
310 
311 			/* topic == "#foo" ? */
312 			if (sub[1] != 0)
313 				return LMVTR_FAILED_WILDCARD_FORMAT;
314 
315 			ret = LMVTR_VALID_WILDCARD;
316 		} else if (sub[0] == '/') {
317 			slashes++;
318 		}
319 		spos++;
320 		sub++;
321 	}
322 
323 	if (awsiot && (slashes < 0 || slashes > 7))
324 		return LMVTR_FAILED_SHADOW_FORMAT;
325 
326 	return ret;
327 }
328 
329 static lws_mqtt_subs_t *
lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)330 lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
331 {
332 	lws_mqtt_subs_t *mysub;
333 	size_t topiclen = strlen(topic);
334 	lws_mqtt_validate_topic_return_t flag;
335 
336 	flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
337 	switch (flag) {
338 	case LMVTR_FAILED_OVERSIZE:
339 		lwsl_err("%s: Topic is too long\n",
340 			 __func__);
341 		return NULL;
342 	case LMVTR_FAILED_SHADOW_FORMAT:
343 	case LMVTR_FAILED_WILDCARD_FORMAT:
344 		lwsl_err("%s: Invalid topic format \"%s\"\n",
345 			 __func__, topic);
346 		return NULL;
347 
348 	case LMVTR_VALID:
349 	case LMVTR_VALID_WILDCARD:
350 	case LMVTR_VALID_SHADOW:
351 		mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub");
352 		if (!mysub) {
353 			lwsl_err("%s: Error allocating mysub\n",
354 				 __func__);
355 			return NULL;
356 		}
357 		mysub->wildcard = (flag == LMVTR_VALID_WILDCARD);
358 		mysub->shadow = (flag == LMVTR_VALID_SHADOW);
359 		break;
360 
361 	default:
362 		lwsl_err("%s: Unknown flag - %d\n",
363 			 __func__, flag);
364 		return NULL;
365 	}
366 
367 	mysub->next = mqtt->subs_head;
368 	mqtt->subs_head = mysub;
369 	memcpy(mysub->topic, topic, strlen(topic) + 1);
370 	mysub->ref_count = 1;
371 
372 	lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
373 		  __func__, mysub, mqtt);
374 
375 	return mysub;
376 }
377 
378 static int
lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)379 lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
380 {
381 	lws_mqtt_subs_t *s = mqtt->subs_head;
382 	lws_mqtt_subs_t *temp = NULL;
383 
384 
385 	lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
386 		  __func__, mqtt);
387 
388 	while (s && s->next) {
389 		if (s->next->ref_count == 0)
390 			break;
391 		s = s->next;
392 	}
393 
394 	if (s && s->next) {
395 		temp = s->next;
396 		lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
397 			  __func__, temp, mqtt);
398 		s->next = temp->next;
399 		lws_free(temp);
400 		return 0;
401 	}
402 	return 1;
403 }
404 
405 /*
406  * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
407  * PUBREC came before the timeout period
408  */
409 
410 static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)411 lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
412 {
413 	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
414 			struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
415 
416 	lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
417 
418 	if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
419 				mqtt->wsi->user_space, NULL, 0))
420 		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
421 }
422 
423 static void
lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)424 lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
425 {
426 	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
427 			struct _lws_mqtt_related, sul_unsuback_wait);
428 
429 	lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
430 
431 	if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
432 					   LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
433 					   mqtt->wsi->user_space, NULL, 0))
434 		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
435 }
436 
437 static void
lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul)438 lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul)
439 {
440 	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
441 			struct _lws_mqtt_related, sul_shadow_wait);
442 
443 	lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
444 
445 	if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
446 					    LWS_CALLBACK_MQTT_SHADOW_TIMEOUT,
447 					    mqtt->wsi->user_space, NULL, 0))
448 		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
449 }
450 
451 void
lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)452 lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
453 {
454 	lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
455 	c->estate = s;
456 }
457 
458 lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char* sub, const char* pub)459 lws_mqtt_is_topic_matched(const char* sub, const char* pub)
460 {
461 	const char *ppos = pub, *spos = sub;
462 
463 	if (!ppos || !spos) {
464 		return LMMTR_TOPIC_MATCH_ERROR;
465 	}
466 
467 	while (*spos) {
468 		if (*ppos == '#' || *ppos == '+') {
469 			lwsl_err("%s: PUBLISH to wildcard "
470 				 "topic \"%s\" not supported\n",
471 				 __func__, pub);
472 			return LMMTR_TOPIC_MATCH_ERROR;
473 		}
474 		/* foo/+/bar == foo/xyz/bar ? */
475 		if (*spos == '+') {
476 			/* Skip ahead */
477 			while (*ppos != '\0' && *ppos != '/') {
478 				ppos++;
479 			}
480 		} else if (*spos == '#') {
481 			return LMMTR_TOPIC_MATCH;
482 		} else {
483 			if (*ppos == '\0') {
484 				/* foo/bar == foo/bar/# ? */
485 				if (!strncmp(spos, "/#", 2))
486 					return LMMTR_TOPIC_MATCH;
487 				return LMMTR_TOPIC_NOMATCH;
488 			/* Non-matching character */
489 			} else if (*ppos != *spos) {
490 				return LMMTR_TOPIC_NOMATCH;
491 			}
492 			ppos++;
493 		}
494 		spos++;
495 	}
496 
497 	if (*spos == '\0' && *ppos == '\0')
498 		return LMMTR_TOPIC_MATCH;
499 
500 	return LMMTR_TOPIC_NOMATCH;
501 }
502 
lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt, const char* ptopic)503 lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
504 				   const char* ptopic) {
505 	lws_mqtt_subs_t *s = mqtt->subs_head;
506 
507 	while (s) {
508 		/*  SUB topic  ==   PUB topic  ? */
509 		/* foo/bar/xyz ==  foo/bar/xyz ? */
510 		if (!s->wildcard) {
511 			if (!strcmp((const char*)s->topic, ptopic))
512 				return s;
513 		} else {
514 			if (lws_mqtt_is_topic_matched(
515 			    s->topic, ptopic) == LMMTR_TOPIC_MATCH)
516 				return s;
517 		}
518 
519 		s = s->next;
520 	}
521 
522 	return NULL;
523 }
524 
525 int
_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par, const uint8_t *buf, size_t len)526 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
527 		    const uint8_t *buf, size_t len)
528 {
529 	struct lws *w;
530 	int n;
531 
532 	if (par->flag_pending_send_reason_close)
533 		return 0;
534 
535 	/*
536 	 * Stateful, fragmentation-immune parser
537 	 *
538 	 * Notice that len can always be 1 if under attack, even over tls if
539 	 * the server is compromised or malicious.
540 	 */
541 
542 	while (len) {
543 		lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
544 		switch (par->state) {
545 		case LMQCPP_IDLE:
546 			par->packet_type_flags = *buf++;
547 			len--;
548 
549 #if defined(LWS_WITH_CLIENT)
550 			/*
551 			 * The case where we sent the connect, but we received
552 			 * something else before any CONNACK
553 			 */
554 			if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
555 			    par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
556 				lwsl_notice("%s: server sent non-CONNACK\n",
557 						__func__);
558 				goto send_protocol_error_and_close;
559 			}
560 #endif /* LWS_WITH_CLIENT */
561 
562 			n = map_flags[par->packet_type_flags >> 4];
563 			/*
564 			 *  Where a flag bit is marked as “Reserved”, it is
565 			 *  reserved for future use and MUST be set to the value
566 			 *  listed [MQTT-2.1.3-1].
567 			 */
568 			if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
569 			    ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
570 				lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n",
571 					    __func__, lws_wsi_tag(wsi),
572 					    par->packet_type_flags, n, (int)len + 1);
573 				lwsl_hexdump_err(buf - 1, len + 1);
574 				goto send_protocol_error_and_close;
575 			}
576 
577 			lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
578 				   __func__, par->packet_type_flags >> 4,
579 				   par->packet_type_flags & 0xf);
580 
581 			/* allows us to know if a property that can only be
582 			 * given once, appears twice */
583 			memset(par->props_seen, 0, sizeof(par->props_seen));
584 			par->state = par->packet_type_flags & 0xf0;
585 			break;
586 
587 		case LMQCPP_CONNECT_PACKET:
588 			lwsl_debug("%s: received CONNECT pkt\n", __func__);
589 			par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
590 			lws_mqtt_vbi_init(&par->vbit);
591 			break;
592 
593 		case LMQCPP_CONNECT_REMAINING_LEN_VBI:
594 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
595 			case LMSPR_NEED_MORE:
596 				break;
597 			case LMSPR_COMPLETED:
598 				par->cpkt_remlen = par->vbit.value;
599 				n = map_flags[ctl_pkt_type(par)];
600 				lws_mqtt_str_init(&par->s_temp, par->temp,
601 						  sizeof(par->temp), 0);
602 				par->state = LMQCPP_CONNECT_VH_PNAME;
603 				break;
604 			default:
605 				lwsl_notice("%s: bad vbi\n", __func__);
606 				goto send_protocol_error_and_close;
607 			}
608 			break;
609 
610 		case LMQCPP_CONNECT_VH_PNAME:
611 			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
612 			case LMSPR_NEED_MORE:
613 				break;
614 			case LMSPR_COMPLETED:
615 				if (par->s_temp.len != 4 ||
616 				    memcmp(par->s_temp.buf, "MQTT",
617 					   par->s_temp.len)) {
618 					lwsl_notice("%s: protocol name: %.*s\n",
619 						  __func__, par->s_temp.len,
620 						  par->s_temp.buf);
621 					goto send_unsupp_connack_and_close;
622 				}
623 				par->state = LMQCPP_CONNECT_VH_PVERSION;
624 				break;
625 			default:
626 				lwsl_notice("%s: bad protocol name\n", __func__);
627 				goto send_protocol_error_and_close;
628 			}
629 			break;
630 
631 		case LMQCPP_CONNECT_VH_PVERSION:
632 			par->conn_protocol_version = *buf++;
633 			len--;
634 			if (par->conn_protocol_version != 5) {
635 				lwsl_info("%s: unsupported MQTT version %d\n",
636 					  __func__, par->conn_protocol_version);
637 				goto send_unsupp_connack_and_close;
638 			}
639 			par->state = LMQCPP_CONNECT_VH_FLAGS;
640 			break;
641 
642 		case LMQCPP_CONNECT_VH_FLAGS:
643 			par->cpkt_flags = *buf++;
644 			len--;
645 			if (par->cpkt_flags & 1) {
646 				/*
647 				 * The Server MUST validate that the reserved
648 				 * flag in the CONNECT packet is set to 0
649 				 * [MQTT-3.1.2-3].
650 				 */
651 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
652 				goto send_reason_and_close;
653 			}
654 			/*
655 			 * conn_flags specifies the Will Properties that should
656 			 * appear in the payload section
657 			 */
658 			lws_mqtt_2byte_init(&par->vbit);
659 			par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
660 			break;
661 
662 		case LMQCPP_CONNECT_VH_KEEPALIVE:
663 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
664 			case LMSPR_NEED_MORE:
665 				break;
666 			case LMSPR_COMPLETED:
667 				par->keepalive = (uint16_t)par->vbit.value;
668 				lws_mqtt_vbi_init(&par->vbit);
669 				par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
670 				break;
671 			default:
672 				lwsl_notice("%s: ka bad vbi\n", __func__);
673 				goto send_protocol_error_and_close;
674 			}
675 			break;
676 
677 		case LMQCPP_PINGRESP_ZERO:
678 			len--;
679 			/* second byte of PINGRESP must be zero */
680 			if (*buf++)
681 				goto send_protocol_error_and_close;
682 			goto cmd_completion;
683 
684 		case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
685 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
686 			case LMSPR_NEED_MORE:
687 				break;
688 			case LMSPR_COMPLETED:
689 				/* reset consumption counter */
690 				par->consumed = 0;
691 				par->props_len = par->vbit.value;
692 				lws_mqtt_vbi_init(&par->vbit);
693 				par->state = LMQCPP_PROP_ID_VBI;
694 				break;
695 			default:
696 				lwsl_notice("%s: connpr bad vbi\n", __func__);
697 				goto send_protocol_error_and_close;
698 			}
699 			break;
700 
701 		/* PUBREC */
702 		case LMQCPP_PUBREC_PACKET:
703 			lwsl_debug("%s: received PUBREC pkt\n", __func__);
704 			lws_mqtt_vbi_init(&par->vbit);
705 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
706 			case LMSPR_NEED_MORE:
707 				break;
708 			case LMSPR_COMPLETED:
709 				par->cpkt_remlen = par->vbit.value;
710 				lwsl_debug("%s: PUBREC pkt len = %d\n",
711 					   __func__, (int)par->cpkt_remlen);
712 				if (par->cpkt_remlen < 2)
713 					goto send_protocol_error_and_close;
714 				par->state = LMQCPP_PUBREC_VH_PKT_ID;
715 				break;
716 			default:
717 				lwsl_notice("%s: pubrec bad vbi\n", __func__);
718 				goto send_protocol_error_and_close;
719 			}
720 			break;
721 
722 		case LMQCPP_PUBREC_VH_PKT_ID:
723 			if (len < 2) {
724 				lwsl_notice("%s: len breakage 3\n", __func__);
725 				return -1;
726 			}
727 
728 			par->cpkt_id = lws_ser_ru16be(buf);
729 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
730 			buf += 2;
731 			len -= 2;
732 			par->cpkt_remlen -= 2;
733 			par->n = 0;
734 
735 			goto cmd_completion;
736 
737 		/* PUBREL */
738 		case LMQCPP_PUBREL_PACKET:
739 			lwsl_debug("%s: received PUBREL pkt\n", __func__);
740 			lws_mqtt_vbi_init(&par->vbit);
741 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
742 			case LMSPR_NEED_MORE:
743 				break;
744 			case LMSPR_COMPLETED:
745 				par->cpkt_remlen = par->vbit.value;
746 				lwsl_debug("%s: PUBREL pkt len = %d\n",
747 					   __func__, (int)par->cpkt_remlen);
748 				if (par->cpkt_remlen < 2)
749 					goto send_protocol_error_and_close;
750 				par->state = LMQCPP_PUBREL_VH_PKT_ID;
751 				break;
752 			default:
753 				lwsl_err("%s: pubrel bad vbi\n", __func__);
754 				goto send_protocol_error_and_close;
755 			}
756 			break;
757 
758 		case LMQCPP_PUBREL_VH_PKT_ID:
759 			if (len < 2) {
760 				lwsl_notice("%s: len breakage 3\n", __func__);
761 				return -1;
762 			}
763 
764 			par->cpkt_id = lws_ser_ru16be(buf);
765 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
766 			buf += 2;
767 			len -= 2;
768 			par->cpkt_remlen -= 2;
769 			par->n = 0;
770 
771 			goto cmd_completion;
772 
773 		/* PUBCOMP */
774 		case LMQCPP_PUBCOMP_PACKET:
775 			lwsl_debug("%s: received PUBCOMP pkt\n", __func__);
776 			lws_mqtt_vbi_init(&par->vbit);
777 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
778 			case LMSPR_NEED_MORE:
779 				break;
780 			case LMSPR_COMPLETED:
781 				par->cpkt_remlen = par->vbit.value;
782 				lwsl_debug("%s: PUBCOMP pkt len = %d\n",
783 					   __func__, (int)par->cpkt_remlen);
784 				if (par->cpkt_remlen < 2)
785 					goto send_protocol_error_and_close;
786 				par->state = LMQCPP_PUBCOMP_VH_PKT_ID;
787 				break;
788 			default:
789 				lwsl_err("%s: pubcmp bad vbi\n", __func__);
790 				goto send_protocol_error_and_close;
791 			}
792 			break;
793 
794 		case LMQCPP_PUBCOMP_VH_PKT_ID:
795 			if (len < 2) {
796 				lwsl_notice("%s: len breakage 3\n", __func__);
797 				return -1;
798 			}
799 
800 			par->cpkt_id = lws_ser_ru16be(buf);
801 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
802 			buf += 2;
803 			len -= 2;
804 			par->cpkt_remlen -= 2;
805 			par->n = 0;
806 
807 			goto cmd_completion;
808 
809 		case LMQCPP_PUBLISH_PACKET:
810 			if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
811 				lwsl_notice("%s: Topic rx before subscribing\n",
812 					    __func__);
813 				goto send_protocol_error_and_close;
814 			}
815 			lwsl_info("%s: received PUBLISH pkt\n", __func__);
816 			par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
817 			lws_mqtt_vbi_init(&par->vbit);
818 			break;
819 		case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
820 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
821 			case LMSPR_NEED_MORE:
822 				break;
823 			case LMSPR_COMPLETED:
824 				par->cpkt_remlen = par->vbit.value;
825 				lwsl_debug("%s: PUBLISH pkt len = %d\n",
826 					   __func__, (int)par->cpkt_remlen);
827 				/* Move on to PUBLISH's variable header */
828 				par->state = LMQCPP_PUBLISH_VH_TOPIC;
829 				break;
830 			default:
831 				lwsl_notice("%s: pubrem bad vbi\n", __func__);
832 				goto send_protocol_error_and_close;
833 			}
834 			break;
835 
836 		case LMQCPP_PUBLISH_VH_TOPIC:
837 		{
838 			lws_mqtt_publish_param_t *pub = NULL;
839 
840 			if (len < 2) {
841 				lwsl_notice("%s: topic too short\n", __func__);
842 				return -1;
843 			}
844 
845 			/* Topic len */
846 			par->n = lws_ser_ru16be(buf);
847 			buf += 2;
848 			len -= 2;
849 
850 			if (len < par->n) {/* the way this is written... */
851 				lwsl_notice("%s: len breakage\n", __func__);
852 				return -1;
853 			}
854 
855 			/* Invalid topic len */
856 			if (par->n == 0) {
857 				lwsl_notice("%s: zero topic len\n", __func__);
858 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
859 				goto send_reason_and_close;
860 			}
861 			lwsl_debug("%s: PUBLISH topic len %d\n",
862 				   __func__, (int)par->n);
863 			assert(!wsi->mqtt->rx_cpkt_param);
864 			wsi->mqtt->rx_cpkt_param = lws_zalloc(
865 				sizeof(lws_mqtt_publish_param_t), "rx pub param");
866 			if (!wsi->mqtt->rx_cpkt_param)
867 				goto oom;
868 			pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
869 
870 			pub->topic_len = (uint16_t)par->n;
871 
872 			/* Topic Name */
873 			pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
874 							"rx publish topic");
875 			if (!pub->topic)
876 				goto oom;
877 			lws_strncpy(pub->topic, (const char *)buf,
878 				    (size_t)pub->topic_len + 1);
879 			buf += pub->topic_len;
880 			len -= pub->topic_len;
881 
882 			/* Extract QoS Level from Fixed Header Flags */
883 			pub->qos = (lws_mqtt_qos_levels_t)
884 					((par->packet_type_flags >> 1) & 0x3);
885 
886 			pub->payload_pos = 0;
887 
888 			pub->payload_len = par->cpkt_remlen -
889 				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
890 
891 			switch (pub->qos) {
892 			case QOS0:
893 				par->state = LMQCPP_PAYLOAD;
894 				if (pub->payload_len == 0)
895 					goto cmd_completion;
896 
897 				break;
898 			case QOS1:
899 			case QOS2:
900 				par->state = LMQCPP_PUBLISH_VH_PKT_ID;
901 				break;
902 			default:
903 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
904 				lws_free_set_NULL(pub->topic);
905 				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
906 				goto send_reason_and_close;
907 			}
908 			break;
909 		}
910 		case LMQCPP_PUBLISH_VH_PKT_ID:
911 		{
912 			lws_mqtt_publish_param_t *pub =
913 				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
914 
915 			if (len < 2) {
916 				lwsl_notice("%s: len breakage 2\n", __func__);
917 				return -1;
918 			}
919 
920 			par->cpkt_id = lws_ser_ru16be(buf);
921 			buf += 2;
922 			len -= 2;
923 			wsi->mqtt->peer_ack_pkt_id = par->cpkt_id;
924 			lwsl_debug("%s: Packet ID %d\n",
925 					__func__, (int)par->cpkt_id);
926 			par->state = LMQCPP_PAYLOAD;
927 			pub->payload_pos = 0;
928 			pub->payload_len = par->cpkt_remlen -
929 				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
930 			if (pub->payload_len == 0)
931 				goto cmd_completion;
932 
933 			break;
934 		}
935 		case LMQCPP_PAYLOAD:
936 		{
937 			lws_mqtt_publish_param_t *pub =
938 				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
939 			if (pub == NULL) {
940 				lwsl_err("%s: Uninitialized pub_param\n",
941 						__func__);
942 				goto send_protocol_error_and_close;
943 			}
944 
945 			pub->payload = buf;
946 			goto cmd_completion;
947 		}
948 
949 		case LMQCPP_CONNACK_PACKET:
950 			if (!lwsi_role_client(wsi)) {
951 				lwsl_err("%s: CONNACK is only Server to Client",
952 						__func__);
953 				goto send_unsupp_connack_and_close;
954 			}
955 
956 			lwsl_debug("%s: received CONNACK pkt\n", __func__);
957 			lws_mqtt_vbi_init(&par->vbit);
958 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
959 			case LMSPR_NEED_MORE:
960 				break;
961 			case LMSPR_COMPLETED:
962 				par->cpkt_remlen = par->vbit.value;
963 				lwsl_debug("%s: CONNACK pkt len = %d\n",
964 					   __func__, (int)par->cpkt_remlen);
965 				if (par->cpkt_remlen != 2)
966 					goto send_protocol_error_and_close;
967 
968 				par->state = LMQCPP_CONNACK_VH_FLAGS;
969 				break;
970 			default:
971 				lwsl_notice("%s: connack bad vbi\n", __func__);
972 				goto send_protocol_error_and_close;
973 			}
974 			break;
975 
976 		case LMQCPP_CONNACK_VH_FLAGS:
977 		{
978 			lws_mqttc_t *c = &wsi->mqtt->client;
979 			par->cpkt_flags = *buf++;
980 			len--;
981 
982 			if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
983 				/*
984 				 * Byte 1 is the "Connect Acknowledge
985 				 * Flags". Bits 7-1 are reserved and
986 				 * MUST be set to 0.
987 				 */
988 				par->reason = LMQCP_REASON_MALFORMED_PACKET;
989 				goto send_reason_and_close;
990 			}
991 			/*
992 			 * If the Server accepts a connection with
993 			 * CleanSession set to 1, the Server MUST set
994 			 * Session Present to 0 in the CONNACK packet
995 			 * in addition to setting a zero return code
996 			 * in the CONNACK packet [MQTT-3.2.2-1]. If
997 			 * the Server accepts a connection with
998 			 * CleanSession set to 0, the value set in
999 			 * Session Present depends on whether the
1000 			 * Server already has stored Session state for
1001 			 * the supplied client ID. If the Server has
1002 			 * stored Session state, it MUST set
1003 			 * SessionPresent to 1 in the CONNACK packet
1004 			 * [MQTT-3.2.2-2]. If the Server does not have
1005 			 * stored Session state, it MUST set Session
1006 			 * Present to 0 in the CONNACK packet. This is
1007 			 * in addition to setting a zero return code
1008 			 * in the CONNACK packet [MQTT-3.2.2-3].
1009 			 */
1010 			if ((c->conn_flags & LMQCFT_CLEAN_START) &&
1011 			    (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
1012 				goto send_protocol_error_and_close;
1013 
1014 			wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags &
1015 						      LMQCFT_SESSION_PRESENT);
1016 
1017 			/* Move on to Connect Return Code */
1018 			par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
1019 			break;
1020 		}
1021 		case LMQCPP_CONNACK_VH_RETURN_CODE:
1022 			par->conn_rc = *buf++;
1023 			len--;
1024 			/*
1025 			 * If a server sends a CONNACK packet containing a
1026 			 * non-zero return code it MUST then close the Network
1027 			 * Connection [MQTT-3.2.2-5]
1028 			 */
1029 			switch (par->conn_rc) {
1030 			case 0:
1031 				goto cmd_completion;
1032 			case 1:
1033 			case 2:
1034 			case 3:
1035 			case 4:
1036 			case 5:
1037 				par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
1038 						par->conn_rc - 1;
1039 				goto send_reason_and_close;
1040 			default:
1041 				lwsl_notice("%s: bad connack retcode\n", __func__);
1042 				goto send_protocol_error_and_close;
1043 			}
1044 			break;
1045 
1046 		/* SUBACK */
1047 		case LMQCPP_SUBACK_PACKET:
1048 			if (!lwsi_role_client(wsi)) {
1049 				lwsl_err("%s: SUBACK is only Server to Client",
1050 						__func__);
1051 				goto send_unsupp_connack_and_close;
1052 			}
1053 
1054 			lwsl_debug("%s: received SUBACK pkt\n", __func__);
1055 			lws_mqtt_vbi_init(&par->vbit);
1056 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1057 			case LMSPR_NEED_MORE:
1058 				break;
1059 			case LMSPR_COMPLETED:
1060 				par->cpkt_remlen = par->vbit.value;
1061 				lwsl_debug("%s: SUBACK pkt len = %d\n",
1062 					   __func__, (int)par->cpkt_remlen);
1063 				if (par->cpkt_remlen <= 2)
1064 					goto send_protocol_error_and_close;
1065 				par->state = LMQCPP_SUBACK_VH_PKT_ID;
1066 				break;
1067 			default:
1068 				lwsl_notice("%s: suback bad vbi\n", __func__);
1069 				goto send_protocol_error_and_close;
1070 			}
1071 			break;
1072 
1073 		case LMQCPP_SUBACK_VH_PKT_ID:
1074 
1075 			if (len < 2) {
1076 				lwsl_notice("%s: len breakage 4\n", __func__);
1077 				return -1;
1078 			}
1079 
1080 			par->cpkt_id = lws_ser_ru16be(buf);
1081 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1082 			buf += 2;
1083 			len -= 2;
1084 			par->cpkt_remlen -= 2;
1085 			par->n = 0;
1086 			par->state = LMQCPP_SUBACK_PAYLOAD;
1087 			*par->temp = 0;
1088 			break;
1089 
1090 		case LMQCPP_SUBACK_PAYLOAD:
1091 		{
1092 			lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
1093 
1094 			len--;
1095 			switch (qos) {
1096 				case QOS0:
1097 				case QOS1:
1098 				case QOS2:
1099 					break;
1100 				case FAILURE_QOS_LEVEL:
1101 					goto send_protocol_error_and_close;
1102 
1103 				default:
1104 					par->reason = LMQCP_REASON_MALFORMED_PACKET;
1105 					goto send_reason_and_close;
1106 			}
1107 
1108 			if (++(par->n) == par->cpkt_remlen) {
1109 				par->n = 0;
1110 				goto cmd_completion;
1111 			}
1112 
1113 			break;
1114 		}
1115 
1116 		/* UNSUBACK */
1117 		case LMQCPP_UNSUBACK_PACKET:
1118 			if (!lwsi_role_client(wsi)) {
1119 				lwsl_err("%s: UNSUBACK is only Server to Client",
1120 						__func__);
1121 				goto send_unsupp_connack_and_close;
1122 			}
1123 
1124 			lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
1125 			lws_mqtt_vbi_init(&par->vbit);
1126 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1127 			case LMSPR_NEED_MORE:
1128 				break;
1129 			case LMSPR_COMPLETED:
1130 				par->cpkt_remlen = par->vbit.value;
1131 				lwsl_debug("%s: UNSUBACK pkt len = %d\n",
1132 					   __func__, (int)par->cpkt_remlen);
1133 				if (par->cpkt_remlen < 2)
1134 					goto send_protocol_error_and_close;
1135 				par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
1136 				break;
1137 			default:
1138 				lwsl_notice("%s: unsuback bad vbi\n", __func__);
1139 				goto send_protocol_error_and_close;
1140 			}
1141 			break;
1142 
1143 		case LMQCPP_UNSUBACK_VH_PKT_ID:
1144 
1145 			if (len < 2) {
1146 				lwsl_notice("%s: len breakage 3\n", __func__);
1147 				return -1;
1148 			}
1149 
1150 			par->cpkt_id = lws_ser_ru16be(buf);
1151 			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1152 			buf += 2;
1153 			len -= 2;
1154 			par->cpkt_remlen -= 2;
1155 			par->n = 0;
1156 
1157 			goto cmd_completion;
1158 
1159 		case LMQCPP_PUBACK_PACKET:
1160 			lws_mqtt_vbi_init(&par->vbit);
1161 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1162 			case LMSPR_NEED_MORE:
1163 				break;
1164 			case LMSPR_COMPLETED:
1165 				par->cpkt_remlen = par->vbit.value;
1166 				lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
1167 					  (int)par->cpkt_remlen);
1168 				/*
1169 				 * must be 4 or more, with special case that 2
1170 				 * means success with no reason code or props
1171 				 */
1172 				if (par->cpkt_remlen <= 1 ||
1173 				    par->cpkt_remlen == 3)
1174 					goto send_protocol_error_and_close;
1175 
1176 				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1177 				par->fixed_seen[2] = par->fixed_seen[3] = 0;
1178 				par->fixed = 0;
1179 				par->n = 0;
1180 				break;
1181 			default:
1182 				lwsl_notice("%s: puback bad vbi\n", __func__);
1183 				goto send_protocol_error_and_close;
1184 			}
1185 			break;
1186 
1187 		case LMQCPP_PUBACK_VH_PKT_ID:
1188 			/*
1189 			 * There are 3 fixed bytes and then a VBI for the
1190 			 * property section length
1191 			 */
1192 			par->fixed_seen[par->fixed++] = *buf++;
1193 			if (len < par->cpkt_remlen - par->n) {
1194 				lwsl_notice("%s: len breakage 4\n", __func__);
1195 				return -1;
1196 			}
1197 			len--;
1198 			par->n++;
1199 			if (par->fixed == 2)
1200 				par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
1201 
1202 			if (par->fixed == 3) {
1203 				lws_mqtt_vbi_init(&par->vbit);
1204 				par->props_consumed = 0;
1205 				par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
1206 			}
1207 			/* length of 2 is truncated packet and we completed it */
1208 			if (par->cpkt_remlen == par->fixed)
1209 				goto cmd_completion;
1210 			break;
1211 
1212 		case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
1213 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1214 			case LMSPR_NEED_MORE:
1215 				break;
1216 			case LMSPR_COMPLETED:
1217 				par->props_len = par->vbit.value;
1218 				lwsl_info("%s: PUBACK props len = %d\n",
1219 					  __func__, (int)par->cpkt_remlen);
1220 				/*
1221 				 * If there are no properties, this is a
1222 				 * command completion event in itself
1223 				 */
1224 				if (!par->props_len)
1225 					goto cmd_completion;
1226 
1227 				/*
1228 				 * Otherwise consume the properties before
1229 				 * completing the command
1230 				 */
1231 				lws_mqtt_vbi_init(&par->vbit);
1232 				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1233 				break;
1234 			default:
1235 				lwsl_notice("%s: puback pr bad vbi\n", __func__);
1236 				goto send_protocol_error_and_close;
1237 			}
1238 			break;
1239 
1240 		case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
1241 			/*
1242 			 * TODO: stash the props
1243 			 */
1244 			par->props_consumed++;
1245 			len--;
1246 			buf++;
1247 			if (par->props_len != par->props_consumed)
1248 				break;
1249 
1250 cmd_completion:
1251 			/*
1252 			 * We come here when we understood we just processed
1253 			 * the last byte of a command packet, regardless of the
1254 			 * packet type
1255 			 */
1256 			par->state = LMQCPP_IDLE;
1257 
1258 			switch (par->packet_type_flags >> 4) {
1259 			case LMQCP_STOC_CONNACK:
1260 				lwsl_info("%s: cmd_completion: CONNACK\n",
1261 					  __func__);
1262 
1263 				/*
1264 				 * Getting the CONNACK means we are the first,
1265 				 * the nwsi, and we succeeded to create a new
1266 				 * network connection ourselves.
1267 				 *
1268 				 * Since others may join us sharing the nwsi,
1269 				 * and we may close while they still want to use
1270 				 * it, our wsi lifecycle alone can no longer
1271 				 * define the lifecycle of the nwsi... it means
1272 				 * we need to do a "magic trick" and instead of
1273 				 * being both the nwsi and act like a child
1274 				 * stream, create a new wsi to take over the
1275 				 * nwsi duties and turn our wsi into a child of
1276 				 * the nwsi with its own lifecycle.
1277 				 *
1278 				 * The nwsi gets a mostly empty wsi->nwsi used
1279 				 * to track already-subscribed topics globally
1280 				 * for the connection.
1281 				 */
1282 
1283 				/* we were under SENT_CLIENT_HANDSHAKE timeout */
1284 				lws_set_timeout(wsi, 0, 0);
1285 
1286 				w = lws_create_new_server_wsi(wsi->a.vhost,
1287 							      wsi->tsi, "mqtt_sid1");
1288 				if (!w) {
1289 					lwsl_notice("%s: sid 1 migrate failed\n",
1290 							__func__);
1291 					return -1;
1292 				}
1293 
1294 				wsi->mux.highest_sid = 1;
1295 				lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1296 
1297 				wsi->mux_substream = 1;
1298 				w->mux_substream = 1;
1299 				w->client_mux_substream = 1;
1300 				wsi->client_mux_migrated = 1;
1301 				wsi->told_user_closed = 1; /* don't tell nwsi closed */
1302 
1303 				lwsi_set_state(w, LRS_ESTABLISHED);
1304 				lwsi_set_state(wsi, LRS_ESTABLISHED);
1305 				lwsi_set_role(w, lwsi_role(wsi));
1306 
1307 #if defined(LWS_WITH_CLIENT)
1308 				w->flags = wsi->flags;
1309 #endif
1310 
1311 				w->mqtt = wsi->mqtt;
1312 				wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1313 				if (!wsi->mqtt)
1314 					return -1;
1315 				w->mqtt->wsi = w;
1316 				w->a.protocol = wsi->a.protocol;
1317 				if (w->user_space &&
1318 				    !w->user_space_externally_allocated)
1319 					lws_free_set_NULL(w->user_space);
1320 				w->user_space = wsi->user_space;
1321 				wsi->user_space = NULL;
1322 				w->user_space_externally_allocated =
1323 					wsi->user_space_externally_allocated;
1324 				if (lws_ensure_user_space(w))
1325 					goto bail1;
1326 				w->a.opaque_user_data = wsi->a.opaque_user_data;
1327 				wsi->a.opaque_user_data = NULL;
1328 				w->stash = wsi->stash;
1329 				wsi->stash = NULL;
1330 
1331 				lws_mux_mark_immortal(w);
1332 
1333 				lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n",
1334 						__func__, lws_wsi_tag(wsi),
1335 						lws_wsi_tag(w));
1336 
1337 				/*
1338 				 * It was the last thing we were waiting for
1339 				 * before we can be fully ESTABLISHED
1340 				 */
1341 				if (lws_mqtt_set_client_established(w)) {
1342 					lwsl_notice("%s: set EST fail\n", __func__);
1343 					return -1;
1344 				}
1345 
1346 				/* get the ball rolling */
1347 				lws_validity_confirmed(wsi);
1348 
1349 				/* well, add the queued guys as children */
1350 				lws_wsi_mux_apply_queue(wsi);
1351 				break;
1352 
1353 bail1:
1354 				/* undo the insert */
1355 				wsi->mux.child_list = w->mux.sibling_list;
1356 				wsi->mux.child_count--;
1357 
1358 				if (w->user_space)
1359 					lws_free_set_NULL(w->user_space);
1360 				w->a.vhost->protocols[0].callback(w,
1361 							LWS_CALLBACK_WSI_DESTROY,
1362 							NULL, NULL, 0);
1363 				__lws_vhost_unbind_wsi(w); /* cx + vh lock */
1364 				lws_free(w);
1365 
1366 				return 0;
1367 
1368 			case LMQCP_PUBREC:
1369 				lwsl_err("%s: cmd_completion: PUBREC\n",
1370 						__func__);
1371 				/*
1372 				 * Figure out which child asked for this
1373 				 */
1374 				n = 0;
1375 				lws_start_foreach_ll(struct lws *, w,
1376 						     wsi->mux.child_list) {
1377 					if (w->mqtt->unacked_publish &&
1378 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1379 						char requested_close = 0;
1380 
1381 						w->mqtt->unacked_publish = 0;
1382 						w->mqtt->unacked_pubrel = 1;
1383 
1384 						if (user_callback_handle_rxflow(
1385 							    w->a.protocol->callback,
1386 							    w, LWS_CALLBACK_MQTT_ACK,
1387 							    w->user_space, NULL, 0) < 0) {
1388 							lwsl_info("%s: MQTT_ACK requests close\n",
1389 								 __func__);
1390 							requested_close = 1;
1391 						}
1392 						n = 1;
1393 
1394 						/*
1395 						 * We got an assertive PUBREC,
1396 						 * no need for timeout wait
1397 						 * any more
1398 						 */
1399 						lws_sul_cancel(&w->mqtt->
1400 							  sul_qos_puback_pubrec_wait);
1401 
1402 						if (requested_close) {
1403 							__lws_close_free_wsi(w,
1404 								0, "ack cb");
1405 							break;
1406 						}
1407 
1408 						break;
1409 					}
1410 				} lws_end_foreach_ll(w, mux.sibling_list);
1411 
1412 				if (!n) {
1413 					lwsl_err("%s: unsolicited PUBREC\n",
1414 							__func__);
1415 					return -1;
1416 				}
1417 				wsi->mqtt->send_pubrel = 1;
1418 				lws_callback_on_writable(wsi);
1419 				break;
1420 
1421 			case LMQCP_PUBCOMP:
1422 				lwsl_err("%s: cmd_completion: PUBCOMP\n",
1423 						__func__);
1424 				n = 0;
1425 				lws_start_foreach_ll(struct lws *, w,
1426 						     wsi->mux.child_list) {
1427 					if (w->mqtt->unacked_pubrel > 0 &&
1428 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1429 						w->mqtt->unacked_pubrel = 0;
1430 						n = 1;
1431 					}
1432 				} lws_end_foreach_ll(w, mux.sibling_list);
1433 
1434 				if (!n) {
1435 					lwsl_err("%s: unsolicited PUBCOMP\n",
1436 							__func__);
1437 					return -1;
1438 				}
1439 
1440 				/*
1441 				 * If we published something and PUBCOMP arrived,
1442 				 * our connection is definitely working in both
1443 				 * directions at the moment.
1444 				 */
1445 				lws_validity_confirmed(wsi);
1446 				break;
1447 
1448 			case LMQCP_PUBREL:
1449 				lwsl_err("%s: cmd_completion: PUBREL\n",
1450 						__func__);
1451 				wsi->mqtt->send_pubcomp = 1;
1452 				lws_callback_on_writable(wsi);
1453 				break;
1454 
1455 			case LMQCP_PUBACK:
1456 				lwsl_info("%s: cmd_completion: PUBACK\n",
1457 						__func__);
1458 
1459 				/*
1460 				 * Figure out which child asked for this
1461 				 */
1462 
1463 				n = 0;
1464 				lws_start_foreach_ll(struct lws *, w,
1465 						      wsi->mux.child_list) {
1466 					if (w->mqtt->unacked_publish &&
1467 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1468 						char requested_close = 0;
1469 
1470 						w->mqtt->unacked_publish = 0;
1471 						if (user_callback_handle_rxflow(
1472 							    w->a.protocol->callback,
1473 							    w, LWS_CALLBACK_MQTT_ACK,
1474 							    w->user_space, NULL, 0) < 0) {
1475 							lwsl_info("%s: MQTT_ACK requests close\n",
1476 								 __func__);
1477 							requested_close = 1;
1478 						}
1479 						n = 1;
1480 
1481 						/*
1482 						 * We got an assertive PUBACK,
1483 						 * no need for ACK timeout wait
1484 						 * any more
1485 						 */
1486 						lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait);
1487 
1488 						if (requested_close) {
1489 							__lws_close_free_wsi(w,
1490 								0, "ack cb");
1491 							break;
1492 						}
1493 
1494 						break;
1495 					}
1496 				} lws_end_foreach_ll(w, mux.sibling_list);
1497 
1498 				if (!n) {
1499 					lwsl_err("%s: unsolicited PUBACK\n",
1500 							__func__);
1501 					return -1;
1502 				}
1503 
1504 				/*
1505 				 * If we published something and it was acked,
1506 				 * our connection is definitely working in both
1507 				 * directions at the moment.
1508 				 */
1509 				lws_validity_confirmed(wsi);
1510 				break;
1511 
1512 			case LMQCP_STOC_PINGRESP:
1513 				lwsl_info("%s: cmd_completion: PINGRESP\n",
1514 						__func__);
1515 				/*
1516 				 * If we asked for a PINGRESP and it came,
1517 				 * our connection is definitely working in both
1518 				 * directions at the moment.
1519 				 */
1520 				lws_validity_confirmed(wsi);
1521 				break;
1522 
1523 			case LMQCP_STOC_SUBACK:
1524 				lwsl_info("%s: cmd_completion: SUBACK\n",
1525 						__func__);
1526 
1527 				/*
1528 				 * Figure out which child asked for this
1529 				 */
1530 
1531 				n = 0;
1532 				lws_start_foreach_ll(struct lws *, w,
1533 						      wsi->mux.child_list) {
1534 					if (w->mqtt->inside_subscribe &&
1535 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1536 						w->mqtt->inside_subscribe = 0;
1537 						if (user_callback_handle_rxflow(
1538 							    w->a.protocol->callback,
1539 							    w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1540 							    w->user_space, NULL, 0) < 0) {
1541 							lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1542 								 __func__);
1543 							return -1;
1544 						}
1545 						n = 1;
1546 						break;
1547 					}
1548 				} lws_end_foreach_ll(w, mux.sibling_list);
1549 
1550 				if (!n) {
1551 					lwsl_err("%s: unsolicited SUBACK\n",
1552 							__func__);
1553 					return -1;
1554 				}
1555 
1556 				/*
1557 				 * If we subscribed to something and SUBACK came,
1558 				 * our connection is definitely working in both
1559 				 * directions at the moment.
1560 				 */
1561 				lws_validity_confirmed(wsi);
1562 
1563 				break;
1564 
1565 			case LMQCP_STOC_UNSUBACK:
1566 			{
1567 				char requested_close = 0;
1568 				lwsl_info("%s: cmd_completion: UNSUBACK\n",
1569 						__func__);
1570 				/*
1571 				 * Figure out which child asked for this
1572 				 */
1573 				n = 0;
1574 				lws_start_foreach_ll(struct lws *, w,
1575 						      wsi->mux.child_list) {
1576 					if (w->mqtt->inside_unsubscribe &&
1577 					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1578 						struct lws *nwsi = lws_get_network_wsi(w);
1579 
1580 						/*
1581 						 * No more subscribers left,
1582 						 * remove the topic from nwsi
1583 						 */
1584 						lws_mqtt_client_remove_subs(nwsi->mqtt);
1585 
1586 						w->mqtt->inside_unsubscribe = 0;
1587 						if (user_callback_handle_rxflow(
1588 							    w->a.protocol->callback,
1589 							    w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1590 							    w->user_space, NULL, 0) < 0) {
1591 							lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1592 								 __func__);
1593 							requested_close = 1;
1594 						}
1595 						n = 1;
1596 
1597 						lws_sul_cancel(&w->mqtt->sul_unsuback_wait);
1598 						if (requested_close) {
1599 							__lws_close_free_wsi(w,
1600 									     0, "unsub ack cb");
1601 							break;
1602 						}
1603 						break;
1604 					}
1605 				} lws_end_foreach_ll(w, mux.sibling_list);
1606 
1607 				if (!n) {
1608 					lwsl_err("%s: unsolicited UNSUBACK\n",
1609 							__func__);
1610 					return -1;
1611 				}
1612 
1613 
1614 				/*
1615 				 * If we unsubscribed to something and
1616 				 * UNSUBACK came, our connection is
1617 				 * definitely working in both
1618 				 * directions at the moment.
1619 				 */
1620 				lws_validity_confirmed(wsi);
1621 
1622 				break;
1623 			}
1624 			case LMQCP_PUBLISH:
1625 			{
1626 				lws_mqtt_publish_param_t *pub =
1627 						(lws_mqtt_publish_param_t *)
1628 							wsi->mqtt->rx_cpkt_param;
1629 				size_t chunk;
1630 
1631 				if (pub == NULL) {
1632 					lwsl_notice("%s: no pub\n", __func__);
1633 					return -1;
1634 				}
1635 
1636 				/*
1637 				 * RX PUBLISH is delivered to any children that
1638 				 * registered for the related topic
1639 				 */
1640 
1641 				n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1642 
1643 				chunk = pub->payload_len - pub->payload_pos;
1644 				if (chunk > len)
1645 					chunk = len;
1646 
1647 				lws_start_foreach_ll(struct lws *, w,
1648 						      wsi->mux.child_list) {
1649 					if (lws_mqtt_find_sub(w->mqtt,
1650 							      pub->topic))
1651 						if (w->a.protocol->callback(
1652 							    w, (enum lws_callback_reasons)n,
1653 							    w->user_space,
1654 							    (void *)pub,
1655 							    chunk)) {
1656 								par->payload_consumed = 0;
1657 								lws_free_set_NULL(pub->topic);
1658 								lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1659 								return 1;
1660 							}
1661 				} lws_end_foreach_ll(w, mux.sibling_list);
1662 
1663 
1664 				pub->payload_pos += (uint32_t)chunk;
1665 				len -= chunk;
1666 				buf += chunk;
1667 
1668 				lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1669 					    __func__, (int)pub->payload_pos,
1670 					    (int)pub->payload_len, (int)len);
1671 
1672 				if (pub->payload_pos != pub->payload_len) {
1673 					/*
1674 					 * More chunks of the payload pending,
1675 					 * blocking this connection from doing
1676 					 * anything else
1677 					 */
1678 					par->state = LMQCPP_PAYLOAD;
1679 					break;
1680 				}
1681 
1682 				if (pub->qos == 1) {
1683 				/* For QOS = 1, send out PUBACK */
1684 					wsi->mqtt->send_puback = 1;
1685 					lws_callback_on_writable(wsi);
1686 				} else if (pub->qos == 2) {
1687 				/* For QOS = 2, send out PUBREC */
1688 					wsi->mqtt->send_pubrec = 1;
1689 					lws_callback_on_writable(wsi);
1690 				}
1691 
1692 				par->payload_consumed = 0;
1693 				lws_free_set_NULL(pub->topic);
1694 				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1695 
1696 				break;
1697 			}
1698 			default:
1699 				break;
1700 			}
1701 
1702 			break;
1703 
1704 
1705 		case LMQCPP_PROP_ID_VBI:
1706 			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1707 			case LMSPR_NEED_MORE:
1708 				break;
1709 			case LMSPR_COMPLETED:
1710 				par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed);
1711 				if (par->vbit.value >
1712 				    LWS_ARRAY_SIZE(property_valid)) {
1713 					lwsl_notice("%s: undef prop id 0x%x\n",
1714 						  __func__, (int)par->vbit.value);
1715 					goto send_protocol_error_and_close;
1716 				}
1717 				if (!(property_valid[par->vbit.value] &
1718 					(1 << ctl_pkt_type(par)))) {
1719 					lwsl_notice("%s: prop id 0x%x invalid for"
1720 						  " control pkt %d\n", __func__,
1721 						  (int)par->vbit.value,
1722 						  ctl_pkt_type(par));
1723 					goto send_protocol_error_and_close;
1724 				}
1725 				par->prop_id = par->vbit.value;
1726 				par->flag_prop_multi = !!(
1727 					par->props_seen[par->prop_id >> 3] &
1728 					(1 << (par->prop_id & 7)));
1729 				par->props_seen[par->prop_id >> 3] =
1730 						(uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7)));
1731 				/*
1732 				 *  even if it's not a vbi property arg,
1733 				 * .consumed of this will be zero the first time
1734 				 */
1735 				lws_mqtt_vbi_init(&par->vbit);
1736 				/*
1737 				 * if it's a string, next state must set the
1738 				 * destination and size limit itself.  But
1739 				 * resetting it generically here lets it use
1740 				 * lws_mqtt_str_first() to understand it's the
1741 				 * first time around.
1742 				 */
1743 				 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1744 
1745 				/* property arg state enums are so encoded */
1746 				par->state = 0x100 | par->vbit.value;
1747 				break;
1748 			default:
1749 				lwsl_notice("%s: prop id bad vbi\n", __func__);
1750 				goto send_protocol_error_and_close;
1751 			}
1752 			break;
1753 
1754 		/*
1755 		 * All possible property payloads... restricting which ones
1756 		 * can appear in which control packets is already done above
1757 		 * in LMQCPP_PROP_ID_VBI
1758 		 */
1759 
1760 		case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1761 		case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1762 		case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1763 		case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1764 		case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1765 		case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1766 		case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1767 		case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1768 			if (par->flag_prop_multi)
1769 				goto singular_prop_seen_twice;
1770 			par->payload_format = *buf++;
1771 			len--;
1772 			if (lws_mqtt_pconsume(par, 1))
1773 				goto send_protocol_error_and_close;
1774 			break;
1775 
1776 		case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1777 		case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1778 		case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1779 		case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1780 			if (par->flag_prop_multi)
1781 				goto singular_prop_seen_twice;
1782 
1783 			if (lws_mqtt_mb_first(&par->vbit))
1784 				lws_mqtt_4byte_init(&par->vbit);
1785 
1786 			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1787 			case LMSPR_NEED_MORE:
1788 				break;
1789 			case LMSPR_COMPLETED:
1790 				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1791 					goto send_protocol_error_and_close;
1792 				break;
1793 			default:
1794 				goto send_protocol_error_and_close;
1795 			}
1796 			break;
1797 
1798 		case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1799 		case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1800 		case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1801 		case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1802 			if (par->flag_prop_multi)
1803 				goto singular_prop_seen_twice;
1804 
1805 			if (lws_mqtt_mb_first(&par->vbit))
1806 				lws_mqtt_2byte_init(&par->vbit);
1807 
1808 			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1809 			case LMSPR_NEED_MORE:
1810 				break;
1811 			case LMSPR_COMPLETED:
1812 				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1813 					goto send_protocol_error_and_close;
1814 				break;
1815 			default:
1816 				goto send_protocol_error_and_close;
1817 			}
1818 			break;
1819 
1820 		case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1821 		case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1822 		case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1823 		case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1824 		case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1825 		case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1826 		case LMQCPP_PROP_REASON_STRING_UTF8S:
1827 		case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1828 		case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1829 			if (par->flag_prop_multi)
1830 				goto singular_prop_seen_twice;
1831 
1832 			if (lws_mqtt_str_first(&par->s_temp))
1833 				lws_mqtt_str_init(&par->s_temp, par->temp,
1834 						  sizeof(par->temp), 0);
1835 
1836 			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1837 			case LMSPR_NEED_MORE:
1838 				break;
1839 			case LMSPR_COMPLETED:
1840 				if (lws_mqtt_pconsume(par, par->s_temp.len))
1841 					goto send_protocol_error_and_close;
1842 				break;
1843 
1844 			default:
1845 				lwsl_info("%s: bad protocol name\n", __func__);
1846 				goto send_protocol_error_and_close;
1847 			}
1848 			break;
1849 
1850 		case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1851 
1852 		case LMQCPP_PROP_CORRELATION_BINDATA:
1853 		case LMQCPP_PROP_AUTH_DATA_BINDATA:
1854 
1855 		/* TODO */
1856 			lwsl_err("%s: Unimplemented packet state 0x%x\n",
1857 					__func__, par->state);
1858 			return -1;
1859 		}
1860 	}
1861 
1862 	return 0;
1863 
1864 oom:
1865 	lwsl_err("%s: OOM!\n", __func__);
1866 	goto send_protocol_error_and_close;
1867 
1868 singular_prop_seen_twice:
1869 	lwsl_info("%s: property appears twice\n", __func__);
1870 
1871 send_protocol_error_and_close:
1872 	lwsl_notice("%s: peac\n", __func__);
1873 	par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1874 
1875 send_reason_and_close:
1876 	lwsl_notice("%s: srac\n", __func__);
1877 	par->flag_pending_send_reason_close = 1;
1878 	goto ask;
1879 
1880 send_unsupp_connack_and_close:
1881 	lwsl_notice("%s: unsupac\n", __func__);
1882 	par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1883 	par->flag_pending_send_connack_close = 1;
1884 
1885 ask:
1886 	/* Should we ask for clients? */
1887 	lws_callback_on_writable(wsi);
1888 
1889 	return -1;
1890 }
1891 
1892 int
lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type, uint8_t dup, lws_mqtt_qos_levels_t qos, uint8_t retain)1893 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1894 			   uint8_t dup, lws_mqtt_qos_levels_t qos,
1895 			   uint8_t retain)
1896 {
1897 	lws_mqtt_fixed_hdr_t hdr;
1898 
1899 	hdr.bits = 0;
1900 	hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf;
1901 
1902 	switch(ctrl_pkt_type) {
1903 	case LMQCP_PUBLISH:
1904 		hdr.flags.dup = !!dup;
1905 		/*
1906 		 * A PUBLISH Packet MUST NOT have both QoS bits set to
1907 		 * 1. If a Server or Client receives a PUBLISH Packet
1908 		 * which has both QoS bits set to 1 it MUST close the
1909 		 * Network Connection [MQTT-3.3.1-4].
1910 		 */
1911 		if (qos >= RESERVED_QOS_LEVEL) {
1912 			lwsl_err("%s: Unsupport QoS level 0x%x\n",
1913 				 __func__, qos);
1914 			return -1;
1915 		}
1916 		hdr.flags.qos = qos & 3;
1917 		hdr.flags.retain = !!retain;
1918 		break;
1919 
1920 	case LMQCP_CTOS_CONNECT:
1921 	case LMQCP_STOC_CONNACK:
1922 	case LMQCP_PUBACK:
1923 	case LMQCP_PUBREC:
1924 	case LMQCP_PUBCOMP:
1925 	case LMQCP_STOC_SUBACK:
1926 	case LMQCP_STOC_UNSUBACK:
1927 	case LMQCP_CTOS_PINGREQ:
1928 	case LMQCP_STOC_PINGRESP:
1929 	case LMQCP_DISCONNECT:
1930 	case LMQCP_AUTH:
1931 		hdr.bits &= 0xf0;
1932 		break;
1933 
1934 	/*
1935 	 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1936 	 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1937 	 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1938 	 * treat any other value as malformed and close the Network
1939 	 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1940 	 */
1941 	case LMQCP_PUBREL:
1942 	case LMQCP_CTOS_SUBSCRIBE:
1943 	case LMQCP_CTOS_UNSUBSCRIBE:
1944 		hdr.bits |= 0x02;
1945 		break;
1946 
1947 	default:
1948 		return -1;
1949 	}
1950 
1951 	*p = hdr.bits;
1952 
1953 	return 0;
1954 }
1955 
1956 int
lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, const void *buf, uint32_t len, int is_complete)1957 lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1958 			     const void *buf, uint32_t len, int is_complete)
1959 {
1960 	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
1961 	uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1962 	struct lws *nwsi = lws_get_network_wsi(wsi);
1963 	lws_mqtt_str_t mqtt_vh_payload;
1964 	uint32_t vh_len, rem_len;
1965 
1966 	assert(pub->topic);
1967 
1968 	lwsl_debug("%s: len = %d, is_complete = %d\n",
1969 		   __func__, (int)len, (int)is_complete);
1970 
1971 	if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1972 		lwsl_err("%s: %s: unknown state 0x%x\n", __func__,
1973 				lws_wsi_tag(wsi), lwsi_state(wsi));
1974 		assert(0);
1975 		return 1;
1976 	}
1977 
1978 	if (wsi->mqtt->inside_payload) {
1979 		/*
1980 		 * Headers are filled, we are sending
1981 		 * the payload - a buffer with LWS_PRE
1982 		 * in front it.
1983 		 */
1984 		start = (uint8_t *)buf;
1985 		p = start + len;
1986 		if (is_complete)
1987 			wsi->mqtt->inside_payload = 0;
1988 		goto do_write;
1989 	}
1990 
1991 	start = b + LWS_PRE;
1992 	p = start;
1993 	/*
1994 	 * Fill headers and the first chunk of the
1995 	 * payload (if any)
1996 	 */
1997 	if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1998 				       pub->dup, pub->qos, pub->retain)) {
1999 		lwsl_err("%s: Failed to fill fixed header\n", __func__);
2000 		return 1;
2001 	}
2002 
2003 	/*
2004 	 * Topic len field + Topic len + Packet ID
2005 	 * (for QOS>0) + Payload len
2006 	 */
2007 	vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
2008 	rem_len = vh_len + pub->payload_len;
2009 	lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
2010 
2011 	/* Will the chunk of payload fit? */
2012 	if ((vh_len + len) >=
2013 	    (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
2014 		lwsl_err("%s: Payload is too big\n", __func__);
2015 		return 1;
2016 	}
2017 
2018 	p += lws_mqtt_vbi_encode(rem_len, p);
2019 
2020 	/* Topic's Len */
2021 	lws_ser_wu16be(p, pub->topic_len);
2022 	p += 2;
2023 
2024 	/*
2025 	 * Init lws_mqtt_str for "MQTT Variable
2026 	 * Headers + payload" (only the supplied
2027 	 * chuncked payload)
2028 	 */
2029 	lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
2030 			  (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len),
2031 			  0);
2032 
2033 	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2034 	lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
2035 	if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
2036 		lwsl_err("%s: a\n", __func__);
2037 		return 1;
2038 	}
2039 
2040 	/* Packet ID */
2041 	if (pub->qos != QOS0) {
2042 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2043 		if (!pub->dup)
2044 			nwsi->mqtt->pkt_id++;
2045 		wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id;
2046 		lwsl_debug("%s: pkt_id = %d\n", __func__,
2047 			   (int)wsi->mqtt->ack_pkt_id);
2048 		lws_ser_wu16be(p, pub->packet_id);
2049 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
2050 			lwsl_err("%s: b\n", __func__);
2051 			return 1;
2052 		}
2053 	}
2054 
2055 	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2056 	memcpy(p, buf, len);
2057 	if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
2058 		return 1;
2059 	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2060 
2061 	if (!is_complete)
2062 		nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
2063 
2064 do_write:
2065 
2066 	// lwsl_hexdump_err(start, lws_ptr_diff(p, start));
2067 
2068 	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2069 			lws_ptr_diff(p, start)) {
2070 		lwsl_err("%s: write failed\n", __func__);
2071 		return 1;
2072 	}
2073 
2074 	if (!is_complete) {
2075 		/* still some more chunks to come... */
2076 		lws_callback_on_writable(wsi);
2077 
2078 		return 0;
2079 	}
2080 
2081 	wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
2082 
2083 	if (pub->qos != QOS0)
2084 		wsi->mqtt->unacked_publish = 1;
2085 
2086 	/* this was the last part of the publish message */
2087 
2088 	if (pub->qos == QOS0) {
2089 		/*
2090 		 * There won't be any real PUBACK, act like we got one
2091 		 * so the user callback logic is the same for QoS0 or
2092 		 * QoS1
2093 		 */
2094 		if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
2095 					    wsi->user_space, NULL, 0)) {
2096 			lwsl_err("%s: ACK callback exited\n", __func__);
2097 			return 1;
2098 		}
2099 	} else if (pub->qos == QOS1 || pub->qos == QOS2) {
2100 		/* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s,
2101 		 * we must RETRY the publish
2102 		 */
2103 		wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend;
2104 		__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2105 				    &wsi->mqtt->sul_qos_puback_pubrec_wait,
2106 				    3 * LWS_USEC_PER_SEC);
2107 	}
2108 
2109 	if (wsi->mqtt->inside_shadow) {
2110 		wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout;
2111 		__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2112 				    &wsi->mqtt->sul_shadow_wait,
2113 				    60 * LWS_USEC_PER_SEC);
2114 	}
2115 
2116 	return 0;
2117 }
2118 
2119 int
lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)2120 lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
2121 {
2122 	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2123 	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2124 	struct lws *nwsi = lws_get_network_wsi(wsi);
2125 	lws_mqtt_str_t mqtt_vh_payload;
2126 	uint8_t exists[8], extant;
2127 	lws_mqtt_subs_t *mysub;
2128 	uint32_t rem_len;
2129 #if defined(_DEBUG)
2130 	uint32_t tops;
2131 #endif
2132 	uint32_t n;
2133 
2134 	assert(sub->num_topics);
2135 	assert(sub->num_topics < sizeof(exists));
2136 
2137 	switch (lwsi_state(wsi)) {
2138 	case LRS_ESTABLISHED: /* Protocol connection established */
2139 		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
2140 					       0, 0, 0)) {
2141 			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2142 			return 1;
2143 		}
2144 
2145 		/*
2146 		 * The stream wants to subscribe to one or more topic, but
2147 		 * the shared nwsi may already be subscribed to some or all of
2148 		 * them from interactions with other streams.  For those cases,
2149 		 * we filter them from the list the child wants until we just
2150 		 * have ones that are new to the nwsi.  If nothing left, we just
2151 		 * synthesize the callback to the child as if SUBACK had come
2152 		 * and we're done, otherwise just ask the server for topics that
2153 		 * are new to the wsi.
2154 		 */
2155 
2156 		extant = 0;
2157 		memset(&exists, 0, sizeof(exists));
2158 		for (n = 0; n < sub->num_topics; n++) {
2159 			lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
2160 				  __func__, (int)n, sub->topic[n].name);
2161 
2162 			mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
2163 			if (mysub && mysub->ref_count) {
2164 				mysub->ref_count++; /* another stream using it */
2165 				exists[n] = 1;
2166 				extant++;
2167 			}
2168 
2169 			/*
2170 			 * Attach the topic we're subscribing to, to wsi->mqtt
2171 			 */
2172 			if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
2173 				lwsl_err("%s: create sub fail\n", __func__);
2174 				return 1;
2175 			}
2176 		}
2177 
2178 		if (extant == sub->num_topics) {
2179 			/*
2180 			 * It turns out there's nothing to do here, the nwsi has
2181 			 * already subscribed to all the topics this stream
2182 			 * wanted.  Just tell it it can have them.
2183 			 */
2184 			lwsl_notice("%s: all topics already subscribed\n", __func__);
2185 			if (user_callback_handle_rxflow(
2186 				    wsi->a.protocol->callback,
2187 				    wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
2188 				    wsi->user_space, NULL, 0) < 0) {
2189 				lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
2190 					 __func__);
2191 				return -1;
2192 			}
2193 
2194 			return 0;
2195 		}
2196 
2197 #if defined(_DEBUG)
2198 		/*
2199 		 * zero or more of the topics already existed, but not all,
2200 		 * so we must go to the server with a filtered list of the
2201 		 * new ones only
2202 		 */
2203 
2204 		tops = sub->num_topics - extant;
2205 #endif
2206 
2207 		/*
2208 		 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
2209 		 */
2210 		rem_len = 2;
2211 		for (n = 0; n < sub->num_topics; n++)
2212 			if (!exists[n])
2213 				rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
2214 
2215 		wsi->mqtt->sub_size = (uint16_t)rem_len;
2216 
2217 #if defined(_DEBUG)
2218 		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2219 			   __func__, (int)tops, (int)rem_len);
2220 #endif
2221 
2222 		p += lws_mqtt_vbi_encode(rem_len, p);
2223 
2224 		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2225 					       wsi->a.context->pt_serv_buf_size) {
2226 			lwsl_err("%s: Payload is too big\n", __func__);
2227 			return 1;
2228 		}
2229 
2230 		/* Init lws_mqtt_str */
2231 		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2232 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2233 
2234 		/* Packet ID */
2235 		wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id;
2236 		lwsl_debug("%s: pkt_id = %d\n", __func__,
2237 			   (int)sub->packet_id);
2238 		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2239 
2240 		nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2241 
2242 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2243 			return 1;
2244 
2245 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2246 
2247 		for (n = 0; n < sub->num_topics; n++) {
2248 			lwsl_info("%s: topics[%d] = %s\n", __func__,
2249 				   (int)n, sub->topic[n].name);
2250 
2251 			/* if the nwsi already has it, don't ask server for it */
2252 			if (exists[n]) {
2253 				lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
2254 					    __func__, (int)n, sub->topic[n].name);
2255 				continue;
2256 			}
2257 
2258 			/*
2259 			 * Attach the topic we're subscribing to, to nwsi->mqtt
2260 			 * so we know the nwsi itself has a subscription to it
2261 			 */
2262 
2263 			if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
2264 				return 1;
2265 
2266 			/* Topic's Len */
2267 			lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
2268 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2269 				return 1;
2270 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2271 
2272 			/* Topic Name */
2273 			lws_strncpy((char *)p, sub->topic[n].name,
2274 				    strlen(sub->topic[n].name) + 1);
2275 			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2276 						 (int)strlen(sub->topic[n].name)))
2277 				return 1;
2278 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2279 
2280 			/* QoS */
2281 			*p = (uint8_t)sub->topic[n].qos;
2282 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
2283 				return 1;
2284 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2285 		}
2286 		break;
2287 
2288 	default:
2289 		return 1;
2290 	}
2291 
2292 	if (wsi->mqtt->inside_resume_session)
2293 		return 0;
2294 
2295 	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2296 					lws_ptr_diff(p, start))
2297 		return 1;
2298 
2299 	wsi->mqtt->inside_subscribe = 1;
2300 
2301 	return 0;
2302 }
2303 
2304 int
lws_mqtt_client_send_unsubcribe(struct lws *wsi, const lws_mqtt_subscribe_param_t *unsub)2305 lws_mqtt_client_send_unsubcribe(struct lws *wsi,
2306 				const lws_mqtt_subscribe_param_t *unsub)
2307 {
2308 	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2309 	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2310 	struct lws *nwsi = lws_get_network_wsi(wsi);
2311 	lws_mqtt_str_t mqtt_vh_payload;
2312 	uint8_t send_unsub[8], orphaned;
2313 	uint32_t rem_len, n;
2314 	lws_mqtt_subs_t *mysub;
2315 #if defined(_DEBUG)
2316 	uint32_t tops;
2317 #endif
2318 
2319 	lwsl_info("%s: Enter\n", __func__);
2320 
2321 	switch (lwsi_state(wsi)) {
2322 	case LRS_ESTABLISHED: /* Protocol connection established */
2323 		orphaned = 0;
2324 		memset(&send_unsub, 0, sizeof(send_unsub));
2325 		for (n = 0; n < unsub->num_topics; n++) {
2326 			mysub = lws_mqtt_find_sub(nwsi->mqtt,
2327 						  unsub->topic[n].name);
2328 			assert(mysub);
2329 
2330 			if (mysub && --mysub->ref_count == 0) {
2331 				lwsl_notice("%s: Need to send UNSUB\n", __func__);
2332 				send_unsub[n] = 1;
2333 				orphaned++;
2334 			}
2335 		}
2336 
2337 		if (!orphaned) {
2338 			/*
2339 			 * The nwsi still has other subscribers bound to the
2340 			 * topics.
2341 			 *
2342 			 * So, don't send UNSUB to server, and just fake the
2343 			 * UNSUB ACK event for the guy going away.
2344 			 */
2345 			lwsl_notice("%s: unsubscribed!\n", __func__);
2346 			if (user_callback_handle_rxflow(
2347 				    wsi->a.protocol->callback,
2348 				    wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
2349 				    wsi->user_space, NULL, 0) < 0) {
2350 				/*
2351 				 * We can't directly close here, because the
2352 				 * caller still has the wsi.  Inform the
2353 				 * caller that we want to close
2354 				 */
2355 
2356 				return 1;
2357 			}
2358 
2359 			return 0;
2360 		}
2361 #if defined(_DEBUG)
2362 		/*
2363 		 * one or more of the topics needs to be unsubscribed
2364 		 * from, so we must go to the server with a filtered
2365 		 * list of the new ones only
2366 		 */
2367 
2368 		tops = orphaned;
2369 #endif
2370 
2371 		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2372 					       0, 0, 0)) {
2373 			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2374 			return 1;
2375 		}
2376 
2377 		/*
2378 		 * Pid + (Topic len field + Topic len) x Num of Topics
2379 		 */
2380 		rem_len = 2;
2381 		for (n = 0; n < unsub->num_topics; n++)
2382 			if (send_unsub[n])
2383 				rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2384 
2385 		wsi->mqtt->sub_size = (uint16_t)rem_len;
2386 
2387 #if defined(_DEBUG)
2388 		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2389 			   __func__, (int)tops, (int)rem_len);
2390 #endif
2391 
2392 		p += lws_mqtt_vbi_encode(rem_len, p);
2393 
2394 		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2395 					       wsi->a.context->pt_serv_buf_size) {
2396 			lwsl_err("%s: Payload is too big\n", __func__);
2397 			return 1;
2398 		}
2399 
2400 		/* Init lws_mqtt_str */
2401 		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2402 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2403 
2404 		/* Packet ID */
2405 		wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2406 		lwsl_debug("%s: pkt_id = %d\n", __func__,
2407 			   (int)wsi->mqtt->ack_pkt_id);
2408 		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2409 
2410 		nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2411 
2412 		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2413 			return 1;
2414 
2415 		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2416 
2417 		for (n = 0; n < unsub->num_topics; n++) {
2418 			lwsl_info("%s: topics[%d] = %s\n", __func__,
2419 				   (int)n, unsub->topic[n].name);
2420 
2421 			/*
2422 			 * Subscriber still bound to it, don't UBSUB
2423 			 * from the server
2424 			 */
2425 			if (!send_unsub[n])
2426 				continue;
2427 
2428 			/* Topic's Len */
2429 			lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2430 			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2431 				return 1;
2432 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2433 
2434 			/* Topic Name */
2435 			lws_strncpy((char *)p, unsub->topic[n].name,
2436 				    strlen(unsub->topic[n].name) + 1);
2437 			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2438 						 (int)strlen(unsub->topic[n].name)))
2439 				return 1;
2440 			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2441 		}
2442 		break;
2443 
2444 	default:
2445 		return 1;
2446 	}
2447 
2448 	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2449 					lws_ptr_diff(p, start))
2450 		return 1;
2451 
2452 	wsi->mqtt->inside_unsubscribe = 1;
2453 
2454 	wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout;
2455 	__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2456 			    &wsi->mqtt->sul_unsuback_wait,
2457 			    3 * LWS_USEC_PER_SEC);
2458 
2459 	return 0;
2460 }
2461 
2462 /*
2463  * This is called when child streams bind to an already-existing and compatible
2464  * MQTT stream
2465  */
2466 
2467 struct lws *
lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)2468 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2469 {
2470 	/* no more children allowed by parent? */
2471 
2472 	if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2473 		lwsl_err("%s: reached concurrent stream limit\n", __func__);
2474 		return NULL;
2475 	}
2476 
2477 #if defined(LWS_WITH_CLIENT)
2478 	wsi->client_mux_substream = 1;
2479 #endif
2480 
2481 	lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2482 
2483 	if (lws_ensure_user_space(wsi))
2484 		goto bail1;
2485 
2486 	lws_mqtt_set_client_established(wsi);
2487 	lws_callback_on_writable(wsi);
2488 
2489 	return wsi;
2490 
2491 bail1:
2492 	/* undo the insert */
2493 	parent_wsi->mux.child_list = wsi->mux.sibling_list;
2494 	parent_wsi->mux.child_count--;
2495 
2496 	if (wsi->user_space)
2497 		lws_free_set_NULL(wsi->user_space);
2498 
2499 	wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2500 	lws_free(wsi);
2501 
2502 	return NULL;
2503 }
2504 
2505