1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #ifndef _PRIVATE_LIB_ROLES_MQTT
26 #define _PRIVATE_LIB_ROLES_MQTT 1
27 
28 extern struct lws_role_ops role_ops_mqtt;
29 
30 #define lwsi_role_mqtt(wsi) (wsi->role_ops == &role_ops_mqtt)
31 
32 #define LWS_MQTT_MAX_CHILDREN 8 /* max child streams on same parent */
33 
34 #define LMQCP_LUT_FLAG_RESERVED_FLAGS  0x10
35 #define LMQCP_LUT_FLAG_PACKET_ID_NONE  0x00
36 #define LMQCP_LUT_FLAG_PACKET_ID_HAS   0x20
37 #define LMQCP_LUT_FLAG_PACKET_ID_QOS12 0x40
38 #define LMQCP_LUT_FLAG_PACKET_ID_MASK  0x60
39 #define LMQCP_LUT_FLAG_PAYLOAD	       0x80	/* payload req (publish = opt)*/
40 
41 #define lws_mqtt_str_is_not_empty(s) ( ((s)) &&		\
42 				       ((s))->len &&	\
43 				       ((s))->buf &&	\
44 				       *((s))->buf )
45 
46 #define LWS_MQTT_RESPONSE_TIMEOUT      (3 * LWS_US_PER_SEC)
47 #define LWS_MQTT_RETRY_CEILING         (60 * LWS_US_PER_SEC)
48 #define LWS_MQTT_MAX_PUBLISH_RETRY 	   (3)
49 
50 typedef enum {
51 	LMSPR_COMPLETED			=  0,
52 	LMSPR_NEED_MORE			=  1,
53 
54 	LMSPR_FAILED_OOM		= -1,
55 	LMSPR_FAILED_OVERSIZE		= -2,
56 	LMSPR_FAILED_FORMAT		= -3,
57 	LMSPR_FAILED_ALREADY_COMPLETED	= -4,
58 } lws_mqtt_stateful_primitive_return_t;
59 
60 typedef struct {
61 	uint32_t value;
62 	char budget;
63 	char consumed;
64 } lws_mqtt_vbi;
65 
66 /* works for vbi, 2-byte and 4-byte fixed length */
67 static inline int
lws_mqtt_mb_first(lws_mqtt_vbi *vbi)68 lws_mqtt_mb_first(lws_mqtt_vbi *vbi) { return !vbi->consumed; }
69 
70 int
71 lws_mqtt_vbi_encode(uint32_t value, void *buf);
72 
73 /*
74  * Decode is done statefully on an arbitrary amount of input data (which may
75  * be one byte).  It's like this so it can continue seamlessly if a buffer ends
76  * partway through the primitive, and the api matches the bulk binary data case.
77  *
78  * VBI decode:
79  *
80  * Initialize the lws_mqtt_vbi state by calling lws_mqtt_vbi_init() on it, then
81  * feed lws_mqtt_vbi_r() bytes to decode.
82  *
83  * Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
84  * LMSPR_NEED_MORE if more calls to lws_mqtt_vbi_r() with subsequent bytes
85  * needed.
86  *
87  * *in and *len are updated accordingly.
88  *
89  * 2-byte and 4-byte decode:
90  *
91  * Initialize the lws_mqtt_vbi state by calling lws_mqtt_2byte_init() or
92  * lws_mqtt_4byte_init() on it, then feed lws_mqtt_mb_parse() bytes
93  * to decode.
94  *
95  * Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
96  * LMSPR_NEED_MORE if more calls to lws_mqtt_mb_parse() with subsequent
97  * bytes needed.
98  *
99  * *in and *len are updated accordingly.
100  */
101 
102 void
103 lws_mqtt_vbi_init(lws_mqtt_vbi *vbi);
104 
105 void
106 lws_mqtt_2byte_init(lws_mqtt_vbi *vbi);
107 
108 void
109 lws_mqtt_4byte_init(lws_mqtt_vbi *vbi);
110 
111 lws_mqtt_stateful_primitive_return_t
112 lws_mqtt_vbi_r(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
113 
114 lws_mqtt_stateful_primitive_return_t
115 lws_mqtt_mb_parse(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
116 
117 struct lws_mqtt_str_st {
118 	uint8_t		*buf;
119 	uint16_t	len;
120 
121 	uint16_t	limit; /* it's cheaper to add the state here than
122 				* the pointer to point to it elsewhere */
123 	uint16_t	pos;
124 	char		len_valid;
125 	char		needs_freeing;
126 };
127 
128 static inline int
lws_mqtt_str_first(struct lws_mqtt_str_st *s)129 lws_mqtt_str_first(struct lws_mqtt_str_st *s) { return !s->buf && !s->pos; }
130 
131 
132 lws_mqtt_stateful_primitive_return_t
133 lws_mqtt_str_parse(struct lws_mqtt_str_st *bd, const uint8_t **in, size_t *len);
134 
135 typedef enum {
136 	LMQCPP_IDLE,
137 
138 	/* receive packet type part of fixed header took us out of idle... */
139 	LMQCPP_CONNECT_PACKET = LMQCP_CTOS_CONNECT << 4,
140 	LMQCPP_CONNECT_REMAINING_LEN_VBI,
141 	LMQCPP_CONNECT_VH_PNAME,
142 	LMQCPP_CONNECT_VH_PVERSION,
143 	LMQCPP_CONNECT_VH_FLAGS,
144 	LMQCPP_CONNECT_VH_KEEPALIVE,
145 	LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN,
146 
147 	LMQCPP_CONNACK_PACKET = LMQCP_STOC_CONNACK << 4,
148 	LMQCPP_CONNACK_VH_FLAGS,
149 	LMQCPP_CONNACK_VH_RETURN_CODE,
150 
151 	LMQCPP_PUBLISH_PACKET = LMQCP_PUBLISH << 4,
152 	LMQCPP_PUBLISH_REMAINING_LEN_VBI,
153 	LMQCPP_PUBLISH_VH_TOPIC,
154 	LMQCPP_PUBLISH_VH_PKT_ID,
155 
156 	LMQCPP_PUBACK_PACKET = LMQCP_PUBACK << 4,
157 	LMQCPP_PUBACK_VH_PKT_ID,
158 	LMQCPP_PUBACK_PROPERTIES_LEN_VBI,
159 
160 	LMQCPP_PUBREC_PACKET = LMQCP_PUBREC << 4,
161 	LMQCPP_PUBREC_VH_PKT_ID,
162 
163 	LMQCPP_PUBREL_PACKET = LMQCP_PUBREL << 4,
164 	LMQCPP_PUBREL_VH_PKT_ID,
165 
166 	LMQCPP_PUBCOMP_PACKET = LMQCP_PUBCOMP << 4,
167 	LMQCPP_PUBCOMP_VH_PKT_ID,
168 
169 	LMQCPP_SUBACK_PACKET = LMQCP_STOC_SUBACK << 4,
170 	LMQCPP_SUBACK_VH_PKT_ID,
171 	LMQCPP_SUBACK_PAYLOAD,
172 
173 	LMQCPP_UNSUBACK_PACKET = LMQCP_STOC_UNSUBACK << 4,
174 	LMQCPP_UNSUBACK_VH_PKT_ID,
175 
176 	LMQCPP_PINGRESP_ZERO = LMQCP_STOC_PINGRESP << 4,
177 
178 	LMQCPP_PAYLOAD,
179 
180 	LMQCPP_EAT_PROPERTIES_AND_COMPLETE,
181 
182 	LMQCPP_PROP_ID_VBI,
183 
184 	/* all possible property payloads */
185 
186 	/* 3.3.2.3.2 */
187 	LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE			= 0x101,
188 
189 	LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE				= 0x102,
190 
191 	LMQCPP_PROP_CONTENT_TYPE_UTF8S					= 0x103,
192 
193 	LMQCPP_PROP_RESPONSE_TOPIC_UTF8S				= 0x108,
194 
195 	LMQCPP_PROP_CORRELATION_BINDATA					= 0x109,
196 
197 	LMQCPP_PROP_SUBSCRIPTION_ID_VBI					= 0x10b,
198 
199 	LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE			= 0x111,
200 
201 	LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S				= 0x112,
202 
203 	LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE				= 0x113,
204 
205 	LMQCPP_PROP_AUTH_METHOD_UTF8S					= 0x115,
206 
207 	LMQCPP_PROP_AUTH_DATA_BINDATA					= 0x116,
208 
209 	LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE				= 0x117,
210 
211 	LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE				= 0x118,
212 
213 	LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE				= 0x119,
214 
215 	LMQCPP_PROP_RESPONSE_INFO_UTF8S					= 0x11a,
216 
217 	LMQCPP_PROP_SERVER_REFERENCE_UTF8S				= 0x11c,
218 
219 	LMQCPP_PROP_REASON_STRING_UTF8S					= 0x11f,
220 
221 	LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE				= 0x121,
222 
223 	LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE					= 0x122,
224 
225 	LMQCPP_PROP_TOPIC_ALIAS_2BYTE					= 0x123,
226 
227 	LMQCPP_PROP_MAXIMUM_QOS_1BYTE					= 0x124,
228 
229 	LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE				= 0x125,
230 
231 	LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S				= 0x126,
232 	LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S				= 0x226,
233 
234 	LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE				= 0x127,
235 
236 	LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE		= 0x128,
237 
238 	LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE		= 0x129,
239 
240 	LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE			= 0x12a,
241 
242 } lws_mqtt_packet_parse_state_t;
243 
244 /*
245  * the states an MQTT connection can be in
246  */
247 
248 typedef enum {
249 	LGSMQTT_UNKNOWN,
250 	LGSMQTT_IDLE,
251 	LGSMQTT_TRANSPORT_CONNECTED,
252 
253 	LGSMQTT_SENT_CONNECT,
254 	LGSMQTT_ESTABLISHED,
255 
256 	LGSMQTT_SENT_SUBSCRIBE,
257 	LGSMQTT_SUBSCRIBED,
258 
259 } lwsgs_mqtt_states_t;
260 
261 typedef struct lws_mqtt_parser_st {
262 	/* struct lws_mqtt_str_st s_content_type; */
263 	lws_mqtt_packet_parse_state_t state;
264 	lws_mqtt_vbi vbit;
265 
266 	lws_mqtt_reason_t reason;
267 
268 	lws_mqtt_str_t s_temp;
269 
270 	uint8_t fixed_seen[4];
271 	uint8_t props_seen[8];
272 
273 	uint8_t cpkt_flags;
274 	uint32_t cpkt_remlen;
275 
276 	uint32_t props_len;
277 	uint32_t consumed;
278 	uint32_t prop_id;
279 	uint32_t props_consumed;
280 	uint32_t payload_consumed;
281 
282 	uint16_t keepalive;
283 	uint16_t cpkt_id;
284 	uint32_t n;
285 
286 	uint8_t temp[32];
287 	uint8_t conn_rc;
288 	uint8_t payload_format;
289 	uint8_t packet_type_flags;
290 	uint8_t conn_protocol_version;
291 	uint8_t fixed;
292 
293 	uint8_t flag_pending_send_connack_close:1;
294 	uint8_t flag_pending_send_reason_close:1;
295 	uint8_t flag_prop_multi:1;
296 	uint8_t flag_server:1;
297 
298 } lws_mqtt_parser_t;
299 
300 typedef enum {
301 	LMVTR_VALID				=  0,
302 	LMVTR_VALID_WILDCARD			=  1,
303 	LMVTR_VALID_SHADOW			=  2,
304 
305 	LMVTR_FAILED_OVERSIZE			= -1,
306 	LMVTR_FAILED_WILDCARD_FORMAT		= -2,
307 	LMVTR_FAILED_SHADOW_FORMAT		= -3,
308 } lws_mqtt_validate_topic_return_t;
309 
310 typedef enum {
311 	LMMTR_TOPIC_NOMATCH			= 0,
312 	LMMTR_TOPIC_MATCH			= 1,
313 
314 	LMMTR_TOPIC_MATCH_ERROR			= -1
315 } lws_mqtt_match_topic_return_t;
316 
317 typedef struct lws_mqtt_subs {
318 	struct lws_mqtt_subs	*next;
319 
320 	uint8_t			ref_count; /* number of children referencing */
321 
322 	/* Flags */
323 	uint8_t			wildcard:1;
324 	uint8_t			shadow:1;
325 
326 	/* subscription name + NUL overallocated here */
327 	char			topic[];
328 } lws_mqtt_subs_t;
329 
330 typedef struct lws_mqtts {
331 	lws_mqtt_parser_t	par;
332 	lwsgs_mqtt_states_t	estate;
333 	struct lws_dll2		active_session_list_head;
334 	struct lws_dll2		limbo_session_list_head;
335 } lws_mqtts_t;
336 
337 typedef struct lws_mqttc {
338 	lws_mqtt_parser_t	par;
339 	lwsgs_mqtt_states_t	estate;
340 	struct lws_mqtt_str_st	*id;
341 	struct lws_mqtt_str_st	*username;
342 	struct lws_mqtt_str_st	*password;
343 	struct {
344 		struct lws_mqtt_str_st	*topic;
345 		struct lws_mqtt_str_st	*message;
346 		lws_mqtt_qos_levels_t qos;
347 		uint8_t		retain;
348 	} will;
349 	uint16_t		keep_alive_secs;
350 	uint16_t			conn_flags;
351 	uint8_t			aws_iot;
352 } lws_mqttc_t;
353 
354 struct _lws_mqtt_related {
355 	lws_mqttc_t		client;
356 	lws_sorted_usec_list_t	sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */
357 	lws_sorted_usec_list_t	sul_qos1_puback_wait; /* QoS1 puback wait TO */
358 	lws_sorted_usec_list_t	sul_unsuback_wait; /* unsuback wait TO */
359 	lws_sorted_usec_list_t	sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */
360 	lws_sorted_usec_list_t	sul_shadow_wait; /* Device Shadow wait TO */
361 	struct lws		*wsi; /**< so sul can use lws_container_of */
362 	lws_mqtt_subs_t		*subs_head; /**< Linked-list of heap-allocated subscription objects */
363 	void			*rx_cpkt_param;
364 	uint16_t		pkt_id;
365 	uint16_t		ack_pkt_id;
366 	uint16_t		peer_ack_pkt_id;
367 	uint16_t		sub_size;
368 
369 #if defined(LWS_WITH_CLIENT)
370 	uint8_t 		send_pingreq:1;
371 	uint8_t			session_resumed:1;
372 #endif
373 	uint8_t			inside_payload:1;
374 	uint8_t			inside_subscribe:1;
375 	uint8_t			inside_unsubscribe:1;
376 	uint8_t			inside_birth:1;
377 	uint8_t			inside_resume_session:1;
378 	uint8_t 		send_puback:1;
379 	uint8_t 		send_pubrel:1;
380 	uint8_t 		send_pubrec:1;
381 	uint8_t 		send_pubcomp:1;
382 	uint8_t			unacked_publish:1;
383 	uint8_t			unacked_pubrel:1;
384 
385 	uint8_t			done_subscribe:1;
386 	uint8_t			done_birth:1;
387 	uint8_t			inside_shadow:1;
388 	uint8_t			done_shadow_subscribe:1;
389 	uint8_t			send_shadow_unsubscribe:1;
390 };
391 
392 /*
393  * New sessions are created by starting CONNECT.  If the ClientID sent
394  * by the client matches a different, extant session, then the
395  * existing one is taken over and the new one created for duration of
396  * CONNECT processing is destroyed.
397  *
398  * On the server side, bearing in mind multiple simultaneous,
399  * fragmented CONNECTs may be interleaved ongoing, all state and
400  * parsing temps for a session must live in the session object.
401  */
402 
403 struct lws_mqtt_endpoint_st;
404 
405 typedef struct lws_mqtts_session_st {
406 	struct lws_dll2 session_list;
407 
408 } lws_mqtts_session_t;
409 
410 #define ctl_pkt_type(x) (x->packet_type_flags >> 4)
411 
412 
413 void
414 lws_mqttc_state_transition(lws_mqttc_t *ep, lwsgs_mqtt_states_t s);
415 
416 int
417 _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
418 		    const uint8_t *buf, size_t len);
419 
420 int
421 lws_mqtt_client_socket_service(struct lws *wsi, struct lws_pollfd *pollfd,
422 			       struct lws *wsi_conn);
423 
424 int
425 lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
426 			      struct lws *wsi);
427 
428 struct lws *
429 lws_mqtt_client_send_connect(struct lws *wsi);
430 
431 struct lws *
432 lws_mqtt_client_send_disconnect(struct lws *wsi);
433 
434 int
435 lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
436 			   uint8_t dup, lws_mqtt_qos_levels_t qos,
437 			   uint8_t retain);
438 
439 struct lws *
440 lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi);
441 
442 lws_mqtt_subs_t *
443 lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic);
444 
445 lws_mqtt_match_topic_return_t
446 lws_mqtt_is_topic_matched(const char* sub, const char* pub);
447 
448 #endif /* _PRIVATE_LIB_ROLES_MQTT */
449 
450