1/*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25#include "private-lib-core.h"
26
27static int
28rops_handle_POLLIN_mqtt(struct lws_context_per_thread *pt, struct lws *wsi,
29			   struct lws_pollfd *pollfd)
30{
31	unsigned int pending = 0;
32	struct lws_tokens ebuf;
33	int n = 0;
34	char buffered = 0;
35
36	lwsl_debug("%s: wsistate 0x%x, %s pollout %d\n", __func__,
37		   (unsigned int)wsi->wsistate,  wsi->a.protocol->name,
38		   pollfd->revents);
39
40	/*
41	 * After the CONNACK and nwsi establishment, the first logical
42	 * stream is migrated out of the nwsi to be child sid 1, and the
43	 * nwsi no longer has a wsi->mqtt of its own.
44	 *
45	 * RX events on the nwsi must be converted to events seen or not
46	 * seen by one or more child streams.
47	 *
48	 * SUBACK - reflected to child stream that asked for it
49	 * PUBACK - routed to child that did the related publish
50	 */
51
52	ebuf.token = NULL;
53	ebuf.len = 0;
54
55	if (lwsi_state(wsi) != LRS_ESTABLISHED) {
56#if defined(LWS_WITH_CLIENT)
57
58		if (lwsi_state(wsi) == LRS_WAITING_SSL &&
59		    ((pollfd->revents & LWS_POLLOUT)) &&
60		    lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
61			lwsl_info("failed at set pollfd\n");
62			return LWS_HPI_RET_PLEASE_CLOSE_ME;
63		}
64
65		if ((pollfd->revents & LWS_POLLOUT) &&
66		    lws_handle_POLLOUT_event(wsi, pollfd)) {
67			lwsl_debug("POLLOUT event closed it\n");
68			return LWS_HPI_RET_PLEASE_CLOSE_ME;
69		}
70
71		n = lws_mqtt_client_socket_service(wsi, pollfd, NULL);
72		if (n)
73			return LWS_HPI_RET_WSI_ALREADY_DIED;
74#endif
75		return LWS_HPI_RET_HANDLED;
76	}
77
78	/* 1: something requested a callback when it was OK to write */
79
80	if ((pollfd->revents & LWS_POLLOUT) &&
81	    lwsi_state_can_handle_POLLOUT(wsi) &&
82	    lws_handle_POLLOUT_event(wsi, pollfd)) {
83		if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
84			lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
85
86		return LWS_HPI_RET_PLEASE_CLOSE_ME;
87	}
88
89	/* 3: buflist needs to be drained
90	 */
91read:
92	// lws_buflist_describe(&wsi->buflist, wsi, __func__);
93	ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist, &ebuf.token);
94	if (ebuf.len) {
95		lwsl_info("draining buflist (len %d)\n", ebuf.len);
96		buffered = 1;
97		goto drain;
98	}
99
100	if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
101		return LWS_HPI_RET_HANDLED;
102
103	/* if (lws_is_flowcontrolled(wsi)) { */
104	/*	lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n", */
105	/*		    __func__, wsi, wsi->rxflow_bitmap); */
106	/*	return LWS_HPI_RET_HANDLED; */
107	/* } */
108
109	if (!(lwsi_role_client(wsi) && lwsi_state(wsi) != LRS_ESTABLISHED)) {
110		/*
111		 * In case we are going to react to this rx by scheduling
112		 * writes, we need to restrict the amount of rx to the size
113		 * the protocol reported for rx buffer.
114		 *
115		 * Otherwise we get a situation we have to absorb possibly a
116		 * lot of reads before we get a chance to drain them by writing
117		 * them, eg, with echo type tests in autobahn.
118		 */
119
120		buffered = 0;
121		ebuf.token = pt->serv_buf;
122		ebuf.len = (int)wsi->a.context->pt_serv_buf_size;
123
124		if ((unsigned int)ebuf.len > wsi->a.context->pt_serv_buf_size)
125			ebuf.len = (int)wsi->a.context->pt_serv_buf_size;
126
127		if ((int)pending > ebuf.len)
128			pending = (unsigned int)ebuf.len;
129
130		ebuf.len = lws_ssl_capable_read(wsi, ebuf.token,
131						pending ? pending :
132						(unsigned int)ebuf.len);
133		switch (ebuf.len) {
134		case 0:
135			lwsl_info("%s: zero length read\n",
136				  __func__);
137			return LWS_HPI_RET_PLEASE_CLOSE_ME;
138		case LWS_SSL_CAPABLE_MORE_SERVICE:
139			lwsl_info("SSL Capable more service\n");
140			return LWS_HPI_RET_HANDLED;
141		case LWS_SSL_CAPABLE_ERROR:
142			lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
143					__func__);
144			return LWS_HPI_RET_PLEASE_CLOSE_ME;
145		}
146
147		/*
148		 * coverity thinks ssl_capable_read() may read over
149		 * 2GB.  Dissuade it...
150		 */
151		ebuf.len &= 0x7fffffff;
152	}
153
154drain:
155	/* service incoming data */
156	//lws_buflist_describe(&wsi->buflist, wsi, __func__);
157	if (ebuf.len) {
158		n = lws_read_mqtt(wsi, ebuf.token, (unsigned int)ebuf.len);
159		if (n < 0) {
160			lwsl_notice("%s: lws_read_mqtt returned %d\n",
161					__func__, n);
162			/* we closed wsi */
163			goto fail;
164                }
165		// lws_buflist_describe(&wsi->buflist, wsi, __func__);
166		lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len);
167		if (lws_buflist_aware_finished_consuming(wsi, &ebuf, ebuf.len,
168							 buffered, __func__))
169			return LWS_HPI_RET_PLEASE_CLOSE_ME;
170	}
171
172	ebuf.token = NULL;
173	ebuf.len = 0;
174
175	pending = (unsigned int)lws_ssl_pending(wsi);
176	if (pending) {
177		pending = pending > wsi->a.context->pt_serv_buf_size ?
178			wsi->a.context->pt_serv_buf_size : pending;
179		goto read;
180	}
181
182	if (buffered && /* were draining, now nothing left */
183	    !lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
184		lwsl_info("%s: %s flow buf: drained\n", __func__, lws_wsi_tag(wsi));
185		/* having drained the rxflow buffer, can rearm POLLIN */
186#if !defined(LWS_WITH_SERVER)
187		n =
188#endif
189		__lws_rx_flow_control(wsi);
190		/* n ignored, needed for NO_SERVER case */
191	}
192
193	/* n = 0 */
194	return LWS_HPI_RET_HANDLED;
195
196fail:
197	lwsl_err("%s: Failed, bailing\n", __func__);
198	lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail");
199
200	return LWS_HPI_RET_WSI_ALREADY_DIED;
201}
202
203#if 0 /* defined(LWS_WITH_SERVER) */
204
205static int
206rops_adoption_bind_mqtt(struct lws *wsi, int type, const char *vh_prot_name)
207{
208	/* no http but socket... must be mqtt */
209	if ((type & LWS_ADOPT_HTTP) || !(type & LWS_ADOPT_SOCKET) ||
210	    (type & _LWS_ADOPT_FINISH))
211		return 0; /* no match */
212
213	lws_role_transition(wsi, 0, (type & LWS_ADOPT_ALLOW_SSL) ? LRS_SSL_INIT :
214				LRS_ESTABLISHED, &role_ops_mqtt);
215
216	if (vh_prot_name)
217		lws_bind_protocol(wsi, wsi->a.protocol, __func__);
218	else
219		/* this is the only time he will transition */
220		lws_bind_protocol(wsi,
221			&wsi->a.vhost->protocols[wsi->a.vhost->mqtt_protocol_index],
222			__func__);
223
224	return 1; /* bound */
225}
226#endif
227
228static int
229rops_client_bind_mqtt(struct lws *wsi, const struct lws_client_connect_info *i)
230{
231	lwsl_debug("%s: i = %p\n", __func__, i);
232	if (!i) {
233
234		/* finalize */
235
236		if (!wsi->user_space && wsi->stash->cis[CIS_METHOD])
237			if (lws_ensure_user_space(wsi))
238				return 1;
239
240		if (!wsi->stash->cis[CIS_METHOD] && !wsi->stash->cis[CIS_ALPN])
241			wsi->stash->cis[CIS_ALPN] = "x-amzn-mqtt-ca";
242
243		/* if we went on the ah waiting list, it's ok, we can
244		 * wait.
245		 *
246		 * When we do get the ah, now or later, he will end up
247		 * at lws_http_client_connect_via_info2().
248		 */
249#if defined(LWS_WITH_CLIENT)
250		if (lws_header_table_attach(wsi, 0) < 0)
251			/*
252			 * if we failed here, the connection is already closed
253			 * and freed.
254			 */
255			return -1;
256#else
257		if (lws_header_table_attach(wsi, 0))
258			return 0;
259#endif
260		return 0;
261	}
262
263	/* if a recognized mqtt method, bind to it */
264	if (strcmp(i->method, "MQTT"))
265		return 0; /* no match */
266
267	if (lws_create_client_mqtt_object(i, wsi))
268		return 1;
269
270	lws_role_transition(wsi, LWSIFR_CLIENT, LRS_UNCONNECTED,
271				&role_ops_mqtt);
272	return 1; /* matched */
273}
274
275static int
276rops_handle_POLLOUT_mqtt(struct lws *wsi)
277{
278	struct lws **wsi2;
279
280	lwsl_debug("%s\n", __func__);
281
282#if defined(LWS_WITH_CLIENT)
283	if (wsi->mqtt && wsi->mqtt->send_pingreq && !wsi->mqtt->inside_payload) {
284		uint8_t buf[LWS_PRE + 2];
285
286		/*
287		 * We are swallowing this POLLOUT in order to send a PINGREQ
288		 * autonomously
289		 */
290
291		wsi->mqtt->send_pingreq = 0;
292
293		lwsl_notice("%s: issuing PINGREQ\n", __func__);
294
295		buf[LWS_PRE] = LMQCP_CTOS_PINGREQ << 4;
296		buf[LWS_PRE + 1] = 0;
297
298		if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 2,
299			      LWS_WRITE_BINARY) != 2)
300			return LWS_HP_RET_BAIL_DIE;
301
302		return LWS_HP_RET_BAIL_OK;
303	}
304#endif
305	if (wsi->mqtt && !wsi->mqtt->inside_payload &&
306	    (wsi->mqtt->send_pubrec || wsi->mqtt->send_pubrel ||
307	     wsi->mqtt->send_pubcomp)) {
308		uint8_t buf[LWS_PRE + 4];
309		/* Remaining len = 2 */
310		buf[LWS_PRE + 1] = 2;
311		if (wsi->mqtt->send_pubrec) {
312			lwsl_notice("%s: issuing PUBREC for pkt id: %d\n",
313				    __func__, wsi->mqtt->peer_ack_pkt_id);
314			buf[LWS_PRE] = LMQCP_PUBREC << 4 | 0x2;
315			/* Packet ID */
316			lws_ser_wu16be(&buf[LWS_PRE + 2],
317				       wsi->mqtt->peer_ack_pkt_id);
318			wsi->mqtt->send_pubrec = 0;
319		} else if (wsi->mqtt->send_pubrel) {
320			lwsl_notice("%s: issuing PUBREL for pkt id: %d\n",
321				    __func__, wsi->mqtt->ack_pkt_id);
322			buf[LWS_PRE] = LMQCP_PUBREL << 4 | 0x2;
323			lws_ser_wu16be(&buf[LWS_PRE + 2],
324				       wsi->mqtt->ack_pkt_id);
325			wsi->mqtt->send_pubrel = 0;
326		} else {
327			lwsl_notice("%s: issuing PUBCOMP for pkt id: %d\n",
328				    __func__, wsi->mqtt->peer_ack_pkt_id);
329			buf[LWS_PRE] = LMQCP_PUBCOMP << 4 | 0x2;
330			lws_ser_wu16be(&buf[LWS_PRE + 2],
331				       wsi->mqtt->peer_ack_pkt_id);
332			wsi->mqtt->send_pubcomp = 0;
333		}
334		if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
335			      LWS_WRITE_BINARY) != 4)
336			return LWS_HP_RET_BAIL_DIE;
337		return LWS_HP_RET_BAIL_OK;
338	}
339
340	wsi = lws_get_network_wsi(wsi);
341
342	wsi->mux.requested_POLLOUT = 0;
343
344	wsi2 = &wsi->mux.child_list;
345	if (!*wsi2) {
346		lwsl_debug("%s: no children\n", __func__);
347		return LWS_HP_RET_DROP_POLLOUT;
348	}
349
350	if (!wsi->mqtt)
351		return LWS_HP_RET_BAIL_DIE;
352
353	lws_wsi_mux_dump_waiting_children(wsi);
354
355	do {
356		struct lws *w, **wa;
357
358		wa = &(*wsi2)->mux.sibling_list;
359		if (!(*wsi2)->mux.requested_POLLOUT)
360			goto next_child;
361
362		if (!lwsi_state_can_handle_POLLOUT(wsi))
363			goto next_child;
364
365		/*
366		 * If the nwsi is in the middle of a frame, we can only
367		 * continue to send that
368		 */
369
370		if (wsi->mqtt->inside_payload && !(*wsi2)->mqtt->inside_payload)
371			goto next_child;
372
373		/*
374		 * we're going to do writable callback for this child.
375		 * move him to be the last child
376		 */
377		w = lws_wsi_mux_move_child_to_tail(wsi2);
378		if (!w) {
379			wa = &wsi->mux.child_list;
380			goto next_child;
381		}
382
383		lwsl_debug("%s: child %s (wsistate 0x%x)\n", __func__,
384			   lws_wsi_tag(w), (unsigned int)w->wsistate);
385
386		if (lwsi_state(wsi) == LRS_ESTABLISHED &&
387		    !wsi->mqtt->inside_payload &&
388		    wsi->mqtt->send_puback) {
389			uint8_t buf[LWS_PRE + 4];
390			lwsl_notice("%s: issuing PUBACK for pkt id: %d\n",
391				    __func__, wsi->mqtt->ack_pkt_id);
392
393			/* Fixed header */
394			buf[LWS_PRE] = LMQCP_PUBACK << 4;
395			/* Remaining len = 2 */
396			buf[LWS_PRE + 1] = 2;
397			/* Packet ID */
398			lws_ser_wu16be(&buf[LWS_PRE + 2], wsi->mqtt->peer_ack_pkt_id);
399
400			if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
401				      LWS_WRITE_BINARY) != 4)
402				return LWS_HP_RET_BAIL_DIE;
403
404			wsi->mqtt->send_puback = 0;
405			w->mux.requested_POLLOUT = 1;
406
407			wa = &wsi->mux.child_list;
408			goto next_child;
409		}
410
411		if (lws_callback_as_writeable(w)) {
412			lwsl_notice("%s: Closing child %s\n", __func__, lws_wsi_tag(w));
413			lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
414					   "mqtt pollout handle");
415			wa = &wsi->mux.child_list;
416		}
417
418next_child:
419		wsi2 = wa;
420	} while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi));
421
422	// lws_wsi_mux_dump_waiting_children(wsi);
423
424	if (lws_wsi_mux_action_pending_writeable_reqs(wsi))
425		return LWS_HP_RET_BAIL_DIE;
426
427	return LWS_HP_RET_BAIL_OK;
428}
429
430#if defined(LWS_WITH_CLIENT)
431static int
432rops_issue_keepalive_mqtt(struct lws *wsi, int isvalid)
433{
434	struct lws *nwsi = lws_get_network_wsi(wsi);
435
436	if (isvalid) {
437		_lws_validity_confirmed_role(nwsi);
438
439		return 0;
440	}
441
442	nwsi->mqtt->send_pingreq = 1;
443	lws_callback_on_writable(nwsi);
444
445	return 0;
446}
447#endif
448
449static int
450rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi)
451{
452	struct lws *nwsi = lws_get_network_wsi(wsi);
453	lws_mqtt_subs_t	*s, *s1, *mysub;
454	lws_mqttc_t *c;
455
456	if (!wsi->mqtt)
457		return 0;
458
459	c = &wsi->mqtt->client;
460
461	lws_sul_cancel(&wsi->mqtt->sul_qos_puback_pubrec_wait);
462
463	lws_mqtt_str_free(&c->username);
464	lws_mqtt_str_free(&c->password);
465	lws_mqtt_str_free(&c->will.message);
466	lws_mqtt_str_free(&c->will.topic);
467	lws_mqtt_str_free(&c->id);
468
469	/* clean up any subscription allocations */
470
471	s = wsi->mqtt->subs_head;
472	wsi->mqtt->subs_head = NULL;
473	while (s) {
474		s1 = s->next;
475		/*
476		 * Account for children no longer using nwsi subscription
477		 */
478		mysub = lws_mqtt_find_sub(nwsi->mqtt, (const char *)&s[1]);
479//		assert(mysub); /* if child subscribed, nwsi must feel the same */
480		if (mysub) {
481			assert(mysub->ref_count);
482			mysub->ref_count--;
483		}
484		lws_free(s);
485		s = s1;
486	}
487
488	lws_mqtt_publish_param_t *pub =
489			(lws_mqtt_publish_param_t *)
490				wsi->mqtt->rx_cpkt_param;
491
492	if (pub)
493		lws_free_set_NULL(pub->topic);
494
495	lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
496
497	lws_free_set_NULL(wsi->mqtt);
498
499	return 0;
500}
501
502static int
503rops_callback_on_writable_mqtt(struct lws *wsi)
504{
505#if defined(LWS_WITH_CLIENT)
506	struct lws *network_wsi;
507#endif
508	int already;
509
510	lwsl_debug("%s: %s (wsistate 0x%x)\n", __func__, lws_wsi_tag(wsi),
511			(unsigned int)wsi->wsistate);
512
513	if (wsi->mux.requested_POLLOUT
514#if defined(LWS_WITH_CLIENT)
515			&& !wsi->client_h2_alpn
516#endif
517	) {
518		lwsl_debug("already pending writable\n");
519		return 1;
520	}
521#if 0
522	/* is this for DATA or for control messages? */
523	if (wsi->upgraded_to_http2 && !wsi->h2.h2n->pps &&
524	    !lws_h2_tx_cr_get(wsi)) {
525		/*
526		 * other side is not able to cope with us sending DATA
527		 * anything so no matter if we have POLLOUT on our side if it's
528		 * DATA we want to send.
529		 *
530		 * Delay waiting for our POLLOUT until peer indicates he has
531		 * space for more using tx window command in http2 layer
532		 */
533		lwsl_notice("%s: %p: skint (%d)\n", __func__, wsi,
534			    wsi->h2.tx_cr);
535		wsi->h2.skint = 1;
536		return 0;
537	}
538
539	wsi->h2.skint = 0;
540#endif
541#if defined(LWS_WITH_CLIENT)
542	network_wsi = lws_get_network_wsi(wsi);
543#endif
544	already = lws_wsi_mux_mark_parents_needing_writeable(wsi);
545
546	/* for network action, act only on the network wsi */
547
548	if (already
549#if defined(LWS_WITH_CLIENT)
550			&& !network_wsi->client_mux_substream
551#endif
552			)
553		return 1;
554
555	return 0;
556}
557
558static int
559rops_close_kill_connection_mqtt(struct lws *wsi, enum lws_close_status reason)
560{
561	lwsl_info(" %s, his parent %s: child list %p, siblings:\n",
562			lws_wsi_tag(wsi),
563			lws_wsi_tag(wsi->mux.parent_wsi), wsi->mux.child_list);
564	//lws_wsi_mux_dump_children(wsi);
565
566	if (wsi->mux_substream
567#if defined(LWS_WITH_CLIENT)
568			|| wsi->client_mux_substream
569#endif
570	) {
571		lwsl_info("closing %s: parent %s: first child %p\n",
572				lws_wsi_tag(wsi),
573				lws_wsi_tag(wsi->mux.parent_wsi),
574				wsi->mux.child_list);
575
576		if (wsi->mux.child_list && lwsl_visible(LLL_INFO)) {
577			lwsl_info(" parent %s: closing children: list:\n", lws_wsi_tag(wsi));
578			lws_wsi_mux_dump_children(wsi);
579		}
580
581		lws_wsi_mux_close_children(wsi, (int)reason);
582	}
583
584	if ((
585#if defined(LWS_WITH_CLIENT)
586			wsi->client_mux_substream ||
587#endif
588			wsi->mux_substream) &&
589	     wsi->mux.parent_wsi) {
590		lws_wsi_mux_sibling_disconnect(wsi);
591	}
592
593	return 0;
594}
595
596static const lws_rops_t rops_table_mqtt[] = {
597	/*  1 */ { .handle_POLLIN	  = rops_handle_POLLIN_mqtt },
598	/*  2 */ { .handle_POLLOUT	  = rops_handle_POLLOUT_mqtt },
599	/*  3 */ { .callback_on_writable  = rops_callback_on_writable_mqtt },
600	/*  4 */ { .close_role		  = rops_close_role_mqtt },
601	/*  5 */ { .close_kill_connection = rops_close_kill_connection_mqtt },
602#if defined(LWS_WITH_CLIENT)
603	/*  6 */ { .client_bind		  = rops_client_bind_mqtt },
604	/*  7 */ { .issue_keepalive	  = rops_issue_keepalive_mqtt },
605#endif
606};
607
608struct lws_role_ops role_ops_mqtt = {
609	/* role name */			"mqtt",
610	/* alpn id */			"x-amzn-mqtt-ca", /* "mqtt/3.1.1" */
611
612	/* rops_table */		rops_table_mqtt,
613	/* rops_idx */			{
614	  /* LWS_ROPS_check_upgrades */
615	  /* LWS_ROPS_pt_init_destroy */		0x00,
616	  /* LWS_ROPS_init_vhost */
617	  /* LWS_ROPS_destroy_vhost */			0x00,
618	  /* LWS_ROPS_service_flag_pending */
619	  /* LWS_ROPS_handle_POLLIN */			0x01,
620	  /* LWS_ROPS_handle_POLLOUT */
621	  /* LWS_ROPS_perform_user_POLLOUT */		0x20,
622	  /* LWS_ROPS_callback_on_writable */
623	  /* LWS_ROPS_tx_credit */			0x30,
624	  /* LWS_ROPS_write_role_protocol */
625	  /* LWS_ROPS_encapsulation_parent */		0x00,
626	  /* LWS_ROPS_alpn_negotiated */
627	  /* LWS_ROPS_close_via_role_protocol */	0x00,
628	  /* LWS_ROPS_close_role */
629	  /* LWS_ROPS_close_kill_connection */		0x45,
630	  /* LWS_ROPS_destroy_role */
631	  /* LWS_ROPS_adoption_bind */			0x00,
632
633	  /* LWS_ROPS_client_bind */
634#if defined(LWS_WITH_CLIENT)
635	  /* LWS_ROPS_issue_keepalive */		0x67,
636#else
637	  /* LWS_ROPS_issue_keepalive */		0x00,
638#endif
639					},
640
641	.adoption_cb =			{ LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED,
642					  LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED },
643	.rx_cb =			{ LWS_CALLBACK_MQTT_CLIENT_RX,
644					  LWS_CALLBACK_MQTT_CLIENT_RX },
645	.writeable_cb =			{ LWS_CALLBACK_MQTT_CLIENT_WRITEABLE,
646					  LWS_CALLBACK_MQTT_CLIENT_WRITEABLE },
647	.close_cb =			{ LWS_CALLBACK_MQTT_CLIENT_CLOSED,
648					  LWS_CALLBACK_MQTT_CLIENT_CLOSED },
649	.protocol_bind_cb =		{ LWS_CALLBACK_MQTT_IDLE,
650					  LWS_CALLBACK_MQTT_IDLE },
651	.protocol_unbind_cb =		{ LWS_CALLBACK_MQTT_DROP_PROTOCOL,
652					  LWS_CALLBACK_MQTT_DROP_PROTOCOL },
653	.file_handle =			0,
654};
655