1d4afb5ceSopenharmony_ci/*
2d4afb5ceSopenharmony_ci * libwebsockets - small server side websockets and web server implementation
3d4afb5ceSopenharmony_ci *
4d4afb5ceSopenharmony_ci * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5d4afb5ceSopenharmony_ci *
6d4afb5ceSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy
7d4afb5ceSopenharmony_ci * of this software and associated documentation files (the "Software"), to
8d4afb5ceSopenharmony_ci * deal in the Software without restriction, including without limitation the
9d4afb5ceSopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10d4afb5ceSopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is
11d4afb5ceSopenharmony_ci * furnished to do so, subject to the following conditions:
12d4afb5ceSopenharmony_ci *
13d4afb5ceSopenharmony_ci * The above copyright notice and this permission notice shall be included in
14d4afb5ceSopenharmony_ci * all copies or substantial portions of the Software.
15d4afb5ceSopenharmony_ci *
16d4afb5ceSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17d4afb5ceSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18d4afb5ceSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19d4afb5ceSopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20d4afb5ceSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21d4afb5ceSopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22d4afb5ceSopenharmony_ci * IN THE SOFTWARE.
23d4afb5ceSopenharmony_ci *
24d4afb5ceSopenharmony_ci * MQTT v5
25d4afb5ceSopenharmony_ci *
26d4afb5ceSopenharmony_ci * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
27d4afb5ceSopenharmony_ci *
28d4afb5ceSopenharmony_ci * Control Packet structure
29d4afb5ceSopenharmony_ci *
30d4afb5ceSopenharmony_ci *  - Always:           2+ byte:  Fixed Hdr
31d4afb5ceSopenharmony_ci *  - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
32d4afb5ceSopenharmony_ci *  - Required in some: variable: Payload
33d4afb5ceSopenharmony_ci *
34d4afb5ceSopenharmony_ci * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
35d4afb5ceSopenharmony_ci *
36d4afb5ceSopenharmony_ci *  - Client Identifier
37d4afb5ceSopenharmony_ci *  - Will Properties
38d4afb5ceSopenharmony_ci *  - Will Topic
39d4afb5ceSopenharmony_ci *  - Will Payload
40d4afb5ceSopenharmony_ci *  - User Name
41d4afb5ceSopenharmony_ci *  - Password
42d4afb5ceSopenharmony_ci */
43d4afb5ceSopenharmony_ci
44d4afb5ceSopenharmony_ci#include "private-lib-core.h"
45d4afb5ceSopenharmony_ci#include <string.h>
46d4afb5ceSopenharmony_ci#include <sys/types.h>
47d4afb5ceSopenharmony_ci#include <assert.h>
48d4afb5ceSopenharmony_ci
49d4afb5ceSopenharmony_citypedef enum {
50d4afb5ceSopenharmony_ci	LMQPRS_AWAITING_CONNECT,
51d4afb5ceSopenharmony_ci
52d4afb5ceSopenharmony_ci} lws_mqtt_protocol_server_connstate_t;
53d4afb5ceSopenharmony_ci
54d4afb5ceSopenharmony_ciconst char * const reason_names_g1[] = {
55d4afb5ceSopenharmony_ci	"Success / Normal disconnection / QoS0",
56d4afb5ceSopenharmony_ci	"QoS1",
57d4afb5ceSopenharmony_ci	"QoS2",
58d4afb5ceSopenharmony_ci	"Disconnect Will",
59d4afb5ceSopenharmony_ci	"No matching subscriber",
60d4afb5ceSopenharmony_ci	"No subscription existed",
61d4afb5ceSopenharmony_ci	"Continue authentication",
62d4afb5ceSopenharmony_ci	"Re-authenticate"
63d4afb5ceSopenharmony_ci};
64d4afb5ceSopenharmony_ci
65d4afb5ceSopenharmony_ciconst char * const reason_names_g2[] = {
66d4afb5ceSopenharmony_ci	"Unspecified error",
67d4afb5ceSopenharmony_ci	"Malformed packet",
68d4afb5ceSopenharmony_ci	"Protocol error",
69d4afb5ceSopenharmony_ci	"Implementation specific error",
70d4afb5ceSopenharmony_ci	"Unsupported protocol",
71d4afb5ceSopenharmony_ci	"Client ID invalid",
72d4afb5ceSopenharmony_ci	"Bad credentials",
73d4afb5ceSopenharmony_ci	"Not Authorized",
74d4afb5ceSopenharmony_ci	"Server Unavailable",
75d4afb5ceSopenharmony_ci	"Server Busy",
76d4afb5ceSopenharmony_ci	"Banned",
77d4afb5ceSopenharmony_ci	"Server Shutting Down",
78d4afb5ceSopenharmony_ci	"Bad Authentication Method",
79d4afb5ceSopenharmony_ci	"Keepalive Timeout",
80d4afb5ceSopenharmony_ci	"Session taken over",
81d4afb5ceSopenharmony_ci	"Topic Filter Invalid",
82d4afb5ceSopenharmony_ci	"Packet ID in use",
83d4afb5ceSopenharmony_ci	"Packet ID not found",
84d4afb5ceSopenharmony_ci	"Max RX Exceeded",
85d4afb5ceSopenharmony_ci	"Topic Alias Invalid",
86d4afb5ceSopenharmony_ci	"Packet too large",
87d4afb5ceSopenharmony_ci	"Ratelimit",
88d4afb5ceSopenharmony_ci	"Quota Exceeded",
89d4afb5ceSopenharmony_ci	"Administrative Action",
90d4afb5ceSopenharmony_ci	"Payload format invalid",
91d4afb5ceSopenharmony_ci	"Retain not supported",
92d4afb5ceSopenharmony_ci	"QoS not supported",
93d4afb5ceSopenharmony_ci	"Use another server",
94d4afb5ceSopenharmony_ci	"Server Moved",
95d4afb5ceSopenharmony_ci	"Shared subscriptions not supported",
96d4afb5ceSopenharmony_ci	"Connection rate exceeded",
97d4afb5ceSopenharmony_ci	"Maximum Connect Time",
98d4afb5ceSopenharmony_ci	"Subscription IDs not supported",
99d4afb5ceSopenharmony_ci	"Wildcard subscriptions not supported"
100d4afb5ceSopenharmony_ci};
101d4afb5ceSopenharmony_ci
102d4afb5ceSopenharmony_ci#define LMQCP_WILL_PROPERTIES 0
103d4afb5ceSopenharmony_ci
104d4afb5ceSopenharmony_ci/* For each property, a bitmap describing which commands it is valid for */
105d4afb5ceSopenharmony_ci
106d4afb5ceSopenharmony_cistatic const uint16_t property_valid[] = {
107d4afb5ceSopenharmony_ci	[LMQPROP_PAYLOAD_FORMAT_INDICATOR]	= (1 << LMQCP_PUBLISH) |
108d4afb5ceSopenharmony_ci						  (1 << LMQCP_WILL_PROPERTIES),
109d4afb5ceSopenharmony_ci	[LMQPROP_MESSAGE_EXPIRY_INTERVAL]	= (1 << LMQCP_PUBLISH) |
110d4afb5ceSopenharmony_ci						  (1 << LMQCP_WILL_PROPERTIES),
111d4afb5ceSopenharmony_ci	[LMQPROP_CONTENT_TYPE]			= (1 << LMQCP_PUBLISH) |
112d4afb5ceSopenharmony_ci						  (1 << LMQCP_WILL_PROPERTIES),
113d4afb5ceSopenharmony_ci	[LMQPROP_RESPONSE_TOPIC]		= (1 << LMQCP_PUBLISH) |
114d4afb5ceSopenharmony_ci						  (1 << LMQCP_WILL_PROPERTIES),
115d4afb5ceSopenharmony_ci	[LMQPROP_CORRELATION_DATA]		= (1 << LMQCP_PUBLISH) |
116d4afb5ceSopenharmony_ci						  (1 << LMQCP_WILL_PROPERTIES),
117d4afb5ceSopenharmony_ci	[LMQPROP_SUBSCRIPTION_IDENTIFIER]	= (1 << LMQCP_PUBLISH) |
118d4afb5ceSopenharmony_ci						  (1 << LMQCP_CTOS_SUBSCRIBE),
119d4afb5ceSopenharmony_ci	[LMQPROP_SESSION_EXPIRY_INTERVAL]	= (1 << LMQCP_CTOS_CONNECT) |
120d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_CONNACK) |
121d4afb5ceSopenharmony_ci						  (1 << LMQCP_DISCONNECT),
122d4afb5ceSopenharmony_ci	[LMQPROP_ASSIGNED_CLIENT_IDENTIFIER]	= (1 << LMQCP_STOC_CONNACK),
123d4afb5ceSopenharmony_ci	[LMQPROP_SERVER_KEEP_ALIVE]		= (1 << LMQCP_STOC_CONNACK),
124d4afb5ceSopenharmony_ci	[LMQPROP_AUTHENTICATION_METHOD]		= (1 << LMQCP_CTOS_CONNECT) |
125d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_CONNACK) |
126d4afb5ceSopenharmony_ci						  (1 << LMQCP_AUTH),
127d4afb5ceSopenharmony_ci	[LMQPROP_AUTHENTICATION_DATA]		= (1 << LMQCP_CTOS_CONNECT) |
128d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_CONNACK) |
129d4afb5ceSopenharmony_ci						  (1 << LMQCP_AUTH),
130d4afb5ceSopenharmony_ci	[LMQPROP_REQUEST_PROBLEM_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
131d4afb5ceSopenharmony_ci	[LMQPROP_WILL_DELAY_INTERVAL]		= (1 << LMQCP_WILL_PROPERTIES),
132d4afb5ceSopenharmony_ci	[LMQPROP_REQUEST_RESPONSE_INFORMATION]	= (1 << LMQCP_CTOS_CONNECT),
133d4afb5ceSopenharmony_ci	[LMQPROP_RESPONSE_INFORMATION]		= (1 << LMQCP_STOC_CONNACK),
134d4afb5ceSopenharmony_ci	[LMQPROP_SERVER_REFERENCE]		= (1 << LMQCP_STOC_CONNACK) |
135d4afb5ceSopenharmony_ci						  (1 << LMQCP_DISCONNECT),
136d4afb5ceSopenharmony_ci	[LMQPROP_REASON_STRING]			= (1 << LMQCP_STOC_CONNACK) |
137d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBACK) |
138d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBREC) |
139d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBREL) |
140d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBCOMP) |
141d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_SUBACK) |
142d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_UNSUBACK) |
143d4afb5ceSopenharmony_ci						  (1 << LMQCP_DISCONNECT) |
144d4afb5ceSopenharmony_ci						  (1 << LMQCP_AUTH),
145d4afb5ceSopenharmony_ci	[LMQPROP_RECEIVE_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
146d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_CONNACK),
147d4afb5ceSopenharmony_ci	[LMQPROP_TOPIC_ALIAS_MAXIMUM]		= (1 << LMQCP_CTOS_CONNECT) |
148d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_CONNACK),
149d4afb5ceSopenharmony_ci	[LMQPROP_TOPIC_ALIAS]			= (1 << LMQCP_PUBLISH),
150d4afb5ceSopenharmony_ci	[LMQPROP_MAXIMUM_QOS]			= (1 << LMQCP_STOC_CONNACK),
151d4afb5ceSopenharmony_ci	[LMQPROP_RETAIN_AVAILABLE]		= (1 << LMQCP_STOC_CONNACK),
152d4afb5ceSopenharmony_ci	[LMQPROP_USER_PROPERTY]			= (1 << LMQCP_CTOS_CONNECT) |
153d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_CONNACK) |
154d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBLISH) |
155d4afb5ceSopenharmony_ci						  (1 << LMQCP_WILL_PROPERTIES) |
156d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBACK) |
157d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBREC) |
158d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBREL) |
159d4afb5ceSopenharmony_ci						  (1 << LMQCP_PUBCOMP) |
160d4afb5ceSopenharmony_ci						  (1 << LMQCP_CTOS_SUBSCRIBE) |
161d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_SUBACK) |
162d4afb5ceSopenharmony_ci						  (1 << LMQCP_CTOS_UNSUBSCRIBE) |
163d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_UNSUBACK) |
164d4afb5ceSopenharmony_ci						  (1 << LMQCP_DISCONNECT) |
165d4afb5ceSopenharmony_ci						  (1 << LMQCP_AUTH),
166d4afb5ceSopenharmony_ci	[LMQPROP_MAXIMUM_PACKET_SIZE]		= (1 << LMQCP_CTOS_CONNECT) |
167d4afb5ceSopenharmony_ci						  (1 << LMQCP_STOC_CONNACK),
168d4afb5ceSopenharmony_ci	[LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
169d4afb5ceSopenharmony_ci	[LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL]	= (1 << LMQCP_STOC_CONNACK),
170d4afb5ceSopenharmony_ci	[LMQPROP_SHARED_SUBSCRIPTION_AVAIL]	= (1 << LMQCP_STOC_CONNACK)
171d4afb5ceSopenharmony_ci};
172d4afb5ceSopenharmony_ci
173d4afb5ceSopenharmony_ci
174d4afb5ceSopenharmony_ci/*
175d4afb5ceSopenharmony_ci * For each command index, maps flags, id, qos and payload legality
176d4afb5ceSopenharmony_ci * notice in most cases PUBLISH requires further processing
177d4afb5ceSopenharmony_ci */
178d4afb5ceSopenharmony_cistatic const uint8_t map_flags[] = {
179d4afb5ceSopenharmony_ci	[LMQCP_RESERVED]		= 0x00,
180d4afb5ceSopenharmony_ci	[LMQCP_CTOS_CONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
181d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PAYLOAD |
182d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
183d4afb5ceSopenharmony_ci	[LMQCP_STOC_CONNACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
184d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
185d4afb5ceSopenharmony_ci	[LMQCP_PUBLISH]			= LMQCP_LUT_FLAG_PAYLOAD | /* option */
186d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
187d4afb5ceSopenharmony_ci	[LMQCP_PUBACK]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
188d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
189d4afb5ceSopenharmony_ci	[LMQCP_PUBREC]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
190d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
191d4afb5ceSopenharmony_ci	[LMQCP_PUBREL]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
192d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
193d4afb5ceSopenharmony_ci	[LMQCP_PUBCOMP]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
194d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
195d4afb5ceSopenharmony_ci	[LMQCP_CTOS_SUBSCRIBE]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
196d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PAYLOAD |
197d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
198d4afb5ceSopenharmony_ci	[LMQCP_STOC_SUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
199d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PAYLOAD |
200d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
201d4afb5ceSopenharmony_ci	[LMQCP_CTOS_UNSUBSCRIBE]	= LMQCP_LUT_FLAG_RESERVED_FLAGS |
202d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PAYLOAD |
203d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
204d4afb5ceSopenharmony_ci	[LMQCP_STOC_UNSUBACK]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
205d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PAYLOAD |
206d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
207d4afb5ceSopenharmony_ci	[LMQCP_CTOS_PINGREQ]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
208d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
209d4afb5ceSopenharmony_ci	[LMQCP_STOC_PINGRESP]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
210d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
211d4afb5ceSopenharmony_ci	[LMQCP_DISCONNECT]		= LMQCP_LUT_FLAG_RESERVED_FLAGS |
212d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
213d4afb5ceSopenharmony_ci	[LMQCP_AUTH]			= LMQCP_LUT_FLAG_RESERVED_FLAGS |
214d4afb5ceSopenharmony_ci					  LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
215d4afb5ceSopenharmony_ci};
216d4afb5ceSopenharmony_ci
217d4afb5ceSopenharmony_cistatic int
218d4afb5ceSopenharmony_cilws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
219d4afb5ceSopenharmony_ci{
220d4afb5ceSopenharmony_ci	par->consumed += (unsigned int)consumed;
221d4afb5ceSopenharmony_ci
222d4afb5ceSopenharmony_ci	if (par->consumed > par->props_len)
223d4afb5ceSopenharmony_ci		return -1;
224d4afb5ceSopenharmony_ci
225d4afb5ceSopenharmony_ci	/* more properties coming */
226d4afb5ceSopenharmony_ci
227d4afb5ceSopenharmony_ci	if (par->consumed < par->props_len) {
228d4afb5ceSopenharmony_ci		par->state = LMQCPP_PROP_ID_VBI;
229d4afb5ceSopenharmony_ci		return 0;
230d4afb5ceSopenharmony_ci	}
231d4afb5ceSopenharmony_ci
232d4afb5ceSopenharmony_ci	/* properties finished: are we headed for payload or idle? */
233d4afb5ceSopenharmony_ci
234d4afb5ceSopenharmony_ci	if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
235d4afb5ceSopenharmony_ci		/* A PUBLISH packet MUST NOT contain a Packet Identifier if
236d4afb5ceSopenharmony_ci		 * its QoS value is set to 0 [MQTT-2.2.1-2]. */
237d4afb5ceSopenharmony_ci	    (ctl_pkt_type(par) != LMQCP_PUBLISH ||
238d4afb5ceSopenharmony_ci	     (par->packet_type_flags & 6))) {
239d4afb5ceSopenharmony_ci		par->state = LMQCPP_PAYLOAD;
240d4afb5ceSopenharmony_ci		return 0;
241d4afb5ceSopenharmony_ci	}
242d4afb5ceSopenharmony_ci
243d4afb5ceSopenharmony_ci	par->state = LMQCPP_IDLE;
244d4afb5ceSopenharmony_ci
245d4afb5ceSopenharmony_ci	return 0;
246d4afb5ceSopenharmony_ci}
247d4afb5ceSopenharmony_ci
248d4afb5ceSopenharmony_cistatic int
249d4afb5ceSopenharmony_cilws_mqtt_set_client_established(struct lws *wsi)
250d4afb5ceSopenharmony_ci{
251d4afb5ceSopenharmony_ci	lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
252d4afb5ceSopenharmony_ci			    &role_ops_mqtt);
253d4afb5ceSopenharmony_ci
254d4afb5ceSopenharmony_ci	if (user_callback_handle_rxflow(wsi->a.protocol->callback,
255d4afb5ceSopenharmony_ci					wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
256d4afb5ceSopenharmony_ci					wsi->user_space, NULL, 0) < 0) {
257d4afb5ceSopenharmony_ci		lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
258d4afb5ceSopenharmony_ci
259d4afb5ceSopenharmony_ci		return -1;
260d4afb5ceSopenharmony_ci	}
261d4afb5ceSopenharmony_ci	/*
262d4afb5ceSopenharmony_ci	 * If we made a new connection and got the ACK, our connection is
263d4afb5ceSopenharmony_ci	 * definitely working in both directions at the moment
264d4afb5ceSopenharmony_ci	 */
265d4afb5ceSopenharmony_ci	lws_validity_confirmed(wsi);
266d4afb5ceSopenharmony_ci
267d4afb5ceSopenharmony_ci	/* clear connection timeout */
268d4afb5ceSopenharmony_ci	lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
269d4afb5ceSopenharmony_ci
270d4afb5ceSopenharmony_ci	return 0;
271d4afb5ceSopenharmony_ci}
272d4afb5ceSopenharmony_ci
273d4afb5ceSopenharmony_cistatic lws_mqtt_validate_topic_return_t
274d4afb5ceSopenharmony_cilws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
275d4afb5ceSopenharmony_ci{
276d4afb5ceSopenharmony_ci	size_t spos = 0;
277d4afb5ceSopenharmony_ci	const char *sub = topic;
278d4afb5ceSopenharmony_ci	int8_t slashes = 0;
279d4afb5ceSopenharmony_ci	lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
280d4afb5ceSopenharmony_ci
281d4afb5ceSopenharmony_ci	if (awsiot) {
282d4afb5ceSopenharmony_ci		if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
283d4afb5ceSopenharmony_ci			return LMVTR_FAILED_OVERSIZE;
284d4afb5ceSopenharmony_ci		if (topic[0] == '$') {
285d4afb5ceSopenharmony_ci			ret = LMVTR_VALID_SHADOW;
286d4afb5ceSopenharmony_ci			slashes = -3;
287d4afb5ceSopenharmony_ci		}
288d4afb5ceSopenharmony_ci	} else {
289d4afb5ceSopenharmony_ci		if (topiclen > LWS_MQTT_MAX_TOPICLEN)
290d4afb5ceSopenharmony_ci			return LMVTR_FAILED_OVERSIZE;
291d4afb5ceSopenharmony_ci		if (topic[0] == '$')
292d4afb5ceSopenharmony_ci			return LMVTR_FAILED_WILDCARD_FORMAT;
293d4afb5ceSopenharmony_ci	}
294d4afb5ceSopenharmony_ci
295d4afb5ceSopenharmony_ci	while (*sub != 0) {
296d4afb5ceSopenharmony_ci		if (sub[0] == '+') {
297d4afb5ceSopenharmony_ci			/* topic == "+foo" || "a/+foo" ? */
298d4afb5ceSopenharmony_ci			if (spos > 0 && sub[-1] != '/')
299d4afb5ceSopenharmony_ci				return LMVTR_FAILED_WILDCARD_FORMAT;
300d4afb5ceSopenharmony_ci
301d4afb5ceSopenharmony_ci			/* topic == "foo+" or "foo+/a" ? */
302d4afb5ceSopenharmony_ci			if (sub[1] != 0 && sub[1] != '/')
303d4afb5ceSopenharmony_ci				return LMVTR_FAILED_WILDCARD_FORMAT;
304d4afb5ceSopenharmony_ci
305d4afb5ceSopenharmony_ci			ret = LMVTR_VALID_WILDCARD;
306d4afb5ceSopenharmony_ci		} else if (sub[0] == '#') {
307d4afb5ceSopenharmony_ci			/* topic == "foo#" ? */
308d4afb5ceSopenharmony_ci			if (spos > 0 && sub[-1] != '/')
309d4afb5ceSopenharmony_ci				return LMVTR_FAILED_WILDCARD_FORMAT;
310d4afb5ceSopenharmony_ci
311d4afb5ceSopenharmony_ci			/* topic == "#foo" ? */
312d4afb5ceSopenharmony_ci			if (sub[1] != 0)
313d4afb5ceSopenharmony_ci				return LMVTR_FAILED_WILDCARD_FORMAT;
314d4afb5ceSopenharmony_ci
315d4afb5ceSopenharmony_ci			ret = LMVTR_VALID_WILDCARD;
316d4afb5ceSopenharmony_ci		} else if (sub[0] == '/') {
317d4afb5ceSopenharmony_ci			slashes++;
318d4afb5ceSopenharmony_ci		}
319d4afb5ceSopenharmony_ci		spos++;
320d4afb5ceSopenharmony_ci		sub++;
321d4afb5ceSopenharmony_ci	}
322d4afb5ceSopenharmony_ci
323d4afb5ceSopenharmony_ci	if (awsiot && (slashes < 0 || slashes > 7))
324d4afb5ceSopenharmony_ci		return LMVTR_FAILED_SHADOW_FORMAT;
325d4afb5ceSopenharmony_ci
326d4afb5ceSopenharmony_ci	return ret;
327d4afb5ceSopenharmony_ci}
328d4afb5ceSopenharmony_ci
329d4afb5ceSopenharmony_cistatic lws_mqtt_subs_t *
330d4afb5ceSopenharmony_cilws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
331d4afb5ceSopenharmony_ci{
332d4afb5ceSopenharmony_ci	lws_mqtt_subs_t *mysub;
333d4afb5ceSopenharmony_ci	size_t topiclen = strlen(topic);
334d4afb5ceSopenharmony_ci	lws_mqtt_validate_topic_return_t flag;
335d4afb5ceSopenharmony_ci
336d4afb5ceSopenharmony_ci	flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
337d4afb5ceSopenharmony_ci	switch (flag) {
338d4afb5ceSopenharmony_ci	case LMVTR_FAILED_OVERSIZE:
339d4afb5ceSopenharmony_ci		lwsl_err("%s: Topic is too long\n",
340d4afb5ceSopenharmony_ci			 __func__);
341d4afb5ceSopenharmony_ci		return NULL;
342d4afb5ceSopenharmony_ci	case LMVTR_FAILED_SHADOW_FORMAT:
343d4afb5ceSopenharmony_ci	case LMVTR_FAILED_WILDCARD_FORMAT:
344d4afb5ceSopenharmony_ci		lwsl_err("%s: Invalid topic format \"%s\"\n",
345d4afb5ceSopenharmony_ci			 __func__, topic);
346d4afb5ceSopenharmony_ci		return NULL;
347d4afb5ceSopenharmony_ci
348d4afb5ceSopenharmony_ci	case LMVTR_VALID:
349d4afb5ceSopenharmony_ci	case LMVTR_VALID_WILDCARD:
350d4afb5ceSopenharmony_ci	case LMVTR_VALID_SHADOW:
351d4afb5ceSopenharmony_ci		mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub");
352d4afb5ceSopenharmony_ci		if (!mysub) {
353d4afb5ceSopenharmony_ci			lwsl_err("%s: Error allocating mysub\n",
354d4afb5ceSopenharmony_ci				 __func__);
355d4afb5ceSopenharmony_ci			return NULL;
356d4afb5ceSopenharmony_ci		}
357d4afb5ceSopenharmony_ci		mysub->wildcard = (flag == LMVTR_VALID_WILDCARD);
358d4afb5ceSopenharmony_ci		mysub->shadow = (flag == LMVTR_VALID_SHADOW);
359d4afb5ceSopenharmony_ci		break;
360d4afb5ceSopenharmony_ci
361d4afb5ceSopenharmony_ci	default:
362d4afb5ceSopenharmony_ci		lwsl_err("%s: Unknown flag - %d\n",
363d4afb5ceSopenharmony_ci			 __func__, flag);
364d4afb5ceSopenharmony_ci		return NULL;
365d4afb5ceSopenharmony_ci	}
366d4afb5ceSopenharmony_ci
367d4afb5ceSopenharmony_ci	mysub->next = mqtt->subs_head;
368d4afb5ceSopenharmony_ci	mqtt->subs_head = mysub;
369d4afb5ceSopenharmony_ci	memcpy(mysub->topic, topic, strlen(topic) + 1);
370d4afb5ceSopenharmony_ci	mysub->ref_count = 1;
371d4afb5ceSopenharmony_ci
372d4afb5ceSopenharmony_ci	lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
373d4afb5ceSopenharmony_ci		  __func__, mysub, mqtt);
374d4afb5ceSopenharmony_ci
375d4afb5ceSopenharmony_ci	return mysub;
376d4afb5ceSopenharmony_ci}
377d4afb5ceSopenharmony_ci
378d4afb5ceSopenharmony_cistatic int
379d4afb5ceSopenharmony_cilws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
380d4afb5ceSopenharmony_ci{
381d4afb5ceSopenharmony_ci	lws_mqtt_subs_t *s = mqtt->subs_head;
382d4afb5ceSopenharmony_ci	lws_mqtt_subs_t *temp = NULL;
383d4afb5ceSopenharmony_ci
384d4afb5ceSopenharmony_ci
385d4afb5ceSopenharmony_ci	lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
386d4afb5ceSopenharmony_ci		  __func__, mqtt);
387d4afb5ceSopenharmony_ci
388d4afb5ceSopenharmony_ci	while (s && s->next) {
389d4afb5ceSopenharmony_ci		if (s->next->ref_count == 0)
390d4afb5ceSopenharmony_ci			break;
391d4afb5ceSopenharmony_ci		s = s->next;
392d4afb5ceSopenharmony_ci	}
393d4afb5ceSopenharmony_ci
394d4afb5ceSopenharmony_ci	if (s && s->next) {
395d4afb5ceSopenharmony_ci		temp = s->next;
396d4afb5ceSopenharmony_ci		lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
397d4afb5ceSopenharmony_ci			  __func__, temp, mqtt);
398d4afb5ceSopenharmony_ci		s->next = temp->next;
399d4afb5ceSopenharmony_ci		lws_free(temp);
400d4afb5ceSopenharmony_ci		return 0;
401d4afb5ceSopenharmony_ci	}
402d4afb5ceSopenharmony_ci	return 1;
403d4afb5ceSopenharmony_ci}
404d4afb5ceSopenharmony_ci
405d4afb5ceSopenharmony_ci/*
406d4afb5ceSopenharmony_ci * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
407d4afb5ceSopenharmony_ci * PUBREC came before the timeout period
408d4afb5ceSopenharmony_ci */
409d4afb5ceSopenharmony_ci
410d4afb5ceSopenharmony_cistatic void
411d4afb5ceSopenharmony_cilws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
412d4afb5ceSopenharmony_ci{
413d4afb5ceSopenharmony_ci	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
414d4afb5ceSopenharmony_ci			struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
415d4afb5ceSopenharmony_ci
416d4afb5ceSopenharmony_ci	lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
417d4afb5ceSopenharmony_ci
418d4afb5ceSopenharmony_ci	if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
419d4afb5ceSopenharmony_ci				mqtt->wsi->user_space, NULL, 0))
420d4afb5ceSopenharmony_ci		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
421d4afb5ceSopenharmony_ci}
422d4afb5ceSopenharmony_ci
423d4afb5ceSopenharmony_cistatic void
424d4afb5ceSopenharmony_cilws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
425d4afb5ceSopenharmony_ci{
426d4afb5ceSopenharmony_ci	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
427d4afb5ceSopenharmony_ci			struct _lws_mqtt_related, sul_unsuback_wait);
428d4afb5ceSopenharmony_ci
429d4afb5ceSopenharmony_ci	lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
430d4afb5ceSopenharmony_ci
431d4afb5ceSopenharmony_ci	if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
432d4afb5ceSopenharmony_ci					   LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
433d4afb5ceSopenharmony_ci					   mqtt->wsi->user_space, NULL, 0))
434d4afb5ceSopenharmony_ci		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
435d4afb5ceSopenharmony_ci}
436d4afb5ceSopenharmony_ci
437d4afb5ceSopenharmony_cistatic void
438d4afb5ceSopenharmony_cilws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul)
439d4afb5ceSopenharmony_ci{
440d4afb5ceSopenharmony_ci	struct _lws_mqtt_related *mqtt = lws_container_of(sul,
441d4afb5ceSopenharmony_ci			struct _lws_mqtt_related, sul_shadow_wait);
442d4afb5ceSopenharmony_ci
443d4afb5ceSopenharmony_ci	lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
444d4afb5ceSopenharmony_ci
445d4afb5ceSopenharmony_ci	if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
446d4afb5ceSopenharmony_ci					    LWS_CALLBACK_MQTT_SHADOW_TIMEOUT,
447d4afb5ceSopenharmony_ci					    mqtt->wsi->user_space, NULL, 0))
448d4afb5ceSopenharmony_ci		lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
449d4afb5ceSopenharmony_ci}
450d4afb5ceSopenharmony_ci
451d4afb5ceSopenharmony_civoid
452d4afb5ceSopenharmony_cilws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
453d4afb5ceSopenharmony_ci{
454d4afb5ceSopenharmony_ci	lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
455d4afb5ceSopenharmony_ci	c->estate = s;
456d4afb5ceSopenharmony_ci}
457d4afb5ceSopenharmony_ci
458d4afb5ceSopenharmony_cilws_mqtt_match_topic_return_t
459d4afb5ceSopenharmony_cilws_mqtt_is_topic_matched(const char* sub, const char* pub)
460d4afb5ceSopenharmony_ci{
461d4afb5ceSopenharmony_ci	const char *ppos = pub, *spos = sub;
462d4afb5ceSopenharmony_ci
463d4afb5ceSopenharmony_ci	if (!ppos || !spos) {
464d4afb5ceSopenharmony_ci		return LMMTR_TOPIC_MATCH_ERROR;
465d4afb5ceSopenharmony_ci	}
466d4afb5ceSopenharmony_ci
467d4afb5ceSopenharmony_ci	while (*spos) {
468d4afb5ceSopenharmony_ci		if (*ppos == '#' || *ppos == '+') {
469d4afb5ceSopenharmony_ci			lwsl_err("%s: PUBLISH to wildcard "
470d4afb5ceSopenharmony_ci				 "topic \"%s\" not supported\n",
471d4afb5ceSopenharmony_ci				 __func__, pub);
472d4afb5ceSopenharmony_ci			return LMMTR_TOPIC_MATCH_ERROR;
473d4afb5ceSopenharmony_ci		}
474d4afb5ceSopenharmony_ci		/* foo/+/bar == foo/xyz/bar ? */
475d4afb5ceSopenharmony_ci		if (*spos == '+') {
476d4afb5ceSopenharmony_ci			/* Skip ahead */
477d4afb5ceSopenharmony_ci			while (*ppos != '\0' && *ppos != '/') {
478d4afb5ceSopenharmony_ci				ppos++;
479d4afb5ceSopenharmony_ci			}
480d4afb5ceSopenharmony_ci		} else if (*spos == '#') {
481d4afb5ceSopenharmony_ci			return LMMTR_TOPIC_MATCH;
482d4afb5ceSopenharmony_ci		} else {
483d4afb5ceSopenharmony_ci			if (*ppos == '\0') {
484d4afb5ceSopenharmony_ci				/* foo/bar == foo/bar/# ? */
485d4afb5ceSopenharmony_ci				if (!strncmp(spos, "/#", 2))
486d4afb5ceSopenharmony_ci					return LMMTR_TOPIC_MATCH;
487d4afb5ceSopenharmony_ci				return LMMTR_TOPIC_NOMATCH;
488d4afb5ceSopenharmony_ci			/* Non-matching character */
489d4afb5ceSopenharmony_ci			} else if (*ppos != *spos) {
490d4afb5ceSopenharmony_ci				return LMMTR_TOPIC_NOMATCH;
491d4afb5ceSopenharmony_ci			}
492d4afb5ceSopenharmony_ci			ppos++;
493d4afb5ceSopenharmony_ci		}
494d4afb5ceSopenharmony_ci		spos++;
495d4afb5ceSopenharmony_ci	}
496d4afb5ceSopenharmony_ci
497d4afb5ceSopenharmony_ci	if (*spos == '\0' && *ppos == '\0')
498d4afb5ceSopenharmony_ci		return LMMTR_TOPIC_MATCH;
499d4afb5ceSopenharmony_ci
500d4afb5ceSopenharmony_ci	return LMMTR_TOPIC_NOMATCH;
501d4afb5ceSopenharmony_ci}
502d4afb5ceSopenharmony_ci
503d4afb5ceSopenharmony_cilws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
504d4afb5ceSopenharmony_ci				   const char* ptopic) {
505d4afb5ceSopenharmony_ci	lws_mqtt_subs_t *s = mqtt->subs_head;
506d4afb5ceSopenharmony_ci
507d4afb5ceSopenharmony_ci	while (s) {
508d4afb5ceSopenharmony_ci		/*  SUB topic  ==   PUB topic  ? */
509d4afb5ceSopenharmony_ci		/* foo/bar/xyz ==  foo/bar/xyz ? */
510d4afb5ceSopenharmony_ci		if (!s->wildcard) {
511d4afb5ceSopenharmony_ci			if (!strcmp((const char*)s->topic, ptopic))
512d4afb5ceSopenharmony_ci				return s;
513d4afb5ceSopenharmony_ci		} else {
514d4afb5ceSopenharmony_ci			if (lws_mqtt_is_topic_matched(
515d4afb5ceSopenharmony_ci			    s->topic, ptopic) == LMMTR_TOPIC_MATCH)
516d4afb5ceSopenharmony_ci				return s;
517d4afb5ceSopenharmony_ci		}
518d4afb5ceSopenharmony_ci
519d4afb5ceSopenharmony_ci		s = s->next;
520d4afb5ceSopenharmony_ci	}
521d4afb5ceSopenharmony_ci
522d4afb5ceSopenharmony_ci	return NULL;
523d4afb5ceSopenharmony_ci}
524d4afb5ceSopenharmony_ci
525d4afb5ceSopenharmony_ciint
526d4afb5ceSopenharmony_ci_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
527d4afb5ceSopenharmony_ci		    const uint8_t *buf, size_t len)
528d4afb5ceSopenharmony_ci{
529d4afb5ceSopenharmony_ci	struct lws *w;
530d4afb5ceSopenharmony_ci	int n;
531d4afb5ceSopenharmony_ci
532d4afb5ceSopenharmony_ci	if (par->flag_pending_send_reason_close)
533d4afb5ceSopenharmony_ci		return 0;
534d4afb5ceSopenharmony_ci
535d4afb5ceSopenharmony_ci	/*
536d4afb5ceSopenharmony_ci	 * Stateful, fragmentation-immune parser
537d4afb5ceSopenharmony_ci	 *
538d4afb5ceSopenharmony_ci	 * Notice that len can always be 1 if under attack, even over tls if
539d4afb5ceSopenharmony_ci	 * the server is compromised or malicious.
540d4afb5ceSopenharmony_ci	 */
541d4afb5ceSopenharmony_ci
542d4afb5ceSopenharmony_ci	while (len) {
543d4afb5ceSopenharmony_ci		lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
544d4afb5ceSopenharmony_ci		switch (par->state) {
545d4afb5ceSopenharmony_ci		case LMQCPP_IDLE:
546d4afb5ceSopenharmony_ci			par->packet_type_flags = *buf++;
547d4afb5ceSopenharmony_ci			len--;
548d4afb5ceSopenharmony_ci
549d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT)
550d4afb5ceSopenharmony_ci			/*
551d4afb5ceSopenharmony_ci			 * The case where we sent the connect, but we received
552d4afb5ceSopenharmony_ci			 * something else before any CONNACK
553d4afb5ceSopenharmony_ci			 */
554d4afb5ceSopenharmony_ci			if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
555d4afb5ceSopenharmony_ci			    par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
556d4afb5ceSopenharmony_ci				lwsl_notice("%s: server sent non-CONNACK\n",
557d4afb5ceSopenharmony_ci						__func__);
558d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
559d4afb5ceSopenharmony_ci			}
560d4afb5ceSopenharmony_ci#endif /* LWS_WITH_CLIENT */
561d4afb5ceSopenharmony_ci
562d4afb5ceSopenharmony_ci			n = map_flags[par->packet_type_flags >> 4];
563d4afb5ceSopenharmony_ci			/*
564d4afb5ceSopenharmony_ci			 *  Where a flag bit is marked as “Reserved”, it is
565d4afb5ceSopenharmony_ci			 *  reserved for future use and MUST be set to the value
566d4afb5ceSopenharmony_ci			 *  listed [MQTT-2.1.3-1].
567d4afb5ceSopenharmony_ci			 */
568d4afb5ceSopenharmony_ci			if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
569d4afb5ceSopenharmony_ci			    ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
570d4afb5ceSopenharmony_ci				lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n",
571d4afb5ceSopenharmony_ci					    __func__, lws_wsi_tag(wsi),
572d4afb5ceSopenharmony_ci					    par->packet_type_flags, n, (int)len + 1);
573d4afb5ceSopenharmony_ci				lwsl_hexdump_err(buf - 1, len + 1);
574d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
575d4afb5ceSopenharmony_ci			}
576d4afb5ceSopenharmony_ci
577d4afb5ceSopenharmony_ci			lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
578d4afb5ceSopenharmony_ci				   __func__, par->packet_type_flags >> 4,
579d4afb5ceSopenharmony_ci				   par->packet_type_flags & 0xf);
580d4afb5ceSopenharmony_ci
581d4afb5ceSopenharmony_ci			/* allows us to know if a property that can only be
582d4afb5ceSopenharmony_ci			 * given once, appears twice */
583d4afb5ceSopenharmony_ci			memset(par->props_seen, 0, sizeof(par->props_seen));
584d4afb5ceSopenharmony_ci			par->state = par->packet_type_flags & 0xf0;
585d4afb5ceSopenharmony_ci			break;
586d4afb5ceSopenharmony_ci
587d4afb5ceSopenharmony_ci		case LMQCPP_CONNECT_PACKET:
588d4afb5ceSopenharmony_ci			lwsl_debug("%s: received CONNECT pkt\n", __func__);
589d4afb5ceSopenharmony_ci			par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
590d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
591d4afb5ceSopenharmony_ci			break;
592d4afb5ceSopenharmony_ci
593d4afb5ceSopenharmony_ci		case LMQCPP_CONNECT_REMAINING_LEN_VBI:
594d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
595d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
596d4afb5ceSopenharmony_ci				break;
597d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
598d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
599d4afb5ceSopenharmony_ci				n = map_flags[ctl_pkt_type(par)];
600d4afb5ceSopenharmony_ci				lws_mqtt_str_init(&par->s_temp, par->temp,
601d4afb5ceSopenharmony_ci						  sizeof(par->temp), 0);
602d4afb5ceSopenharmony_ci				par->state = LMQCPP_CONNECT_VH_PNAME;
603d4afb5ceSopenharmony_ci				break;
604d4afb5ceSopenharmony_ci			default:
605d4afb5ceSopenharmony_ci				lwsl_notice("%s: bad vbi\n", __func__);
606d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
607d4afb5ceSopenharmony_ci			}
608d4afb5ceSopenharmony_ci			break;
609d4afb5ceSopenharmony_ci
610d4afb5ceSopenharmony_ci		case LMQCPP_CONNECT_VH_PNAME:
611d4afb5ceSopenharmony_ci			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
612d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
613d4afb5ceSopenharmony_ci				break;
614d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
615d4afb5ceSopenharmony_ci				if (par->s_temp.len != 4 ||
616d4afb5ceSopenharmony_ci				    memcmp(par->s_temp.buf, "MQTT",
617d4afb5ceSopenharmony_ci					   par->s_temp.len)) {
618d4afb5ceSopenharmony_ci					lwsl_notice("%s: protocol name: %.*s\n",
619d4afb5ceSopenharmony_ci						  __func__, par->s_temp.len,
620d4afb5ceSopenharmony_ci						  par->s_temp.buf);
621d4afb5ceSopenharmony_ci					goto send_unsupp_connack_and_close;
622d4afb5ceSopenharmony_ci				}
623d4afb5ceSopenharmony_ci				par->state = LMQCPP_CONNECT_VH_PVERSION;
624d4afb5ceSopenharmony_ci				break;
625d4afb5ceSopenharmony_ci			default:
626d4afb5ceSopenharmony_ci				lwsl_notice("%s: bad protocol name\n", __func__);
627d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
628d4afb5ceSopenharmony_ci			}
629d4afb5ceSopenharmony_ci			break;
630d4afb5ceSopenharmony_ci
631d4afb5ceSopenharmony_ci		case LMQCPP_CONNECT_VH_PVERSION:
632d4afb5ceSopenharmony_ci			par->conn_protocol_version = *buf++;
633d4afb5ceSopenharmony_ci			len--;
634d4afb5ceSopenharmony_ci			if (par->conn_protocol_version != 5) {
635d4afb5ceSopenharmony_ci				lwsl_info("%s: unsupported MQTT version %d\n",
636d4afb5ceSopenharmony_ci					  __func__, par->conn_protocol_version);
637d4afb5ceSopenharmony_ci				goto send_unsupp_connack_and_close;
638d4afb5ceSopenharmony_ci			}
639d4afb5ceSopenharmony_ci			par->state = LMQCPP_CONNECT_VH_FLAGS;
640d4afb5ceSopenharmony_ci			break;
641d4afb5ceSopenharmony_ci
642d4afb5ceSopenharmony_ci		case LMQCPP_CONNECT_VH_FLAGS:
643d4afb5ceSopenharmony_ci			par->cpkt_flags = *buf++;
644d4afb5ceSopenharmony_ci			len--;
645d4afb5ceSopenharmony_ci			if (par->cpkt_flags & 1) {
646d4afb5ceSopenharmony_ci				/*
647d4afb5ceSopenharmony_ci				 * The Server MUST validate that the reserved
648d4afb5ceSopenharmony_ci				 * flag in the CONNECT packet is set to 0
649d4afb5ceSopenharmony_ci				 * [MQTT-3.1.2-3].
650d4afb5ceSopenharmony_ci				 */
651d4afb5ceSopenharmony_ci				par->reason = LMQCP_REASON_MALFORMED_PACKET;
652d4afb5ceSopenharmony_ci				goto send_reason_and_close;
653d4afb5ceSopenharmony_ci			}
654d4afb5ceSopenharmony_ci			/*
655d4afb5ceSopenharmony_ci			 * conn_flags specifies the Will Properties that should
656d4afb5ceSopenharmony_ci			 * appear in the payload section
657d4afb5ceSopenharmony_ci			 */
658d4afb5ceSopenharmony_ci			lws_mqtt_2byte_init(&par->vbit);
659d4afb5ceSopenharmony_ci			par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
660d4afb5ceSopenharmony_ci			break;
661d4afb5ceSopenharmony_ci
662d4afb5ceSopenharmony_ci		case LMQCPP_CONNECT_VH_KEEPALIVE:
663d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
664d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
665d4afb5ceSopenharmony_ci				break;
666d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
667d4afb5ceSopenharmony_ci				par->keepalive = (uint16_t)par->vbit.value;
668d4afb5ceSopenharmony_ci				lws_mqtt_vbi_init(&par->vbit);
669d4afb5ceSopenharmony_ci				par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
670d4afb5ceSopenharmony_ci				break;
671d4afb5ceSopenharmony_ci			default:
672d4afb5ceSopenharmony_ci				lwsl_notice("%s: ka bad vbi\n", __func__);
673d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
674d4afb5ceSopenharmony_ci			}
675d4afb5ceSopenharmony_ci			break;
676d4afb5ceSopenharmony_ci
677d4afb5ceSopenharmony_ci		case LMQCPP_PINGRESP_ZERO:
678d4afb5ceSopenharmony_ci			len--;
679d4afb5ceSopenharmony_ci			/* second byte of PINGRESP must be zero */
680d4afb5ceSopenharmony_ci			if (*buf++)
681d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
682d4afb5ceSopenharmony_ci			goto cmd_completion;
683d4afb5ceSopenharmony_ci
684d4afb5ceSopenharmony_ci		case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
685d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
686d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
687d4afb5ceSopenharmony_ci				break;
688d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
689d4afb5ceSopenharmony_ci				/* reset consumption counter */
690d4afb5ceSopenharmony_ci				par->consumed = 0;
691d4afb5ceSopenharmony_ci				par->props_len = par->vbit.value;
692d4afb5ceSopenharmony_ci				lws_mqtt_vbi_init(&par->vbit);
693d4afb5ceSopenharmony_ci				par->state = LMQCPP_PROP_ID_VBI;
694d4afb5ceSopenharmony_ci				break;
695d4afb5ceSopenharmony_ci			default:
696d4afb5ceSopenharmony_ci				lwsl_notice("%s: connpr bad vbi\n", __func__);
697d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
698d4afb5ceSopenharmony_ci			}
699d4afb5ceSopenharmony_ci			break;
700d4afb5ceSopenharmony_ci
701d4afb5ceSopenharmony_ci		/* PUBREC */
702d4afb5ceSopenharmony_ci		case LMQCPP_PUBREC_PACKET:
703d4afb5ceSopenharmony_ci			lwsl_debug("%s: received PUBREC pkt\n", __func__);
704d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
705d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
706d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
707d4afb5ceSopenharmony_ci				break;
708d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
709d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
710d4afb5ceSopenharmony_ci				lwsl_debug("%s: PUBREC pkt len = %d\n",
711d4afb5ceSopenharmony_ci					   __func__, (int)par->cpkt_remlen);
712d4afb5ceSopenharmony_ci				if (par->cpkt_remlen < 2)
713d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
714d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBREC_VH_PKT_ID;
715d4afb5ceSopenharmony_ci				break;
716d4afb5ceSopenharmony_ci			default:
717d4afb5ceSopenharmony_ci				lwsl_notice("%s: pubrec bad vbi\n", __func__);
718d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
719d4afb5ceSopenharmony_ci			}
720d4afb5ceSopenharmony_ci			break;
721d4afb5ceSopenharmony_ci
722d4afb5ceSopenharmony_ci		case LMQCPP_PUBREC_VH_PKT_ID:
723d4afb5ceSopenharmony_ci			if (len < 2) {
724d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage 3\n", __func__);
725d4afb5ceSopenharmony_ci				return -1;
726d4afb5ceSopenharmony_ci			}
727d4afb5ceSopenharmony_ci
728d4afb5ceSopenharmony_ci			par->cpkt_id = lws_ser_ru16be(buf);
729d4afb5ceSopenharmony_ci			wsi->mqtt->ack_pkt_id = par->cpkt_id;
730d4afb5ceSopenharmony_ci			buf += 2;
731d4afb5ceSopenharmony_ci			len -= 2;
732d4afb5ceSopenharmony_ci			par->cpkt_remlen -= 2;
733d4afb5ceSopenharmony_ci			par->n = 0;
734d4afb5ceSopenharmony_ci
735d4afb5ceSopenharmony_ci			goto cmd_completion;
736d4afb5ceSopenharmony_ci
737d4afb5ceSopenharmony_ci		/* PUBREL */
738d4afb5ceSopenharmony_ci		case LMQCPP_PUBREL_PACKET:
739d4afb5ceSopenharmony_ci			lwsl_debug("%s: received PUBREL pkt\n", __func__);
740d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
741d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
742d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
743d4afb5ceSopenharmony_ci				break;
744d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
745d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
746d4afb5ceSopenharmony_ci				lwsl_debug("%s: PUBREL pkt len = %d\n",
747d4afb5ceSopenharmony_ci					   __func__, (int)par->cpkt_remlen);
748d4afb5ceSopenharmony_ci				if (par->cpkt_remlen < 2)
749d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
750d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBREL_VH_PKT_ID;
751d4afb5ceSopenharmony_ci				break;
752d4afb5ceSopenharmony_ci			default:
753d4afb5ceSopenharmony_ci				lwsl_err("%s: pubrel bad vbi\n", __func__);
754d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
755d4afb5ceSopenharmony_ci			}
756d4afb5ceSopenharmony_ci			break;
757d4afb5ceSopenharmony_ci
758d4afb5ceSopenharmony_ci		case LMQCPP_PUBREL_VH_PKT_ID:
759d4afb5ceSopenharmony_ci			if (len < 2) {
760d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage 3\n", __func__);
761d4afb5ceSopenharmony_ci				return -1;
762d4afb5ceSopenharmony_ci			}
763d4afb5ceSopenharmony_ci
764d4afb5ceSopenharmony_ci			par->cpkt_id = lws_ser_ru16be(buf);
765d4afb5ceSopenharmony_ci			wsi->mqtt->ack_pkt_id = par->cpkt_id;
766d4afb5ceSopenharmony_ci			buf += 2;
767d4afb5ceSopenharmony_ci			len -= 2;
768d4afb5ceSopenharmony_ci			par->cpkt_remlen -= 2;
769d4afb5ceSopenharmony_ci			par->n = 0;
770d4afb5ceSopenharmony_ci
771d4afb5ceSopenharmony_ci			goto cmd_completion;
772d4afb5ceSopenharmony_ci
773d4afb5ceSopenharmony_ci		/* PUBCOMP */
774d4afb5ceSopenharmony_ci		case LMQCPP_PUBCOMP_PACKET:
775d4afb5ceSopenharmony_ci			lwsl_debug("%s: received PUBCOMP pkt\n", __func__);
776d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
777d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
778d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
779d4afb5ceSopenharmony_ci				break;
780d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
781d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
782d4afb5ceSopenharmony_ci				lwsl_debug("%s: PUBCOMP pkt len = %d\n",
783d4afb5ceSopenharmony_ci					   __func__, (int)par->cpkt_remlen);
784d4afb5ceSopenharmony_ci				if (par->cpkt_remlen < 2)
785d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
786d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBCOMP_VH_PKT_ID;
787d4afb5ceSopenharmony_ci				break;
788d4afb5ceSopenharmony_ci			default:
789d4afb5ceSopenharmony_ci				lwsl_err("%s: pubcmp bad vbi\n", __func__);
790d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
791d4afb5ceSopenharmony_ci			}
792d4afb5ceSopenharmony_ci			break;
793d4afb5ceSopenharmony_ci
794d4afb5ceSopenharmony_ci		case LMQCPP_PUBCOMP_VH_PKT_ID:
795d4afb5ceSopenharmony_ci			if (len < 2) {
796d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage 3\n", __func__);
797d4afb5ceSopenharmony_ci				return -1;
798d4afb5ceSopenharmony_ci			}
799d4afb5ceSopenharmony_ci
800d4afb5ceSopenharmony_ci			par->cpkt_id = lws_ser_ru16be(buf);
801d4afb5ceSopenharmony_ci			wsi->mqtt->ack_pkt_id = par->cpkt_id;
802d4afb5ceSopenharmony_ci			buf += 2;
803d4afb5ceSopenharmony_ci			len -= 2;
804d4afb5ceSopenharmony_ci			par->cpkt_remlen -= 2;
805d4afb5ceSopenharmony_ci			par->n = 0;
806d4afb5ceSopenharmony_ci
807d4afb5ceSopenharmony_ci			goto cmd_completion;
808d4afb5ceSopenharmony_ci
809d4afb5ceSopenharmony_ci		case LMQCPP_PUBLISH_PACKET:
810d4afb5ceSopenharmony_ci			if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
811d4afb5ceSopenharmony_ci				lwsl_notice("%s: Topic rx before subscribing\n",
812d4afb5ceSopenharmony_ci					    __func__);
813d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
814d4afb5ceSopenharmony_ci			}
815d4afb5ceSopenharmony_ci			lwsl_info("%s: received PUBLISH pkt\n", __func__);
816d4afb5ceSopenharmony_ci			par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
817d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
818d4afb5ceSopenharmony_ci			break;
819d4afb5ceSopenharmony_ci		case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
820d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
821d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
822d4afb5ceSopenharmony_ci				break;
823d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
824d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
825d4afb5ceSopenharmony_ci				lwsl_debug("%s: PUBLISH pkt len = %d\n",
826d4afb5ceSopenharmony_ci					   __func__, (int)par->cpkt_remlen);
827d4afb5ceSopenharmony_ci				/* Move on to PUBLISH's variable header */
828d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBLISH_VH_TOPIC;
829d4afb5ceSopenharmony_ci				break;
830d4afb5ceSopenharmony_ci			default:
831d4afb5ceSopenharmony_ci				lwsl_notice("%s: pubrem bad vbi\n", __func__);
832d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
833d4afb5ceSopenharmony_ci			}
834d4afb5ceSopenharmony_ci			break;
835d4afb5ceSopenharmony_ci
836d4afb5ceSopenharmony_ci		case LMQCPP_PUBLISH_VH_TOPIC:
837d4afb5ceSopenharmony_ci		{
838d4afb5ceSopenharmony_ci			lws_mqtt_publish_param_t *pub = NULL;
839d4afb5ceSopenharmony_ci
840d4afb5ceSopenharmony_ci			if (len < 2) {
841d4afb5ceSopenharmony_ci				lwsl_notice("%s: topic too short\n", __func__);
842d4afb5ceSopenharmony_ci				return -1;
843d4afb5ceSopenharmony_ci			}
844d4afb5ceSopenharmony_ci
845d4afb5ceSopenharmony_ci			/* Topic len */
846d4afb5ceSopenharmony_ci			par->n = lws_ser_ru16be(buf);
847d4afb5ceSopenharmony_ci			buf += 2;
848d4afb5ceSopenharmony_ci			len -= 2;
849d4afb5ceSopenharmony_ci
850d4afb5ceSopenharmony_ci			if (len < par->n) {/* the way this is written... */
851d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage\n", __func__);
852d4afb5ceSopenharmony_ci				return -1;
853d4afb5ceSopenharmony_ci			}
854d4afb5ceSopenharmony_ci
855d4afb5ceSopenharmony_ci			/* Invalid topic len */
856d4afb5ceSopenharmony_ci			if (par->n == 0) {
857d4afb5ceSopenharmony_ci				lwsl_notice("%s: zero topic len\n", __func__);
858d4afb5ceSopenharmony_ci				par->reason = LMQCP_REASON_MALFORMED_PACKET;
859d4afb5ceSopenharmony_ci				goto send_reason_and_close;
860d4afb5ceSopenharmony_ci			}
861d4afb5ceSopenharmony_ci			lwsl_debug("%s: PUBLISH topic len %d\n",
862d4afb5ceSopenharmony_ci				   __func__, (int)par->n);
863d4afb5ceSopenharmony_ci			assert(!wsi->mqtt->rx_cpkt_param);
864d4afb5ceSopenharmony_ci			wsi->mqtt->rx_cpkt_param = lws_zalloc(
865d4afb5ceSopenharmony_ci				sizeof(lws_mqtt_publish_param_t), "rx pub param");
866d4afb5ceSopenharmony_ci			if (!wsi->mqtt->rx_cpkt_param)
867d4afb5ceSopenharmony_ci				goto oom;
868d4afb5ceSopenharmony_ci			pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
869d4afb5ceSopenharmony_ci
870d4afb5ceSopenharmony_ci			pub->topic_len = (uint16_t)par->n;
871d4afb5ceSopenharmony_ci
872d4afb5ceSopenharmony_ci			/* Topic Name */
873d4afb5ceSopenharmony_ci			pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
874d4afb5ceSopenharmony_ci							"rx publish topic");
875d4afb5ceSopenharmony_ci			if (!pub->topic)
876d4afb5ceSopenharmony_ci				goto oom;
877d4afb5ceSopenharmony_ci			lws_strncpy(pub->topic, (const char *)buf,
878d4afb5ceSopenharmony_ci				    (size_t)pub->topic_len + 1);
879d4afb5ceSopenharmony_ci			buf += pub->topic_len;
880d4afb5ceSopenharmony_ci			len -= pub->topic_len;
881d4afb5ceSopenharmony_ci
882d4afb5ceSopenharmony_ci			/* Extract QoS Level from Fixed Header Flags */
883d4afb5ceSopenharmony_ci			pub->qos = (lws_mqtt_qos_levels_t)
884d4afb5ceSopenharmony_ci					((par->packet_type_flags >> 1) & 0x3);
885d4afb5ceSopenharmony_ci
886d4afb5ceSopenharmony_ci			pub->payload_pos = 0;
887d4afb5ceSopenharmony_ci
888d4afb5ceSopenharmony_ci			pub->payload_len = par->cpkt_remlen -
889d4afb5ceSopenharmony_ci				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
890d4afb5ceSopenharmony_ci
891d4afb5ceSopenharmony_ci			switch (pub->qos) {
892d4afb5ceSopenharmony_ci			case QOS0:
893d4afb5ceSopenharmony_ci				par->state = LMQCPP_PAYLOAD;
894d4afb5ceSopenharmony_ci				if (pub->payload_len == 0)
895d4afb5ceSopenharmony_ci					goto cmd_completion;
896d4afb5ceSopenharmony_ci
897d4afb5ceSopenharmony_ci				break;
898d4afb5ceSopenharmony_ci			case QOS1:
899d4afb5ceSopenharmony_ci			case QOS2:
900d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBLISH_VH_PKT_ID;
901d4afb5ceSopenharmony_ci				break;
902d4afb5ceSopenharmony_ci			default:
903d4afb5ceSopenharmony_ci				par->reason = LMQCP_REASON_MALFORMED_PACKET;
904d4afb5ceSopenharmony_ci				lws_free_set_NULL(pub->topic);
905d4afb5ceSopenharmony_ci				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
906d4afb5ceSopenharmony_ci				goto send_reason_and_close;
907d4afb5ceSopenharmony_ci			}
908d4afb5ceSopenharmony_ci			break;
909d4afb5ceSopenharmony_ci		}
910d4afb5ceSopenharmony_ci		case LMQCPP_PUBLISH_VH_PKT_ID:
911d4afb5ceSopenharmony_ci		{
912d4afb5ceSopenharmony_ci			lws_mqtt_publish_param_t *pub =
913d4afb5ceSopenharmony_ci				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
914d4afb5ceSopenharmony_ci
915d4afb5ceSopenharmony_ci			if (len < 2) {
916d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage 2\n", __func__);
917d4afb5ceSopenharmony_ci				return -1;
918d4afb5ceSopenharmony_ci			}
919d4afb5ceSopenharmony_ci
920d4afb5ceSopenharmony_ci			par->cpkt_id = lws_ser_ru16be(buf);
921d4afb5ceSopenharmony_ci			buf += 2;
922d4afb5ceSopenharmony_ci			len -= 2;
923d4afb5ceSopenharmony_ci			wsi->mqtt->peer_ack_pkt_id = par->cpkt_id;
924d4afb5ceSopenharmony_ci			lwsl_debug("%s: Packet ID %d\n",
925d4afb5ceSopenharmony_ci					__func__, (int)par->cpkt_id);
926d4afb5ceSopenharmony_ci			par->state = LMQCPP_PAYLOAD;
927d4afb5ceSopenharmony_ci			pub->payload_pos = 0;
928d4afb5ceSopenharmony_ci			pub->payload_len = par->cpkt_remlen -
929d4afb5ceSopenharmony_ci				(unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
930d4afb5ceSopenharmony_ci			if (pub->payload_len == 0)
931d4afb5ceSopenharmony_ci				goto cmd_completion;
932d4afb5ceSopenharmony_ci
933d4afb5ceSopenharmony_ci			break;
934d4afb5ceSopenharmony_ci		}
935d4afb5ceSopenharmony_ci		case LMQCPP_PAYLOAD:
936d4afb5ceSopenharmony_ci		{
937d4afb5ceSopenharmony_ci			lws_mqtt_publish_param_t *pub =
938d4afb5ceSopenharmony_ci				(lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
939d4afb5ceSopenharmony_ci			if (pub == NULL) {
940d4afb5ceSopenharmony_ci				lwsl_err("%s: Uninitialized pub_param\n",
941d4afb5ceSopenharmony_ci						__func__);
942d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
943d4afb5ceSopenharmony_ci			}
944d4afb5ceSopenharmony_ci
945d4afb5ceSopenharmony_ci			pub->payload = buf;
946d4afb5ceSopenharmony_ci			goto cmd_completion;
947d4afb5ceSopenharmony_ci		}
948d4afb5ceSopenharmony_ci
949d4afb5ceSopenharmony_ci		case LMQCPP_CONNACK_PACKET:
950d4afb5ceSopenharmony_ci			if (!lwsi_role_client(wsi)) {
951d4afb5ceSopenharmony_ci				lwsl_err("%s: CONNACK is only Server to Client",
952d4afb5ceSopenharmony_ci						__func__);
953d4afb5ceSopenharmony_ci				goto send_unsupp_connack_and_close;
954d4afb5ceSopenharmony_ci			}
955d4afb5ceSopenharmony_ci
956d4afb5ceSopenharmony_ci			lwsl_debug("%s: received CONNACK pkt\n", __func__);
957d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
958d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
959d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
960d4afb5ceSopenharmony_ci				break;
961d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
962d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
963d4afb5ceSopenharmony_ci				lwsl_debug("%s: CONNACK pkt len = %d\n",
964d4afb5ceSopenharmony_ci					   __func__, (int)par->cpkt_remlen);
965d4afb5ceSopenharmony_ci				if (par->cpkt_remlen != 2)
966d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
967d4afb5ceSopenharmony_ci
968d4afb5ceSopenharmony_ci				par->state = LMQCPP_CONNACK_VH_FLAGS;
969d4afb5ceSopenharmony_ci				break;
970d4afb5ceSopenharmony_ci			default:
971d4afb5ceSopenharmony_ci				lwsl_notice("%s: connack bad vbi\n", __func__);
972d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
973d4afb5ceSopenharmony_ci			}
974d4afb5ceSopenharmony_ci			break;
975d4afb5ceSopenharmony_ci
976d4afb5ceSopenharmony_ci		case LMQCPP_CONNACK_VH_FLAGS:
977d4afb5ceSopenharmony_ci		{
978d4afb5ceSopenharmony_ci			lws_mqttc_t *c = &wsi->mqtt->client;
979d4afb5ceSopenharmony_ci			par->cpkt_flags = *buf++;
980d4afb5ceSopenharmony_ci			len--;
981d4afb5ceSopenharmony_ci
982d4afb5ceSopenharmony_ci			if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
983d4afb5ceSopenharmony_ci				/*
984d4afb5ceSopenharmony_ci				 * Byte 1 is the "Connect Acknowledge
985d4afb5ceSopenharmony_ci				 * Flags". Bits 7-1 are reserved and
986d4afb5ceSopenharmony_ci				 * MUST be set to 0.
987d4afb5ceSopenharmony_ci				 */
988d4afb5ceSopenharmony_ci				par->reason = LMQCP_REASON_MALFORMED_PACKET;
989d4afb5ceSopenharmony_ci				goto send_reason_and_close;
990d4afb5ceSopenharmony_ci			}
991d4afb5ceSopenharmony_ci			/*
992d4afb5ceSopenharmony_ci			 * If the Server accepts a connection with
993d4afb5ceSopenharmony_ci			 * CleanSession set to 1, the Server MUST set
994d4afb5ceSopenharmony_ci			 * Session Present to 0 in the CONNACK packet
995d4afb5ceSopenharmony_ci			 * in addition to setting a zero return code
996d4afb5ceSopenharmony_ci			 * in the CONNACK packet [MQTT-3.2.2-1]. If
997d4afb5ceSopenharmony_ci			 * the Server accepts a connection with
998d4afb5ceSopenharmony_ci			 * CleanSession set to 0, the value set in
999d4afb5ceSopenharmony_ci			 * Session Present depends on whether the
1000d4afb5ceSopenharmony_ci			 * Server already has stored Session state for
1001d4afb5ceSopenharmony_ci			 * the supplied client ID. If the Server has
1002d4afb5ceSopenharmony_ci			 * stored Session state, it MUST set
1003d4afb5ceSopenharmony_ci			 * SessionPresent to 1 in the CONNACK packet
1004d4afb5ceSopenharmony_ci			 * [MQTT-3.2.2-2]. If the Server does not have
1005d4afb5ceSopenharmony_ci			 * stored Session state, it MUST set Session
1006d4afb5ceSopenharmony_ci			 * Present to 0 in the CONNACK packet. This is
1007d4afb5ceSopenharmony_ci			 * in addition to setting a zero return code
1008d4afb5ceSopenharmony_ci			 * in the CONNACK packet [MQTT-3.2.2-3].
1009d4afb5ceSopenharmony_ci			 */
1010d4afb5ceSopenharmony_ci			if ((c->conn_flags & LMQCFT_CLEAN_START) &&
1011d4afb5ceSopenharmony_ci			    (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
1012d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1013d4afb5ceSopenharmony_ci
1014d4afb5ceSopenharmony_ci			wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags &
1015d4afb5ceSopenharmony_ci						      LMQCFT_SESSION_PRESENT);
1016d4afb5ceSopenharmony_ci
1017d4afb5ceSopenharmony_ci			/* Move on to Connect Return Code */
1018d4afb5ceSopenharmony_ci			par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
1019d4afb5ceSopenharmony_ci			break;
1020d4afb5ceSopenharmony_ci		}
1021d4afb5ceSopenharmony_ci		case LMQCPP_CONNACK_VH_RETURN_CODE:
1022d4afb5ceSopenharmony_ci			par->conn_rc = *buf++;
1023d4afb5ceSopenharmony_ci			len--;
1024d4afb5ceSopenharmony_ci			/*
1025d4afb5ceSopenharmony_ci			 * If a server sends a CONNACK packet containing a
1026d4afb5ceSopenharmony_ci			 * non-zero return code it MUST then close the Network
1027d4afb5ceSopenharmony_ci			 * Connection [MQTT-3.2.2-5]
1028d4afb5ceSopenharmony_ci			 */
1029d4afb5ceSopenharmony_ci			switch (par->conn_rc) {
1030d4afb5ceSopenharmony_ci			case 0:
1031d4afb5ceSopenharmony_ci				goto cmd_completion;
1032d4afb5ceSopenharmony_ci			case 1:
1033d4afb5ceSopenharmony_ci			case 2:
1034d4afb5ceSopenharmony_ci			case 3:
1035d4afb5ceSopenharmony_ci			case 4:
1036d4afb5ceSopenharmony_ci			case 5:
1037d4afb5ceSopenharmony_ci				par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
1038d4afb5ceSopenharmony_ci						par->conn_rc - 1;
1039d4afb5ceSopenharmony_ci				goto send_reason_and_close;
1040d4afb5ceSopenharmony_ci			default:
1041d4afb5ceSopenharmony_ci				lwsl_notice("%s: bad connack retcode\n", __func__);
1042d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1043d4afb5ceSopenharmony_ci			}
1044d4afb5ceSopenharmony_ci			break;
1045d4afb5ceSopenharmony_ci
1046d4afb5ceSopenharmony_ci		/* SUBACK */
1047d4afb5ceSopenharmony_ci		case LMQCPP_SUBACK_PACKET:
1048d4afb5ceSopenharmony_ci			if (!lwsi_role_client(wsi)) {
1049d4afb5ceSopenharmony_ci				lwsl_err("%s: SUBACK is only Server to Client",
1050d4afb5ceSopenharmony_ci						__func__);
1051d4afb5ceSopenharmony_ci				goto send_unsupp_connack_and_close;
1052d4afb5ceSopenharmony_ci			}
1053d4afb5ceSopenharmony_ci
1054d4afb5ceSopenharmony_ci			lwsl_debug("%s: received SUBACK pkt\n", __func__);
1055d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
1056d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1057d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1058d4afb5ceSopenharmony_ci				break;
1059d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1060d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
1061d4afb5ceSopenharmony_ci				lwsl_debug("%s: SUBACK pkt len = %d\n",
1062d4afb5ceSopenharmony_ci					   __func__, (int)par->cpkt_remlen);
1063d4afb5ceSopenharmony_ci				if (par->cpkt_remlen <= 2)
1064d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1065d4afb5ceSopenharmony_ci				par->state = LMQCPP_SUBACK_VH_PKT_ID;
1066d4afb5ceSopenharmony_ci				break;
1067d4afb5ceSopenharmony_ci			default:
1068d4afb5ceSopenharmony_ci				lwsl_notice("%s: suback bad vbi\n", __func__);
1069d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1070d4afb5ceSopenharmony_ci			}
1071d4afb5ceSopenharmony_ci			break;
1072d4afb5ceSopenharmony_ci
1073d4afb5ceSopenharmony_ci		case LMQCPP_SUBACK_VH_PKT_ID:
1074d4afb5ceSopenharmony_ci
1075d4afb5ceSopenharmony_ci			if (len < 2) {
1076d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage 4\n", __func__);
1077d4afb5ceSopenharmony_ci				return -1;
1078d4afb5ceSopenharmony_ci			}
1079d4afb5ceSopenharmony_ci
1080d4afb5ceSopenharmony_ci			par->cpkt_id = lws_ser_ru16be(buf);
1081d4afb5ceSopenharmony_ci			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1082d4afb5ceSopenharmony_ci			buf += 2;
1083d4afb5ceSopenharmony_ci			len -= 2;
1084d4afb5ceSopenharmony_ci			par->cpkt_remlen -= 2;
1085d4afb5ceSopenharmony_ci			par->n = 0;
1086d4afb5ceSopenharmony_ci			par->state = LMQCPP_SUBACK_PAYLOAD;
1087d4afb5ceSopenharmony_ci			*par->temp = 0;
1088d4afb5ceSopenharmony_ci			break;
1089d4afb5ceSopenharmony_ci
1090d4afb5ceSopenharmony_ci		case LMQCPP_SUBACK_PAYLOAD:
1091d4afb5ceSopenharmony_ci		{
1092d4afb5ceSopenharmony_ci			lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
1093d4afb5ceSopenharmony_ci
1094d4afb5ceSopenharmony_ci			len--;
1095d4afb5ceSopenharmony_ci			switch (qos) {
1096d4afb5ceSopenharmony_ci				case QOS0:
1097d4afb5ceSopenharmony_ci				case QOS1:
1098d4afb5ceSopenharmony_ci				case QOS2:
1099d4afb5ceSopenharmony_ci					break;
1100d4afb5ceSopenharmony_ci				case FAILURE_QOS_LEVEL:
1101d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1102d4afb5ceSopenharmony_ci
1103d4afb5ceSopenharmony_ci				default:
1104d4afb5ceSopenharmony_ci					par->reason = LMQCP_REASON_MALFORMED_PACKET;
1105d4afb5ceSopenharmony_ci					goto send_reason_and_close;
1106d4afb5ceSopenharmony_ci			}
1107d4afb5ceSopenharmony_ci
1108d4afb5ceSopenharmony_ci			if (++(par->n) == par->cpkt_remlen) {
1109d4afb5ceSopenharmony_ci				par->n = 0;
1110d4afb5ceSopenharmony_ci				goto cmd_completion;
1111d4afb5ceSopenharmony_ci			}
1112d4afb5ceSopenharmony_ci
1113d4afb5ceSopenharmony_ci			break;
1114d4afb5ceSopenharmony_ci		}
1115d4afb5ceSopenharmony_ci
1116d4afb5ceSopenharmony_ci		/* UNSUBACK */
1117d4afb5ceSopenharmony_ci		case LMQCPP_UNSUBACK_PACKET:
1118d4afb5ceSopenharmony_ci			if (!lwsi_role_client(wsi)) {
1119d4afb5ceSopenharmony_ci				lwsl_err("%s: UNSUBACK is only Server to Client",
1120d4afb5ceSopenharmony_ci						__func__);
1121d4afb5ceSopenharmony_ci				goto send_unsupp_connack_and_close;
1122d4afb5ceSopenharmony_ci			}
1123d4afb5ceSopenharmony_ci
1124d4afb5ceSopenharmony_ci			lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
1125d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
1126d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1127d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1128d4afb5ceSopenharmony_ci				break;
1129d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1130d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
1131d4afb5ceSopenharmony_ci				lwsl_debug("%s: UNSUBACK pkt len = %d\n",
1132d4afb5ceSopenharmony_ci					   __func__, (int)par->cpkt_remlen);
1133d4afb5ceSopenharmony_ci				if (par->cpkt_remlen < 2)
1134d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1135d4afb5ceSopenharmony_ci				par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
1136d4afb5ceSopenharmony_ci				break;
1137d4afb5ceSopenharmony_ci			default:
1138d4afb5ceSopenharmony_ci				lwsl_notice("%s: unsuback bad vbi\n", __func__);
1139d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1140d4afb5ceSopenharmony_ci			}
1141d4afb5ceSopenharmony_ci			break;
1142d4afb5ceSopenharmony_ci
1143d4afb5ceSopenharmony_ci		case LMQCPP_UNSUBACK_VH_PKT_ID:
1144d4afb5ceSopenharmony_ci
1145d4afb5ceSopenharmony_ci			if (len < 2) {
1146d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage 3\n", __func__);
1147d4afb5ceSopenharmony_ci				return -1;
1148d4afb5ceSopenharmony_ci			}
1149d4afb5ceSopenharmony_ci
1150d4afb5ceSopenharmony_ci			par->cpkt_id = lws_ser_ru16be(buf);
1151d4afb5ceSopenharmony_ci			wsi->mqtt->ack_pkt_id = par->cpkt_id;
1152d4afb5ceSopenharmony_ci			buf += 2;
1153d4afb5ceSopenharmony_ci			len -= 2;
1154d4afb5ceSopenharmony_ci			par->cpkt_remlen -= 2;
1155d4afb5ceSopenharmony_ci			par->n = 0;
1156d4afb5ceSopenharmony_ci
1157d4afb5ceSopenharmony_ci			goto cmd_completion;
1158d4afb5ceSopenharmony_ci
1159d4afb5ceSopenharmony_ci		case LMQCPP_PUBACK_PACKET:
1160d4afb5ceSopenharmony_ci			lws_mqtt_vbi_init(&par->vbit);
1161d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1162d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1163d4afb5ceSopenharmony_ci				break;
1164d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1165d4afb5ceSopenharmony_ci				par->cpkt_remlen = par->vbit.value;
1166d4afb5ceSopenharmony_ci				lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
1167d4afb5ceSopenharmony_ci					  (int)par->cpkt_remlen);
1168d4afb5ceSopenharmony_ci				/*
1169d4afb5ceSopenharmony_ci				 * must be 4 or more, with special case that 2
1170d4afb5ceSopenharmony_ci				 * means success with no reason code or props
1171d4afb5ceSopenharmony_ci				 */
1172d4afb5ceSopenharmony_ci				if (par->cpkt_remlen <= 1 ||
1173d4afb5ceSopenharmony_ci				    par->cpkt_remlen == 3)
1174d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1175d4afb5ceSopenharmony_ci
1176d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1177d4afb5ceSopenharmony_ci				par->fixed_seen[2] = par->fixed_seen[3] = 0;
1178d4afb5ceSopenharmony_ci				par->fixed = 0;
1179d4afb5ceSopenharmony_ci				par->n = 0;
1180d4afb5ceSopenharmony_ci				break;
1181d4afb5ceSopenharmony_ci			default:
1182d4afb5ceSopenharmony_ci				lwsl_notice("%s: puback bad vbi\n", __func__);
1183d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1184d4afb5ceSopenharmony_ci			}
1185d4afb5ceSopenharmony_ci			break;
1186d4afb5ceSopenharmony_ci
1187d4afb5ceSopenharmony_ci		case LMQCPP_PUBACK_VH_PKT_ID:
1188d4afb5ceSopenharmony_ci			/*
1189d4afb5ceSopenharmony_ci			 * There are 3 fixed bytes and then a VBI for the
1190d4afb5ceSopenharmony_ci			 * property section length
1191d4afb5ceSopenharmony_ci			 */
1192d4afb5ceSopenharmony_ci			par->fixed_seen[par->fixed++] = *buf++;
1193d4afb5ceSopenharmony_ci			if (len < par->cpkt_remlen - par->n) {
1194d4afb5ceSopenharmony_ci				lwsl_notice("%s: len breakage 4\n", __func__);
1195d4afb5ceSopenharmony_ci				return -1;
1196d4afb5ceSopenharmony_ci			}
1197d4afb5ceSopenharmony_ci			len--;
1198d4afb5ceSopenharmony_ci			par->n++;
1199d4afb5ceSopenharmony_ci			if (par->fixed == 2)
1200d4afb5ceSopenharmony_ci				par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
1201d4afb5ceSopenharmony_ci
1202d4afb5ceSopenharmony_ci			if (par->fixed == 3) {
1203d4afb5ceSopenharmony_ci				lws_mqtt_vbi_init(&par->vbit);
1204d4afb5ceSopenharmony_ci				par->props_consumed = 0;
1205d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
1206d4afb5ceSopenharmony_ci			}
1207d4afb5ceSopenharmony_ci			/* length of 2 is truncated packet and we completed it */
1208d4afb5ceSopenharmony_ci			if (par->cpkt_remlen == par->fixed)
1209d4afb5ceSopenharmony_ci				goto cmd_completion;
1210d4afb5ceSopenharmony_ci			break;
1211d4afb5ceSopenharmony_ci
1212d4afb5ceSopenharmony_ci		case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
1213d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1214d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1215d4afb5ceSopenharmony_ci				break;
1216d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1217d4afb5ceSopenharmony_ci				par->props_len = par->vbit.value;
1218d4afb5ceSopenharmony_ci				lwsl_info("%s: PUBACK props len = %d\n",
1219d4afb5ceSopenharmony_ci					  __func__, (int)par->cpkt_remlen);
1220d4afb5ceSopenharmony_ci				/*
1221d4afb5ceSopenharmony_ci				 * If there are no properties, this is a
1222d4afb5ceSopenharmony_ci				 * command completion event in itself
1223d4afb5ceSopenharmony_ci				 */
1224d4afb5ceSopenharmony_ci				if (!par->props_len)
1225d4afb5ceSopenharmony_ci					goto cmd_completion;
1226d4afb5ceSopenharmony_ci
1227d4afb5ceSopenharmony_ci				/*
1228d4afb5ceSopenharmony_ci				 * Otherwise consume the properties before
1229d4afb5ceSopenharmony_ci				 * completing the command
1230d4afb5ceSopenharmony_ci				 */
1231d4afb5ceSopenharmony_ci				lws_mqtt_vbi_init(&par->vbit);
1232d4afb5ceSopenharmony_ci				par->state = LMQCPP_PUBACK_VH_PKT_ID;
1233d4afb5ceSopenharmony_ci				break;
1234d4afb5ceSopenharmony_ci			default:
1235d4afb5ceSopenharmony_ci				lwsl_notice("%s: puback pr bad vbi\n", __func__);
1236d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1237d4afb5ceSopenharmony_ci			}
1238d4afb5ceSopenharmony_ci			break;
1239d4afb5ceSopenharmony_ci
1240d4afb5ceSopenharmony_ci		case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
1241d4afb5ceSopenharmony_ci			/*
1242d4afb5ceSopenharmony_ci			 * TODO: stash the props
1243d4afb5ceSopenharmony_ci			 */
1244d4afb5ceSopenharmony_ci			par->props_consumed++;
1245d4afb5ceSopenharmony_ci			len--;
1246d4afb5ceSopenharmony_ci			buf++;
1247d4afb5ceSopenharmony_ci			if (par->props_len != par->props_consumed)
1248d4afb5ceSopenharmony_ci				break;
1249d4afb5ceSopenharmony_ci
1250d4afb5ceSopenharmony_cicmd_completion:
1251d4afb5ceSopenharmony_ci			/*
1252d4afb5ceSopenharmony_ci			 * We come here when we understood we just processed
1253d4afb5ceSopenharmony_ci			 * the last byte of a command packet, regardless of the
1254d4afb5ceSopenharmony_ci			 * packet type
1255d4afb5ceSopenharmony_ci			 */
1256d4afb5ceSopenharmony_ci			par->state = LMQCPP_IDLE;
1257d4afb5ceSopenharmony_ci
1258d4afb5ceSopenharmony_ci			switch (par->packet_type_flags >> 4) {
1259d4afb5ceSopenharmony_ci			case LMQCP_STOC_CONNACK:
1260d4afb5ceSopenharmony_ci				lwsl_info("%s: cmd_completion: CONNACK\n",
1261d4afb5ceSopenharmony_ci					  __func__);
1262d4afb5ceSopenharmony_ci
1263d4afb5ceSopenharmony_ci				/*
1264d4afb5ceSopenharmony_ci				 * Getting the CONNACK means we are the first,
1265d4afb5ceSopenharmony_ci				 * the nwsi, and we succeeded to create a new
1266d4afb5ceSopenharmony_ci				 * network connection ourselves.
1267d4afb5ceSopenharmony_ci				 *
1268d4afb5ceSopenharmony_ci				 * Since others may join us sharing the nwsi,
1269d4afb5ceSopenharmony_ci				 * and we may close while they still want to use
1270d4afb5ceSopenharmony_ci				 * it, our wsi lifecycle alone can no longer
1271d4afb5ceSopenharmony_ci				 * define the lifecycle of the nwsi... it means
1272d4afb5ceSopenharmony_ci				 * we need to do a "magic trick" and instead of
1273d4afb5ceSopenharmony_ci				 * being both the nwsi and act like a child
1274d4afb5ceSopenharmony_ci				 * stream, create a new wsi to take over the
1275d4afb5ceSopenharmony_ci				 * nwsi duties and turn our wsi into a child of
1276d4afb5ceSopenharmony_ci				 * the nwsi with its own lifecycle.
1277d4afb5ceSopenharmony_ci				 *
1278d4afb5ceSopenharmony_ci				 * The nwsi gets a mostly empty wsi->nwsi used
1279d4afb5ceSopenharmony_ci				 * to track already-subscribed topics globally
1280d4afb5ceSopenharmony_ci				 * for the connection.
1281d4afb5ceSopenharmony_ci				 */
1282d4afb5ceSopenharmony_ci
1283d4afb5ceSopenharmony_ci				/* we were under SENT_CLIENT_HANDSHAKE timeout */
1284d4afb5ceSopenharmony_ci				lws_set_timeout(wsi, 0, 0);
1285d4afb5ceSopenharmony_ci
1286d4afb5ceSopenharmony_ci				w = lws_create_new_server_wsi(wsi->a.vhost,
1287d4afb5ceSopenharmony_ci							      wsi->tsi, "mqtt_sid1");
1288d4afb5ceSopenharmony_ci				if (!w) {
1289d4afb5ceSopenharmony_ci					lwsl_notice("%s: sid 1 migrate failed\n",
1290d4afb5ceSopenharmony_ci							__func__);
1291d4afb5ceSopenharmony_ci					return -1;
1292d4afb5ceSopenharmony_ci				}
1293d4afb5ceSopenharmony_ci
1294d4afb5ceSopenharmony_ci				wsi->mux.highest_sid = 1;
1295d4afb5ceSopenharmony_ci				lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
1296d4afb5ceSopenharmony_ci
1297d4afb5ceSopenharmony_ci				wsi->mux_substream = 1;
1298d4afb5ceSopenharmony_ci				w->mux_substream = 1;
1299d4afb5ceSopenharmony_ci				w->client_mux_substream = 1;
1300d4afb5ceSopenharmony_ci				wsi->client_mux_migrated = 1;
1301d4afb5ceSopenharmony_ci				wsi->told_user_closed = 1; /* don't tell nwsi closed */
1302d4afb5ceSopenharmony_ci
1303d4afb5ceSopenharmony_ci				lwsi_set_state(w, LRS_ESTABLISHED);
1304d4afb5ceSopenharmony_ci				lwsi_set_state(wsi, LRS_ESTABLISHED);
1305d4afb5ceSopenharmony_ci				lwsi_set_role(w, lwsi_role(wsi));
1306d4afb5ceSopenharmony_ci
1307d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT)
1308d4afb5ceSopenharmony_ci				w->flags = wsi->flags;
1309d4afb5ceSopenharmony_ci#endif
1310d4afb5ceSopenharmony_ci
1311d4afb5ceSopenharmony_ci				w->mqtt = wsi->mqtt;
1312d4afb5ceSopenharmony_ci				wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
1313d4afb5ceSopenharmony_ci				if (!wsi->mqtt)
1314d4afb5ceSopenharmony_ci					return -1;
1315d4afb5ceSopenharmony_ci				w->mqtt->wsi = w;
1316d4afb5ceSopenharmony_ci				w->a.protocol = wsi->a.protocol;
1317d4afb5ceSopenharmony_ci				if (w->user_space &&
1318d4afb5ceSopenharmony_ci				    !w->user_space_externally_allocated)
1319d4afb5ceSopenharmony_ci					lws_free_set_NULL(w->user_space);
1320d4afb5ceSopenharmony_ci				w->user_space = wsi->user_space;
1321d4afb5ceSopenharmony_ci				wsi->user_space = NULL;
1322d4afb5ceSopenharmony_ci				w->user_space_externally_allocated =
1323d4afb5ceSopenharmony_ci					wsi->user_space_externally_allocated;
1324d4afb5ceSopenharmony_ci				if (lws_ensure_user_space(w))
1325d4afb5ceSopenharmony_ci					goto bail1;
1326d4afb5ceSopenharmony_ci				w->a.opaque_user_data = wsi->a.opaque_user_data;
1327d4afb5ceSopenharmony_ci				wsi->a.opaque_user_data = NULL;
1328d4afb5ceSopenharmony_ci				w->stash = wsi->stash;
1329d4afb5ceSopenharmony_ci				wsi->stash = NULL;
1330d4afb5ceSopenharmony_ci
1331d4afb5ceSopenharmony_ci				lws_mux_mark_immortal(w);
1332d4afb5ceSopenharmony_ci
1333d4afb5ceSopenharmony_ci				lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n",
1334d4afb5ceSopenharmony_ci						__func__, lws_wsi_tag(wsi),
1335d4afb5ceSopenharmony_ci						lws_wsi_tag(w));
1336d4afb5ceSopenharmony_ci
1337d4afb5ceSopenharmony_ci				/*
1338d4afb5ceSopenharmony_ci				 * It was the last thing we were waiting for
1339d4afb5ceSopenharmony_ci				 * before we can be fully ESTABLISHED
1340d4afb5ceSopenharmony_ci				 */
1341d4afb5ceSopenharmony_ci				if (lws_mqtt_set_client_established(w)) {
1342d4afb5ceSopenharmony_ci					lwsl_notice("%s: set EST fail\n", __func__);
1343d4afb5ceSopenharmony_ci					return -1;
1344d4afb5ceSopenharmony_ci				}
1345d4afb5ceSopenharmony_ci
1346d4afb5ceSopenharmony_ci				/* get the ball rolling */
1347d4afb5ceSopenharmony_ci				lws_validity_confirmed(wsi);
1348d4afb5ceSopenharmony_ci
1349d4afb5ceSopenharmony_ci				/* well, add the queued guys as children */
1350d4afb5ceSopenharmony_ci				lws_wsi_mux_apply_queue(wsi);
1351d4afb5ceSopenharmony_ci				break;
1352d4afb5ceSopenharmony_ci
1353d4afb5ceSopenharmony_cibail1:
1354d4afb5ceSopenharmony_ci				/* undo the insert */
1355d4afb5ceSopenharmony_ci				wsi->mux.child_list = w->mux.sibling_list;
1356d4afb5ceSopenharmony_ci				wsi->mux.child_count--;
1357d4afb5ceSopenharmony_ci
1358d4afb5ceSopenharmony_ci				if (w->user_space)
1359d4afb5ceSopenharmony_ci					lws_free_set_NULL(w->user_space);
1360d4afb5ceSopenharmony_ci				w->a.vhost->protocols[0].callback(w,
1361d4afb5ceSopenharmony_ci							LWS_CALLBACK_WSI_DESTROY,
1362d4afb5ceSopenharmony_ci							NULL, NULL, 0);
1363d4afb5ceSopenharmony_ci				__lws_vhost_unbind_wsi(w); /* cx + vh lock */
1364d4afb5ceSopenharmony_ci				lws_free(w);
1365d4afb5ceSopenharmony_ci
1366d4afb5ceSopenharmony_ci				return 0;
1367d4afb5ceSopenharmony_ci
1368d4afb5ceSopenharmony_ci			case LMQCP_PUBREC:
1369d4afb5ceSopenharmony_ci				lwsl_err("%s: cmd_completion: PUBREC\n",
1370d4afb5ceSopenharmony_ci						__func__);
1371d4afb5ceSopenharmony_ci				/*
1372d4afb5ceSopenharmony_ci				 * Figure out which child asked for this
1373d4afb5ceSopenharmony_ci				 */
1374d4afb5ceSopenharmony_ci				n = 0;
1375d4afb5ceSopenharmony_ci				lws_start_foreach_ll(struct lws *, w,
1376d4afb5ceSopenharmony_ci						     wsi->mux.child_list) {
1377d4afb5ceSopenharmony_ci					if (w->mqtt->unacked_publish &&
1378d4afb5ceSopenharmony_ci					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1379d4afb5ceSopenharmony_ci						char requested_close = 0;
1380d4afb5ceSopenharmony_ci
1381d4afb5ceSopenharmony_ci						w->mqtt->unacked_publish = 0;
1382d4afb5ceSopenharmony_ci						w->mqtt->unacked_pubrel = 1;
1383d4afb5ceSopenharmony_ci
1384d4afb5ceSopenharmony_ci						if (user_callback_handle_rxflow(
1385d4afb5ceSopenharmony_ci							    w->a.protocol->callback,
1386d4afb5ceSopenharmony_ci							    w, LWS_CALLBACK_MQTT_ACK,
1387d4afb5ceSopenharmony_ci							    w->user_space, NULL, 0) < 0) {
1388d4afb5ceSopenharmony_ci							lwsl_info("%s: MQTT_ACK requests close\n",
1389d4afb5ceSopenharmony_ci								 __func__);
1390d4afb5ceSopenharmony_ci							requested_close = 1;
1391d4afb5ceSopenharmony_ci						}
1392d4afb5ceSopenharmony_ci						n = 1;
1393d4afb5ceSopenharmony_ci
1394d4afb5ceSopenharmony_ci						/*
1395d4afb5ceSopenharmony_ci						 * We got an assertive PUBREC,
1396d4afb5ceSopenharmony_ci						 * no need for timeout wait
1397d4afb5ceSopenharmony_ci						 * any more
1398d4afb5ceSopenharmony_ci						 */
1399d4afb5ceSopenharmony_ci						lws_sul_cancel(&w->mqtt->
1400d4afb5ceSopenharmony_ci							  sul_qos_puback_pubrec_wait);
1401d4afb5ceSopenharmony_ci
1402d4afb5ceSopenharmony_ci						if (requested_close) {
1403d4afb5ceSopenharmony_ci							__lws_close_free_wsi(w,
1404d4afb5ceSopenharmony_ci								0, "ack cb");
1405d4afb5ceSopenharmony_ci							break;
1406d4afb5ceSopenharmony_ci						}
1407d4afb5ceSopenharmony_ci
1408d4afb5ceSopenharmony_ci						break;
1409d4afb5ceSopenharmony_ci					}
1410d4afb5ceSopenharmony_ci				} lws_end_foreach_ll(w, mux.sibling_list);
1411d4afb5ceSopenharmony_ci
1412d4afb5ceSopenharmony_ci				if (!n) {
1413d4afb5ceSopenharmony_ci					lwsl_err("%s: unsolicited PUBREC\n",
1414d4afb5ceSopenharmony_ci							__func__);
1415d4afb5ceSopenharmony_ci					return -1;
1416d4afb5ceSopenharmony_ci				}
1417d4afb5ceSopenharmony_ci				wsi->mqtt->send_pubrel = 1;
1418d4afb5ceSopenharmony_ci				lws_callback_on_writable(wsi);
1419d4afb5ceSopenharmony_ci				break;
1420d4afb5ceSopenharmony_ci
1421d4afb5ceSopenharmony_ci			case LMQCP_PUBCOMP:
1422d4afb5ceSopenharmony_ci				lwsl_err("%s: cmd_completion: PUBCOMP\n",
1423d4afb5ceSopenharmony_ci						__func__);
1424d4afb5ceSopenharmony_ci				n = 0;
1425d4afb5ceSopenharmony_ci				lws_start_foreach_ll(struct lws *, w,
1426d4afb5ceSopenharmony_ci						     wsi->mux.child_list) {
1427d4afb5ceSopenharmony_ci					if (w->mqtt->unacked_pubrel > 0 &&
1428d4afb5ceSopenharmony_ci					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1429d4afb5ceSopenharmony_ci						w->mqtt->unacked_pubrel = 0;
1430d4afb5ceSopenharmony_ci						n = 1;
1431d4afb5ceSopenharmony_ci					}
1432d4afb5ceSopenharmony_ci				} lws_end_foreach_ll(w, mux.sibling_list);
1433d4afb5ceSopenharmony_ci
1434d4afb5ceSopenharmony_ci				if (!n) {
1435d4afb5ceSopenharmony_ci					lwsl_err("%s: unsolicited PUBCOMP\n",
1436d4afb5ceSopenharmony_ci							__func__);
1437d4afb5ceSopenharmony_ci					return -1;
1438d4afb5ceSopenharmony_ci				}
1439d4afb5ceSopenharmony_ci
1440d4afb5ceSopenharmony_ci				/*
1441d4afb5ceSopenharmony_ci				 * If we published something and PUBCOMP arrived,
1442d4afb5ceSopenharmony_ci				 * our connection is definitely working in both
1443d4afb5ceSopenharmony_ci				 * directions at the moment.
1444d4afb5ceSopenharmony_ci				 */
1445d4afb5ceSopenharmony_ci				lws_validity_confirmed(wsi);
1446d4afb5ceSopenharmony_ci				break;
1447d4afb5ceSopenharmony_ci
1448d4afb5ceSopenharmony_ci			case LMQCP_PUBREL:
1449d4afb5ceSopenharmony_ci				lwsl_err("%s: cmd_completion: PUBREL\n",
1450d4afb5ceSopenharmony_ci						__func__);
1451d4afb5ceSopenharmony_ci				wsi->mqtt->send_pubcomp = 1;
1452d4afb5ceSopenharmony_ci				lws_callback_on_writable(wsi);
1453d4afb5ceSopenharmony_ci				break;
1454d4afb5ceSopenharmony_ci
1455d4afb5ceSopenharmony_ci			case LMQCP_PUBACK:
1456d4afb5ceSopenharmony_ci				lwsl_info("%s: cmd_completion: PUBACK\n",
1457d4afb5ceSopenharmony_ci						__func__);
1458d4afb5ceSopenharmony_ci
1459d4afb5ceSopenharmony_ci				/*
1460d4afb5ceSopenharmony_ci				 * Figure out which child asked for this
1461d4afb5ceSopenharmony_ci				 */
1462d4afb5ceSopenharmony_ci
1463d4afb5ceSopenharmony_ci				n = 0;
1464d4afb5ceSopenharmony_ci				lws_start_foreach_ll(struct lws *, w,
1465d4afb5ceSopenharmony_ci						      wsi->mux.child_list) {
1466d4afb5ceSopenharmony_ci					if (w->mqtt->unacked_publish &&
1467d4afb5ceSopenharmony_ci					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1468d4afb5ceSopenharmony_ci						char requested_close = 0;
1469d4afb5ceSopenharmony_ci
1470d4afb5ceSopenharmony_ci						w->mqtt->unacked_publish = 0;
1471d4afb5ceSopenharmony_ci						if (user_callback_handle_rxflow(
1472d4afb5ceSopenharmony_ci							    w->a.protocol->callback,
1473d4afb5ceSopenharmony_ci							    w, LWS_CALLBACK_MQTT_ACK,
1474d4afb5ceSopenharmony_ci							    w->user_space, NULL, 0) < 0) {
1475d4afb5ceSopenharmony_ci							lwsl_info("%s: MQTT_ACK requests close\n",
1476d4afb5ceSopenharmony_ci								 __func__);
1477d4afb5ceSopenharmony_ci							requested_close = 1;
1478d4afb5ceSopenharmony_ci						}
1479d4afb5ceSopenharmony_ci						n = 1;
1480d4afb5ceSopenharmony_ci
1481d4afb5ceSopenharmony_ci						/*
1482d4afb5ceSopenharmony_ci						 * We got an assertive PUBACK,
1483d4afb5ceSopenharmony_ci						 * no need for ACK timeout wait
1484d4afb5ceSopenharmony_ci						 * any more
1485d4afb5ceSopenharmony_ci						 */
1486d4afb5ceSopenharmony_ci						lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait);
1487d4afb5ceSopenharmony_ci
1488d4afb5ceSopenharmony_ci						if (requested_close) {
1489d4afb5ceSopenharmony_ci							__lws_close_free_wsi(w,
1490d4afb5ceSopenharmony_ci								0, "ack cb");
1491d4afb5ceSopenharmony_ci							break;
1492d4afb5ceSopenharmony_ci						}
1493d4afb5ceSopenharmony_ci
1494d4afb5ceSopenharmony_ci						break;
1495d4afb5ceSopenharmony_ci					}
1496d4afb5ceSopenharmony_ci				} lws_end_foreach_ll(w, mux.sibling_list);
1497d4afb5ceSopenharmony_ci
1498d4afb5ceSopenharmony_ci				if (!n) {
1499d4afb5ceSopenharmony_ci					lwsl_err("%s: unsolicited PUBACK\n",
1500d4afb5ceSopenharmony_ci							__func__);
1501d4afb5ceSopenharmony_ci					return -1;
1502d4afb5ceSopenharmony_ci				}
1503d4afb5ceSopenharmony_ci
1504d4afb5ceSopenharmony_ci				/*
1505d4afb5ceSopenharmony_ci				 * If we published something and it was acked,
1506d4afb5ceSopenharmony_ci				 * our connection is definitely working in both
1507d4afb5ceSopenharmony_ci				 * directions at the moment.
1508d4afb5ceSopenharmony_ci				 */
1509d4afb5ceSopenharmony_ci				lws_validity_confirmed(wsi);
1510d4afb5ceSopenharmony_ci				break;
1511d4afb5ceSopenharmony_ci
1512d4afb5ceSopenharmony_ci			case LMQCP_STOC_PINGRESP:
1513d4afb5ceSopenharmony_ci				lwsl_info("%s: cmd_completion: PINGRESP\n",
1514d4afb5ceSopenharmony_ci						__func__);
1515d4afb5ceSopenharmony_ci				/*
1516d4afb5ceSopenharmony_ci				 * If we asked for a PINGRESP and it came,
1517d4afb5ceSopenharmony_ci				 * our connection is definitely working in both
1518d4afb5ceSopenharmony_ci				 * directions at the moment.
1519d4afb5ceSopenharmony_ci				 */
1520d4afb5ceSopenharmony_ci				lws_validity_confirmed(wsi);
1521d4afb5ceSopenharmony_ci				break;
1522d4afb5ceSopenharmony_ci
1523d4afb5ceSopenharmony_ci			case LMQCP_STOC_SUBACK:
1524d4afb5ceSopenharmony_ci				lwsl_info("%s: cmd_completion: SUBACK\n",
1525d4afb5ceSopenharmony_ci						__func__);
1526d4afb5ceSopenharmony_ci
1527d4afb5ceSopenharmony_ci				/*
1528d4afb5ceSopenharmony_ci				 * Figure out which child asked for this
1529d4afb5ceSopenharmony_ci				 */
1530d4afb5ceSopenharmony_ci
1531d4afb5ceSopenharmony_ci				n = 0;
1532d4afb5ceSopenharmony_ci				lws_start_foreach_ll(struct lws *, w,
1533d4afb5ceSopenharmony_ci						      wsi->mux.child_list) {
1534d4afb5ceSopenharmony_ci					if (w->mqtt->inside_subscribe &&
1535d4afb5ceSopenharmony_ci					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1536d4afb5ceSopenharmony_ci						w->mqtt->inside_subscribe = 0;
1537d4afb5ceSopenharmony_ci						if (user_callback_handle_rxflow(
1538d4afb5ceSopenharmony_ci							    w->a.protocol->callback,
1539d4afb5ceSopenharmony_ci							    w, LWS_CALLBACK_MQTT_SUBSCRIBED,
1540d4afb5ceSopenharmony_ci							    w->user_space, NULL, 0) < 0) {
1541d4afb5ceSopenharmony_ci							lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
1542d4afb5ceSopenharmony_ci								 __func__);
1543d4afb5ceSopenharmony_ci							return -1;
1544d4afb5ceSopenharmony_ci						}
1545d4afb5ceSopenharmony_ci						n = 1;
1546d4afb5ceSopenharmony_ci						break;
1547d4afb5ceSopenharmony_ci					}
1548d4afb5ceSopenharmony_ci				} lws_end_foreach_ll(w, mux.sibling_list);
1549d4afb5ceSopenharmony_ci
1550d4afb5ceSopenharmony_ci				if (!n) {
1551d4afb5ceSopenharmony_ci					lwsl_err("%s: unsolicited SUBACK\n",
1552d4afb5ceSopenharmony_ci							__func__);
1553d4afb5ceSopenharmony_ci					return -1;
1554d4afb5ceSopenharmony_ci				}
1555d4afb5ceSopenharmony_ci
1556d4afb5ceSopenharmony_ci				/*
1557d4afb5ceSopenharmony_ci				 * If we subscribed to something and SUBACK came,
1558d4afb5ceSopenharmony_ci				 * our connection is definitely working in both
1559d4afb5ceSopenharmony_ci				 * directions at the moment.
1560d4afb5ceSopenharmony_ci				 */
1561d4afb5ceSopenharmony_ci				lws_validity_confirmed(wsi);
1562d4afb5ceSopenharmony_ci
1563d4afb5ceSopenharmony_ci				break;
1564d4afb5ceSopenharmony_ci
1565d4afb5ceSopenharmony_ci			case LMQCP_STOC_UNSUBACK:
1566d4afb5ceSopenharmony_ci			{
1567d4afb5ceSopenharmony_ci				char requested_close = 0;
1568d4afb5ceSopenharmony_ci				lwsl_info("%s: cmd_completion: UNSUBACK\n",
1569d4afb5ceSopenharmony_ci						__func__);
1570d4afb5ceSopenharmony_ci				/*
1571d4afb5ceSopenharmony_ci				 * Figure out which child asked for this
1572d4afb5ceSopenharmony_ci				 */
1573d4afb5ceSopenharmony_ci				n = 0;
1574d4afb5ceSopenharmony_ci				lws_start_foreach_ll(struct lws *, w,
1575d4afb5ceSopenharmony_ci						      wsi->mux.child_list) {
1576d4afb5ceSopenharmony_ci					if (w->mqtt->inside_unsubscribe &&
1577d4afb5ceSopenharmony_ci					    w->mqtt->ack_pkt_id == par->cpkt_id) {
1578d4afb5ceSopenharmony_ci						struct lws *nwsi = lws_get_network_wsi(w);
1579d4afb5ceSopenharmony_ci
1580d4afb5ceSopenharmony_ci						/*
1581d4afb5ceSopenharmony_ci						 * No more subscribers left,
1582d4afb5ceSopenharmony_ci						 * remove the topic from nwsi
1583d4afb5ceSopenharmony_ci						 */
1584d4afb5ceSopenharmony_ci						lws_mqtt_client_remove_subs(nwsi->mqtt);
1585d4afb5ceSopenharmony_ci
1586d4afb5ceSopenharmony_ci						w->mqtt->inside_unsubscribe = 0;
1587d4afb5ceSopenharmony_ci						if (user_callback_handle_rxflow(
1588d4afb5ceSopenharmony_ci							    w->a.protocol->callback,
1589d4afb5ceSopenharmony_ci							    w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
1590d4afb5ceSopenharmony_ci							    w->user_space, NULL, 0) < 0) {
1591d4afb5ceSopenharmony_ci							lwsl_info("%s: MQTT_UNSUBACK requests close\n",
1592d4afb5ceSopenharmony_ci								 __func__);
1593d4afb5ceSopenharmony_ci							requested_close = 1;
1594d4afb5ceSopenharmony_ci						}
1595d4afb5ceSopenharmony_ci						n = 1;
1596d4afb5ceSopenharmony_ci
1597d4afb5ceSopenharmony_ci						lws_sul_cancel(&w->mqtt->sul_unsuback_wait);
1598d4afb5ceSopenharmony_ci						if (requested_close) {
1599d4afb5ceSopenharmony_ci							__lws_close_free_wsi(w,
1600d4afb5ceSopenharmony_ci									     0, "unsub ack cb");
1601d4afb5ceSopenharmony_ci							break;
1602d4afb5ceSopenharmony_ci						}
1603d4afb5ceSopenharmony_ci						break;
1604d4afb5ceSopenharmony_ci					}
1605d4afb5ceSopenharmony_ci				} lws_end_foreach_ll(w, mux.sibling_list);
1606d4afb5ceSopenharmony_ci
1607d4afb5ceSopenharmony_ci				if (!n) {
1608d4afb5ceSopenharmony_ci					lwsl_err("%s: unsolicited UNSUBACK\n",
1609d4afb5ceSopenharmony_ci							__func__);
1610d4afb5ceSopenharmony_ci					return -1;
1611d4afb5ceSopenharmony_ci				}
1612d4afb5ceSopenharmony_ci
1613d4afb5ceSopenharmony_ci
1614d4afb5ceSopenharmony_ci				/*
1615d4afb5ceSopenharmony_ci				 * If we unsubscribed to something and
1616d4afb5ceSopenharmony_ci				 * UNSUBACK came, our connection is
1617d4afb5ceSopenharmony_ci				 * definitely working in both
1618d4afb5ceSopenharmony_ci				 * directions at the moment.
1619d4afb5ceSopenharmony_ci				 */
1620d4afb5ceSopenharmony_ci				lws_validity_confirmed(wsi);
1621d4afb5ceSopenharmony_ci
1622d4afb5ceSopenharmony_ci				break;
1623d4afb5ceSopenharmony_ci			}
1624d4afb5ceSopenharmony_ci			case LMQCP_PUBLISH:
1625d4afb5ceSopenharmony_ci			{
1626d4afb5ceSopenharmony_ci				lws_mqtt_publish_param_t *pub =
1627d4afb5ceSopenharmony_ci						(lws_mqtt_publish_param_t *)
1628d4afb5ceSopenharmony_ci							wsi->mqtt->rx_cpkt_param;
1629d4afb5ceSopenharmony_ci				size_t chunk;
1630d4afb5ceSopenharmony_ci
1631d4afb5ceSopenharmony_ci				if (pub == NULL) {
1632d4afb5ceSopenharmony_ci					lwsl_notice("%s: no pub\n", __func__);
1633d4afb5ceSopenharmony_ci					return -1;
1634d4afb5ceSopenharmony_ci				}
1635d4afb5ceSopenharmony_ci
1636d4afb5ceSopenharmony_ci				/*
1637d4afb5ceSopenharmony_ci				 * RX PUBLISH is delivered to any children that
1638d4afb5ceSopenharmony_ci				 * registered for the related topic
1639d4afb5ceSopenharmony_ci				 */
1640d4afb5ceSopenharmony_ci
1641d4afb5ceSopenharmony_ci				n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
1642d4afb5ceSopenharmony_ci
1643d4afb5ceSopenharmony_ci				chunk = pub->payload_len - pub->payload_pos;
1644d4afb5ceSopenharmony_ci				if (chunk > len)
1645d4afb5ceSopenharmony_ci					chunk = len;
1646d4afb5ceSopenharmony_ci
1647d4afb5ceSopenharmony_ci				lws_start_foreach_ll(struct lws *, w,
1648d4afb5ceSopenharmony_ci						      wsi->mux.child_list) {
1649d4afb5ceSopenharmony_ci					if (lws_mqtt_find_sub(w->mqtt,
1650d4afb5ceSopenharmony_ci							      pub->topic))
1651d4afb5ceSopenharmony_ci						if (w->a.protocol->callback(
1652d4afb5ceSopenharmony_ci							    w, (enum lws_callback_reasons)n,
1653d4afb5ceSopenharmony_ci							    w->user_space,
1654d4afb5ceSopenharmony_ci							    (void *)pub,
1655d4afb5ceSopenharmony_ci							    chunk)) {
1656d4afb5ceSopenharmony_ci								par->payload_consumed = 0;
1657d4afb5ceSopenharmony_ci								lws_free_set_NULL(pub->topic);
1658d4afb5ceSopenharmony_ci								lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1659d4afb5ceSopenharmony_ci								return 1;
1660d4afb5ceSopenharmony_ci							}
1661d4afb5ceSopenharmony_ci				} lws_end_foreach_ll(w, mux.sibling_list);
1662d4afb5ceSopenharmony_ci
1663d4afb5ceSopenharmony_ci
1664d4afb5ceSopenharmony_ci				pub->payload_pos += (uint32_t)chunk;
1665d4afb5ceSopenharmony_ci				len -= chunk;
1666d4afb5ceSopenharmony_ci				buf += chunk;
1667d4afb5ceSopenharmony_ci
1668d4afb5ceSopenharmony_ci				lwsl_debug("%s: post pos %d, plen %d, len %d\n",
1669d4afb5ceSopenharmony_ci					    __func__, (int)pub->payload_pos,
1670d4afb5ceSopenharmony_ci					    (int)pub->payload_len, (int)len);
1671d4afb5ceSopenharmony_ci
1672d4afb5ceSopenharmony_ci				if (pub->payload_pos != pub->payload_len) {
1673d4afb5ceSopenharmony_ci					/*
1674d4afb5ceSopenharmony_ci					 * More chunks of the payload pending,
1675d4afb5ceSopenharmony_ci					 * blocking this connection from doing
1676d4afb5ceSopenharmony_ci					 * anything else
1677d4afb5ceSopenharmony_ci					 */
1678d4afb5ceSopenharmony_ci					par->state = LMQCPP_PAYLOAD;
1679d4afb5ceSopenharmony_ci					break;
1680d4afb5ceSopenharmony_ci				}
1681d4afb5ceSopenharmony_ci
1682d4afb5ceSopenharmony_ci				if (pub->qos == 1) {
1683d4afb5ceSopenharmony_ci				/* For QOS = 1, send out PUBACK */
1684d4afb5ceSopenharmony_ci					wsi->mqtt->send_puback = 1;
1685d4afb5ceSopenharmony_ci					lws_callback_on_writable(wsi);
1686d4afb5ceSopenharmony_ci				} else if (pub->qos == 2) {
1687d4afb5ceSopenharmony_ci				/* For QOS = 2, send out PUBREC */
1688d4afb5ceSopenharmony_ci					wsi->mqtt->send_pubrec = 1;
1689d4afb5ceSopenharmony_ci					lws_callback_on_writable(wsi);
1690d4afb5ceSopenharmony_ci				}
1691d4afb5ceSopenharmony_ci
1692d4afb5ceSopenharmony_ci				par->payload_consumed = 0;
1693d4afb5ceSopenharmony_ci				lws_free_set_NULL(pub->topic);
1694d4afb5ceSopenharmony_ci				lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
1695d4afb5ceSopenharmony_ci
1696d4afb5ceSopenharmony_ci				break;
1697d4afb5ceSopenharmony_ci			}
1698d4afb5ceSopenharmony_ci			default:
1699d4afb5ceSopenharmony_ci				break;
1700d4afb5ceSopenharmony_ci			}
1701d4afb5ceSopenharmony_ci
1702d4afb5ceSopenharmony_ci			break;
1703d4afb5ceSopenharmony_ci
1704d4afb5ceSopenharmony_ci
1705d4afb5ceSopenharmony_ci		case LMQCPP_PROP_ID_VBI:
1706d4afb5ceSopenharmony_ci			switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
1707d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1708d4afb5ceSopenharmony_ci				break;
1709d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1710d4afb5ceSopenharmony_ci				par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed);
1711d4afb5ceSopenharmony_ci				if (par->vbit.value >
1712d4afb5ceSopenharmony_ci				    LWS_ARRAY_SIZE(property_valid)) {
1713d4afb5ceSopenharmony_ci					lwsl_notice("%s: undef prop id 0x%x\n",
1714d4afb5ceSopenharmony_ci						  __func__, (int)par->vbit.value);
1715d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1716d4afb5ceSopenharmony_ci				}
1717d4afb5ceSopenharmony_ci				if (!(property_valid[par->vbit.value] &
1718d4afb5ceSopenharmony_ci					(1 << ctl_pkt_type(par)))) {
1719d4afb5ceSopenharmony_ci					lwsl_notice("%s: prop id 0x%x invalid for"
1720d4afb5ceSopenharmony_ci						  " control pkt %d\n", __func__,
1721d4afb5ceSopenharmony_ci						  (int)par->vbit.value,
1722d4afb5ceSopenharmony_ci						  ctl_pkt_type(par));
1723d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1724d4afb5ceSopenharmony_ci				}
1725d4afb5ceSopenharmony_ci				par->prop_id = par->vbit.value;
1726d4afb5ceSopenharmony_ci				par->flag_prop_multi = !!(
1727d4afb5ceSopenharmony_ci					par->props_seen[par->prop_id >> 3] &
1728d4afb5ceSopenharmony_ci					(1 << (par->prop_id & 7)));
1729d4afb5ceSopenharmony_ci				par->props_seen[par->prop_id >> 3] =
1730d4afb5ceSopenharmony_ci						(uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7)));
1731d4afb5ceSopenharmony_ci				/*
1732d4afb5ceSopenharmony_ci				 *  even if it's not a vbi property arg,
1733d4afb5ceSopenharmony_ci				 * .consumed of this will be zero the first time
1734d4afb5ceSopenharmony_ci				 */
1735d4afb5ceSopenharmony_ci				lws_mqtt_vbi_init(&par->vbit);
1736d4afb5ceSopenharmony_ci				/*
1737d4afb5ceSopenharmony_ci				 * if it's a string, next state must set the
1738d4afb5ceSopenharmony_ci				 * destination and size limit itself.  But
1739d4afb5ceSopenharmony_ci				 * resetting it generically here lets it use
1740d4afb5ceSopenharmony_ci				 * lws_mqtt_str_first() to understand it's the
1741d4afb5ceSopenharmony_ci				 * first time around.
1742d4afb5ceSopenharmony_ci				 */
1743d4afb5ceSopenharmony_ci				 lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
1744d4afb5ceSopenharmony_ci
1745d4afb5ceSopenharmony_ci				/* property arg state enums are so encoded */
1746d4afb5ceSopenharmony_ci				par->state = 0x100 | par->vbit.value;
1747d4afb5ceSopenharmony_ci				break;
1748d4afb5ceSopenharmony_ci			default:
1749d4afb5ceSopenharmony_ci				lwsl_notice("%s: prop id bad vbi\n", __func__);
1750d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1751d4afb5ceSopenharmony_ci			}
1752d4afb5ceSopenharmony_ci			break;
1753d4afb5ceSopenharmony_ci
1754d4afb5ceSopenharmony_ci		/*
1755d4afb5ceSopenharmony_ci		 * All possible property payloads... restricting which ones
1756d4afb5ceSopenharmony_ci		 * can appear in which control packets is already done above
1757d4afb5ceSopenharmony_ci		 * in LMQCPP_PROP_ID_VBI
1758d4afb5ceSopenharmony_ci		 */
1759d4afb5ceSopenharmony_ci
1760d4afb5ceSopenharmony_ci		case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
1761d4afb5ceSopenharmony_ci		case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
1762d4afb5ceSopenharmony_ci		case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
1763d4afb5ceSopenharmony_ci		case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
1764d4afb5ceSopenharmony_ci		case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
1765d4afb5ceSopenharmony_ci		case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
1766d4afb5ceSopenharmony_ci		case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
1767d4afb5ceSopenharmony_ci		case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
1768d4afb5ceSopenharmony_ci			if (par->flag_prop_multi)
1769d4afb5ceSopenharmony_ci				goto singular_prop_seen_twice;
1770d4afb5ceSopenharmony_ci			par->payload_format = *buf++;
1771d4afb5ceSopenharmony_ci			len--;
1772d4afb5ceSopenharmony_ci			if (lws_mqtt_pconsume(par, 1))
1773d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1774d4afb5ceSopenharmony_ci			break;
1775d4afb5ceSopenharmony_ci
1776d4afb5ceSopenharmony_ci		case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
1777d4afb5ceSopenharmony_ci		case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
1778d4afb5ceSopenharmony_ci		case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
1779d4afb5ceSopenharmony_ci		case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
1780d4afb5ceSopenharmony_ci			if (par->flag_prop_multi)
1781d4afb5ceSopenharmony_ci				goto singular_prop_seen_twice;
1782d4afb5ceSopenharmony_ci
1783d4afb5ceSopenharmony_ci			if (lws_mqtt_mb_first(&par->vbit))
1784d4afb5ceSopenharmony_ci				lws_mqtt_4byte_init(&par->vbit);
1785d4afb5ceSopenharmony_ci
1786d4afb5ceSopenharmony_ci			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1787d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1788d4afb5ceSopenharmony_ci				break;
1789d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1790d4afb5ceSopenharmony_ci				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1791d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1792d4afb5ceSopenharmony_ci				break;
1793d4afb5ceSopenharmony_ci			default:
1794d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1795d4afb5ceSopenharmony_ci			}
1796d4afb5ceSopenharmony_ci			break;
1797d4afb5ceSopenharmony_ci
1798d4afb5ceSopenharmony_ci		case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
1799d4afb5ceSopenharmony_ci		case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
1800d4afb5ceSopenharmony_ci		case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
1801d4afb5ceSopenharmony_ci		case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
1802d4afb5ceSopenharmony_ci			if (par->flag_prop_multi)
1803d4afb5ceSopenharmony_ci				goto singular_prop_seen_twice;
1804d4afb5ceSopenharmony_ci
1805d4afb5ceSopenharmony_ci			if (lws_mqtt_mb_first(&par->vbit))
1806d4afb5ceSopenharmony_ci				lws_mqtt_2byte_init(&par->vbit);
1807d4afb5ceSopenharmony_ci
1808d4afb5ceSopenharmony_ci			switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
1809d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1810d4afb5ceSopenharmony_ci				break;
1811d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1812d4afb5ceSopenharmony_ci				if (lws_mqtt_pconsume(par, par->vbit.consumed))
1813d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1814d4afb5ceSopenharmony_ci				break;
1815d4afb5ceSopenharmony_ci			default:
1816d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1817d4afb5ceSopenharmony_ci			}
1818d4afb5ceSopenharmony_ci			break;
1819d4afb5ceSopenharmony_ci
1820d4afb5ceSopenharmony_ci		case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
1821d4afb5ceSopenharmony_ci		case LMQCPP_PROP_AUTH_METHOD_UTF8S:
1822d4afb5ceSopenharmony_ci		case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
1823d4afb5ceSopenharmony_ci		case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
1824d4afb5ceSopenharmony_ci		case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
1825d4afb5ceSopenharmony_ci		case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
1826d4afb5ceSopenharmony_ci		case LMQCPP_PROP_REASON_STRING_UTF8S:
1827d4afb5ceSopenharmony_ci		case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
1828d4afb5ceSopenharmony_ci		case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
1829d4afb5ceSopenharmony_ci			if (par->flag_prop_multi)
1830d4afb5ceSopenharmony_ci				goto singular_prop_seen_twice;
1831d4afb5ceSopenharmony_ci
1832d4afb5ceSopenharmony_ci			if (lws_mqtt_str_first(&par->s_temp))
1833d4afb5ceSopenharmony_ci				lws_mqtt_str_init(&par->s_temp, par->temp,
1834d4afb5ceSopenharmony_ci						  sizeof(par->temp), 0);
1835d4afb5ceSopenharmony_ci
1836d4afb5ceSopenharmony_ci			switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
1837d4afb5ceSopenharmony_ci			case LMSPR_NEED_MORE:
1838d4afb5ceSopenharmony_ci				break;
1839d4afb5ceSopenharmony_ci			case LMSPR_COMPLETED:
1840d4afb5ceSopenharmony_ci				if (lws_mqtt_pconsume(par, par->s_temp.len))
1841d4afb5ceSopenharmony_ci					goto send_protocol_error_and_close;
1842d4afb5ceSopenharmony_ci				break;
1843d4afb5ceSopenharmony_ci
1844d4afb5ceSopenharmony_ci			default:
1845d4afb5ceSopenharmony_ci				lwsl_info("%s: bad protocol name\n", __func__);
1846d4afb5ceSopenharmony_ci				goto send_protocol_error_and_close;
1847d4afb5ceSopenharmony_ci			}
1848d4afb5ceSopenharmony_ci			break;
1849d4afb5ceSopenharmony_ci
1850d4afb5ceSopenharmony_ci		case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
1851d4afb5ceSopenharmony_ci
1852d4afb5ceSopenharmony_ci		case LMQCPP_PROP_CORRELATION_BINDATA:
1853d4afb5ceSopenharmony_ci		case LMQCPP_PROP_AUTH_DATA_BINDATA:
1854d4afb5ceSopenharmony_ci
1855d4afb5ceSopenharmony_ci		/* TODO */
1856d4afb5ceSopenharmony_ci			lwsl_err("%s: Unimplemented packet state 0x%x\n",
1857d4afb5ceSopenharmony_ci					__func__, par->state);
1858d4afb5ceSopenharmony_ci			return -1;
1859d4afb5ceSopenharmony_ci		}
1860d4afb5ceSopenharmony_ci	}
1861d4afb5ceSopenharmony_ci
1862d4afb5ceSopenharmony_ci	return 0;
1863d4afb5ceSopenharmony_ci
1864d4afb5ceSopenharmony_cioom:
1865d4afb5ceSopenharmony_ci	lwsl_err("%s: OOM!\n", __func__);
1866d4afb5ceSopenharmony_ci	goto send_protocol_error_and_close;
1867d4afb5ceSopenharmony_ci
1868d4afb5ceSopenharmony_cisingular_prop_seen_twice:
1869d4afb5ceSopenharmony_ci	lwsl_info("%s: property appears twice\n", __func__);
1870d4afb5ceSopenharmony_ci
1871d4afb5ceSopenharmony_cisend_protocol_error_and_close:
1872d4afb5ceSopenharmony_ci	lwsl_notice("%s: peac\n", __func__);
1873d4afb5ceSopenharmony_ci	par->reason = LMQCP_REASON_PROTOCOL_ERROR;
1874d4afb5ceSopenharmony_ci
1875d4afb5ceSopenharmony_cisend_reason_and_close:
1876d4afb5ceSopenharmony_ci	lwsl_notice("%s: srac\n", __func__);
1877d4afb5ceSopenharmony_ci	par->flag_pending_send_reason_close = 1;
1878d4afb5ceSopenharmony_ci	goto ask;
1879d4afb5ceSopenharmony_ci
1880d4afb5ceSopenharmony_cisend_unsupp_connack_and_close:
1881d4afb5ceSopenharmony_ci	lwsl_notice("%s: unsupac\n", __func__);
1882d4afb5ceSopenharmony_ci	par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
1883d4afb5ceSopenharmony_ci	par->flag_pending_send_connack_close = 1;
1884d4afb5ceSopenharmony_ci
1885d4afb5ceSopenharmony_ciask:
1886d4afb5ceSopenharmony_ci	/* Should we ask for clients? */
1887d4afb5ceSopenharmony_ci	lws_callback_on_writable(wsi);
1888d4afb5ceSopenharmony_ci
1889d4afb5ceSopenharmony_ci	return -1;
1890d4afb5ceSopenharmony_ci}
1891d4afb5ceSopenharmony_ci
1892d4afb5ceSopenharmony_ciint
1893d4afb5ceSopenharmony_cilws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
1894d4afb5ceSopenharmony_ci			   uint8_t dup, lws_mqtt_qos_levels_t qos,
1895d4afb5ceSopenharmony_ci			   uint8_t retain)
1896d4afb5ceSopenharmony_ci{
1897d4afb5ceSopenharmony_ci	lws_mqtt_fixed_hdr_t hdr;
1898d4afb5ceSopenharmony_ci
1899d4afb5ceSopenharmony_ci	hdr.bits = 0;
1900d4afb5ceSopenharmony_ci	hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf;
1901d4afb5ceSopenharmony_ci
1902d4afb5ceSopenharmony_ci	switch(ctrl_pkt_type) {
1903d4afb5ceSopenharmony_ci	case LMQCP_PUBLISH:
1904d4afb5ceSopenharmony_ci		hdr.flags.dup = !!dup;
1905d4afb5ceSopenharmony_ci		/*
1906d4afb5ceSopenharmony_ci		 * A PUBLISH Packet MUST NOT have both QoS bits set to
1907d4afb5ceSopenharmony_ci		 * 1. If a Server or Client receives a PUBLISH Packet
1908d4afb5ceSopenharmony_ci		 * which has both QoS bits set to 1 it MUST close the
1909d4afb5ceSopenharmony_ci		 * Network Connection [MQTT-3.3.1-4].
1910d4afb5ceSopenharmony_ci		 */
1911d4afb5ceSopenharmony_ci		if (qos >= RESERVED_QOS_LEVEL) {
1912d4afb5ceSopenharmony_ci			lwsl_err("%s: Unsupport QoS level 0x%x\n",
1913d4afb5ceSopenharmony_ci				 __func__, qos);
1914d4afb5ceSopenharmony_ci			return -1;
1915d4afb5ceSopenharmony_ci		}
1916d4afb5ceSopenharmony_ci		hdr.flags.qos = qos & 3;
1917d4afb5ceSopenharmony_ci		hdr.flags.retain = !!retain;
1918d4afb5ceSopenharmony_ci		break;
1919d4afb5ceSopenharmony_ci
1920d4afb5ceSopenharmony_ci	case LMQCP_CTOS_CONNECT:
1921d4afb5ceSopenharmony_ci	case LMQCP_STOC_CONNACK:
1922d4afb5ceSopenharmony_ci	case LMQCP_PUBACK:
1923d4afb5ceSopenharmony_ci	case LMQCP_PUBREC:
1924d4afb5ceSopenharmony_ci	case LMQCP_PUBCOMP:
1925d4afb5ceSopenharmony_ci	case LMQCP_STOC_SUBACK:
1926d4afb5ceSopenharmony_ci	case LMQCP_STOC_UNSUBACK:
1927d4afb5ceSopenharmony_ci	case LMQCP_CTOS_PINGREQ:
1928d4afb5ceSopenharmony_ci	case LMQCP_STOC_PINGRESP:
1929d4afb5ceSopenharmony_ci	case LMQCP_DISCONNECT:
1930d4afb5ceSopenharmony_ci	case LMQCP_AUTH:
1931d4afb5ceSopenharmony_ci		hdr.bits &= 0xf0;
1932d4afb5ceSopenharmony_ci		break;
1933d4afb5ceSopenharmony_ci
1934d4afb5ceSopenharmony_ci	/*
1935d4afb5ceSopenharmony_ci	 * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
1936d4afb5ceSopenharmony_ci	 * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
1937d4afb5ceSopenharmony_ci	 * MUST be set to 0,0,1 and 0 respectively. The Server MUST
1938d4afb5ceSopenharmony_ci	 * treat any other value as malformed and close the Network
1939d4afb5ceSopenharmony_ci	 * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
1940d4afb5ceSopenharmony_ci	 */
1941d4afb5ceSopenharmony_ci	case LMQCP_PUBREL:
1942d4afb5ceSopenharmony_ci	case LMQCP_CTOS_SUBSCRIBE:
1943d4afb5ceSopenharmony_ci	case LMQCP_CTOS_UNSUBSCRIBE:
1944d4afb5ceSopenharmony_ci		hdr.bits |= 0x02;
1945d4afb5ceSopenharmony_ci		break;
1946d4afb5ceSopenharmony_ci
1947d4afb5ceSopenharmony_ci	default:
1948d4afb5ceSopenharmony_ci		return -1;
1949d4afb5ceSopenharmony_ci	}
1950d4afb5ceSopenharmony_ci
1951d4afb5ceSopenharmony_ci	*p = hdr.bits;
1952d4afb5ceSopenharmony_ci
1953d4afb5ceSopenharmony_ci	return 0;
1954d4afb5ceSopenharmony_ci}
1955d4afb5ceSopenharmony_ci
1956d4afb5ceSopenharmony_ciint
1957d4afb5ceSopenharmony_cilws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
1958d4afb5ceSopenharmony_ci			     const void *buf, uint32_t len, int is_complete)
1959d4afb5ceSopenharmony_ci{
1960d4afb5ceSopenharmony_ci	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
1961d4afb5ceSopenharmony_ci	uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
1962d4afb5ceSopenharmony_ci	struct lws *nwsi = lws_get_network_wsi(wsi);
1963d4afb5ceSopenharmony_ci	lws_mqtt_str_t mqtt_vh_payload;
1964d4afb5ceSopenharmony_ci	uint32_t vh_len, rem_len;
1965d4afb5ceSopenharmony_ci
1966d4afb5ceSopenharmony_ci	assert(pub->topic);
1967d4afb5ceSopenharmony_ci
1968d4afb5ceSopenharmony_ci	lwsl_debug("%s: len = %d, is_complete = %d\n",
1969d4afb5ceSopenharmony_ci		   __func__, (int)len, (int)is_complete);
1970d4afb5ceSopenharmony_ci
1971d4afb5ceSopenharmony_ci	if (lwsi_state(wsi) != LRS_ESTABLISHED) {
1972d4afb5ceSopenharmony_ci		lwsl_err("%s: %s: unknown state 0x%x\n", __func__,
1973d4afb5ceSopenharmony_ci				lws_wsi_tag(wsi), lwsi_state(wsi));
1974d4afb5ceSopenharmony_ci		assert(0);
1975d4afb5ceSopenharmony_ci		return 1;
1976d4afb5ceSopenharmony_ci	}
1977d4afb5ceSopenharmony_ci
1978d4afb5ceSopenharmony_ci	if (wsi->mqtt->inside_payload) {
1979d4afb5ceSopenharmony_ci		/*
1980d4afb5ceSopenharmony_ci		 * Headers are filled, we are sending
1981d4afb5ceSopenharmony_ci		 * the payload - a buffer with LWS_PRE
1982d4afb5ceSopenharmony_ci		 * in front it.
1983d4afb5ceSopenharmony_ci		 */
1984d4afb5ceSopenharmony_ci		start = (uint8_t *)buf;
1985d4afb5ceSopenharmony_ci		p = start + len;
1986d4afb5ceSopenharmony_ci		if (is_complete)
1987d4afb5ceSopenharmony_ci			wsi->mqtt->inside_payload = 0;
1988d4afb5ceSopenharmony_ci		goto do_write;
1989d4afb5ceSopenharmony_ci	}
1990d4afb5ceSopenharmony_ci
1991d4afb5ceSopenharmony_ci	start = b + LWS_PRE;
1992d4afb5ceSopenharmony_ci	p = start;
1993d4afb5ceSopenharmony_ci	/*
1994d4afb5ceSopenharmony_ci	 * Fill headers and the first chunk of the
1995d4afb5ceSopenharmony_ci	 * payload (if any)
1996d4afb5ceSopenharmony_ci	 */
1997d4afb5ceSopenharmony_ci	if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
1998d4afb5ceSopenharmony_ci				       pub->dup, pub->qos, pub->retain)) {
1999d4afb5ceSopenharmony_ci		lwsl_err("%s: Failed to fill fixed header\n", __func__);
2000d4afb5ceSopenharmony_ci		return 1;
2001d4afb5ceSopenharmony_ci	}
2002d4afb5ceSopenharmony_ci
2003d4afb5ceSopenharmony_ci	/*
2004d4afb5ceSopenharmony_ci	 * Topic len field + Topic len + Packet ID
2005d4afb5ceSopenharmony_ci	 * (for QOS>0) + Payload len
2006d4afb5ceSopenharmony_ci	 */
2007d4afb5ceSopenharmony_ci	vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0));
2008d4afb5ceSopenharmony_ci	rem_len = vh_len + pub->payload_len;
2009d4afb5ceSopenharmony_ci	lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
2010d4afb5ceSopenharmony_ci
2011d4afb5ceSopenharmony_ci	/* Will the chunk of payload fit? */
2012d4afb5ceSopenharmony_ci	if ((vh_len + len) >=
2013d4afb5ceSopenharmony_ci	    (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
2014d4afb5ceSopenharmony_ci		lwsl_err("%s: Payload is too big\n", __func__);
2015d4afb5ceSopenharmony_ci		return 1;
2016d4afb5ceSopenharmony_ci	}
2017d4afb5ceSopenharmony_ci
2018d4afb5ceSopenharmony_ci	p += lws_mqtt_vbi_encode(rem_len, p);
2019d4afb5ceSopenharmony_ci
2020d4afb5ceSopenharmony_ci	/* Topic's Len */
2021d4afb5ceSopenharmony_ci	lws_ser_wu16be(p, pub->topic_len);
2022d4afb5ceSopenharmony_ci	p += 2;
2023d4afb5ceSopenharmony_ci
2024d4afb5ceSopenharmony_ci	/*
2025d4afb5ceSopenharmony_ci	 * Init lws_mqtt_str for "MQTT Variable
2026d4afb5ceSopenharmony_ci	 * Headers + payload" (only the supplied
2027d4afb5ceSopenharmony_ci	 * chuncked payload)
2028d4afb5ceSopenharmony_ci	 */
2029d4afb5ceSopenharmony_ci	lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
2030d4afb5ceSopenharmony_ci			  (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len),
2031d4afb5ceSopenharmony_ci			  0);
2032d4afb5ceSopenharmony_ci
2033d4afb5ceSopenharmony_ci	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2034d4afb5ceSopenharmony_ci	lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
2035d4afb5ceSopenharmony_ci	if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
2036d4afb5ceSopenharmony_ci		lwsl_err("%s: a\n", __func__);
2037d4afb5ceSopenharmony_ci		return 1;
2038d4afb5ceSopenharmony_ci	}
2039d4afb5ceSopenharmony_ci
2040d4afb5ceSopenharmony_ci	/* Packet ID */
2041d4afb5ceSopenharmony_ci	if (pub->qos != QOS0) {
2042d4afb5ceSopenharmony_ci		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2043d4afb5ceSopenharmony_ci		if (!pub->dup)
2044d4afb5ceSopenharmony_ci			nwsi->mqtt->pkt_id++;
2045d4afb5ceSopenharmony_ci		wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id;
2046d4afb5ceSopenharmony_ci		lwsl_debug("%s: pkt_id = %d\n", __func__,
2047d4afb5ceSopenharmony_ci			   (int)wsi->mqtt->ack_pkt_id);
2048d4afb5ceSopenharmony_ci		lws_ser_wu16be(p, pub->packet_id);
2049d4afb5ceSopenharmony_ci		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
2050d4afb5ceSopenharmony_ci			lwsl_err("%s: b\n", __func__);
2051d4afb5ceSopenharmony_ci			return 1;
2052d4afb5ceSopenharmony_ci		}
2053d4afb5ceSopenharmony_ci	}
2054d4afb5ceSopenharmony_ci
2055d4afb5ceSopenharmony_ci	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2056d4afb5ceSopenharmony_ci	memcpy(p, buf, len);
2057d4afb5ceSopenharmony_ci	if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
2058d4afb5ceSopenharmony_ci		return 1;
2059d4afb5ceSopenharmony_ci	p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2060d4afb5ceSopenharmony_ci
2061d4afb5ceSopenharmony_ci	if (!is_complete)
2062d4afb5ceSopenharmony_ci		nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
2063d4afb5ceSopenharmony_ci
2064d4afb5ceSopenharmony_cido_write:
2065d4afb5ceSopenharmony_ci
2066d4afb5ceSopenharmony_ci	// lwsl_hexdump_err(start, lws_ptr_diff(p, start));
2067d4afb5ceSopenharmony_ci
2068d4afb5ceSopenharmony_ci	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2069d4afb5ceSopenharmony_ci			lws_ptr_diff(p, start)) {
2070d4afb5ceSopenharmony_ci		lwsl_err("%s: write failed\n", __func__);
2071d4afb5ceSopenharmony_ci		return 1;
2072d4afb5ceSopenharmony_ci	}
2073d4afb5ceSopenharmony_ci
2074d4afb5ceSopenharmony_ci	if (!is_complete) {
2075d4afb5ceSopenharmony_ci		/* still some more chunks to come... */
2076d4afb5ceSopenharmony_ci		lws_callback_on_writable(wsi);
2077d4afb5ceSopenharmony_ci
2078d4afb5ceSopenharmony_ci		return 0;
2079d4afb5ceSopenharmony_ci	}
2080d4afb5ceSopenharmony_ci
2081d4afb5ceSopenharmony_ci	wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
2082d4afb5ceSopenharmony_ci
2083d4afb5ceSopenharmony_ci	if (pub->qos != QOS0)
2084d4afb5ceSopenharmony_ci		wsi->mqtt->unacked_publish = 1;
2085d4afb5ceSopenharmony_ci
2086d4afb5ceSopenharmony_ci	/* this was the last part of the publish message */
2087d4afb5ceSopenharmony_ci
2088d4afb5ceSopenharmony_ci	if (pub->qos == QOS0) {
2089d4afb5ceSopenharmony_ci		/*
2090d4afb5ceSopenharmony_ci		 * There won't be any real PUBACK, act like we got one
2091d4afb5ceSopenharmony_ci		 * so the user callback logic is the same for QoS0 or
2092d4afb5ceSopenharmony_ci		 * QoS1
2093d4afb5ceSopenharmony_ci		 */
2094d4afb5ceSopenharmony_ci		if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
2095d4afb5ceSopenharmony_ci					    wsi->user_space, NULL, 0)) {
2096d4afb5ceSopenharmony_ci			lwsl_err("%s: ACK callback exited\n", __func__);
2097d4afb5ceSopenharmony_ci			return 1;
2098d4afb5ceSopenharmony_ci		}
2099d4afb5ceSopenharmony_ci	} else if (pub->qos == QOS1 || pub->qos == QOS2) {
2100d4afb5ceSopenharmony_ci		/* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s,
2101d4afb5ceSopenharmony_ci		 * we must RETRY the publish
2102d4afb5ceSopenharmony_ci		 */
2103d4afb5ceSopenharmony_ci		wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend;
2104d4afb5ceSopenharmony_ci		__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2105d4afb5ceSopenharmony_ci				    &wsi->mqtt->sul_qos_puback_pubrec_wait,
2106d4afb5ceSopenharmony_ci				    3 * LWS_USEC_PER_SEC);
2107d4afb5ceSopenharmony_ci	}
2108d4afb5ceSopenharmony_ci
2109d4afb5ceSopenharmony_ci	if (wsi->mqtt->inside_shadow) {
2110d4afb5ceSopenharmony_ci		wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout;
2111d4afb5ceSopenharmony_ci		__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2112d4afb5ceSopenharmony_ci				    &wsi->mqtt->sul_shadow_wait,
2113d4afb5ceSopenharmony_ci				    60 * LWS_USEC_PER_SEC);
2114d4afb5ceSopenharmony_ci	}
2115d4afb5ceSopenharmony_ci
2116d4afb5ceSopenharmony_ci	return 0;
2117d4afb5ceSopenharmony_ci}
2118d4afb5ceSopenharmony_ci
2119d4afb5ceSopenharmony_ciint
2120d4afb5ceSopenharmony_cilws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
2121d4afb5ceSopenharmony_ci{
2122d4afb5ceSopenharmony_ci	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2123d4afb5ceSopenharmony_ci	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2124d4afb5ceSopenharmony_ci	struct lws *nwsi = lws_get_network_wsi(wsi);
2125d4afb5ceSopenharmony_ci	lws_mqtt_str_t mqtt_vh_payload;
2126d4afb5ceSopenharmony_ci	uint8_t exists[8], extant;
2127d4afb5ceSopenharmony_ci	lws_mqtt_subs_t *mysub;
2128d4afb5ceSopenharmony_ci	uint32_t rem_len;
2129d4afb5ceSopenharmony_ci#if defined(_DEBUG)
2130d4afb5ceSopenharmony_ci	uint32_t tops;
2131d4afb5ceSopenharmony_ci#endif
2132d4afb5ceSopenharmony_ci	uint32_t n;
2133d4afb5ceSopenharmony_ci
2134d4afb5ceSopenharmony_ci	assert(sub->num_topics);
2135d4afb5ceSopenharmony_ci	assert(sub->num_topics < sizeof(exists));
2136d4afb5ceSopenharmony_ci
2137d4afb5ceSopenharmony_ci	switch (lwsi_state(wsi)) {
2138d4afb5ceSopenharmony_ci	case LRS_ESTABLISHED: /* Protocol connection established */
2139d4afb5ceSopenharmony_ci		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
2140d4afb5ceSopenharmony_ci					       0, 0, 0)) {
2141d4afb5ceSopenharmony_ci			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2142d4afb5ceSopenharmony_ci			return 1;
2143d4afb5ceSopenharmony_ci		}
2144d4afb5ceSopenharmony_ci
2145d4afb5ceSopenharmony_ci		/*
2146d4afb5ceSopenharmony_ci		 * The stream wants to subscribe to one or more topic, but
2147d4afb5ceSopenharmony_ci		 * the shared nwsi may already be subscribed to some or all of
2148d4afb5ceSopenharmony_ci		 * them from interactions with other streams.  For those cases,
2149d4afb5ceSopenharmony_ci		 * we filter them from the list the child wants until we just
2150d4afb5ceSopenharmony_ci		 * have ones that are new to the nwsi.  If nothing left, we just
2151d4afb5ceSopenharmony_ci		 * synthesize the callback to the child as if SUBACK had come
2152d4afb5ceSopenharmony_ci		 * and we're done, otherwise just ask the server for topics that
2153d4afb5ceSopenharmony_ci		 * are new to the wsi.
2154d4afb5ceSopenharmony_ci		 */
2155d4afb5ceSopenharmony_ci
2156d4afb5ceSopenharmony_ci		extant = 0;
2157d4afb5ceSopenharmony_ci		memset(&exists, 0, sizeof(exists));
2158d4afb5ceSopenharmony_ci		for (n = 0; n < sub->num_topics; n++) {
2159d4afb5ceSopenharmony_ci			lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
2160d4afb5ceSopenharmony_ci				  __func__, (int)n, sub->topic[n].name);
2161d4afb5ceSopenharmony_ci
2162d4afb5ceSopenharmony_ci			mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
2163d4afb5ceSopenharmony_ci			if (mysub && mysub->ref_count) {
2164d4afb5ceSopenharmony_ci				mysub->ref_count++; /* another stream using it */
2165d4afb5ceSopenharmony_ci				exists[n] = 1;
2166d4afb5ceSopenharmony_ci				extant++;
2167d4afb5ceSopenharmony_ci			}
2168d4afb5ceSopenharmony_ci
2169d4afb5ceSopenharmony_ci			/*
2170d4afb5ceSopenharmony_ci			 * Attach the topic we're subscribing to, to wsi->mqtt
2171d4afb5ceSopenharmony_ci			 */
2172d4afb5ceSopenharmony_ci			if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
2173d4afb5ceSopenharmony_ci				lwsl_err("%s: create sub fail\n", __func__);
2174d4afb5ceSopenharmony_ci				return 1;
2175d4afb5ceSopenharmony_ci			}
2176d4afb5ceSopenharmony_ci		}
2177d4afb5ceSopenharmony_ci
2178d4afb5ceSopenharmony_ci		if (extant == sub->num_topics) {
2179d4afb5ceSopenharmony_ci			/*
2180d4afb5ceSopenharmony_ci			 * It turns out there's nothing to do here, the nwsi has
2181d4afb5ceSopenharmony_ci			 * already subscribed to all the topics this stream
2182d4afb5ceSopenharmony_ci			 * wanted.  Just tell it it can have them.
2183d4afb5ceSopenharmony_ci			 */
2184d4afb5ceSopenharmony_ci			lwsl_notice("%s: all topics already subscribed\n", __func__);
2185d4afb5ceSopenharmony_ci			if (user_callback_handle_rxflow(
2186d4afb5ceSopenharmony_ci				    wsi->a.protocol->callback,
2187d4afb5ceSopenharmony_ci				    wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
2188d4afb5ceSopenharmony_ci				    wsi->user_space, NULL, 0) < 0) {
2189d4afb5ceSopenharmony_ci				lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
2190d4afb5ceSopenharmony_ci					 __func__);
2191d4afb5ceSopenharmony_ci				return -1;
2192d4afb5ceSopenharmony_ci			}
2193d4afb5ceSopenharmony_ci
2194d4afb5ceSopenharmony_ci			return 0;
2195d4afb5ceSopenharmony_ci		}
2196d4afb5ceSopenharmony_ci
2197d4afb5ceSopenharmony_ci#if defined(_DEBUG)
2198d4afb5ceSopenharmony_ci		/*
2199d4afb5ceSopenharmony_ci		 * zero or more of the topics already existed, but not all,
2200d4afb5ceSopenharmony_ci		 * so we must go to the server with a filtered list of the
2201d4afb5ceSopenharmony_ci		 * new ones only
2202d4afb5ceSopenharmony_ci		 */
2203d4afb5ceSopenharmony_ci
2204d4afb5ceSopenharmony_ci		tops = sub->num_topics - extant;
2205d4afb5ceSopenharmony_ci#endif
2206d4afb5ceSopenharmony_ci
2207d4afb5ceSopenharmony_ci		/*
2208d4afb5ceSopenharmony_ci		 * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
2209d4afb5ceSopenharmony_ci		 */
2210d4afb5ceSopenharmony_ci		rem_len = 2;
2211d4afb5ceSopenharmony_ci		for (n = 0; n < sub->num_topics; n++)
2212d4afb5ceSopenharmony_ci			if (!exists[n])
2213d4afb5ceSopenharmony_ci				rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
2214d4afb5ceSopenharmony_ci
2215d4afb5ceSopenharmony_ci		wsi->mqtt->sub_size = (uint16_t)rem_len;
2216d4afb5ceSopenharmony_ci
2217d4afb5ceSopenharmony_ci#if defined(_DEBUG)
2218d4afb5ceSopenharmony_ci		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2219d4afb5ceSopenharmony_ci			   __func__, (int)tops, (int)rem_len);
2220d4afb5ceSopenharmony_ci#endif
2221d4afb5ceSopenharmony_ci
2222d4afb5ceSopenharmony_ci		p += lws_mqtt_vbi_encode(rem_len, p);
2223d4afb5ceSopenharmony_ci
2224d4afb5ceSopenharmony_ci		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2225d4afb5ceSopenharmony_ci					       wsi->a.context->pt_serv_buf_size) {
2226d4afb5ceSopenharmony_ci			lwsl_err("%s: Payload is too big\n", __func__);
2227d4afb5ceSopenharmony_ci			return 1;
2228d4afb5ceSopenharmony_ci		}
2229d4afb5ceSopenharmony_ci
2230d4afb5ceSopenharmony_ci		/* Init lws_mqtt_str */
2231d4afb5ceSopenharmony_ci		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2232d4afb5ceSopenharmony_ci		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2233d4afb5ceSopenharmony_ci
2234d4afb5ceSopenharmony_ci		/* Packet ID */
2235d4afb5ceSopenharmony_ci		wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id;
2236d4afb5ceSopenharmony_ci		lwsl_debug("%s: pkt_id = %d\n", __func__,
2237d4afb5ceSopenharmony_ci			   (int)sub->packet_id);
2238d4afb5ceSopenharmony_ci		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2239d4afb5ceSopenharmony_ci
2240d4afb5ceSopenharmony_ci		nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2241d4afb5ceSopenharmony_ci
2242d4afb5ceSopenharmony_ci		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2243d4afb5ceSopenharmony_ci			return 1;
2244d4afb5ceSopenharmony_ci
2245d4afb5ceSopenharmony_ci		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2246d4afb5ceSopenharmony_ci
2247d4afb5ceSopenharmony_ci		for (n = 0; n < sub->num_topics; n++) {
2248d4afb5ceSopenharmony_ci			lwsl_info("%s: topics[%d] = %s\n", __func__,
2249d4afb5ceSopenharmony_ci				   (int)n, sub->topic[n].name);
2250d4afb5ceSopenharmony_ci
2251d4afb5ceSopenharmony_ci			/* if the nwsi already has it, don't ask server for it */
2252d4afb5ceSopenharmony_ci			if (exists[n]) {
2253d4afb5ceSopenharmony_ci				lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
2254d4afb5ceSopenharmony_ci					    __func__, (int)n, sub->topic[n].name);
2255d4afb5ceSopenharmony_ci				continue;
2256d4afb5ceSopenharmony_ci			}
2257d4afb5ceSopenharmony_ci
2258d4afb5ceSopenharmony_ci			/*
2259d4afb5ceSopenharmony_ci			 * Attach the topic we're subscribing to, to nwsi->mqtt
2260d4afb5ceSopenharmony_ci			 * so we know the nwsi itself has a subscription to it
2261d4afb5ceSopenharmony_ci			 */
2262d4afb5ceSopenharmony_ci
2263d4afb5ceSopenharmony_ci			if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
2264d4afb5ceSopenharmony_ci				return 1;
2265d4afb5ceSopenharmony_ci
2266d4afb5ceSopenharmony_ci			/* Topic's Len */
2267d4afb5ceSopenharmony_ci			lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
2268d4afb5ceSopenharmony_ci			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2269d4afb5ceSopenharmony_ci				return 1;
2270d4afb5ceSopenharmony_ci			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2271d4afb5ceSopenharmony_ci
2272d4afb5ceSopenharmony_ci			/* Topic Name */
2273d4afb5ceSopenharmony_ci			lws_strncpy((char *)p, sub->topic[n].name,
2274d4afb5ceSopenharmony_ci				    strlen(sub->topic[n].name) + 1);
2275d4afb5ceSopenharmony_ci			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2276d4afb5ceSopenharmony_ci						 (int)strlen(sub->topic[n].name)))
2277d4afb5ceSopenharmony_ci				return 1;
2278d4afb5ceSopenharmony_ci			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2279d4afb5ceSopenharmony_ci
2280d4afb5ceSopenharmony_ci			/* QoS */
2281d4afb5ceSopenharmony_ci			*p = (uint8_t)sub->topic[n].qos;
2282d4afb5ceSopenharmony_ci			if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
2283d4afb5ceSopenharmony_ci				return 1;
2284d4afb5ceSopenharmony_ci			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2285d4afb5ceSopenharmony_ci		}
2286d4afb5ceSopenharmony_ci		break;
2287d4afb5ceSopenharmony_ci
2288d4afb5ceSopenharmony_ci	default:
2289d4afb5ceSopenharmony_ci		return 1;
2290d4afb5ceSopenharmony_ci	}
2291d4afb5ceSopenharmony_ci
2292d4afb5ceSopenharmony_ci	if (wsi->mqtt->inside_resume_session)
2293d4afb5ceSopenharmony_ci		return 0;
2294d4afb5ceSopenharmony_ci
2295d4afb5ceSopenharmony_ci	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2296d4afb5ceSopenharmony_ci					lws_ptr_diff(p, start))
2297d4afb5ceSopenharmony_ci		return 1;
2298d4afb5ceSopenharmony_ci
2299d4afb5ceSopenharmony_ci	wsi->mqtt->inside_subscribe = 1;
2300d4afb5ceSopenharmony_ci
2301d4afb5ceSopenharmony_ci	return 0;
2302d4afb5ceSopenharmony_ci}
2303d4afb5ceSopenharmony_ci
2304d4afb5ceSopenharmony_ciint
2305d4afb5ceSopenharmony_cilws_mqtt_client_send_unsubcribe(struct lws *wsi,
2306d4afb5ceSopenharmony_ci				const lws_mqtt_subscribe_param_t *unsub)
2307d4afb5ceSopenharmony_ci{
2308d4afb5ceSopenharmony_ci	struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
2309d4afb5ceSopenharmony_ci	uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
2310d4afb5ceSopenharmony_ci	struct lws *nwsi = lws_get_network_wsi(wsi);
2311d4afb5ceSopenharmony_ci	lws_mqtt_str_t mqtt_vh_payload;
2312d4afb5ceSopenharmony_ci	uint8_t send_unsub[8], orphaned;
2313d4afb5ceSopenharmony_ci	uint32_t rem_len, n;
2314d4afb5ceSopenharmony_ci	lws_mqtt_subs_t *mysub;
2315d4afb5ceSopenharmony_ci#if defined(_DEBUG)
2316d4afb5ceSopenharmony_ci	uint32_t tops;
2317d4afb5ceSopenharmony_ci#endif
2318d4afb5ceSopenharmony_ci
2319d4afb5ceSopenharmony_ci	lwsl_info("%s: Enter\n", __func__);
2320d4afb5ceSopenharmony_ci
2321d4afb5ceSopenharmony_ci	switch (lwsi_state(wsi)) {
2322d4afb5ceSopenharmony_ci	case LRS_ESTABLISHED: /* Protocol connection established */
2323d4afb5ceSopenharmony_ci		orphaned = 0;
2324d4afb5ceSopenharmony_ci		memset(&send_unsub, 0, sizeof(send_unsub));
2325d4afb5ceSopenharmony_ci		for (n = 0; n < unsub->num_topics; n++) {
2326d4afb5ceSopenharmony_ci			mysub = lws_mqtt_find_sub(nwsi->mqtt,
2327d4afb5ceSopenharmony_ci						  unsub->topic[n].name);
2328d4afb5ceSopenharmony_ci			assert(mysub);
2329d4afb5ceSopenharmony_ci
2330d4afb5ceSopenharmony_ci			if (mysub && --mysub->ref_count == 0) {
2331d4afb5ceSopenharmony_ci				lwsl_notice("%s: Need to send UNSUB\n", __func__);
2332d4afb5ceSopenharmony_ci				send_unsub[n] = 1;
2333d4afb5ceSopenharmony_ci				orphaned++;
2334d4afb5ceSopenharmony_ci			}
2335d4afb5ceSopenharmony_ci		}
2336d4afb5ceSopenharmony_ci
2337d4afb5ceSopenharmony_ci		if (!orphaned) {
2338d4afb5ceSopenharmony_ci			/*
2339d4afb5ceSopenharmony_ci			 * The nwsi still has other subscribers bound to the
2340d4afb5ceSopenharmony_ci			 * topics.
2341d4afb5ceSopenharmony_ci			 *
2342d4afb5ceSopenharmony_ci			 * So, don't send UNSUB to server, and just fake the
2343d4afb5ceSopenharmony_ci			 * UNSUB ACK event for the guy going away.
2344d4afb5ceSopenharmony_ci			 */
2345d4afb5ceSopenharmony_ci			lwsl_notice("%s: unsubscribed!\n", __func__);
2346d4afb5ceSopenharmony_ci			if (user_callback_handle_rxflow(
2347d4afb5ceSopenharmony_ci				    wsi->a.protocol->callback,
2348d4afb5ceSopenharmony_ci				    wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
2349d4afb5ceSopenharmony_ci				    wsi->user_space, NULL, 0) < 0) {
2350d4afb5ceSopenharmony_ci				/*
2351d4afb5ceSopenharmony_ci				 * We can't directly close here, because the
2352d4afb5ceSopenharmony_ci				 * caller still has the wsi.  Inform the
2353d4afb5ceSopenharmony_ci				 * caller that we want to close
2354d4afb5ceSopenharmony_ci				 */
2355d4afb5ceSopenharmony_ci
2356d4afb5ceSopenharmony_ci				return 1;
2357d4afb5ceSopenharmony_ci			}
2358d4afb5ceSopenharmony_ci
2359d4afb5ceSopenharmony_ci			return 0;
2360d4afb5ceSopenharmony_ci		}
2361d4afb5ceSopenharmony_ci#if defined(_DEBUG)
2362d4afb5ceSopenharmony_ci		/*
2363d4afb5ceSopenharmony_ci		 * one or more of the topics needs to be unsubscribed
2364d4afb5ceSopenharmony_ci		 * from, so we must go to the server with a filtered
2365d4afb5ceSopenharmony_ci		 * list of the new ones only
2366d4afb5ceSopenharmony_ci		 */
2367d4afb5ceSopenharmony_ci
2368d4afb5ceSopenharmony_ci		tops = orphaned;
2369d4afb5ceSopenharmony_ci#endif
2370d4afb5ceSopenharmony_ci
2371d4afb5ceSopenharmony_ci		if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
2372d4afb5ceSopenharmony_ci					       0, 0, 0)) {
2373d4afb5ceSopenharmony_ci			lwsl_err("%s: Failed to fill fixed header\n", __func__);
2374d4afb5ceSopenharmony_ci			return 1;
2375d4afb5ceSopenharmony_ci		}
2376d4afb5ceSopenharmony_ci
2377d4afb5ceSopenharmony_ci		/*
2378d4afb5ceSopenharmony_ci		 * Pid + (Topic len field + Topic len) x Num of Topics
2379d4afb5ceSopenharmony_ci		 */
2380d4afb5ceSopenharmony_ci		rem_len = 2;
2381d4afb5ceSopenharmony_ci		for (n = 0; n < unsub->num_topics; n++)
2382d4afb5ceSopenharmony_ci			if (send_unsub[n])
2383d4afb5ceSopenharmony_ci				rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
2384d4afb5ceSopenharmony_ci
2385d4afb5ceSopenharmony_ci		wsi->mqtt->sub_size = (uint16_t)rem_len;
2386d4afb5ceSopenharmony_ci
2387d4afb5ceSopenharmony_ci#if defined(_DEBUG)
2388d4afb5ceSopenharmony_ci		lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
2389d4afb5ceSopenharmony_ci			   __func__, (int)tops, (int)rem_len);
2390d4afb5ceSopenharmony_ci#endif
2391d4afb5ceSopenharmony_ci
2392d4afb5ceSopenharmony_ci		p += lws_mqtt_vbi_encode(rem_len, p);
2393d4afb5ceSopenharmony_ci
2394d4afb5ceSopenharmony_ci		if ((rem_len + lws_ptr_diff_size_t(p, start)) >=
2395d4afb5ceSopenharmony_ci					       wsi->a.context->pt_serv_buf_size) {
2396d4afb5ceSopenharmony_ci			lwsl_err("%s: Payload is too big\n", __func__);
2397d4afb5ceSopenharmony_ci			return 1;
2398d4afb5ceSopenharmony_ci		}
2399d4afb5ceSopenharmony_ci
2400d4afb5ceSopenharmony_ci		/* Init lws_mqtt_str */
2401d4afb5ceSopenharmony_ci		lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0);
2402d4afb5ceSopenharmony_ci		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2403d4afb5ceSopenharmony_ci
2404d4afb5ceSopenharmony_ci		/* Packet ID */
2405d4afb5ceSopenharmony_ci		wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
2406d4afb5ceSopenharmony_ci		lwsl_debug("%s: pkt_id = %d\n", __func__,
2407d4afb5ceSopenharmony_ci			   (int)wsi->mqtt->ack_pkt_id);
2408d4afb5ceSopenharmony_ci		lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
2409d4afb5ceSopenharmony_ci
2410d4afb5ceSopenharmony_ci		nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot;
2411d4afb5ceSopenharmony_ci
2412d4afb5ceSopenharmony_ci		if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2413d4afb5ceSopenharmony_ci			return 1;
2414d4afb5ceSopenharmony_ci
2415d4afb5ceSopenharmony_ci		p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2416d4afb5ceSopenharmony_ci
2417d4afb5ceSopenharmony_ci		for (n = 0; n < unsub->num_topics; n++) {
2418d4afb5ceSopenharmony_ci			lwsl_info("%s: topics[%d] = %s\n", __func__,
2419d4afb5ceSopenharmony_ci				   (int)n, unsub->topic[n].name);
2420d4afb5ceSopenharmony_ci
2421d4afb5ceSopenharmony_ci			/*
2422d4afb5ceSopenharmony_ci			 * Subscriber still bound to it, don't UBSUB
2423d4afb5ceSopenharmony_ci			 * from the server
2424d4afb5ceSopenharmony_ci			 */
2425d4afb5ceSopenharmony_ci			if (!send_unsub[n])
2426d4afb5ceSopenharmony_ci				continue;
2427d4afb5ceSopenharmony_ci
2428d4afb5ceSopenharmony_ci			/* Topic's Len */
2429d4afb5ceSopenharmony_ci			lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
2430d4afb5ceSopenharmony_ci			if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
2431d4afb5ceSopenharmony_ci				return 1;
2432d4afb5ceSopenharmony_ci			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2433d4afb5ceSopenharmony_ci
2434d4afb5ceSopenharmony_ci			/* Topic Name */
2435d4afb5ceSopenharmony_ci			lws_strncpy((char *)p, unsub->topic[n].name,
2436d4afb5ceSopenharmony_ci				    strlen(unsub->topic[n].name) + 1);
2437d4afb5ceSopenharmony_ci			if (lws_mqtt_str_advance(&mqtt_vh_payload,
2438d4afb5ceSopenharmony_ci						 (int)strlen(unsub->topic[n].name)))
2439d4afb5ceSopenharmony_ci				return 1;
2440d4afb5ceSopenharmony_ci			p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
2441d4afb5ceSopenharmony_ci		}
2442d4afb5ceSopenharmony_ci		break;
2443d4afb5ceSopenharmony_ci
2444d4afb5ceSopenharmony_ci	default:
2445d4afb5ceSopenharmony_ci		return 1;
2446d4afb5ceSopenharmony_ci	}
2447d4afb5ceSopenharmony_ci
2448d4afb5ceSopenharmony_ci	if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
2449d4afb5ceSopenharmony_ci					lws_ptr_diff(p, start))
2450d4afb5ceSopenharmony_ci		return 1;
2451d4afb5ceSopenharmony_ci
2452d4afb5ceSopenharmony_ci	wsi->mqtt->inside_unsubscribe = 1;
2453d4afb5ceSopenharmony_ci
2454d4afb5ceSopenharmony_ci	wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout;
2455d4afb5ceSopenharmony_ci	__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
2456d4afb5ceSopenharmony_ci			    &wsi->mqtt->sul_unsuback_wait,
2457d4afb5ceSopenharmony_ci			    3 * LWS_USEC_PER_SEC);
2458d4afb5ceSopenharmony_ci
2459d4afb5ceSopenharmony_ci	return 0;
2460d4afb5ceSopenharmony_ci}
2461d4afb5ceSopenharmony_ci
2462d4afb5ceSopenharmony_ci/*
2463d4afb5ceSopenharmony_ci * This is called when child streams bind to an already-existing and compatible
2464d4afb5ceSopenharmony_ci * MQTT stream
2465d4afb5ceSopenharmony_ci */
2466d4afb5ceSopenharmony_ci
2467d4afb5ceSopenharmony_cistruct lws *
2468d4afb5ceSopenharmony_cilws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
2469d4afb5ceSopenharmony_ci{
2470d4afb5ceSopenharmony_ci	/* no more children allowed by parent? */
2471d4afb5ceSopenharmony_ci
2472d4afb5ceSopenharmony_ci	if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
2473d4afb5ceSopenharmony_ci		lwsl_err("%s: reached concurrent stream limit\n", __func__);
2474d4afb5ceSopenharmony_ci		return NULL;
2475d4afb5ceSopenharmony_ci	}
2476d4afb5ceSopenharmony_ci
2477d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT)
2478d4afb5ceSopenharmony_ci	wsi->client_mux_substream = 1;
2479d4afb5ceSopenharmony_ci#endif
2480d4afb5ceSopenharmony_ci
2481d4afb5ceSopenharmony_ci	lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
2482d4afb5ceSopenharmony_ci
2483d4afb5ceSopenharmony_ci	if (lws_ensure_user_space(wsi))
2484d4afb5ceSopenharmony_ci		goto bail1;
2485d4afb5ceSopenharmony_ci
2486d4afb5ceSopenharmony_ci	lws_mqtt_set_client_established(wsi);
2487d4afb5ceSopenharmony_ci	lws_callback_on_writable(wsi);
2488d4afb5ceSopenharmony_ci
2489d4afb5ceSopenharmony_ci	return wsi;
2490d4afb5ceSopenharmony_ci
2491d4afb5ceSopenharmony_cibail1:
2492d4afb5ceSopenharmony_ci	/* undo the insert */
2493d4afb5ceSopenharmony_ci	parent_wsi->mux.child_list = wsi->mux.sibling_list;
2494d4afb5ceSopenharmony_ci	parent_wsi->mux.child_count--;
2495d4afb5ceSopenharmony_ci
2496d4afb5ceSopenharmony_ci	if (wsi->user_space)
2497d4afb5ceSopenharmony_ci		lws_free_set_NULL(wsi->user_space);
2498d4afb5ceSopenharmony_ci
2499d4afb5ceSopenharmony_ci	wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
2500d4afb5ceSopenharmony_ci	lws_free(wsi);
2501d4afb5ceSopenharmony_ci
2502d4afb5ceSopenharmony_ci	return NULL;
2503d4afb5ceSopenharmony_ci}
2504d4afb5ceSopenharmony_ci
2505