1/*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 *
24 * MQTT v5
25 *
26 * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27 *
28 * Control Packet structure
29 *
30 *  - Always:           2+ byte:  Fixed Hdr
31 *  - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32 *  - Required in some: variable: Payload
33 *
34 * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35 *
36 *  - Client Identifier
37 *  - Will Properties
38 *  - Will Topic
39 *  - Will Payload
40 *  - User Name
41 *  - Password
42 */
43
44#include "private-lib-core.h"
45#include <string.h>
46#include <sys/types.h>
47#include <assert.h>
48
49typedef enum {
50	LMQPRS_AWAITING_CONNECT,
51
52} lws_mqtt_protocol_server_connstate_t;
53
54const char * const reason_names_g1[] = {
55	"Success / Normal disconnection / QoS0",
56	"QoS1",
57	"QoS2",
58	"Disconnect Will",
59	"No matching subscriber",
60	"No subscription existed",
61	"Continue authentication",
62	"Re-authenticate"
63};
64
65const char * const reason_names_g2[] = {
66	"Unspecified error",
67	"Malformed packet",
68	"Protocol error",
69	"Implementation specific error",
70	"Unsupported protocol",
71	"Client ID invalid",
72	"Bad credentials",
73	"Not Authorized",
74	"Server Unavailable",
75	"Server Busy",
76	"Banned",
77	"Server Shutting Down",
78	"Bad Authentication Method",
79	"Keepalive Timeout",
80	"Session taken over",
81	"Topic Filter Invalid",
82	"Packet ID in use",
83	"Packet ID not found",
84	"Max RX Exceeded",
85	"Topic Alias Invalid",
86	"Packet too large",
87	"Ratelimit",
88	"Quota Exceeded",
89	"Administrative Action",
90	"Payload format invalid",
91	"Retain not supported",
92	"QoS not supported",
93	"Use another server",
94	"Server Moved",
95	"Shared subscriptions not supported",
96	"Connection rate exceeded",
97	"Maximum Connect Time",
98	"Subscription IDs not supported",
99	"Wildcard subscriptions not supported"
100};
101
102#define LMQCP_WILL_PROPERTIES 0
103
104/* For each property, a bitmap describing which commands it is valid for */
105
106static const uint16_t property_valid[] = {
107	[LMQPROP_PAYLOAD_FORMAT_INDICATOR]	= (1 << LMQCP_PUBLISH) |
108						  (1 << LMQCP_WILL_PROPERTIES),
109	[LMQPROP_MESSAGE_EXPIRY_INTERVAL]	= (1 << LMQCP_PUBLISH) |
110						  (1 << LMQCP_WILL_PROPERTIES),
111	[LMQPROP_CONTENT_TYPE]			= (1 << LMQCP_PUBLISH) |
112						  (1 << LMQCP_WILL_PROPERTIES),
113	[LMQPROP_RESPONSE_TOPIC]		= (1 << LMQCP_PUBLISH) |
114						  (1 << LMQCP_WILL_PROPERTIES),
115	[LMQPROP_CORRELATION_DATA]		= (1 << LMQCP_PUBLISH) |
116						  (1 << LMQCP_WILL_PROPERTIES),
117	[LMQPROP_SUBSCRIPTION_IDENTIFIER]	= (1 << LMQCP_PUBLISH) |
118						  (1 << LMQCP_CTOS_SUBSCRIBE),
119	[LMQPROP_SESSION_EXPIRY_INTERVAL]	= (1 << LMQCP_CTOS_CONNECT) |
120						  (1 << LMQCP_STOC_CONNACK) |
121						  (1 << LMQCP_DISCONNECT),
122	[LMQPROP_ASSIGNED_CLIENT_IDENTIFIER]	= (1 << LMQCP_STOC_CONNACK),
123	[LMQPROP_SERVER_KEEP_ALIVE]		= (1 << LMQCP_STOC_CONNACK),
124	[LMQPROP_AUTHENTICATION_METHOD]		= (1 << LMQCP_CTOS_CONNECT) |
125						  (1 << LMQCP_STOC_CONNACK) |
126						  (1 << LMQCP_AUTH),
127	[LMQPROP_AUTHENTICATION_DATA]		= (1 << LMQCP_CTOS_CONNECT) |
128						  (1 << LMQCP_STOC_CONNACK) |
129						  (1 << LMQCP_AUTH),
130	[LMQPROP_REQUEST_PROBLEM_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
131	[LMQPROP_WILL_DELAY_INTERVAL]		= (1 << LMQCP_WILL_PROPERTIES),
132	[LMQPROP_REQUEST_RESPONSE_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
133	[LMQPROP_RESPONSE_INFORMATION]		= (1 << LMQCP_STOC_CONNACK),
134	[LMQPROP_SERVER_REFERENCE]		= (1 << LMQCP_STOC_CONNACK) |
135						  (1 << LMQCP_DISCONNECT),
136	[LMQPROP_REASON_STRING]			= (1 << LMQCP_STOC_CONNACK) |
137						  (1 << LMQCP_PUBACK) |
138						  (1 << LMQCP_PUBREC) |
139						  (1 << LMQCP_PUBREL) |
140						  (1 << LMQCP_PUBCOMP) |
141						  (1 << LMQCP_STOC_SUBACK) |
142						  (1 << LMQCP_STOC_UNSUBACK) |
143						  (1 << LMQCP_DISCONNECT) |
144						  (1 << LMQCP_AUTH),
145	[LMQPROP_RECEIVE_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
146						  (1 << LMQCP_STOC_CONNACK),
147	[LMQPROP_TOPIC_ALIAS_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
148						  (1 << LMQCP_STOC_CONNACK),
149	[LMQPROP_TOPIC_ALIAS]			= (1 << LMQCP_PUBLISH),
150	[LMQPROP_MAXIMUM_QOS]			= (1 << LMQCP_STOC_CONNACK),
151	[LMQPROP_RETAIN_AVAILABLE]		= (1 << LMQCP_STOC_CONNACK),
152	[LMQPROP_USER_PROPERTY]			= (1 << LMQCP_CTOS_CONNECT) |
153						  (1 << LMQCP_STOC_CONNACK) |
154						  (1 << LMQCP_PUBLISH) |
155						  (1 << LMQCP_WILL_PROPERTIES) |
156						  (1 << LMQCP_PUBACK) |
157						  (1 << LMQCP_PUBREC) |
158						  (1 << LMQCP_PUBREL) |
159						  (1 << LMQCP_PUBCOMP) |
160						  (1 << LMQCP_CTOS_SUBSCRIBE) |
161						  (1 << LMQCP_STOC_SUBACK) |
162						  (1 << LMQCP_CTOS_UNSUBSCRIBE) |
163						  (1 << LMQCP_STOC_UNSUBACK) |
164						  (1 << LMQCP_DISCONNECT) |
165						  (1 << LMQCP_AUTH),
166	[LMQPROP_MAXIMUM_PACKET_SIZE]		= (1 << LMQCP_CTOS_CONNECT) |
167						  (1 << LMQCP_STOC_CONNACK),
168	[LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
169	[LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
170	[LMQPROP_SHARED_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK)
171};
172
173
174/*
175 * For each command index, maps flags, id, qos and payload legality
176 * notice in most cases PUBLISH requires further processing
177 */
178static const uint8_t map_flags[] = {
179	[LMQCP_RESERVED]		= 0x00,
180	[LMQCP_CTOS_CONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
181					  LMQCP_LUT_FLAG_PAYLOAD |
182					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
183	[LMQCP_STOC_CONNACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
184					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
185	[LMQCP_PUBLISH]			= LMQCP_LUT_FLAG_PAYLOAD | /* option */
186					  LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
187	[LMQCP_PUBACK]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
188					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
189	[LMQCP_PUBREC]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
190					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
191	[LMQCP_PUBREL]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
192					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
193	[LMQCP_PUBCOMP]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
194					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
195	[LMQCP_CTOS_SUBSCRIBE]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
196					  LMQCP_LUT_FLAG_PAYLOAD |
197					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
198	[LMQCP_STOC_SUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
199					  LMQCP_LUT_FLAG_PAYLOAD |
200					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
201	[LMQCP_CTOS_UNSUBSCRIBE]	= LMQCP_LUT_FLAG_RESERVED_FLAGS |
202					  LMQCP_LUT_FLAG_PAYLOAD |
203					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
204	[LMQCP_STOC_UNSUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
205					  LMQCP_LUT_FLAG_PAYLOAD |
206					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
207	[LMQCP_CTOS_PINGREQ]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
208					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
209	[LMQCP_STOC_PINGRESP]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
210					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
211	[LMQCP_DISCONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
212					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
213	[LMQCP_AUTH]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
214					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
215};
216
217static int
218lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
219{
220	par->consumed += (unsigned int)consumed;
221
222	if (par->consumed > par->props_len)
223		return -1;
224
225	/* more properties coming */
226
227	if (par->consumed < par->props_len) {
228		par->state = LMQCPP_PROP_ID_VBI;
229		return 0;
230	}
231
232	/* properties finished: are we headed for payload or idle? */
233
234	if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
235		/* A PUBLISH packet MUST NOT contain a Packet Identifier if
236		 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
237	    (ctl_pkt_type(par) != LMQCP_PUBLISH ||
238	     (par->packet_type_flags & 6))) {
239		par->state = LMQCPP_PAYLOAD;
240		return 0;
241	}
242
243	par->state = LMQCPP_IDLE;
244
245	return 0;
246}
247
248static int
249lws_mqtt_set_client_established(struct lws *wsi)
250{
251	lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
252			    &role_ops_mqtt);
253
254	if (user_callback_handle_rxflow(wsi->a.protocol->callback,
255					wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
256					wsi->user_space, NULL, 0) < 0) {
257		lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
258
259		return -1;
260	}
261	/*
262	 * If we made a new connection and got the ACK, our connection is
263	 * definitely working in both directions at the moment
264	 */
265	lws_validity_confirmed(wsi);
266
267	/* clear connection timeout */
268	lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
269
270	return 0;
271}
272
273static lws_mqtt_validate_topic_return_t
274lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
275{
276	size_t spos = 0;
277	const char *sub = topic;
278	int8_t slashes = 0;
279	lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
280
281	if (awsiot) {
282		if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
283			return LMVTR_FAILED_OVERSIZE;
284		if (topic[0] == '$') {
285			ret = LMVTR_VALID_SHADOW;
286			slashes = -3;
287		}
288	} else {
289		if (topiclen > LWS_MQTT_MAX_TOPICLEN)
290			return LMVTR_FAILED_OVERSIZE;
291		if (topic[0] == '$')
292			return LMVTR_FAILED_WILDCARD_FORMAT;
293	}
294
295	while (*sub != 0) {
296		if (sub[0] == '+') {
297			/* topic == "+foo" || "a/+foo" ? */
298			if (spos > 0 && sub[-1] != '/')
299				return LMVTR_FAILED_WILDCARD_FORMAT;
300
301			/* topic == "foo+" or "foo+/a" ? */
302			if (sub[1] != 0 && sub[1] != '/')
303				return LMVTR_FAILED_WILDCARD_FORMAT;
304
305			ret = LMVTR_VALID_WILDCARD;
306		} else if (sub[0] == '#') {
307			/* topic == "foo#" ? */
308			if (spos > 0 && sub[-1] != '/')
309				return LMVTR_FAILED_WILDCARD_FORMAT;
310
311			/* topic == "#foo" ? */
312			if (sub[1] != 0)
313				return LMVTR_FAILED_WILDCARD_FORMAT;
314
315			ret = LMVTR_VALID_WILDCARD;
316		} else if (sub[0] == '/') {
317			slashes++;
318		}
319		spos++;
320		sub++;
321	}
322
323	if (awsiot && (slashes < 0 || slashes > 7))
324		return LMVTR_FAILED_SHADOW_FORMAT;
325
326	return ret;
327}
328
329static lws_mqtt_subs_t *
330lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
331{
332	lws_mqtt_subs_t *mysub;
333	size_t topiclen = strlen(topic);
334	lws_mqtt_validate_topic_return_t flag;
335
336	flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
337	switch (flag) {
338	case LMVTR_FAILED_OVERSIZE:
339		lwsl_err("%s: Topic is too long\n",
340			 __func__);
341		return NULL;
342	case LMVTR_FAILED_SHADOW_FORMAT:
343	case LMVTR_FAILED_WILDCARD_FORMAT:
344		lwsl_err("%s: Invalid topic format \"%s\"\n",
345			 __func__, topic);
346		return NULL;
347
348	case LMVTR_VALID:
349	case LMVTR_VALID_WILDCARD:
350	case LMVTR_VALID_SHADOW:
351		mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub");
352		if (!mysub) {
353			lwsl_err("%s: Error allocating mysub\n",
354				 __func__);
355			return NULL;
356		}
357		mysub->wildcard = (flag == LMVTR_VALID_WILDCARD);
358		mysub->shadow = (flag == LMVTR_VALID_SHADOW);
359		break;
360
361	default:
362		lwsl_err("%s: Unknown flag - %d\n",
363			 __func__, flag);
364		return NULL;
365	}
366
367	mysub->next = mqtt->subs_head;
368	mqtt->subs_head = mysub;
369	memcpy(mysub->topic, topic, strlen(topic) + 1);
370	mysub->ref_count = 1;
371
372	lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
373		  __func__, mysub, mqtt);
374
375	return mysub;
376}
377
378static int
379lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
380{
381	lws_mqtt_subs_t *s = mqtt->subs_head;
382	lws_mqtt_subs_t *temp = NULL;
383
384
385	lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
386		  __func__, mqtt);
387
388	while (s && s->next) {
389		if (s->next->ref_count == 0)
390			break;
391		s = s->next;
392	}
393
394	if (s && s->next) {
395		temp = s->next;
396		lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
397			  __func__, temp, mqtt);
398		s->next = temp->next;
399		lws_free(temp);
400		return 0;
401	}
402	return 1;
403}
404
405/*
406 * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
407 * PUBREC came before the timeout period
408 */
409
410static void
411lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
412{
413	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
414			struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
415
416	lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
417
418	if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
419				mqtt->wsi->user_space, NULL, 0))
420		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
421}
422
423static void
424lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
425{
426	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
427			struct _lws_mqtt_related, sul_unsuback_wait);
428
429	lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
430
431	if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
432					   LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
433					   mqtt->wsi->user_space, NULL, 0))
434		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
435}
436
437static void
438lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul)
439{
440	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
441			struct _lws_mqtt_related, sul_shadow_wait);
442
443	lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
444
445	if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
446					    LWS_CALLBACK_MQTT_SHADOW_TIMEOUT,
447					    mqtt->wsi->user_space, NULL, 0))
448		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
449}
450
451void
452lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
453{
454	lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
455	c->estate = s;
456}
457
458lws_mqtt_match_topic_return_t
459lws_mqtt_is_topic_matched(const char* sub, const char* pub)
460{
461	const char *ppos = pub, *spos = sub;
462
463	if (!ppos || !spos) {
464		return LMMTR_TOPIC_MATCH_ERROR;
465	}
466
467	while (*spos) {
468		if (*ppos == '#' || *ppos == '+') {
469			lwsl_err("%s: PUBLISH to wildcard "
470				 "topic \"%s\" not supported\n",
471				 __func__, pub);
472			return LMMTR_TOPIC_MATCH_ERROR;
473		}
474		/* foo/+/bar == foo/xyz/bar ? */
475		if (*spos == '+') {
476			/* Skip ahead */
477			while (*ppos != '\0' && *ppos != '/') {
478				ppos++;
479			}
480		} else if (*spos == '#') {
481			return LMMTR_TOPIC_MATCH;
482		} else {
483			if (*ppos == '\0') {
484				/* foo/bar == foo/bar/# ? */
485				if (!strncmp(spos, "/#", 2))
486					return LMMTR_TOPIC_MATCH;
487				return LMMTR_TOPIC_NOMATCH;
488			/* Non-matching character */
489			} else if (*ppos != *spos) {
490				return LMMTR_TOPIC_NOMATCH;
491			}
492			ppos++;
493		}
494		spos++;
495	}
496
497	if (*spos == '\0' && *ppos == '\0')
498		return LMMTR_TOPIC_MATCH;
499
500	return LMMTR_TOPIC_NOMATCH;
501}
502
503lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
504				   const char* ptopic) {
505	lws_mqtt_subs_t *s = mqtt->subs_head;
506
507	while (s) {
508		/*  SUB topic  ==   PUB topic  ? */
509		/* foo/bar/xyz ==  foo/bar/xyz ? */
510		if (!s->wildcard) {
511			if (!strcmp((const char*)s->topic, ptopic))
512				return s;
513		} else {
514			if (lws_mqtt_is_topic_matched(
515			    s->topic, ptopic) == LMMTR_TOPIC_MATCH)
516				return s;
517		}
518
519		s = s->next;
520	}
521
522	return NULL;
523}
524
525int
526_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
527		    const uint8_t *buf, size_t len)
528{
529	struct lws *w;
530	int n;
531
532	if (par->flag_pending_send_reason_close)
533		return 0;
534
535	/*
536	 * Stateful, fragmentation-immune parser
537	 *
538	 * Notice that len can always be 1 if under attack, even over tls if
539	 * the server is compromised or malicious.
540	 */
541
542	while (len) {
543		lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
544		switch (par->state) {
545		case LMQCPP_IDLE:
546			par->packet_type_flags = *buf++;
547			len--;
548
549#if defined(LWS_WITH_CLIENT)
550			/*
551			 * The case where we sent the connect, but we received
552			 * something else before any CONNACK
553			 */
554			if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
555			    par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
556				lwsl_notice("%s: server sent non-CONNACK\n",
557						__func__);
558				goto send_protocol_error_and_close;
559			}
560#endif /* LWS_WITH_CLIENT */
561
562			n = map_flags[par->packet_type_flags >> 4];
563			/*
564			 *  Where a flag bit is marked as “Reserved”, it is
565			 *  reserved for future use and MUST be set to the value
566			 *  listed [MQTT-2.1.3-1].
567			 */
568			if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
569			    ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
570				lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n",
571					    __func__, lws_wsi_tag(wsi),
572					    par->packet_type_flags, n, (int)len + 1);
573				lwsl_hexdump_err(buf - 1, len + 1);
574				goto send_protocol_error_and_close;
575			}
576
577			lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
578				   __func__, par->packet_type_flags >> 4,
579				   par->packet_type_flags & 0xf);
580
581			/* allows us to know if a property that can only be
582			 * given once, appears twice */
583			memset(par->props_seen, 0, sizeof(par->props_seen));
584			par->state = par->packet_type_flags & 0xf0;
585			break;
586
587		case LMQCPP_CONNECT_PACKET:
588			lwsl_debug("%s: received CONNECT pkt\n", __func__);
589			par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
590			lws_mqtt_vbi_init(&par->vbit);
591			break;
592
593		case LMQCPP_CONNECT_REMAINING_LEN_VBI:
594			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
595			case LMSPR_NEED_MORE:
596				break;
597			case LMSPR_COMPLETED:
598				par->cpkt_remlen = par->vbit.value;
599				n = map_flags[ctl_pkt_type(par)];
600				lws_mqtt_str_init(&par->s_temp, par->temp,
601						  sizeof(par->temp), 0);
602				par->state = LMQCPP_CONNECT_VH_PNAME;
603				break;
604			default:
605				lwsl_notice("%s: bad vbi\n", __func__);
606				goto send_protocol_error_and_close;
607			}
608			break;
609
610		case LMQCPP_CONNECT_VH_PNAME:
611			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
612			case LMSPR_NEED_MORE:
613				break;
614			case LMSPR_COMPLETED:
615				if (par->s_temp.len != 4 ||
616				    memcmp(par->s_temp.buf, "MQTT",
617					   par->s_temp.len)) {
618					lwsl_notice("%s: protocol name: %.*s\n",
619						  __func__, par->s_temp.len,
620						  par->s_temp.buf);
621					goto send_unsupp_connack_and_close;
622				}
623				par->state = LMQCPP_CONNECT_VH_PVERSION;
624				break;
625			default:
626				lwsl_notice("%s: bad protocol name\n", __func__);
627				goto send_protocol_error_and_close;
628			}
629			break;
630
631		case LMQCPP_CONNECT_VH_PVERSION:
632			par->conn_protocol_version = *buf++;
633			len--;
634			if (par->conn_protocol_version != 5) {
635				lwsl_info("%s: unsupported MQTT version %d\n",
636					  __func__, par->conn_protocol_version);
637				goto send_unsupp_connack_and_close;
638			}
639			par->state = LMQCPP_CONNECT_VH_FLAGS;
640			break;
641
642		case LMQCPP_CONNECT_VH_FLAGS:
643			par->cpkt_flags = *buf++;
644			len--;
645			if (par->cpkt_flags & 1) {
646				/*
647				 * The Server MUST validate that the reserved
648				 * flag in the CONNECT packet is set to 0
649				 * [MQTT-3.1.2-3].
650				 */
651				par->reason = LMQCP_REASON_MALFORMED_PACKET;
652				goto send_reason_and_close;
653			}
654			/*
655			 * conn_flags specifies the Will Properties that should
656			 * appear in the payload section
657			 */
658			lws_mqtt_2byte_init(&par->vbit);
659			par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
660			break;
661
662		case LMQCPP_CONNECT_VH_KEEPALIVE:
663			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
664			case LMSPR_NEED_MORE:
665				break;
666			case LMSPR_COMPLETED:
667				par->keepalive = (uint16_t)par->vbit.value;
668				lws_mqtt_vbi_init(&par->vbit);
669				par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
670				break;
671			default:
672				lwsl_notice("%s: ka bad vbi\n", __func__);
673				goto send_protocol_error_and_close;
674			}
675			break;
676
677		case LMQCPP_PINGRESP_ZERO:
678			len--;
679			/* second byte of PINGRESP must be zero */
680			if (*buf++)
681				goto send_protocol_error_and_close;
682			goto cmd_completion;
683
684		case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
685			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
686			case LMSPR_NEED_MORE:
687				break;
688			case LMSPR_COMPLETED:
689				/* reset consumption counter */
690				par->consumed = 0;
691				par->props_len = par->vbit.value;
692				lws_mqtt_vbi_init(&par->vbit);
693				par->state = LMQCPP_PROP_ID_VBI;
694				break;
695			default:
696				lwsl_notice("%s: connpr bad vbi\n", __func__);
697				goto send_protocol_error_and_close;
698			}
699			break;
700
701		/* PUBREC */
702		case LMQCPP_PUBREC_PACKET:
703			lwsl_debug("%s: received PUBREC pkt\n", __func__);
704			lws_mqtt_vbi_init(&par->vbit);
705			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
706			case LMSPR_NEED_MORE:
707				break;
708			case LMSPR_COMPLETED:
709				par->cpkt_remlen = par->vbit.value;
710				lwsl_debug("%s: PUBREC pkt len = %d\n",
711					   __func__, (int)par->cpkt_remlen);
712				if (par->cpkt_remlen < 2)
713					goto send_protocol_error_and_close;
714				par->state = LMQCPP_PUBREC_VH_PKT_ID;
715				break;
716			default:
717				lwsl_notice("%s: pubrec bad vbi\n", __func__);
718				goto send_protocol_error_and_close;
719			}
720			break;
721
722		case LMQCPP_PUBREC_VH_PKT_ID:
723			if (len < 2) {
724				lwsl_notice("%s: len breakage 3\n", __func__);
725				return -1;
726			}
727
728			par->cpkt_id = lws_ser_ru16be(buf);
729			wsi->mqtt->ack_pkt_id = par->cpkt_id;
730			buf += 2;
731			len -= 2;
732			par->cpkt_remlen -= 2;
733			par->n = 0;
734
735			goto cmd_completion;
736
737		/* PUBREL */
738		case LMQCPP_PUBREL_PACKET:
739			lwsl_debug("%s: received PUBREL pkt\n", __func__);
740			lws_mqtt_vbi_init(&par->vbit);
741			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
742			case LMSPR_NEED_MORE:
743				break;
744			case LMSPR_COMPLETED:
745				par->cpkt_remlen = par->vbit.value;
746				lwsl_debug("%s: PUBREL pkt len = %d\n",
747					   __func__, (int)par->cpkt_remlen);
748				if (par->cpkt_remlen < 2)
749					goto send_protocol_error_and_close;
750				par->state = LMQCPP_PUBREL_VH_PKT_ID;
751				break;
752			default:
753				lwsl_err("%s: pubrel bad vbi\n", __func__);
754				goto send_protocol_error_and_close;
755			}
756			break;
757
758		case LMQCPP_PUBREL_VH_PKT_ID:
759			if (len < 2) {
760				lwsl_notice("%s: len breakage 3\n", __func__);
761				return -1;
762			}
763
764			par->cpkt_id = lws_ser_ru16be(buf);
765			wsi->mqtt->ack_pkt_id = par->cpkt_id;
766			buf += 2;
767			len -= 2;
768			par->cpkt_remlen -= 2;
769			par->n = 0;
770
771			goto cmd_completion;
772
773		/* PUBCOMP */
774		case LMQCPP_PUBCOMP_PACKET:
775			lwsl_debug("%s: received PUBCOMP pkt\n", __func__);
776			lws_mqtt_vbi_init(&par->vbit);
777			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
778			case LMSPR_NEED_MORE:
779				break;
780			case LMSPR_COMPLETED:
781				par->cpkt_remlen = par->vbit.value;
782				lwsl_debug("%s: PUBCOMP pkt len = %d\n",
783					   __func__, (int)par->cpkt_remlen);
784				if (par->cpkt_remlen < 2)
785					goto send_protocol_error_and_close;
786				par->state = LMQCPP_PUBCOMP_VH_PKT_ID;
787				break;
788			default:
789				lwsl_err("%s: pubcmp bad vbi\n", __func__);
790				goto send_protocol_error_and_close;
791			}
792			break;
793
794		case LMQCPP_PUBCOMP_VH_PKT_ID:
795			if (len < 2) {
796				lwsl_notice("%s: len breakage 3\n", __func__);
797				return -1;
798			}
799
800			par->cpkt_id = lws_ser_ru16be(buf);
801			wsi->mqtt->ack_pkt_id = par->cpkt_id;
802			buf += 2;
803			len -= 2;
804			par->cpkt_remlen -= 2;
805			par->n = 0;
806
807			goto cmd_completion;
808
809		case LMQCPP_PUBLISH_PACKET:
810			if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
811				lwsl_notice("%s: Topic rx before subscribing\n",
812					    __func__);
813				goto send_protocol_error_and_close;
814			}
815			lwsl_info("%s: received PUBLISH pkt\n", __func__);
816			par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
817			lws_mqtt_vbi_init(&par->vbit);
818			break;
819		case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
820			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
821			case LMSPR_NEED_MORE:
822				break;
823			case LMSPR_COMPLETED:
824				par->cpkt_remlen = par->vbit.value;
825				lwsl_debug("%s: PUBLISH pkt len = %d\n",
826					   __func__, (int)par->cpkt_remlen);
827				/* Move on to PUBLISH's variable header */
828				par->state = LMQCPP_PUBLISH_VH_TOPIC;
829				break;
830			default:
831				lwsl_notice("%s: pubrem bad vbi\n", __func__);
832				goto send_protocol_error_and_close;
833			}
834			break;
835
836		case LMQCPP_PUBLISH_VH_TOPIC:
837		{
838			lws_mqtt_publish_param_t *pub = NULL;
839
840			if (len < 2) {
841				lwsl_notice("%s: topic too short\n", __func__);
842				return -1;
843			}
844
845			/* Topic len */
846			par->n = lws_ser_ru16be(buf);
847			buf += 2;
848			len -= 2;
849
850			if (len < par->n) {/* the way this is written... */
851				lwsl_notice("%s: len breakage\n", __func__);
852				return -1;
853			}
854
855			/* Invalid topic len */
856			if (par->n == 0) {
857				lwsl_notice("%s: zero topic len\n", __func__);
858				par->reason = LMQCP_REASON_MALFORMED_PACKET;
859				goto send_reason_and_close;
860			}
861			lwsl_debug("%s: PUBLISH topic len %d\n",
862				   __func__, (int)par->n);
863			assert(!wsi->mqtt->rx_cpkt_param);
864			wsi->mqtt->rx_cpkt_param = lws_zalloc(
865				sizeof(lws_mqtt_publish_param_t), "rx pub param");
866			if (!wsi->mqtt->rx_cpkt_param)
867				goto oom;
868			pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
869
870			pub->topic_len = (uint16_t)par->n;
871
872			/* Topic Name */
873			pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
874							"rx publish topic");
875			if (!pub->topic)
876				goto oom;
877			lws_strncpy(pub->topic, (const char *)buf,
878				    (size_t)pub->topic_len + 1);
879			buf += pub->topic_len;
880			len -= pub->topic_len;
881
882			/* Extract QoS Level from Fixed Header Flags */
883			pub->qos = (lws_mqtt_qos_levels_t)
884					((par->packet_type_flags >> 1) & 0x3);
885
886			pub->payload_pos = 0;
887
888			pub->payload_len = par->cpkt_remlen -
889				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
890
891			switch (pub->qos) {
892			case QOS0:
893				par->state = LMQCPP_PAYLOAD;
894				if (pub->payload_len == 0)
895					goto cmd_completion;
896
897				break;
898			case QOS1:
899			case QOS2:
900				par->state = LMQCPP_PUBLISH_VH_PKT_ID;
901				break;
902			default:
903				par->reason = LMQCP_REASON_MALFORMED_PACKET;
904				lws_free_set_NULL(pub->topic);
905				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
906				goto send_reason_and_close;
907			}
908			break;
909		}
910		case LMQCPP_PUBLISH_VH_PKT_ID:
911		{
912			lws_mqtt_publish_param_t *pub =
913				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
914
915			if (len < 2) {
916				lwsl_notice("%s: len breakage 2\n", __func__);
917				return -1;
918			}
919
920			par->cpkt_id = lws_ser_ru16be(buf);
921			buf += 2;
922			len -= 2;
923			wsi->mqtt->peer_ack_pkt_id = par->cpkt_id;
924			lwsl_debug("%s: Packet ID %d\n",
925					__func__, (int)par->cpkt_id);
926			par->state = LMQCPP_PAYLOAD;
927			pub->payload_pos = 0;
928			pub->payload_len = par->cpkt_remlen -
929				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
930			if (pub->payload_len == 0)
931				goto cmd_completion;
932
933			break;
934		}
935		case LMQCPP_PAYLOAD:
936		{
937			lws_mqtt_publish_param_t *pub =
938				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
939			if (pub == NULL) {
940				lwsl_err("%s: Uninitialized pub_param\n",
941						__func__);
942				goto send_protocol_error_and_close;
943			}
944
945			pub->payload = buf;
946			goto cmd_completion;
947		}
948
949		case LMQCPP_CONNACK_PACKET:
950			if (!lwsi_role_client(wsi)) {
951				lwsl_err("%s: CONNACK is only Server to Client",
952						__func__);
953				goto send_unsupp_connack_and_close;
954			}
955
956			lwsl_debug("%s: received CONNACK pkt\n", __func__);
957			lws_mqtt_vbi_init(&par->vbit);
958			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
959			case LMSPR_NEED_MORE:
960				break;
961			case LMSPR_COMPLETED:
962				par->cpkt_remlen = par->vbit.value;
963				lwsl_debug("%s: CONNACK pkt len = %d\n",
964					   __func__, (int)par->cpkt_remlen);
965				if (par->cpkt_remlen != 2)
966					goto send_protocol_error_and_close;
967
968				par->state = LMQCPP_CONNACK_VH_FLAGS;
969				break;
970			default:
971				lwsl_notice("%s: connack bad vbi\n", __func__);
972				goto send_protocol_error_and_close;
973			}
974			break;
975
976		case LMQCPP_CONNACK_VH_FLAGS:
977		{
978			lws_mqttc_t *c = &wsi->mqtt->client;
979			par->cpkt_flags = *buf++;
980			len--;
981
982			if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
983				/*
984				 * Byte 1 is the "Connect Acknowledge
985				 * Flags". Bits 7-1 are reserved and
986				 * MUST be set to 0.
987				 */
988				par->reason = LMQCP_REASON_MALFORMED_PACKET;
989				goto send_reason_and_close;
990			}
991			/*
992			 * If the Server accepts a connection with
993			 * CleanSession set to 1, the Server MUST set
994			 * Session Present to 0 in the CONNACK packet
995			 * in addition to setting a zero return code
996			 * in the CONNACK packet [MQTT-3.2.2-1]. If
997			 * the Server accepts a connection with
998			 * CleanSession set to 0, the value set in
999			 * Session Present depends on whether the
1000			 * Server already has stored Session state for
1001			 * the supplied client ID. If the Server has
1002			 * stored Session state, it MUST set
1003			 * SessionPresent to 1 in the CONNACK packet
1004			 * [MQTT-3.2.2-2]. If the Server does not have
1005			 * stored Session state, it MUST set Session
1006			 * Present to 0 in the CONNACK packet. This is
1007			 * in addition to setting a zero return code
1008			 * in the CONNACK packet [MQTT-3.2.2-3].
1009			 */
1010			if ((c->conn_flags & LMQCFT_CLEAN_START) &&
1011			    (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
1012				goto send_protocol_error_and_close;
1013
1014			wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags &
1015						      LMQCFT_SESSION_PRESENT);
1016
1017			/* Move on to Connect Return Code */
1018			par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
1019			break;
1020		}
1021		case LMQCPP_CONNACK_VH_RETURN_CODE:
1022			par->conn_rc = *buf++;
1023			len--;
1024			/*
1025			 * If a server sends a CONNACK packet containing a
1026			 * non-zero return code it MUST then close the Network
1027			 * Connection [MQTT-3.2.2-5]
1028			 */
1029			switch (par->conn_rc) {
1030			case 0:
1031				goto cmd_completion;
1032			case 1:
1033			case 2:
1034			case 3:
1035			case 4:
1036			case 5:
1037				par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
1038						par->conn_rc - 1;
1039				goto send_reason_and_close;
1040			default:
1041				lwsl_notice("%s: bad connack retcode\n", __func__);
1042				goto send_protocol_error_and_close;
1043			}
1044			break;
1045
1046		/* SUBACK */
1047		case LMQCPP_SUBACK_PACKET:
1048			if (!lwsi_role_client(wsi)) {
1049				lwsl_err("%s: SUBACK is only Server to Client",
1050						__func__);
1051				goto send_unsupp_connack_and_close;
1052			}
1053
1054			lwsl_debug("%s: received SUBACK pkt\n", __func__);
1055			lws_mqtt_vbi_init(&par->vbit);
1056			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1057			case LMSPR_NEED_MORE:
1058				break;
1059			case LMSPR_COMPLETED:
1060				par->cpkt_remlen = par->vbit.value;
1061				lwsl_debug("%s: SUBACK pkt len = %d\n",
1062					   __func__, (int)par->cpkt_remlen);
1063				if (par->cpkt_remlen <= 2)
1064					goto send_protocol_error_and_close;
1065				par->state = LMQCPP_SUBACK_VH_PKT_ID;
1066				break;
1067			default:
1068				lwsl_notice("%s: suback bad vbi\n", __func__);
1069				goto send_protocol_error_and_close;
1070			}
1071			break;
1072
1073		case LMQCPP_SUBACK_VH_PKT_ID:
1074
1075			if (len < 2) {
1076				lwsl_notice("%s: len breakage 4\n", __func__);
1077				return -1;
1078			}
1079
1080			par->cpkt_id = lws_ser_ru16be(buf);
1081			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1082			buf += 2;
1083			len -= 2;
1084			par->cpkt_remlen -= 2;
1085			par->n = 0;
1086			par->state = LMQCPP_SUBACK_PAYLOAD;
1087			*par->temp = 0;
1088			break;
1089
1090		case LMQCPP_SUBACK_PAYLOAD:
1091		{
1092			lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
1093
1094			len--;
1095			switch (qos) {
1096				case QOS0:
1097				case QOS1:
1098				case QOS2:
1099					break;
1100				case FAILURE_QOS_LEVEL:
1101					goto send_protocol_error_and_close;
1102
1103				default:
1104					par->reason = LMQCP_REASON_MALFORMED_PACKET;
1105					goto send_reason_and_close;
1106			}
1107
1108			if (++(par->n) == par->cpkt_remlen) {
1109				par->n = 0;
1110				goto cmd_completion;
1111			}
1112
1113			break;
1114		}
1115
1116		/* UNSUBACK */
1117		case LMQCPP_UNSUBACK_PACKET:
1118			if (!lwsi_role_client(wsi)) {
1119				lwsl_err("%s: UNSUBACK is only Server to Client",
1120						__func__);
1121				goto send_unsupp_connack_and_close;
1122			}
1123
1124			lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
1125			lws_mqtt_vbi_init(&par->vbit);
1126			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1127			case LMSPR_NEED_MORE:
1128				break;
1129			case LMSPR_COMPLETED:
1130				par->cpkt_remlen = par->vbit.value;
1131				lwsl_debug("%s: UNSUBACK pkt len = %d\n",
1132					   __func__, (int)par->cpkt_remlen);
1133				if (par->cpkt_remlen < 2)
1134					goto send_protocol_error_and_close;
1135				par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
1136				break;
1137			default:
1138				lwsl_notice("%s: unsuback bad vbi\n", __func__);
1139				goto send_protocol_error_and_close;
1140			}
1141			break;
1142
1143		case LMQCPP_UNSUBACK_VH_PKT_ID:
1144
1145			if (len < 2) {
1146				lwsl_notice("%s: len breakage 3\n", __func__);
1147				return -1;
1148			}
1149
1150			par->cpkt_id = lws_ser_ru16be(buf);
1151			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1152			buf += 2;
1153			len -= 2;
1154			par->cpkt_remlen -= 2;
1155			par->n = 0;
1156
1157			goto cmd_completion;
1158
1159		case LMQCPP_PUBACK_PACKET:
1160			lws_mqtt_vbi_init(&par->vbit);
1161			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1162			case LMSPR_NEED_MORE:
1163				break;
1164			case LMSPR_COMPLETED:
1165				par->cpkt_remlen = par->vbit.value;
1166				lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
1167					  (int)par->cpkt_remlen);
1168				/*
1169				 * must be 4 or more, with special case that 2
1170				 * means success with no reason code or props
1171				 */
1172				if (par->cpkt_remlen <= 1 ||
1173				    par->cpkt_remlen == 3)
1174					goto send_protocol_error_and_close;
1175
1176				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1177				par->fixed_seen[2] = par->fixed_seen[3] = 0;
1178				par->fixed = 0;
1179				par->n = 0;
1180				break;
1181			default:
1182				lwsl_notice("%s: puback bad vbi\n", __func__);
1183				goto send_protocol_error_and_close;
1184			}
1185			break;
1186
1187		case LMQCPP_PUBACK_VH_PKT_ID:
1188			/*
1189			 * There are 3 fixed bytes and then a VBI for the
1190			 * property section length
1191			 */
1192			par->fixed_seen[par->fixed++] = *buf++;
1193			if (len < par->cpkt_remlen - par->n) {
1194				lwsl_notice("%s: len breakage 4\n", __func__);
1195				return -1;
1196			}
1197			len--;
1198			par->n++;
1199			if (par->fixed == 2)
1200				par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
1201
1202			if (par->fixed == 3) {
1203				lws_mqtt_vbi_init(&par->vbit);
1204				par->props_consumed = 0;
1205				par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
1206			}
1207			/* length of 2 is truncated packet and we completed it */
1208			if (par->cpkt_remlen == par->fixed)
1209				goto cmd_completion;
1210			break;
1211
1212		case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
1213			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1214			case LMSPR_NEED_MORE:
1215				break;
1216			case LMSPR_COMPLETED:
1217				par->props_len = par->vbit.value;
1218				lwsl_info("%s: PUBACK props len = %d\n",
1219					  __func__, (int)par->cpkt_remlen);
1220				/*
1221				 * If there are no properties, this is a
1222				 * command completion event in itself
1223				 */
1224				if (!par->props_len)
1225					goto cmd_completion;
1226
1227				/*
1228				 * Otherwise consume the properties before
1229				 * completing the command
1230				 */
1231				lws_mqtt_vbi_init(&par->vbit);
1232				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1233				break;
1234			default:
1235				lwsl_notice("%s: puback pr bad vbi\n", __func__);
1236				goto send_protocol_error_and_close;
1237			}
1238			break;
1239
1240		case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
1241			/*
1242			 * TODO: stash the props
1243			 */
1244			par->props_consumed++;
1245			len--;
1246			buf++;
1247			if (par->props_len != par->props_consumed)
1248				break;
1249
1250cmd_completion:
1251			/*
1252			 * We come here when we understood we just processed
1253			 * the last byte of a command packet, regardless of the
1254			 * packet type
1255			 */
1256			par->state = LMQCPP_IDLE;
1257
1258			switch (par->packet_type_flags >> 4) {
1259			case LMQCP_STOC_CONNACK:
1260				lwsl_info("%s: cmd_completion: CONNACK\n",
1261					  __func__);
1262
1263				/*
1264				 * Getting the CONNACK means we are the first,
1265				 * the nwsi, and we succeeded to create a new
1266				 * network connection ourselves.
1267				 *
1268				 * Since others may join us sharing the nwsi,
1269				 * and we may close while they still want to use
1270				 * it, our wsi lifecycle alone can no longer
1271				 * define the lifecycle of the nwsi... it means
1272				 * we need to do a "magic trick" and instead of
1273				 * being both the nwsi and act like a child
1274				 * stream, create a new wsi to take over the
1275				 * nwsi duties and turn our wsi into a child of
1276				 * the nwsi with its own lifecycle.
1277				 *
1278				 * The nwsi gets a mostly empty wsi->nwsi used
1279				 * to track already-subscribed topics globally
1280				 * for the connection.
1281				 */
1282
1283				/* we were under SENT_CLIENT_HANDSHAKE timeout */
1284				lws_set_timeout(wsi, 0, 0);
1285
1286				w = lws_create_new_server_wsi(wsi->a.vhost,
1287							      wsi->tsi, "mqtt_sid1");
1288				if (!w) {
1289					lwsl_notice("%s: sid 1 migrate failed\n",
1290							__func__);
1291					return -1;
1292				}
1293
1294				wsi->mux.highest_sid = 1;
1295				lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1296
1297				wsi->mux_substream = 1;
1298				w->mux_substream = 1;
1299				w->client_mux_substream = 1;
1300				wsi->client_mux_migrated = 1;
1301				wsi->told_user_closed = 1; /* don't tell nwsi closed */
1302
1303				lwsi_set_state(w, LRS_ESTABLISHED);
1304				lwsi_set_state(wsi, LRS_ESTABLISHED);
1305				lwsi_set_role(w, lwsi_role(wsi));
1306
1307#if defined(LWS_WITH_CLIENT)
1308				w->flags = wsi->flags;
1309#endif
1310
1311				w->mqtt = wsi->mqtt;
1312				wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1313				if (!wsi->mqtt)
1314					return -1;
1315				w->mqtt->wsi = w;
1316				w->a.protocol = wsi->a.protocol;
1317				if (w->user_space &&
1318				    !w->user_space_externally_allocated)
1319					lws_free_set_NULL(w->user_space);
1320				w->user_space = wsi->user_space;
1321				wsi->user_space = NULL;
1322				w->user_space_externally_allocated =
1323					wsi->user_space_externally_allocated;
1324				if (lws_ensure_user_space(w))
1325					goto bail1;
1326				w->a.opaque_user_data = wsi->a.opaque_user_data;
1327				wsi->a.opaque_user_data = NULL;
1328				w->stash = wsi->stash;
1329				wsi->stash = NULL;
1330
1331				lws_mux_mark_immortal(w);
1332
1333				lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n",
1334						__func__, lws_wsi_tag(wsi),
1335						lws_wsi_tag(w));
1336
1337				/*
1338				 * It was the last thing we were waiting for
1339				 * before we can be fully ESTABLISHED
1340				 */
1341				if (lws_mqtt_set_client_established(w)) {
1342					lwsl_notice("%s: set EST fail\n", __func__);
1343					return -1;
1344				}
1345
1346				/* get the ball rolling */
1347				lws_validity_confirmed(wsi);
1348
1349				/* well, add the queued guys as children */
1350				lws_wsi_mux_apply_queue(wsi);
1351				break;
1352
1353bail1:
1354				/* undo the insert */
1355				wsi->mux.child_list = w->mux.sibling_list;
1356				wsi->mux.child_count--;
1357
1358				if (w->user_space)
1359					lws_free_set_NULL(w->user_space);
1360				w->a.vhost->protocols[0].callback(w,
1361							LWS_CALLBACK_WSI_DESTROY,
1362							NULL, NULL, 0);
1363				__lws_vhost_unbind_wsi(w); /* cx + vh lock */
1364				lws_free(w);
1365
1366				return 0;
1367
1368			case LMQCP_PUBREC:
1369				lwsl_err("%s: cmd_completion: PUBREC\n",
1370						__func__);
1371				/*
1372				 * Figure out which child asked for this
1373				 */
1374				n = 0;
1375				lws_start_foreach_ll(struct lws *, w,
1376						     wsi->mux.child_list) {
1377					if (w->mqtt->unacked_publish &&
1378					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1379						char requested_close = 0;
1380
1381						w->mqtt->unacked_publish = 0;
1382						w->mqtt->unacked_pubrel = 1;
1383
1384						if (user_callback_handle_rxflow(
1385							    w->a.protocol->callback,
1386							    w, LWS_CALLBACK_MQTT_ACK,
1387							    w->user_space, NULL, 0) < 0) {
1388							lwsl_info("%s: MQTT_ACK requests close\n",
1389								 __func__);
1390							requested_close = 1;
1391						}
1392						n = 1;
1393
1394						/*
1395						 * We got an assertive PUBREC,
1396						 * no need for timeout wait
1397						 * any more
1398						 */
1399						lws_sul_cancel(&w->mqtt->
1400							  sul_qos_puback_pubrec_wait);
1401
1402						if (requested_close) {
1403							__lws_close_free_wsi(w,
1404								0, "ack cb");
1405							break;
1406						}
1407
1408						break;
1409					}
1410				} lws_end_foreach_ll(w, mux.sibling_list);
1411
1412				if (!n) {
1413					lwsl_err("%s: unsolicited PUBREC\n",
1414							__func__);
1415					return -1;
1416				}
1417				wsi->mqtt->send_pubrel = 1;
1418				lws_callback_on_writable(wsi);
1419				break;
1420
1421			case LMQCP_PUBCOMP:
1422				lwsl_err("%s: cmd_completion: PUBCOMP\n",
1423						__func__);
1424				n = 0;
1425				lws_start_foreach_ll(struct lws *, w,
1426						     wsi->mux.child_list) {
1427					if (w->mqtt->unacked_pubrel > 0 &&
1428					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1429						w->mqtt->unacked_pubrel = 0;
1430						n = 1;
1431					}
1432				} lws_end_foreach_ll(w, mux.sibling_list);
1433
1434				if (!n) {
1435					lwsl_err("%s: unsolicited PUBCOMP\n",
1436							__func__);
1437					return -1;
1438				}
1439
1440				/*
1441				 * If we published something and PUBCOMP arrived,
1442				 * our connection is definitely working in both
1443				 * directions at the moment.
1444				 */
1445				lws_validity_confirmed(wsi);
1446				break;
1447
1448			case LMQCP_PUBREL:
1449				lwsl_err("%s: cmd_completion: PUBREL\n",
1450						__func__);
1451				wsi->mqtt->send_pubcomp = 1;
1452				lws_callback_on_writable(wsi);
1453				break;
1454
1455			case LMQCP_PUBACK:
1456				lwsl_info("%s: cmd_completion: PUBACK\n",
1457						__func__);
1458
1459				/*
1460				 * Figure out which child asked for this
1461				 */
1462
1463				n = 0;
1464				lws_start_foreach_ll(struct lws *, w,
1465						      wsi->mux.child_list) {
1466					if (w->mqtt->unacked_publish &&
1467					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1468						char requested_close = 0;
1469
1470						w->mqtt->unacked_publish = 0;
1471						if (user_callback_handle_rxflow(
1472							    w->a.protocol->callback,
1473							    w, LWS_CALLBACK_MQTT_ACK,
1474							    w->user_space, NULL, 0) < 0) {
1475							lwsl_info("%s: MQTT_ACK requests close\n",
1476								 __func__);
1477							requested_close = 1;
1478						}
1479						n = 1;
1480
1481						/*
1482						 * We got an assertive PUBACK,
1483						 * no need for ACK timeout wait
1484						 * any more
1485						 */
1486						lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait);
1487
1488						if (requested_close) {
1489							__lws_close_free_wsi(w,
1490								0, "ack cb");
1491							break;
1492						}
1493
1494						break;
1495					}
1496				} lws_end_foreach_ll(w, mux.sibling_list);
1497
1498				if (!n) {
1499					lwsl_err("%s: unsolicited PUBACK\n",
1500							__func__);
1501					return -1;
1502				}
1503
1504				/*
1505				 * If we published something and it was acked,
1506				 * our connection is definitely working in both
1507				 * directions at the moment.
1508				 */
1509				lws_validity_confirmed(wsi);
1510				break;
1511
1512			case LMQCP_STOC_PINGRESP:
1513				lwsl_info("%s: cmd_completion: PINGRESP\n",
1514						__func__);
1515				/*
1516				 * If we asked for a PINGRESP and it came,
1517				 * our connection is definitely working in both
1518				 * directions at the moment.
1519				 */
1520				lws_validity_confirmed(wsi);
1521				break;
1522
1523			case LMQCP_STOC_SUBACK:
1524				lwsl_info("%s: cmd_completion: SUBACK\n",
1525						__func__);
1526
1527				/*
1528				 * Figure out which child asked for this
1529				 */
1530
1531				n = 0;
1532				lws_start_foreach_ll(struct lws *, w,
1533						      wsi->mux.child_list) {
1534					if (w->mqtt->inside_subscribe &&
1535					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1536						w->mqtt->inside_subscribe = 0;
1537						if (user_callback_handle_rxflow(
1538							    w->a.protocol->callback,
1539							    w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1540							    w->user_space, NULL, 0) < 0) {
1541							lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1542								 __func__);
1543							return -1;
1544						}
1545						n = 1;
1546						break;
1547					}
1548				} lws_end_foreach_ll(w, mux.sibling_list);
1549
1550				if (!n) {
1551					lwsl_err("%s: unsolicited SUBACK\n",
1552							__func__);
1553					return -1;
1554				}
1555
1556				/*
1557				 * If we subscribed to something and SUBACK came,
1558				 * our connection is definitely working in both
1559				 * directions at the moment.
1560				 */
1561				lws_validity_confirmed(wsi);
1562
1563				break;
1564
1565			case LMQCP_STOC_UNSUBACK:
1566			{
1567				char requested_close = 0;
1568				lwsl_info("%s: cmd_completion: UNSUBACK\n",
1569						__func__);
1570				/*
1571				 * Figure out which child asked for this
1572				 */
1573				n = 0;
1574				lws_start_foreach_ll(struct lws *, w,
1575						      wsi->mux.child_list) {
1576					if (w->mqtt->inside_unsubscribe &&
1577					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1578						struct lws *nwsi = lws_get_network_wsi(w);
1579
1580						/*
1581						 * No more subscribers left,
1582						 * remove the topic from nwsi
1583						 */
1584						lws_mqtt_client_remove_subs(nwsi->mqtt);
1585
1586						w->mqtt->inside_unsubscribe = 0;
1587						if (user_callback_handle_rxflow(
1588							    w->a.protocol->callback,
1589							    w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1590							    w->user_space, NULL, 0) < 0) {
1591							lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1592								 __func__);
1593							requested_close = 1;
1594						}
1595						n = 1;
1596
1597						lws_sul_cancel(&w->mqtt->sul_unsuback_wait);
1598						if (requested_close) {
1599							__lws_close_free_wsi(w,
1600									     0, "unsub ack cb");
1601							break;
1602						}
1603						break;
1604					}
1605				} lws_end_foreach_ll(w, mux.sibling_list);
1606
1607				if (!n) {
1608					lwsl_err("%s: unsolicited UNSUBACK\n",
1609							__func__);
1610					return -1;
1611				}
1612
1613
1614				/*
1615				 * If we unsubscribed to something and
1616				 * UNSUBACK came, our connection is
1617				 * definitely working in both
1618				 * directions at the moment.
1619				 */
1620				lws_validity_confirmed(wsi);
1621
1622				break;
1623			}
1624			case LMQCP_PUBLISH:
1625			{
1626				lws_mqtt_publish_param_t *pub =
1627						(lws_mqtt_publish_param_t *)
1628							wsi->mqtt->rx_cpkt_param;
1629				size_t chunk;
1630
1631				if (pub == NULL) {
1632					lwsl_notice("%s: no pub\n", __func__);
1633					return -1;
1634				}
1635
1636				/*
1637				 * RX PUBLISH is delivered to any children that
1638				 * registered for the related topic
1639				 */
1640
1641				n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1642
1643				chunk = pub->payload_len - pub->payload_pos;
1644				if (chunk > len)
1645					chunk = len;
1646
1647				lws_start_foreach_ll(struct lws *, w,
1648						      wsi->mux.child_list) {
1649					if (lws_mqtt_find_sub(w->mqtt,
1650							      pub->topic))
1651						if (w->a.protocol->callback(
1652							    w, (enum lws_callback_reasons)n,
1653							    w->user_space,
1654							    (void *)pub,
1655							    chunk)) {
1656								par->payload_consumed = 0;
1657								lws_free_set_NULL(pub->topic);
1658								lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1659								return 1;
1660							}
1661				} lws_end_foreach_ll(w, mux.sibling_list);
1662
1663
1664				pub->payload_pos += (uint32_t)chunk;
1665				len -= chunk;
1666				buf += chunk;
1667
1668				lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1669					    __func__, (int)pub->payload_pos,
1670					    (int)pub->payload_len, (int)len);
1671
1672				if (pub->payload_pos != pub->payload_len) {
1673					/*
1674					 * More chunks of the payload pending,
1675					 * blocking this connection from doing
1676					 * anything else
1677					 */
1678					par->state = LMQCPP_PAYLOAD;
1679					break;
1680				}
1681
1682				if (pub->qos == 1) {
1683				/* For QOS = 1, send out PUBACK */
1684					wsi->mqtt->send_puback = 1;
1685					lws_callback_on_writable(wsi);
1686				} else if (pub->qos == 2) {
1687				/* For QOS = 2, send out PUBREC */
1688					wsi->mqtt->send_pubrec = 1;
1689					lws_callback_on_writable(wsi);
1690				}
1691
1692				par->payload_consumed = 0;
1693				lws_free_set_NULL(pub->topic);
1694				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1695
1696				break;
1697			}
1698			default:
1699				break;
1700			}
1701
1702			break;
1703
1704
1705		case LMQCPP_PROP_ID_VBI:
1706			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1707			case LMSPR_NEED_MORE:
1708				break;
1709			case LMSPR_COMPLETED:
1710				par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed);
1711				if (par->vbit.value >
1712				    LWS_ARRAY_SIZE(property_valid)) {
1713					lwsl_notice("%s: undef prop id 0x%x\n",
1714						  __func__, (int)par->vbit.value);
1715					goto send_protocol_error_and_close;
1716				}
1717				if (!(property_valid[par->vbit.value] &
1718					(1 << ctl_pkt_type(par)))) {
1719					lwsl_notice("%s: prop id 0x%x invalid for"
1720						  " control pkt %d\n", __func__,
1721						  (int)par->vbit.value,
1722						  ctl_pkt_type(par));
1723					goto send_protocol_error_and_close;
1724				}
1725				par->prop_id = par->vbit.value;
1726				par->flag_prop_multi = !!(
1727					par->props_seen[par->prop_id >> 3] &
1728					(1 << (par->prop_id & 7)));
1729				par->props_seen[par->prop_id >> 3] =
1730						(uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7)));
1731				/*
1732				 *  even if it's not a vbi property arg,
1733				 * .consumed of this will be zero the first time
1734				 */
1735				lws_mqtt_vbi_init(&par->vbit);
1736				/*
1737				 * if it's a string, next state must set the
1738				 * destination and size limit itself.  But
1739				 * resetting it generically here lets it use
1740				 * lws_mqtt_str_first() to understand it's the
1741				 * first time around.
1742				 */
1743				 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1744
1745				/* property arg state enums are so encoded */
1746				par->state = 0x100 | par->vbit.value;
1747				break;
1748			default:
1749				lwsl_notice("%s: prop id bad vbi\n", __func__);
1750				goto send_protocol_error_and_close;
1751			}
1752			break;
1753
1754		/*
1755		 * All possible property payloads... restricting which ones
1756		 * can appear in which control packets is already done above
1757		 * in LMQCPP_PROP_ID_VBI
1758		 */
1759
1760		case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1761		case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1762		case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1763		case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1764		case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1765		case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1766		case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1767		case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1768			if (par->flag_prop_multi)
1769				goto singular_prop_seen_twice;
1770			par->payload_format = *buf++;
1771			len--;
1772			if (lws_mqtt_pconsume(par, 1))
1773				goto send_protocol_error_and_close;
1774			break;
1775
1776		case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1777		case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1778		case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1779		case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1780			if (par->flag_prop_multi)
1781				goto singular_prop_seen_twice;
1782
1783			if (lws_mqtt_mb_first(&par->vbit))
1784				lws_mqtt_4byte_init(&par->vbit);
1785
1786			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1787			case LMSPR_NEED_MORE:
1788				break;
1789			case LMSPR_COMPLETED:
1790				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1791					goto send_protocol_error_and_close;
1792				break;
1793			default:
1794				goto send_protocol_error_and_close;
1795			}
1796			break;
1797
1798		case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1799		case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1800		case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1801		case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1802			if (par->flag_prop_multi)
1803				goto singular_prop_seen_twice;
1804
1805			if (lws_mqtt_mb_first(&par->vbit))
1806				lws_mqtt_2byte_init(&par->vbit);
1807
1808			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1809			case LMSPR_NEED_MORE:
1810				break;
1811			case LMSPR_COMPLETED:
1812				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1813					goto send_protocol_error_and_close;
1814				break;
1815			default:
1816				goto send_protocol_error_and_close;
1817			}
1818			break;
1819
1820		case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1821		case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1822		case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1823		case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1824		case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1825		case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1826		case LMQCPP_PROP_REASON_STRING_UTF8S:
1827		case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1828		case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1829			if (par->flag_prop_multi)
1830				goto singular_prop_seen_twice;
1831
1832			if (lws_mqtt_str_first(&par->s_temp))
1833				lws_mqtt_str_init(&par->s_temp, par->temp,
1834						  sizeof(par->temp), 0);
1835
1836			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1837			case LMSPR_NEED_MORE:
1838				break;
1839			case LMSPR_COMPLETED:
1840				if (lws_mqtt_pconsume(par, par->s_temp.len))
1841					goto send_protocol_error_and_close;
1842				break;
1843
1844			default:
1845				lwsl_info("%s: bad protocol name\n", __func__);
1846				goto send_protocol_error_and_close;
1847			}
1848			break;
1849
1850		case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1851
1852		case LMQCPP_PROP_CORRELATION_BINDATA:
1853		case LMQCPP_PROP_AUTH_DATA_BINDATA:
1854
1855		/* TODO */
1856			lwsl_err("%s: Unimplemented packet state 0x%x\n",
1857					__func__, par->state);
1858			return -1;
1859		}
1860	}
1861
1862	return 0;
1863
1864oom:
1865	lwsl_err("%s: OOM!\n", __func__);
1866	goto send_protocol_error_and_close;
1867
1868singular_prop_seen_twice:
1869	lwsl_info("%s: property appears twice\n", __func__);
1870
1871send_protocol_error_and_close:
1872	lwsl_notice("%s: peac\n", __func__);
1873	par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1874
1875send_reason_and_close:
1876	lwsl_notice("%s: srac\n", __func__);
1877	par->flag_pending_send_reason_close = 1;
1878	goto ask;
1879
1880send_unsupp_connack_and_close:
1881	lwsl_notice("%s: unsupac\n", __func__);
1882	par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1883	par->flag_pending_send_connack_close = 1;
1884
1885ask:
1886	/* Should we ask for clients? */
1887	lws_callback_on_writable(wsi);
1888
1889	return -1;
1890}
1891
1892int
1893lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1894			   uint8_t dup, lws_mqtt_qos_levels_t qos,
1895			   uint8_t retain)
1896{
1897	lws_mqtt_fixed_hdr_t hdr;
1898
1899	hdr.bits = 0;
1900	hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf;
1901
1902	switch(ctrl_pkt_type) {
1903	case LMQCP_PUBLISH:
1904		hdr.flags.dup = !!dup;
1905		/*
1906		 * A PUBLISH Packet MUST NOT have both QoS bits set to
1907		 * 1. If a Server or Client receives a PUBLISH Packet
1908		 * which has both QoS bits set to 1 it MUST close the
1909		 * Network Connection [MQTT-3.3.1-4].
1910		 */
1911		if (qos >= RESERVED_QOS_LEVEL) {
1912			lwsl_err("%s: Unsupport QoS level 0x%x\n",
1913				 __func__, qos);
1914			return -1;
1915		}
1916		hdr.flags.qos = qos & 3;
1917		hdr.flags.retain = !!retain;
1918		break;
1919
1920	case LMQCP_CTOS_CONNECT:
1921	case LMQCP_STOC_CONNACK:
1922	case LMQCP_PUBACK:
1923	case LMQCP_PUBREC:
1924	case LMQCP_PUBCOMP:
1925	case LMQCP_STOC_SUBACK:
1926	case LMQCP_STOC_UNSUBACK:
1927	case LMQCP_CTOS_PINGREQ:
1928	case LMQCP_STOC_PINGRESP:
1929	case LMQCP_DISCONNECT:
1930	case LMQCP_AUTH:
1931		hdr.bits &= 0xf0;
1932		break;
1933
1934	/*
1935	 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1936	 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1937	 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1938	 * treat any other value as malformed and close the Network
1939	 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1940	 */
1941	case LMQCP_PUBREL:
1942	case LMQCP_CTOS_SUBSCRIBE:
1943	case LMQCP_CTOS_UNSUBSCRIBE:
1944		hdr.bits |= 0x02;
1945		break;
1946
1947	default:
1948		return -1;
1949	}
1950
1951	*p = hdr.bits;
1952
1953	return 0;
1954}
1955
1956int
1957lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1958			     const void *buf, uint32_t len, int is_complete)
1959{
1960	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
1961	uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1962	struct lws *nwsi = lws_get_network_wsi(wsi);
1963	lws_mqtt_str_t mqtt_vh_payload;
1964	uint32_t vh_len, rem_len;
1965
1966	assert(pub->topic);
1967
1968	lwsl_debug("%s: len = %d, is_complete = %d\n",
1969		   __func__, (int)len, (int)is_complete);
1970
1971	if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1972		lwsl_err("%s: %s: unknown state 0x%x\n", __func__,
1973				lws_wsi_tag(wsi), lwsi_state(wsi));
1974		assert(0);
1975		return 1;
1976	}
1977
1978	if (wsi->mqtt->inside_payload) {
1979		/*
1980		 * Headers are filled, we are sending
1981		 * the payload - a buffer with LWS_PRE
1982		 * in front it.
1983		 */
1984		start = (uint8_t *)buf;
1985		p = start + len;
1986		if (is_complete)
1987			wsi->mqtt->inside_payload = 0;
1988		goto do_write;
1989	}
1990
1991	start = b + LWS_PRE;
1992	p = start;
1993	/*
1994	 * Fill headers and the first chunk of the
1995	 * payload (if any)
1996	 */
1997	if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1998				       pub->dup, pub->qos, pub->retain)) {
1999		lwsl_err("%s: Failed to fill fixed header\n", __func__);
2000		return 1;
2001	}
2002
2003	/*
2004	 * Topic len field + Topic len + Packet ID
2005	 * (for QOS>0) + Payload len
2006	 */
2007	vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
2008	rem_len = vh_len + pub->payload_len;
2009	lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
2010
2011	/* Will the chunk of payload fit? */
2012	if ((vh_len + len) >=
2013	    (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
2014		lwsl_err("%s: Payload is too big\n", __func__);
2015		return 1;
2016	}
2017
2018	p += lws_mqtt_vbi_encode(rem_len, p);
2019
2020	/* Topic's Len */
2021	lws_ser_wu16be(p, pub->topic_len);
2022	p += 2;
2023
2024	/*
2025	 * Init lws_mqtt_str for "MQTT Variable
2026	 * Headers + payload" (only the supplied
2027	 * chuncked payload)
2028	 */
2029	lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
2030			  (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len),
2031			  0);
2032
2033	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2034	lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
2035	if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
2036		lwsl_err("%s: a\n", __func__);
2037		return 1;
2038	}
2039
2040	/* Packet ID */
2041	if (pub->qos != QOS0) {
2042		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2043		if (!pub->dup)
2044			nwsi->mqtt->pkt_id++;
2045		wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id;
2046		lwsl_debug("%s: pkt_id = %d\n", __func__,
2047			   (int)wsi->mqtt->ack_pkt_id);
2048		lws_ser_wu16be(p, pub->packet_id);
2049		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
2050			lwsl_err("%s: b\n", __func__);
2051			return 1;
2052		}
2053	}
2054
2055	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2056	memcpy(p, buf, len);
2057	if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
2058		return 1;
2059	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2060
2061	if (!is_complete)
2062		nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
2063
2064do_write:
2065
2066	// lwsl_hexdump_err(start, lws_ptr_diff(p, start));
2067
2068	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2069			lws_ptr_diff(p, start)) {
2070		lwsl_err("%s: write failed\n", __func__);
2071		return 1;
2072	}
2073
2074	if (!is_complete) {
2075		/* still some more chunks to come... */
2076		lws_callback_on_writable(wsi);
2077
2078		return 0;
2079	}
2080
2081	wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
2082
2083	if (pub->qos != QOS0)
2084		wsi->mqtt->unacked_publish = 1;
2085
2086	/* this was the last part of the publish message */
2087
2088	if (pub->qos == QOS0) {
2089		/*
2090		 * There won't be any real PUBACK, act like we got one
2091		 * so the user callback logic is the same for QoS0 or
2092		 * QoS1
2093		 */
2094		if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
2095					    wsi->user_space, NULL, 0)) {
2096			lwsl_err("%s: ACK callback exited\n", __func__);
2097			return 1;
2098		}
2099	} else if (pub->qos == QOS1 || pub->qos == QOS2) {
2100		/* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s,
2101		 * we must RETRY the publish
2102		 */
2103		wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend;
2104		__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2105				    &wsi->mqtt->sul_qos_puback_pubrec_wait,
2106				    3 * LWS_USEC_PER_SEC);
2107	}
2108
2109	if (wsi->mqtt->inside_shadow) {
2110		wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout;
2111		__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2112				    &wsi->mqtt->sul_shadow_wait,
2113				    60 * LWS_USEC_PER_SEC);
2114	}
2115
2116	return 0;
2117}
2118
2119int
2120lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
2121{
2122	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2123	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2124	struct lws *nwsi = lws_get_network_wsi(wsi);
2125	lws_mqtt_str_t mqtt_vh_payload;
2126	uint8_t exists[8], extant;
2127	lws_mqtt_subs_t *mysub;
2128	uint32_t rem_len;
2129#if defined(_DEBUG)
2130	uint32_t tops;
2131#endif
2132	uint32_t n;
2133
2134	assert(sub->num_topics);
2135	assert(sub->num_topics < sizeof(exists));
2136
2137	switch (lwsi_state(wsi)) {
2138	case LRS_ESTABLISHED: /* Protocol connection established */
2139		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
2140					       0, 0, 0)) {
2141			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2142			return 1;
2143		}
2144
2145		/*
2146		 * The stream wants to subscribe to one or more topic, but
2147		 * the shared nwsi may already be subscribed to some or all of
2148		 * them from interactions with other streams.  For those cases,
2149		 * we filter them from the list the child wants until we just
2150		 * have ones that are new to the nwsi.  If nothing left, we just
2151		 * synthesize the callback to the child as if SUBACK had come
2152		 * and we're done, otherwise just ask the server for topics that
2153		 * are new to the wsi.
2154		 */
2155
2156		extant = 0;
2157		memset(&exists, 0, sizeof(exists));
2158		for (n = 0; n < sub->num_topics; n++) {
2159			lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
2160				  __func__, (int)n, sub->topic[n].name);
2161
2162			mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
2163			if (mysub && mysub->ref_count) {
2164				mysub->ref_count++; /* another stream using it */
2165				exists[n] = 1;
2166				extant++;
2167			}
2168
2169			/*
2170			 * Attach the topic we're subscribing to, to wsi->mqtt
2171			 */
2172			if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
2173				lwsl_err("%s: create sub fail\n", __func__);
2174				return 1;
2175			}
2176		}
2177
2178		if (extant == sub->num_topics) {
2179			/*
2180			 * It turns out there's nothing to do here, the nwsi has
2181			 * already subscribed to all the topics this stream
2182			 * wanted.  Just tell it it can have them.
2183			 */
2184			lwsl_notice("%s: all topics already subscribed\n", __func__);
2185			if (user_callback_handle_rxflow(
2186				    wsi->a.protocol->callback,
2187				    wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
2188				    wsi->user_space, NULL, 0) < 0) {
2189				lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
2190					 __func__);
2191				return -1;
2192			}
2193
2194			return 0;
2195		}
2196
2197#if defined(_DEBUG)
2198		/*
2199		 * zero or more of the topics already existed, but not all,
2200		 * so we must go to the server with a filtered list of the
2201		 * new ones only
2202		 */
2203
2204		tops = sub->num_topics - extant;
2205#endif
2206
2207		/*
2208		 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
2209		 */
2210		rem_len = 2;
2211		for (n = 0; n < sub->num_topics; n++)
2212			if (!exists[n])
2213				rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
2214
2215		wsi->mqtt->sub_size = (uint16_t)rem_len;
2216
2217#if defined(_DEBUG)
2218		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2219			   __func__, (int)tops, (int)rem_len);
2220#endif
2221
2222		p += lws_mqtt_vbi_encode(rem_len, p);
2223
2224		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2225					       wsi->a.context->pt_serv_buf_size) {
2226			lwsl_err("%s: Payload is too big\n", __func__);
2227			return 1;
2228		}
2229
2230		/* Init lws_mqtt_str */
2231		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2232		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2233
2234		/* Packet ID */
2235		wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id;
2236		lwsl_debug("%s: pkt_id = %d\n", __func__,
2237			   (int)sub->packet_id);
2238		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2239
2240		nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2241
2242		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2243			return 1;
2244
2245		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2246
2247		for (n = 0; n < sub->num_topics; n++) {
2248			lwsl_info("%s: topics[%d] = %s\n", __func__,
2249				   (int)n, sub->topic[n].name);
2250
2251			/* if the nwsi already has it, don't ask server for it */
2252			if (exists[n]) {
2253				lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
2254					    __func__, (int)n, sub->topic[n].name);
2255				continue;
2256			}
2257
2258			/*
2259			 * Attach the topic we're subscribing to, to nwsi->mqtt
2260			 * so we know the nwsi itself has a subscription to it
2261			 */
2262
2263			if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
2264				return 1;
2265
2266			/* Topic's Len */
2267			lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
2268			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2269				return 1;
2270			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2271
2272			/* Topic Name */
2273			lws_strncpy((char *)p, sub->topic[n].name,
2274				    strlen(sub->topic[n].name) + 1);
2275			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2276						 (int)strlen(sub->topic[n].name)))
2277				return 1;
2278			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2279
2280			/* QoS */
2281			*p = (uint8_t)sub->topic[n].qos;
2282			if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
2283				return 1;
2284			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2285		}
2286		break;
2287
2288	default:
2289		return 1;
2290	}
2291
2292	if (wsi->mqtt->inside_resume_session)
2293		return 0;
2294
2295	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2296					lws_ptr_diff(p, start))
2297		return 1;
2298
2299	wsi->mqtt->inside_subscribe = 1;
2300
2301	return 0;
2302}
2303
2304int
2305lws_mqtt_client_send_unsubcribe(struct lws *wsi,
2306				const lws_mqtt_subscribe_param_t *unsub)
2307{
2308	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2309	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2310	struct lws *nwsi = lws_get_network_wsi(wsi);
2311	lws_mqtt_str_t mqtt_vh_payload;
2312	uint8_t send_unsub[8], orphaned;
2313	uint32_t rem_len, n;
2314	lws_mqtt_subs_t *mysub;
2315#if defined(_DEBUG)
2316	uint32_t tops;
2317#endif
2318
2319	lwsl_info("%s: Enter\n", __func__);
2320
2321	switch (lwsi_state(wsi)) {
2322	case LRS_ESTABLISHED: /* Protocol connection established */
2323		orphaned = 0;
2324		memset(&send_unsub, 0, sizeof(send_unsub));
2325		for (n = 0; n < unsub->num_topics; n++) {
2326			mysub = lws_mqtt_find_sub(nwsi->mqtt,
2327						  unsub->topic[n].name);
2328			assert(mysub);
2329
2330			if (mysub && --mysub->ref_count == 0) {
2331				lwsl_notice("%s: Need to send UNSUB\n", __func__);
2332				send_unsub[n] = 1;
2333				orphaned++;
2334			}
2335		}
2336
2337		if (!orphaned) {
2338			/*
2339			 * The nwsi still has other subscribers bound to the
2340			 * topics.
2341			 *
2342			 * So, don't send UNSUB to server, and just fake the
2343			 * UNSUB ACK event for the guy going away.
2344			 */
2345			lwsl_notice("%s: unsubscribed!\n", __func__);
2346			if (user_callback_handle_rxflow(
2347				    wsi->a.protocol->callback,
2348				    wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
2349				    wsi->user_space, NULL, 0) < 0) {
2350				/*
2351				 * We can't directly close here, because the
2352				 * caller still has the wsi.  Inform the
2353				 * caller that we want to close
2354				 */
2355
2356				return 1;
2357			}
2358
2359			return 0;
2360		}
2361#if defined(_DEBUG)
2362		/*
2363		 * one or more of the topics needs to be unsubscribed
2364		 * from, so we must go to the server with a filtered
2365		 * list of the new ones only
2366		 */
2367
2368		tops = orphaned;
2369#endif
2370
2371		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2372					       0, 0, 0)) {
2373			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2374			return 1;
2375		}
2376
2377		/*
2378		 * Pid + (Topic len field + Topic len) x Num of Topics
2379		 */
2380		rem_len = 2;
2381		for (n = 0; n < unsub->num_topics; n++)
2382			if (send_unsub[n])
2383				rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2384
2385		wsi->mqtt->sub_size = (uint16_t)rem_len;
2386
2387#if defined(_DEBUG)
2388		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2389			   __func__, (int)tops, (int)rem_len);
2390#endif
2391
2392		p += lws_mqtt_vbi_encode(rem_len, p);
2393
2394		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2395					       wsi->a.context->pt_serv_buf_size) {
2396			lwsl_err("%s: Payload is too big\n", __func__);
2397			return 1;
2398		}
2399
2400		/* Init lws_mqtt_str */
2401		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2402		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2403
2404		/* Packet ID */
2405		wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2406		lwsl_debug("%s: pkt_id = %d\n", __func__,
2407			   (int)wsi->mqtt->ack_pkt_id);
2408		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2409
2410		nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2411
2412		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2413			return 1;
2414
2415		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2416
2417		for (n = 0; n < unsub->num_topics; n++) {
2418			lwsl_info("%s: topics[%d] = %s\n", __func__,
2419				   (int)n, unsub->topic[n].name);
2420
2421			/*
2422			 * Subscriber still bound to it, don't UBSUB
2423			 * from the server
2424			 */
2425			if (!send_unsub[n])
2426				continue;
2427
2428			/* Topic's Len */
2429			lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2430			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2431				return 1;
2432			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2433
2434			/* Topic Name */
2435			lws_strncpy((char *)p, unsub->topic[n].name,
2436				    strlen(unsub->topic[n].name) + 1);
2437			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2438						 (int)strlen(unsub->topic[n].name)))
2439				return 1;
2440			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2441		}
2442		break;
2443
2444	default:
2445		return 1;
2446	}
2447
2448	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2449					lws_ptr_diff(p, start))
2450		return 1;
2451
2452	wsi->mqtt->inside_unsubscribe = 1;
2453
2454	wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout;
2455	__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2456			    &wsi->mqtt->sul_unsuback_wait,
2457			    3 * LWS_USEC_PER_SEC);
2458
2459	return 0;
2460}
2461
2462/*
2463 * This is called when child streams bind to an already-existing and compatible
2464 * MQTT stream
2465 */
2466
2467struct lws *
2468lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2469{
2470	/* no more children allowed by parent? */
2471
2472	if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2473		lwsl_err("%s: reached concurrent stream limit\n", __func__);
2474		return NULL;
2475	}
2476
2477#if defined(LWS_WITH_CLIENT)
2478	wsi->client_mux_substream = 1;
2479#endif
2480
2481	lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2482
2483	if (lws_ensure_user_space(wsi))
2484		goto bail1;
2485
2486	lws_mqtt_set_client_established(wsi);
2487	lws_callback_on_writable(wsi);
2488
2489	return wsi;
2490
2491bail1:
2492	/* undo the insert */
2493	parent_wsi->mux.child_list = wsi->mux.sibling_list;
2494	parent_wsi->mux.child_count--;
2495
2496	if (wsi->user_space)
2497		lws_free_set_NULL(wsi->user_space);
2498
2499	wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2500	lws_free(wsi);
2501
2502	return NULL;
2503}
2504
2505