1 /*
2  * libwebsockets - small server side websockets and web server implementation
3  *
4  * Copyright (C) 2019 - 2022 Andy Green <andy@warmcat.com>
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to
8  * deal in the Software without restriction, including without limitation the
9  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10  * sell copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22  * IN THE SOFTWARE.
23  */
24 
25 #include <private-lib-core.h>
26 
27 static void
secstream_mqtt_cleanup(lws_ss_handle_t *h)28 secstream_mqtt_cleanup(lws_ss_handle_t *h)
29 {
30 	uint32_t i;
31 
32 	if (h->u.mqtt.heap_baggage) {
33 		lws_free(h->u.mqtt.heap_baggage);
34 		h->u.mqtt.heap_baggage = NULL;
35 	}
36 
37 	if (h->u.mqtt.sub_info.topic) {
38 		for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) {
39 			if (h->u.mqtt.sub_info.topic[i].name) {
40 				lws_free((void*)h->u.mqtt.sub_info.topic[i].name);
41 				h->u.mqtt.sub_info.topic[i].name = NULL;
42 			}
43 		}
44 		lws_free(h->u.mqtt.sub_info.topic);
45 		h->u.mqtt.sub_info.topic = NULL;
46 	}
47 	lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
48 }
49 
50 static int
secstream_mqtt_subscribe(struct lws *wsi)51 secstream_mqtt_subscribe(struct lws *wsi)
52 {
53 	size_t used_in, used_out, topic_limit;
54 	lws_strexp_t exp;
55 	char* expbuf;
56 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
57 
58 	if (!h || !h->policy)
59 		return -1;
60 
61 	if (h->policy->u.mqtt.aws_iot)
62 		topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
63 	else
64 		topic_limit = LWS_MQTT_MAX_TOPICLEN;
65 
66 	if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe)
67 		return 0;
68 
69 	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
70 			topic_limit);
71 	/*
72 	 * Expand with no output first to calculate the size of
73 	 * expanded string then, allocate new buffer and expand
74 	 * again with the buffer
75 	 */
76 	if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
77 			      strlen(h->policy->u.mqtt.subscribe), &used_in,
78 			      &used_out) != LSTRX_DONE) {
79 		lwsl_err(
80 			"%s, failed to expand MQTT subscribe"
81 			" topic with no output\n",
82 			__func__);
83 		return 1;
84 	}
85 
86 	expbuf = lws_malloc(used_out + 1, __func__);
87 	if (!expbuf) {
88 		lwsl_err(
89 			 "%s, failed to allocate MQTT subscribe"
90 			 "topic",
91 			 __func__);
92 		return 1;
93 	}
94 
95 	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
96 			used_out + 1);
97 
98 	if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
99 			      strlen(h->policy->u.mqtt.subscribe), &used_in,
100 			      &used_out) != LSTRX_DONE) {
101 		lwsl_err("%s, failed to expand MQTT subscribe topic\n",
102 			 __func__);
103 		lws_free(expbuf);
104 		return 1;
105 	}
106 	lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
107 	h->u.mqtt.sub_top.name = expbuf;
108 
109 	/*
110 	 * The policy says to subscribe to something, and we
111 	 * haven't done it yet.  Do it using the pre-prepared
112 	 * string-substituted version of the policy string.
113 	 */
114 
115 	lwsl_notice("%s: subscribing %s\n", __func__,
116 		    h->u.mqtt.sub_top.name);
117 
118 	h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
119 	memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
120 	h->u.mqtt.sub_info.num_topics = 1;
121 	h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
122 	h->u.mqtt.sub_info.topic =
123 			    lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__);
124 	h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
125 	h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
126 
127 	if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
128 		lwsl_notice("%s: unable to subscribe", __func__);
129 		lws_free(expbuf);
130 		h->u.mqtt.sub_top.name = NULL;
131 		return -1;
132 	}
133 	lws_free(expbuf);
134 	h->u.mqtt.sub_top.name = NULL;
135 
136 	/* Expect a SUBACK */
137 	if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
138 		lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
139 		return -1;
140 	}
141 	return 0;
142 }
143 
144 static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, uint32_t payload_len, const char* topic, lws_mqtt_qos_levels_t qos, uint8_t retain, uint8_t dup, int f)145 secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
146 			uint32_t payload_len, const char* topic,
147 			lws_mqtt_qos_levels_t qos,  uint8_t retain, uint8_t dup,
148 			int f)
149 {
150 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
151 	size_t used_in, used_out, topic_limit;
152 	lws_strexp_t exp;
153 	char *expbuf;
154 	lws_mqtt_publish_param_t mqpp;
155 
156 	if (h->policy->u.mqtt.aws_iot)
157 		topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
158 	else
159 		topic_limit = LWS_MQTT_MAX_TOPICLEN;
160 
161 	memset(&mqpp, 0, sizeof(mqpp));
162 
163 	lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
164 			topic_limit);
165 
166 	if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
167 			      &used_out) != LSTRX_DONE) {
168 		lwsl_err("%s, failed to expand MQTT publish"
169 			 " topic with no output\n", __func__);
170 		return 1;
171 	}
172 	expbuf = lws_malloc(used_out + 1, __func__);
173 	if (!expbuf) {
174 		lwsl_err("%s, failed to allocate MQTT publish topic",
175 			  __func__);
176 		return 1;
177 	}
178 
179 	lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
180 			used_out + 1);
181 
182 	if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
183 			      &used_out) != LSTRX_DONE) {
184 		lws_free(expbuf);
185 		return 1;
186 	}
187 	lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
188 	mqpp.topic = (char *)expbuf;
189 
190 	mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
191 	mqpp.packet_id = (uint16_t)(h->txord - 1);
192 	mqpp.qos = qos;
193 	mqpp.retain = !!retain;
194 	mqpp.payload = buf;
195 	mqpp.dup = !!dup;
196 	if (payload_len)
197 		mqpp.payload_len = payload_len;
198 	else
199 		mqpp.payload_len = (uint32_t)buf_len;
200 
201 	lwsl_notice("%s: payload len %d\n", __func__,
202 		    (int)mqpp.payload_len);
203 
204 	if (lws_mqtt_client_send_publish(wsi, &mqpp,
205 					 (const char *)buf,
206 					 (uint32_t)buf_len,
207 					 f & LWSSS_FLAG_EOM)) {
208 		lwsl_notice("%s: failed to publish\n", __func__);
209 		lws_free(expbuf);
210 		return -1;
211 	}
212 	lws_free(expbuf);
213 
214 	if ((mqpp.qos == QOS1 || mqpp.qos == QOS2) && buf_len > 0) {
215 		if (lws_buflist_append_segment(&h->u.mqtt.buflist_unacked,
216 					       buf, buf_len) < 0) {
217 			lwsl_notice("%s: failed to store unacked\n", __func__);
218 			return -1;
219 		}
220 	}
221 
222 	return 0;
223 }
224 
225 static int
secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen)226 secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) {
227 	lws_strexp_t exp;
228 	size_t used_in, used_out = 0;
229 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
230 
231 	if (h->policy->u.mqtt.birth_message) {
232 		lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
233 				(char *)buf, buflen);
234 		if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
235 		    strlen(h->policy->u.mqtt.birth_message),
236 			&used_in, &used_out) != LSTRX_DONE) {
237 			return 1;
238 		}
239 	}
240 	wsi->mqtt->inside_birth = 1;
241 	return secstream_mqtt_publish(wsi, buf,
242 				      used_out, 0, h->policy->u.mqtt.birth_topic,
243 				      h->policy->u.mqtt.birth_qos,
244 				      h->policy->u.mqtt.birth_retain, 0,
245 				      LWSSS_FLAG_EOM);
246 }
247 
248 static int
secstream_mqtt_resend(struct lws *wsi, uint8_t *buf)249 secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) {
250 	uint8_t *buffered;
251 	size_t len;
252 	int f = 0, r;
253 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
254 
255 	len = lws_buflist_next_segment_len(&h->u.mqtt.buflist_unacked,
256 					   &buffered);
257 
258 	if (h->u.mqtt.unacked_size <= len)
259 		f |= LWSSS_FLAG_EOM;
260 
261 	if (!len) {
262 		/* when the message does not have payload */
263 		buffered = buf;
264 	} else {
265 		h->u.mqtt.unacked_size -= (uint32_t)len;
266 	}
267 
268 	if (wsi->mqtt->inside_birth) {
269 		r = secstream_mqtt_publish(wsi, buffered, len, 0,
270 					   h->policy->u.mqtt.birth_topic,
271 					   h->policy->u.mqtt.birth_qos,
272 					   h->policy->u.mqtt.birth_retain,
273 					   1, f);
274 	} else {
275 		r = secstream_mqtt_publish(wsi, buffered, len,
276 					   (uint32_t)h->writeable_len,
277 					   h->policy->u.mqtt.topic,
278 					   h->policy->u.mqtt.qos,
279 					   h->policy->u.mqtt.retain, 1, f);
280 	}
281 	if (len)
282 		lws_buflist_use_segment(&h->u.mqtt.buflist_unacked, len);
283 
284 	if (r) {
285 		lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
286 		h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
287 
288 		if (wsi->mqtt->inside_birth) {
289 			lwsl_err("%s: %s: failed to send Birth\n", __func__,
290 				 lws_ss_tag(h));
291 			return -1;
292 		} else {
293 			r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
294 			if (r != LWSSSSRET_OK)
295 				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
296 		}
297 	}
298 	return 0;
299 }
300 
301 static char *
expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len)302 expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len)
303 {
304 	lws_strexp_t exp;
305 	char *expbuf = NULL;
306 	size_t used_in = 0, used_out = 0, post_len = 0;
307 
308 	memset(&exp, 0, sizeof(exp));
309 
310 	if (post)
311 		post_len = strlen(post);
312 
313 	if (post_len > max_len)
314 		return NULL;
315 
316 	lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, NULL,
317 			max_len - post_len);
318 
319 	if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
320 			      &used_out) != LSTRX_DONE) {
321 		lwsl_err("%s, failed to expand %s", __func__, str);
322 
323 		return NULL;
324 	}
325 
326 	expbuf = lws_malloc(used_out + 1 + post_len, __func__);
327 	if (!expbuf) {
328 		lwsl_err("%s, failed to allocate str_exp for %s", __func__, str);
329 
330 		return NULL;
331 	}
332 
333 	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
334 			used_out + 1 + post_len);
335 
336 	if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
337 			      &used_out) != LSTRX_DONE) {
338 		lwsl_err("%s, failed to expand str_exp %s\n", __func__, str);
339 		lws_free(expbuf);
340 
341 		return NULL;
342 	}
343 	if (post)
344 		strcat(expbuf, post);
345 
346 	return expbuf;
347 }
348 
349 static lws_mqtt_match_topic_return_t
secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic)350 secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic)
351 {
352 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
353 	const char *match[] = { LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH,
354 				LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH };
355 	char *expbuf = NULL;
356 	unsigned int i = 0;
357 	lws_mqtt_match_topic_return_t ret = LMMTR_TOPIC_NOMATCH;
358 
359 	if (!topic)
360 		return LMMTR_TOPIC_MATCH_ERROR;
361 
362 	expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN);
363 	if (!expbuf) {
364 		lwsl_wsi_warn(wsi, "Failed to expand Shadow topic");
365 
366 		return LMMTR_TOPIC_MATCH_ERROR;
367 	}
368 	for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) {
369 		if (lws_mqtt_is_topic_matched(
370 				match[i], expbuf) == LMMTR_TOPIC_MATCH) {
371 			ret = LMMTR_TOPIC_MATCH;
372 			break;
373 		}
374 	}
375 	lws_free(expbuf);
376 
377 	return ret;
378 }
379 
380 static void
secstream_mqtt_shadow_cleanup(struct lws *wsi)381 secstream_mqtt_shadow_cleanup(struct lws *wsi)
382 {
383 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
384 	uint32_t i = 0;
385 
386 	for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++)
387 		lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name);
388 
389 	h->u.mqtt.shadow_sub.num_topics = 0;
390 
391 	if (h->u.mqtt.shadow_sub.topic) {
392 		lws_free(h->u.mqtt.shadow_sub.topic);
393 		h->u.mqtt.shadow_sub.topic = NULL;
394 	}
395 }
396 
397 static lws_ss_state_return_t
secstream_mqtt_shadow_unsubscribe(struct lws *wsi)398 secstream_mqtt_shadow_unsubscribe(struct lws *wsi)
399 {
400 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
401 
402 	if (h->u.mqtt.shadow_sub.num_topics == 0) {
403 		wsi->mqtt->send_shadow_unsubscribe = 0;
404 		wsi->mqtt->inside_shadow = 0;
405 		wsi->mqtt->done_shadow_subscribe = 0;
406 
407 		return LWSSSSRET_OK;
408 	}
409 
410 	if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) {
411 		lwsl_wsi_err(wsi, "Failed to send MQTT unsubsribe");
412 
413 		return LWSSSSRET_DISCONNECT_ME;
414 	}
415 	/* Expect a UNSUBACK */
416 	if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
417 		lwsl_wsi_err(wsi, "Unable to set LWS_POLLIN");
418 
419 		return LWSSSSRET_DISCONNECT_ME;
420 	}
421 	wsi->mqtt->send_shadow_unsubscribe = 0;
422 
423 	return LWSSSSRET_OK;
424 }
425 
426 static int
secstream_mqtt_shadow_subscribe(struct lws *wsi)427 secstream_mqtt_shadow_subscribe(struct lws *wsi)
428 {
429 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
430 	char* expbuf = NULL;
431 	const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR,
432 				   LWS_MQTT_SHADOW_RESP_REJECTED_STR };
433 	unsigned int i, suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]);
434 
435 	if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow)
436 		return 0;
437 
438 	if (h->u.mqtt.shadow_sub.num_topics > 0)
439 		secstream_mqtt_shadow_cleanup(wsi);
440 
441 	memset(&h->u.mqtt.shadow_sub, 0, sizeof(lws_mqtt_subscribe_param_t));
442 	h->u.mqtt.shadow_sub.topic = lws_malloc(
443 			sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__);
444 	if (!h->u.mqtt.shadow_sub.topic) {
445 		lwsl_ss_err(h, "Failed to allocate Shadow topics");
446 		return -1;
447 	}
448 	h->u.mqtt.shadow_sub.num_topics = suffixes_len;
449 	for (i = 0; i < suffixes_len; i++) {
450 		expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i],
451 					 LWS_MQTT_MAX_AWSIOT_TOPICLEN);
452 		if (!expbuf) {
453 			lwsl_ss_err(h, "Failed to allocate Shadow topic");
454 			secstream_mqtt_shadow_cleanup(wsi);
455 
456 			return -1;
457 		}
458 		h->u.mqtt.shadow_sub.topic[i].name = expbuf;
459 		h->u.mqtt.shadow_sub.topic[i].qos = h->policy->u.mqtt.qos;
460 	}
461 	h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1);
462 
463 	if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) {
464 		lwsl_wsi_notice(wsi, "Unable to subscribe Shadow topics");
465 
466 		return 0;
467 	}
468 
469 	/* Expect a SUBACK */
470 	if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
471 		lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
472 		return -1;
473 	}
474 	wsi->mqtt->inside_shadow = 1;
475 
476 	return 0;
477 }
478 
479 static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)480 secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
481 	     void *in, size_t len)
482 {
483 	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
484 	size_t used_in = 0, used_out = 0, topic_len = 0;
485 	lws_mqtt_publish_param_t *pmqpp = NULL;
486 	lws_ss_state_return_t r = LWSSSSRET_OK;
487 	uint8_t buf[LWS_PRE + 1400];
488 	size_t buflen = sizeof(buf) - LWS_PRE;
489 	lws_ss_metadata_t *omd = NULL;
490 	char *sub_topic = NULL;
491 	lws_strexp_t exp;
492 	int f = 0;
493 
494 	memset(buf, 0, sizeof(buf));
495 	memset(&exp, 0, sizeof(exp));
496 
497 	switch (reason) {
498 
499 	/* because we are protocols[0] ... */
500 	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
501 		lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
502 			 in ? (char *)in : "(null)");
503 		if (!h)
504 			break;
505 
506 #if defined(LWS_WITH_CONMON)
507 		lws_conmon_ss_json(h);
508 #endif
509 
510 		r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
511 		h->wsi = NULL;
512 
513 		secstream_mqtt_cleanup(h);
514 
515 		if (r == LWSSSSRET_DESTROY_ME)
516 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
517 
518 		r = lws_ss_backoff(h);
519 		if (r != LWSSSSRET_OK)
520 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
521 
522 		break;
523 
524 	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
525 		if (!h)
526 			break;
527 		lws_sul_cancel(&h->sul_timeout);
528 #if defined(LWS_WITH_CONMON)
529 		lws_conmon_ss_json(h);
530 #endif
531 		if (h->ss_dangling_connected)
532 			r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
533 		else
534 			r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
535 		if (h->wsi)
536 			lws_set_opaque_user_data(h->wsi, NULL);
537 		h->wsi = NULL;
538 
539 		secstream_mqtt_cleanup(h);
540 
541 		if (r)
542 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
543 
544 		if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
545 		    !h->txn_ok && !wsi->a.context->being_destroyed) {
546 			r = lws_ss_backoff(h);
547 			if (r != LWSSSSRET_OK)
548 				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
549 		}
550 		break;
551 
552 	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
553 		/*
554 		 * Make sure the handle wsi points to the stream wsi not the
555 		 * original nwsi, in the case it was migrated
556 		 */
557 		h->wsi = wsi;
558 		h->retry = 0;
559 		h->seqstate = SSSEQ_CONNECTED;
560 
561 		if (h->policy->u.mqtt.birth_topic &&
562 		    !wsi->mqtt->done_birth) {
563 			struct lws *nwsi = lws_get_network_wsi(wsi);
564 			lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) {
565 				if (w != wsi &&
566 					(w->mqtt->done_birth || w->mqtt->inside_birth)) {
567 					/*
568 					 * If any Birth was sent out or
569 					 * is pending on other stream,
570 					 * skip sending Birth.
571 					 */
572 					wsi->mqtt->done_birth = 1;
573 					break;
574 				}
575 			} lws_end_foreach_ll(w, mux.sibling_list);
576 		}
577 
578 		if (!h->policy->u.mqtt.subscribe ||
579 		    !h->policy->u.mqtt.subscribe[0]) {
580 			/*
581 			 * If subscribe is empty in the policy, then,
582 			 * skip sending SUBSCRIBE and signal the user
583 			 * application.
584 			 */
585 			wsi->mqtt->done_subscribe = 1;
586 		} else if (!h->policy->u.mqtt.clean_start &&
587 			   wsi->mqtt->session_resumed) {
588 			wsi->mqtt->inside_resume_session = 1;
589 			/*
590 			 * If the previous session is resumed and Server has
591 			 * stored session, then, do not subscribe.
592 			 */
593 			if (!secstream_mqtt_subscribe(wsi))
594 				wsi->mqtt->done_subscribe = 1;
595 			wsi->mqtt->inside_resume_session = 0;
596 		} else if (h->policy->u.mqtt.subscribe &&
597 			   !wsi->mqtt->done_subscribe) {
598 			/*
599 			 * If a subscribe is pending on the stream, then make
600 			 * sure the SUBSCRIBE is done before signaling the
601 			 * user application.
602 			 */
603 			lws_callback_on_writable(wsi);
604 			break;
605 		}
606 
607 		if (h->policy->u.mqtt.birth_topic &&
608 		    !wsi->mqtt->done_birth) {
609 			/*
610 			 * If a Birth is pending on the stream, then make
611 			 * sure the Birth is done before signaling the
612 			 * user application.
613 			 */
614 			lws_callback_on_writable(wsi);
615 			break;
616 		}
617 		lws_sul_cancel(&h->sul);
618 #if defined(LWS_WITH_SYS_METRICS)
619 		/*
620 		 * If any hanging caliper measurement, dump it, and free any tags
621 		 */
622 		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
623 #endif
624 		r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
625 		if (r != LWSSSSRET_OK)
626 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
627 		if (h->policy->u.mqtt.topic)
628 			lws_callback_on_writable(wsi);
629 		break;
630 
631 	case LWS_CALLBACK_MQTT_CLIENT_RX:
632 		// lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len);
633 		if (!h || !h->info.rx)
634 			return 0;
635 
636 		pmqpp = (lws_mqtt_publish_param_t *)in;
637 
638 		f = 0;
639 		if (!pmqpp->payload_pos)
640 			f |= LWSSS_FLAG_SOM;
641 		if (pmqpp->payload_pos + len == pmqpp->payload_len)
642 			f |= LWSSS_FLAG_EOM;
643 
644 		h->subseq = 1;
645 
646 		if (wsi->mqtt->inside_shadow) {
647 			/*
648 			 * When Shadow is used, the stream receives multiple
649 			 * topics including Shadow response, set received
650 			 * topic on the metadata
651 			 */
652 			lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata,
653 					NULL, (size_t)-1);
654 
655 			if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
656 					strlen(h->policy->u.mqtt.subscribe),
657 					&used_in, &used_out) != LSTRX_DONE) {
658 				lwsl_err("%s, failed to expand subscribe topic",
659 					 __func__);
660 				return -1;
661 			}
662 			omd = lws_ss_get_handle_metadata(h, exp.name);
663 
664 			if (!omd) {
665 				lwsl_err("%s, failed to find metadata for subscribe",
666 					 __func__);
667 				return -1;
668 			}
669 			sub_topic = omd->value__may_own_heap;
670 			topic_len = omd->length;
671 
672 			_lws_ss_set_metadata(omd, exp.name,
673 					     (const void *)pmqpp->topic,
674 					     pmqpp->topic_len);
675 		}
676 
677 		r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
678 			   len, f);
679 
680 		if (wsi->mqtt->inside_shadow)
681 			_lws_ss_set_metadata(omd, exp.name, &sub_topic,
682 					     topic_len);
683 
684 		if (r != LWSSSSRET_OK)
685 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
686 
687 		if (wsi->mqtt->inside_shadow) {
688 			size_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR);
689 			size_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR);
690 			uint32_t i;
691 
692 			for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) {
693 				/*
694 				 * received response ('/accepted' or 'rejected')
695 				 * and clean up Shadow operation
696 				 */
697 				if (strncmp(h->u.mqtt.shadow_sub.topic[i].name,
698 					    pmqpp->topic, pmqpp->topic_len) ||
699 				    (strlen(pmqpp->topic) < acc_n ||
700 				     strlen(pmqpp->topic) < rej_n))
701 					continue;
702 
703 				if (!strcmp(pmqpp->topic +
704 						(strlen(pmqpp->topic) - acc_n),
705 					 	LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) ||
706 				    !strcmp(pmqpp->topic +
707 						(strlen(pmqpp->topic) - rej_n),
708 						 LWS_MQTT_SHADOW_RESP_REJECTED_STR)) {
709 					lws_sul_cancel(&wsi->mqtt->sul_shadow_wait);
710 					wsi->mqtt->send_shadow_unsubscribe = 1;
711 					lws_callback_on_writable(wsi);
712 
713 					return 0;
714 				}
715 			}
716 		}
717 		return 0; /* don't passthru */
718 
719 	case LWS_CALLBACK_MQTT_SUBSCRIBED:
720 		if (wsi->mqtt->inside_shadow) {
721 			wsi->mqtt->done_shadow_subscribe = 1;
722 			lws_callback_on_writable(wsi);
723 
724 			return 0;
725 		}
726 		/*
727 		 * Stream demanded a subscribe without a Birth while connecting, once
728 		 * done notify CONNECTED event to the application.
729 		 */
730 		if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) {
731 			lws_sul_cancel(&h->sul);
732 			r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
733 			if (r != LWSSSSRET_OK)
734 				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
735 		}
736 		wsi->mqtt->done_subscribe = 1;
737 		lws_callback_on_writable(wsi);
738 		break;
739 
740 	case LWS_CALLBACK_MQTT_ACK:
741 		lws_sul_cancel(&h->sul_timeout);
742 		if (h->u.mqtt.send_unacked) {
743 			lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
744 			h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
745 		}
746 
747 		if (wsi->mqtt->inside_birth) {
748 			/*
749 			 * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify
750 			 * CONNECTED event to the application.
751 			 */
752 			wsi->mqtt->inside_birth = 0;
753 			wsi->mqtt->done_birth = 1;
754 			r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
755 			if (r != LWSSSSRET_OK)
756 				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
757 			lws_callback_on_writable(wsi);
758 			break;
759 		}
760 		r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
761 		if (r != LWSSSSRET_OK)
762 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
763 		break;
764 
765 	case LWS_CALLBACK_MQTT_RESEND:
766 		lws_sul_cancel(&h->sul_timeout);
767 		if (h->u.mqtt.retry_count++ < LWS_MQTT_MAX_PUBLISH_RETRY) {
768 			h->u.mqtt.unacked_size =
769 				(uint32_t)lws_buflist_total_len(&h->u.mqtt.buflist_unacked);
770 			if (h->u.mqtt.unacked_size) {
771 				lwsl_notice("%s: %s: resend unacked message (%d/%d) \n",
772 					    __func__, lws_ss_tag(h),
773 					    h->u.mqtt.retry_count,
774 					    LWS_MQTT_MAX_PUBLISH_RETRY);
775 				h->u.mqtt.send_unacked = 1;
776 				lws_callback_on_writable(wsi);
777 				break;
778 			}
779 		}
780 
781 		lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
782 		h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
783 
784 		if (wsi->mqtt->inside_birth) {
785 			lwsl_err("%s: %s: failed to send Birth\n", __func__,
786 				 lws_ss_tag(h));
787 			return -1;
788 		}
789 
790 		r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
791 		if (r != LWSSSSRET_OK)
792 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
793 		break;
794 
795 	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
796 	{
797 		if (!h || !h->info.tx)
798 			return 0;
799 		lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
800 
801 		if (h->seqstate != SSSEQ_CONNECTED) {
802 			lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
803 			break;
804 		}
805 
806 		if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
807 			return secstream_mqtt_subscribe(wsi);
808 
809 		if (h->u.mqtt.send_unacked)
810 			return secstream_mqtt_resend(wsi, buf + LWS_PRE);
811 
812 		if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic)
813 			return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen);
814 
815 		if (h->policy->u.mqtt.aws_iot) {
816 			if (secstream_mqtt_is_shadow_matched(wsi,
817 			    h->policy->u.mqtt.topic) == LMMTR_TOPIC_MATCH) {
818 				if (!wsi->mqtt->done_shadow_subscribe)
819 					return secstream_mqtt_shadow_subscribe(wsi);
820 				if (wsi->mqtt->send_shadow_unsubscribe)
821 					return secstream_mqtt_shadow_unsubscribe(wsi);
822 			}
823 		}
824 
825 		r = h->info.tx(ss_to_userobj(h),  h->txord++,  buf + LWS_PRE,
826 			       &buflen, &f);
827 
828 		if (r == LWSSSSRET_TX_DONT_SEND) {
829 			if (wsi->mqtt->done_shadow_subscribe) {
830 				return secstream_mqtt_shadow_unsubscribe(wsi);
831 			}
832 			return 0;
833 		}
834 
835 		if (r == LWSSSSRET_DISCONNECT_ME) {
836 			lws_mqtt_subscribe_param_t lmsp;
837 			if (h->u.mqtt.sub_info.num_topics) {
838 				lmsp.num_topics = h->u.mqtt.sub_info.num_topics;
839 				lmsp.topic = h->u.mqtt.sub_info.topic;
840 				lmsp.packet_id = (uint16_t)(h->txord - 1);
841 				if (lws_mqtt_client_send_unsubcribe(wsi,
842 								    &lmsp)) {
843 					lwsl_err("%s, failed to send"
844 					         " MQTT unsubsribe", __func__);
845 					return -1;
846 				}
847 				return 0;
848 			}
849 		}
850 
851 		if (r < 0)
852 			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
853 
854 		if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
855 					   (uint32_t)h->writeable_len,
856 					   h->policy->u.mqtt.topic,
857 					   h->policy->u.mqtt.qos,
858 					   h->policy->u.mqtt.retain, 0, f) != 0) {
859 			r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
860 			if (r != LWSSSSRET_OK)
861 				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
862 		}
863 		return 0;
864 	}
865 
866 	case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
867 	{
868 		struct lws *nwsi = lws_get_network_wsi(wsi);
869 
870 		if (wsi->mqtt->inside_shadow) {
871 			secstream_mqtt_shadow_cleanup(wsi);
872 			wsi->mqtt->inside_shadow = 0;
873 			wsi->mqtt->done_shadow_subscribe = 0;
874 			break;
875 		}
876 		if (nwsi && (nwsi->mux.child_count == 1))
877 			lws_mqtt_client_send_disconnect(nwsi);
878 		return -1;
879 	}
880 
881 	case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT:
882 		if (!wsi->mqtt)
883 			return -1;
884 
885 		if (wsi->mqtt->inside_shadow) {
886 			secstream_mqtt_shadow_cleanup(wsi);
887 			wsi->mqtt->inside_shadow = 0;
888 			wsi->mqtt->done_shadow_subscribe = 0;
889 			lwsl_warn("%s: %s: Unsubscribe (Shadow) timeout.\n",
890 				  __func__, lws_ss_tag(h));
891 			break;
892 		}
893 
894 		if (wsi->mqtt->inside_unsubscribe) {
895 			lwsl_warn("%s: %s: Unsubscribe timeout.\n", __func__,
896 				  lws_ss_tag(h));
897 			return -1;
898 		}
899 		break;
900 
901 	case LWS_CALLBACK_MQTT_SHADOW_TIMEOUT:
902 		if (!wsi->mqtt)
903 			return -1;
904 
905 		if (wsi->mqtt->inside_shadow) {
906 			lwsl_warn("%s: %s: Shadow timeout.\n", __func__,
907 				  lws_ss_tag(h));
908 			wsi->mqtt->send_shadow_unsubscribe = 1;
909 			lws_callback_on_writable(wsi);
910 		}
911 		break;
912 
913 	default:
914 		break;
915 	}
916 
917 	return lws_callback_http_dummy(wsi, reason, user, in, len);
918 }
919 
920 const struct lws_protocols protocol_secstream_mqtt = {
921 	"lws-secstream-mqtt",
922 	secstream_mqtt,
923 	0, 0, 0, NULL, 0
924 };
925 /*
926  * Munge connect info according to protocol-specific considerations... this
927  * usually means interpreting aux in a protocol-specific way and using the
928  * pieces at connection setup time, eg, http url pieces.
929  *
930  * len bytes of buf can be used for things with scope until after the actual
931  * connect.
932  *
933  * For ws, protocol aux is <url path>;<ws subprotocol name>
934  */
935 
936 enum {
937 	SSCMM_STRSUB_WILL_TOPIC,
938 	SSCMM_STRSUB_WILL_MESSAGE,
939 	SSCMM_STRSUB_SUBSCRIBE,
940 	SSCMM_STRSUB_TOPIC,
941 	SSCMM_STRSUB_BIRTH_TOPIC,
942 	SSCMM_STRSUB_BIRTH_MESSAGE
943 };
944 
945 static int
secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, struct lws_client_connect_info *i, union lws_ss_contemp *ct)946 secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
947 			     struct lws_client_connect_info *i,
948 			     union lws_ss_contemp *ct)
949 {
950 	const char *sources[6] = {
951 		/* we're going to string-substitute these before use */
952 		h->policy->u.mqtt.will_topic,
953 		h->policy->u.mqtt.will_message,
954 		h->policy->u.mqtt.subscribe,
955 		h->policy->u.mqtt.topic,
956 		h->policy->u.mqtt.birth_topic,
957 		h->policy->u.mqtt.birth_message
958 	};
959 	size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
960 	lws_strexp_t exp;
961 	char *ps[6];
962 	uint8_t *p = NULL;
963 	int n = -1;
964 	size_t blen;
965 	lws_system_blob_t *b = NULL;
966 
967 	memset(&ct->ccp, 0, sizeof(ct->ccp));
968 	b = lws_system_get_blob(i->context,
969 				LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID, 0);
970 
971 	/* If LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID is set */
972 	if (b && (blen = lws_system_blob_get_size(b))) {
973 		if (blen > LWS_MQTT_MAX_CIDLEN) {
974 			lwsl_err("%s - Client ID too long.\n",
975 				 __func__);
976 			return -1;
977 		}
978 		p = (uint8_t *)lws_zalloc(blen+1, __func__);
979 		if (!p)
980 			return -1;
981 		n = lws_system_blob_get(b, p, &blen, 0);
982 		if (n) {
983 			ct->ccp.client_id = NULL;
984 		} else {
985 			ct->ccp.client_id = (const char *)p;
986 			lwsl_notice("%s - Client ID = %s\n",
987 				    __func__, ct->ccp.client_id);
988 		}
989 	} else {
990 		/* Default (Random) client ID */
991 		ct->ccp.client_id = NULL;
992 	}
993 
994 	b = lws_system_get_blob(i->context,
995 				LWS_SYSBLOB_TYPE_MQTT_USERNAME, 0);
996 
997 	/* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
998 	if (b && (blen = lws_system_blob_get_size(b))) {
999 		p = (uint8_t *)lws_zalloc(blen+1, __func__);
1000 		if (!p)
1001 			return -1;
1002 		n = lws_system_blob_get(b, p, &blen, 0);
1003 		if (n) {
1004 			ct->ccp.username = NULL;
1005 		} else {
1006 			ct->ccp.username = (const char *)p;
1007 			lwsl_notice("%s - Username ID = %s\n",
1008 				    __func__, ct->ccp.username);
1009 		}
1010 	}
1011 
1012 	b = lws_system_get_blob(i->context,
1013 				LWS_SYSBLOB_TYPE_MQTT_PASSWORD, 0);
1014 
1015 	/* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
1016 	if (b && (blen = lws_system_blob_get_size(b))) {
1017 		p = (uint8_t *)lws_zalloc(blen+1, __func__);
1018 		if (!p)
1019 			return -1;
1020 		n = lws_system_blob_get(b, p, &blen, 0);
1021 		if (n) {
1022 			ct->ccp.password = NULL;
1023 		} else {
1024 			ct->ccp.password = (const char *)p;
1025 			lwsl_notice("%s - Password ID = %s\n",
1026 				    __func__, ct->ccp.password);
1027 		}
1028 	}
1029 
1030 	ct->ccp.keep_alive		= h->policy->u.mqtt.keep_alive;
1031 	ct->ccp.clean_start		= (h->policy->u.mqtt.clean_start & 1u);
1032 	ct->ccp.will_param.qos		= h->policy->u.mqtt.will_qos;
1033 	ct->ccp.will_param.retain	= h->policy->u.mqtt.will_retain;
1034 	ct->ccp.birth_param.qos		= h->policy->u.mqtt.birth_qos;
1035 	ct->ccp.birth_param.retain	= h->policy->u.mqtt.birth_retain;
1036 	ct->ccp.aws_iot			= h->policy->u.mqtt.aws_iot;
1037 	h->u.mqtt.topic_qos.qos		= h->policy->u.mqtt.qos;
1038 
1039 	/*
1040 	 * We're going to string-substitute several of these parameters, which
1041 	 * have unknown, possibly large size.  And, as their usage is deferred
1042 	 * inside the asynchronous lifetime of the MQTT connection, they need
1043 	 * to live on the heap.
1044 	 *
1045 	 * Notice these allocations at h->u.mqtt.heap_baggage belong to the
1046 	 * underlying MQTT stream lifetime, not the logical SS lifetime, and
1047 	 * are destroyed if present at connection error or close of the
1048 	 * underlying connection.
1049 	 *
1050 	 *
1051 	 * First, compute the length of each without producing strsubst output,
1052 	 * and keep a running total.
1053 	 */
1054 
1055 	for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
1056 		if (!sources[n])
1057 			continue;
1058 
1059 		lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
1060 				NULL, (size_t)-1);
1061 		if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
1062 				      &used_in, &olen[n]) != LSTRX_DONE) {
1063 			lwsl_err("%s: failed to subsitute %s\n", __func__,
1064 					sources[n]);
1065 			return 1;
1066 		}
1067 		tot += olen[n] + 1;
1068 	}
1069 
1070 	/*
1071 	 * Then, allocate enough space on the heap for the total of the
1072 	 * substituted results
1073 	 */
1074 
1075 	h->u.mqtt.heap_baggage = lws_malloc(tot, __func__);
1076 	if (!h->u.mqtt.heap_baggage)
1077 		return 1;
1078 
1079 	/*
1080 	 * Finally, issue the subsitutions one after the other into the single
1081 	 * allocated result buffer and prepare pointers into them
1082 	 */
1083 
1084 	p = h->u.mqtt.heap_baggage;
1085 	for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
1086 		lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
1087 				(char *)p, (size_t)-1);
1088 		if (!sources[n]) {
1089 			ps[n] = NULL;
1090 			continue;
1091 		}
1092 		ps[n] = (char *)p;
1093 		if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
1094 				      &used_in, &olen[n]) != LSTRX_DONE)
1095 			return 1;
1096 
1097 		p += olen[n] + 1;
1098 	}
1099 
1100 	/*
1101 	 * Point the guys who want the substituted content at the substituted
1102 	 * strings
1103 	 */
1104 
1105 	ct->ccp.will_param.topic	= ps[SSCMM_STRSUB_WILL_TOPIC];
1106 	ct->ccp.will_param.message	= ps[SSCMM_STRSUB_WILL_MESSAGE];
1107 	h->u.mqtt.subscribe_to		= ps[SSCMM_STRSUB_SUBSCRIBE];
1108 	h->u.mqtt.subscribe_to_len	= olen[SSCMM_STRSUB_SUBSCRIBE];
1109 	h->u.mqtt.topic_qos.name	= ps[SSCMM_STRSUB_TOPIC];
1110 	ct->ccp.birth_param.topic	= ps[SSCMM_STRSUB_BIRTH_TOPIC];
1111 	ct->ccp.birth_param.message	= ps[SSCMM_STRSUB_BIRTH_MESSAGE];
1112 
1113 	i->method = "MQTT";
1114 	i->mqtt_cp = &ct->ccp;
1115 
1116 	i->alpn = "x-amzn-mqtt-ca";
1117 
1118 	/* share connections where possible */
1119 	i->ssl_connection |= LCCSCF_PIPELINE;
1120 
1121 	return 0;
1122 }
1123 
1124 const struct ss_pcols ss_pcol_mqtt = {
1125 	"MQTT",
1126 	"x-amzn-mqtt-ca", //"mqtt/3.1.1",
1127 	&protocol_secstream_mqtt,
1128 	secstream_connect_munge_mqtt,
1129 	NULL, NULL
1130 };
1131