1d4afb5ceSopenharmony_ci/*
2d4afb5ceSopenharmony_ci * libwebsockets - small server side websockets and web server implementation
3d4afb5ceSopenharmony_ci *
4d4afb5ceSopenharmony_ci * Copyright (C) 2019 - 2022 Andy Green <andy@warmcat.com>
5d4afb5ceSopenharmony_ci *
6d4afb5ceSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy
7d4afb5ceSopenharmony_ci * of this software and associated documentation files (the "Software"), to
8d4afb5ceSopenharmony_ci * deal in the Software without restriction, including without limitation the
9d4afb5ceSopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10d4afb5ceSopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is
11d4afb5ceSopenharmony_ci * furnished to do so, subject to the following conditions:
12d4afb5ceSopenharmony_ci *
13d4afb5ceSopenharmony_ci * The above copyright notice and this permission notice shall be included in
14d4afb5ceSopenharmony_ci * all copies or substantial portions of the Software.
15d4afb5ceSopenharmony_ci *
16d4afb5ceSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17d4afb5ceSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18d4afb5ceSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19d4afb5ceSopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20d4afb5ceSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21d4afb5ceSopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22d4afb5ceSopenharmony_ci * IN THE SOFTWARE.
23d4afb5ceSopenharmony_ci */
24d4afb5ceSopenharmony_ci
25d4afb5ceSopenharmony_ci#include <private-lib-core.h>
26d4afb5ceSopenharmony_ci
27d4afb5ceSopenharmony_cistatic void
28d4afb5ceSopenharmony_cisecstream_mqtt_cleanup(lws_ss_handle_t *h)
29d4afb5ceSopenharmony_ci{
30d4afb5ceSopenharmony_ci	uint32_t i;
31d4afb5ceSopenharmony_ci
32d4afb5ceSopenharmony_ci	if (h->u.mqtt.heap_baggage) {
33d4afb5ceSopenharmony_ci		lws_free(h->u.mqtt.heap_baggage);
34d4afb5ceSopenharmony_ci		h->u.mqtt.heap_baggage = NULL;
35d4afb5ceSopenharmony_ci	}
36d4afb5ceSopenharmony_ci
37d4afb5ceSopenharmony_ci	if (h->u.mqtt.sub_info.topic) {
38d4afb5ceSopenharmony_ci		for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) {
39d4afb5ceSopenharmony_ci			if (h->u.mqtt.sub_info.topic[i].name) {
40d4afb5ceSopenharmony_ci				lws_free((void*)h->u.mqtt.sub_info.topic[i].name);
41d4afb5ceSopenharmony_ci				h->u.mqtt.sub_info.topic[i].name = NULL;
42d4afb5ceSopenharmony_ci			}
43d4afb5ceSopenharmony_ci		}
44d4afb5ceSopenharmony_ci		lws_free(h->u.mqtt.sub_info.topic);
45d4afb5ceSopenharmony_ci		h->u.mqtt.sub_info.topic = NULL;
46d4afb5ceSopenharmony_ci	}
47d4afb5ceSopenharmony_ci	lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
48d4afb5ceSopenharmony_ci}
49d4afb5ceSopenharmony_ci
50d4afb5ceSopenharmony_cistatic int
51d4afb5ceSopenharmony_cisecstream_mqtt_subscribe(struct lws *wsi)
52d4afb5ceSopenharmony_ci{
53d4afb5ceSopenharmony_ci	size_t used_in, used_out, topic_limit;
54d4afb5ceSopenharmony_ci	lws_strexp_t exp;
55d4afb5ceSopenharmony_ci	char* expbuf;
56d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
57d4afb5ceSopenharmony_ci
58d4afb5ceSopenharmony_ci	if (!h || !h->policy)
59d4afb5ceSopenharmony_ci		return -1;
60d4afb5ceSopenharmony_ci
61d4afb5ceSopenharmony_ci	if (h->policy->u.mqtt.aws_iot)
62d4afb5ceSopenharmony_ci		topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
63d4afb5ceSopenharmony_ci	else
64d4afb5ceSopenharmony_ci		topic_limit = LWS_MQTT_MAX_TOPICLEN;
65d4afb5ceSopenharmony_ci
66d4afb5ceSopenharmony_ci	if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe)
67d4afb5ceSopenharmony_ci		return 0;
68d4afb5ceSopenharmony_ci
69d4afb5ceSopenharmony_ci	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
70d4afb5ceSopenharmony_ci			topic_limit);
71d4afb5ceSopenharmony_ci	/*
72d4afb5ceSopenharmony_ci	 * Expand with no output first to calculate the size of
73d4afb5ceSopenharmony_ci	 * expanded string then, allocate new buffer and expand
74d4afb5ceSopenharmony_ci	 * again with the buffer
75d4afb5ceSopenharmony_ci	 */
76d4afb5ceSopenharmony_ci	if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
77d4afb5ceSopenharmony_ci			      strlen(h->policy->u.mqtt.subscribe), &used_in,
78d4afb5ceSopenharmony_ci			      &used_out) != LSTRX_DONE) {
79d4afb5ceSopenharmony_ci		lwsl_err(
80d4afb5ceSopenharmony_ci			"%s, failed to expand MQTT subscribe"
81d4afb5ceSopenharmony_ci			" topic with no output\n",
82d4afb5ceSopenharmony_ci			__func__);
83d4afb5ceSopenharmony_ci		return 1;
84d4afb5ceSopenharmony_ci	}
85d4afb5ceSopenharmony_ci
86d4afb5ceSopenharmony_ci	expbuf = lws_malloc(used_out + 1, __func__);
87d4afb5ceSopenharmony_ci	if (!expbuf) {
88d4afb5ceSopenharmony_ci		lwsl_err(
89d4afb5ceSopenharmony_ci			 "%s, failed to allocate MQTT subscribe"
90d4afb5ceSopenharmony_ci			 "topic",
91d4afb5ceSopenharmony_ci			 __func__);
92d4afb5ceSopenharmony_ci		return 1;
93d4afb5ceSopenharmony_ci	}
94d4afb5ceSopenharmony_ci
95d4afb5ceSopenharmony_ci	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
96d4afb5ceSopenharmony_ci			used_out + 1);
97d4afb5ceSopenharmony_ci
98d4afb5ceSopenharmony_ci	if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
99d4afb5ceSopenharmony_ci			      strlen(h->policy->u.mqtt.subscribe), &used_in,
100d4afb5ceSopenharmony_ci			      &used_out) != LSTRX_DONE) {
101d4afb5ceSopenharmony_ci		lwsl_err("%s, failed to expand MQTT subscribe topic\n",
102d4afb5ceSopenharmony_ci			 __func__);
103d4afb5ceSopenharmony_ci		lws_free(expbuf);
104d4afb5ceSopenharmony_ci		return 1;
105d4afb5ceSopenharmony_ci	}
106d4afb5ceSopenharmony_ci	lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
107d4afb5ceSopenharmony_ci	h->u.mqtt.sub_top.name = expbuf;
108d4afb5ceSopenharmony_ci
109d4afb5ceSopenharmony_ci	/*
110d4afb5ceSopenharmony_ci	 * The policy says to subscribe to something, and we
111d4afb5ceSopenharmony_ci	 * haven't done it yet.  Do it using the pre-prepared
112d4afb5ceSopenharmony_ci	 * string-substituted version of the policy string.
113d4afb5ceSopenharmony_ci	 */
114d4afb5ceSopenharmony_ci
115d4afb5ceSopenharmony_ci	lwsl_notice("%s: subscribing %s\n", __func__,
116d4afb5ceSopenharmony_ci		    h->u.mqtt.sub_top.name);
117d4afb5ceSopenharmony_ci
118d4afb5ceSopenharmony_ci	h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
119d4afb5ceSopenharmony_ci	memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
120d4afb5ceSopenharmony_ci	h->u.mqtt.sub_info.num_topics = 1;
121d4afb5ceSopenharmony_ci	h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
122d4afb5ceSopenharmony_ci	h->u.mqtt.sub_info.topic =
123d4afb5ceSopenharmony_ci			    lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__);
124d4afb5ceSopenharmony_ci	h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
125d4afb5ceSopenharmony_ci	h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
126d4afb5ceSopenharmony_ci
127d4afb5ceSopenharmony_ci	if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
128d4afb5ceSopenharmony_ci		lwsl_notice("%s: unable to subscribe", __func__);
129d4afb5ceSopenharmony_ci		lws_free(expbuf);
130d4afb5ceSopenharmony_ci		h->u.mqtt.sub_top.name = NULL;
131d4afb5ceSopenharmony_ci		return -1;
132d4afb5ceSopenharmony_ci	}
133d4afb5ceSopenharmony_ci	lws_free(expbuf);
134d4afb5ceSopenharmony_ci	h->u.mqtt.sub_top.name = NULL;
135d4afb5ceSopenharmony_ci
136d4afb5ceSopenharmony_ci	/* Expect a SUBACK */
137d4afb5ceSopenharmony_ci	if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
138d4afb5ceSopenharmony_ci		lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
139d4afb5ceSopenharmony_ci		return -1;
140d4afb5ceSopenharmony_ci	}
141d4afb5ceSopenharmony_ci	return 0;
142d4afb5ceSopenharmony_ci}
143d4afb5ceSopenharmony_ci
144d4afb5ceSopenharmony_cistatic int
145d4afb5ceSopenharmony_cisecstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
146d4afb5ceSopenharmony_ci			uint32_t payload_len, const char* topic,
147d4afb5ceSopenharmony_ci			lws_mqtt_qos_levels_t qos,  uint8_t retain, uint8_t dup,
148d4afb5ceSopenharmony_ci			int f)
149d4afb5ceSopenharmony_ci{
150d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
151d4afb5ceSopenharmony_ci	size_t used_in, used_out, topic_limit;
152d4afb5ceSopenharmony_ci	lws_strexp_t exp;
153d4afb5ceSopenharmony_ci	char *expbuf;
154d4afb5ceSopenharmony_ci	lws_mqtt_publish_param_t mqpp;
155d4afb5ceSopenharmony_ci
156d4afb5ceSopenharmony_ci	if (h->policy->u.mqtt.aws_iot)
157d4afb5ceSopenharmony_ci		topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
158d4afb5ceSopenharmony_ci	else
159d4afb5ceSopenharmony_ci		topic_limit = LWS_MQTT_MAX_TOPICLEN;
160d4afb5ceSopenharmony_ci
161d4afb5ceSopenharmony_ci	memset(&mqpp, 0, sizeof(mqpp));
162d4afb5ceSopenharmony_ci
163d4afb5ceSopenharmony_ci	lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
164d4afb5ceSopenharmony_ci			topic_limit);
165d4afb5ceSopenharmony_ci
166d4afb5ceSopenharmony_ci	if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
167d4afb5ceSopenharmony_ci			      &used_out) != LSTRX_DONE) {
168d4afb5ceSopenharmony_ci		lwsl_err("%s, failed to expand MQTT publish"
169d4afb5ceSopenharmony_ci			 " topic with no output\n", __func__);
170d4afb5ceSopenharmony_ci		return 1;
171d4afb5ceSopenharmony_ci	}
172d4afb5ceSopenharmony_ci	expbuf = lws_malloc(used_out + 1, __func__);
173d4afb5ceSopenharmony_ci	if (!expbuf) {
174d4afb5ceSopenharmony_ci		lwsl_err("%s, failed to allocate MQTT publish topic",
175d4afb5ceSopenharmony_ci			  __func__);
176d4afb5ceSopenharmony_ci		return 1;
177d4afb5ceSopenharmony_ci	}
178d4afb5ceSopenharmony_ci
179d4afb5ceSopenharmony_ci	lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
180d4afb5ceSopenharmony_ci			used_out + 1);
181d4afb5ceSopenharmony_ci
182d4afb5ceSopenharmony_ci	if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
183d4afb5ceSopenharmony_ci			      &used_out) != LSTRX_DONE) {
184d4afb5ceSopenharmony_ci		lws_free(expbuf);
185d4afb5ceSopenharmony_ci		return 1;
186d4afb5ceSopenharmony_ci	}
187d4afb5ceSopenharmony_ci	lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
188d4afb5ceSopenharmony_ci	mqpp.topic = (char *)expbuf;
189d4afb5ceSopenharmony_ci
190d4afb5ceSopenharmony_ci	mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
191d4afb5ceSopenharmony_ci	mqpp.packet_id = (uint16_t)(h->txord - 1);
192d4afb5ceSopenharmony_ci	mqpp.qos = qos;
193d4afb5ceSopenharmony_ci	mqpp.retain = !!retain;
194d4afb5ceSopenharmony_ci	mqpp.payload = buf;
195d4afb5ceSopenharmony_ci	mqpp.dup = !!dup;
196d4afb5ceSopenharmony_ci	if (payload_len)
197d4afb5ceSopenharmony_ci		mqpp.payload_len = payload_len;
198d4afb5ceSopenharmony_ci	else
199d4afb5ceSopenharmony_ci		mqpp.payload_len = (uint32_t)buf_len;
200d4afb5ceSopenharmony_ci
201d4afb5ceSopenharmony_ci	lwsl_notice("%s: payload len %d\n", __func__,
202d4afb5ceSopenharmony_ci		    (int)mqpp.payload_len);
203d4afb5ceSopenharmony_ci
204d4afb5ceSopenharmony_ci	if (lws_mqtt_client_send_publish(wsi, &mqpp,
205d4afb5ceSopenharmony_ci					 (const char *)buf,
206d4afb5ceSopenharmony_ci					 (uint32_t)buf_len,
207d4afb5ceSopenharmony_ci					 f & LWSSS_FLAG_EOM)) {
208d4afb5ceSopenharmony_ci		lwsl_notice("%s: failed to publish\n", __func__);
209d4afb5ceSopenharmony_ci		lws_free(expbuf);
210d4afb5ceSopenharmony_ci		return -1;
211d4afb5ceSopenharmony_ci	}
212d4afb5ceSopenharmony_ci	lws_free(expbuf);
213d4afb5ceSopenharmony_ci
214d4afb5ceSopenharmony_ci	if ((mqpp.qos == QOS1 || mqpp.qos == QOS2) && buf_len > 0) {
215d4afb5ceSopenharmony_ci		if (lws_buflist_append_segment(&h->u.mqtt.buflist_unacked,
216d4afb5ceSopenharmony_ci					       buf, buf_len) < 0) {
217d4afb5ceSopenharmony_ci			lwsl_notice("%s: failed to store unacked\n", __func__);
218d4afb5ceSopenharmony_ci			return -1;
219d4afb5ceSopenharmony_ci		}
220d4afb5ceSopenharmony_ci	}
221d4afb5ceSopenharmony_ci
222d4afb5ceSopenharmony_ci	return 0;
223d4afb5ceSopenharmony_ci}
224d4afb5ceSopenharmony_ci
225d4afb5ceSopenharmony_cistatic int
226d4afb5ceSopenharmony_cisecstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) {
227d4afb5ceSopenharmony_ci	lws_strexp_t exp;
228d4afb5ceSopenharmony_ci	size_t used_in, used_out = 0;
229d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
230d4afb5ceSopenharmony_ci
231d4afb5ceSopenharmony_ci	if (h->policy->u.mqtt.birth_message) {
232d4afb5ceSopenharmony_ci		lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
233d4afb5ceSopenharmony_ci				(char *)buf, buflen);
234d4afb5ceSopenharmony_ci		if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
235d4afb5ceSopenharmony_ci		    strlen(h->policy->u.mqtt.birth_message),
236d4afb5ceSopenharmony_ci			&used_in, &used_out) != LSTRX_DONE) {
237d4afb5ceSopenharmony_ci			return 1;
238d4afb5ceSopenharmony_ci		}
239d4afb5ceSopenharmony_ci	}
240d4afb5ceSopenharmony_ci	wsi->mqtt->inside_birth = 1;
241d4afb5ceSopenharmony_ci	return secstream_mqtt_publish(wsi, buf,
242d4afb5ceSopenharmony_ci				      used_out, 0, h->policy->u.mqtt.birth_topic,
243d4afb5ceSopenharmony_ci				      h->policy->u.mqtt.birth_qos,
244d4afb5ceSopenharmony_ci				      h->policy->u.mqtt.birth_retain, 0,
245d4afb5ceSopenharmony_ci				      LWSSS_FLAG_EOM);
246d4afb5ceSopenharmony_ci}
247d4afb5ceSopenharmony_ci
248d4afb5ceSopenharmony_cistatic int
249d4afb5ceSopenharmony_cisecstream_mqtt_resend(struct lws *wsi, uint8_t *buf) {
250d4afb5ceSopenharmony_ci	uint8_t *buffered;
251d4afb5ceSopenharmony_ci	size_t len;
252d4afb5ceSopenharmony_ci	int f = 0, r;
253d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
254d4afb5ceSopenharmony_ci
255d4afb5ceSopenharmony_ci	len = lws_buflist_next_segment_len(&h->u.mqtt.buflist_unacked,
256d4afb5ceSopenharmony_ci					   &buffered);
257d4afb5ceSopenharmony_ci
258d4afb5ceSopenharmony_ci	if (h->u.mqtt.unacked_size <= len)
259d4afb5ceSopenharmony_ci		f |= LWSSS_FLAG_EOM;
260d4afb5ceSopenharmony_ci
261d4afb5ceSopenharmony_ci	if (!len) {
262d4afb5ceSopenharmony_ci		/* when the message does not have payload */
263d4afb5ceSopenharmony_ci		buffered = buf;
264d4afb5ceSopenharmony_ci	} else {
265d4afb5ceSopenharmony_ci		h->u.mqtt.unacked_size -= (uint32_t)len;
266d4afb5ceSopenharmony_ci	}
267d4afb5ceSopenharmony_ci
268d4afb5ceSopenharmony_ci	if (wsi->mqtt->inside_birth) {
269d4afb5ceSopenharmony_ci		r = secstream_mqtt_publish(wsi, buffered, len, 0,
270d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.birth_topic,
271d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.birth_qos,
272d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.birth_retain,
273d4afb5ceSopenharmony_ci					   1, f);
274d4afb5ceSopenharmony_ci	} else {
275d4afb5ceSopenharmony_ci		r = secstream_mqtt_publish(wsi, buffered, len,
276d4afb5ceSopenharmony_ci					   (uint32_t)h->writeable_len,
277d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.topic,
278d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.qos,
279d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.retain, 1, f);
280d4afb5ceSopenharmony_ci	}
281d4afb5ceSopenharmony_ci	if (len)
282d4afb5ceSopenharmony_ci		lws_buflist_use_segment(&h->u.mqtt.buflist_unacked, len);
283d4afb5ceSopenharmony_ci
284d4afb5ceSopenharmony_ci	if (r) {
285d4afb5ceSopenharmony_ci		lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
286d4afb5ceSopenharmony_ci		h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
287d4afb5ceSopenharmony_ci
288d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_birth) {
289d4afb5ceSopenharmony_ci			lwsl_err("%s: %s: failed to send Birth\n", __func__,
290d4afb5ceSopenharmony_ci				 lws_ss_tag(h));
291d4afb5ceSopenharmony_ci			return -1;
292d4afb5ceSopenharmony_ci		} else {
293d4afb5ceSopenharmony_ci			r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
294d4afb5ceSopenharmony_ci			if (r != LWSSSSRET_OK)
295d4afb5ceSopenharmony_ci				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
296d4afb5ceSopenharmony_ci		}
297d4afb5ceSopenharmony_ci	}
298d4afb5ceSopenharmony_ci	return 0;
299d4afb5ceSopenharmony_ci}
300d4afb5ceSopenharmony_ci
301d4afb5ceSopenharmony_cistatic char *
302d4afb5ceSopenharmony_ciexpand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len)
303d4afb5ceSopenharmony_ci{
304d4afb5ceSopenharmony_ci	lws_strexp_t exp;
305d4afb5ceSopenharmony_ci	char *expbuf = NULL;
306d4afb5ceSopenharmony_ci	size_t used_in = 0, used_out = 0, post_len = 0;
307d4afb5ceSopenharmony_ci
308d4afb5ceSopenharmony_ci	memset(&exp, 0, sizeof(exp));
309d4afb5ceSopenharmony_ci
310d4afb5ceSopenharmony_ci	if (post)
311d4afb5ceSopenharmony_ci		post_len = strlen(post);
312d4afb5ceSopenharmony_ci
313d4afb5ceSopenharmony_ci	if (post_len > max_len)
314d4afb5ceSopenharmony_ci		return NULL;
315d4afb5ceSopenharmony_ci
316d4afb5ceSopenharmony_ci	lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, NULL,
317d4afb5ceSopenharmony_ci			max_len - post_len);
318d4afb5ceSopenharmony_ci
319d4afb5ceSopenharmony_ci	if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
320d4afb5ceSopenharmony_ci			      &used_out) != LSTRX_DONE) {
321d4afb5ceSopenharmony_ci		lwsl_err("%s, failed to expand %s", __func__, str);
322d4afb5ceSopenharmony_ci
323d4afb5ceSopenharmony_ci		return NULL;
324d4afb5ceSopenharmony_ci	}
325d4afb5ceSopenharmony_ci
326d4afb5ceSopenharmony_ci	expbuf = lws_malloc(used_out + 1 + post_len, __func__);
327d4afb5ceSopenharmony_ci	if (!expbuf) {
328d4afb5ceSopenharmony_ci		lwsl_err("%s, failed to allocate str_exp for %s", __func__, str);
329d4afb5ceSopenharmony_ci
330d4afb5ceSopenharmony_ci		return NULL;
331d4afb5ceSopenharmony_ci	}
332d4afb5ceSopenharmony_ci
333d4afb5ceSopenharmony_ci	lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
334d4afb5ceSopenharmony_ci			used_out + 1 + post_len);
335d4afb5ceSopenharmony_ci
336d4afb5ceSopenharmony_ci	if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
337d4afb5ceSopenharmony_ci			      &used_out) != LSTRX_DONE) {
338d4afb5ceSopenharmony_ci		lwsl_err("%s, failed to expand str_exp %s\n", __func__, str);
339d4afb5ceSopenharmony_ci		lws_free(expbuf);
340d4afb5ceSopenharmony_ci
341d4afb5ceSopenharmony_ci		return NULL;
342d4afb5ceSopenharmony_ci	}
343d4afb5ceSopenharmony_ci	if (post)
344d4afb5ceSopenharmony_ci		strcat(expbuf, post);
345d4afb5ceSopenharmony_ci
346d4afb5ceSopenharmony_ci	return expbuf;
347d4afb5ceSopenharmony_ci}
348d4afb5ceSopenharmony_ci
349d4afb5ceSopenharmony_cistatic lws_mqtt_match_topic_return_t
350d4afb5ceSopenharmony_cisecstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic)
351d4afb5ceSopenharmony_ci{
352d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
353d4afb5ceSopenharmony_ci	const char *match[] = { LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH,
354d4afb5ceSopenharmony_ci				LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH };
355d4afb5ceSopenharmony_ci	char *expbuf = NULL;
356d4afb5ceSopenharmony_ci	unsigned int i = 0;
357d4afb5ceSopenharmony_ci	lws_mqtt_match_topic_return_t ret = LMMTR_TOPIC_NOMATCH;
358d4afb5ceSopenharmony_ci
359d4afb5ceSopenharmony_ci	if (!topic)
360d4afb5ceSopenharmony_ci		return LMMTR_TOPIC_MATCH_ERROR;
361d4afb5ceSopenharmony_ci
362d4afb5ceSopenharmony_ci	expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN);
363d4afb5ceSopenharmony_ci	if (!expbuf) {
364d4afb5ceSopenharmony_ci		lwsl_wsi_warn(wsi, "Failed to expand Shadow topic");
365d4afb5ceSopenharmony_ci
366d4afb5ceSopenharmony_ci		return LMMTR_TOPIC_MATCH_ERROR;
367d4afb5ceSopenharmony_ci	}
368d4afb5ceSopenharmony_ci	for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) {
369d4afb5ceSopenharmony_ci		if (lws_mqtt_is_topic_matched(
370d4afb5ceSopenharmony_ci				match[i], expbuf) == LMMTR_TOPIC_MATCH) {
371d4afb5ceSopenharmony_ci			ret = LMMTR_TOPIC_MATCH;
372d4afb5ceSopenharmony_ci			break;
373d4afb5ceSopenharmony_ci		}
374d4afb5ceSopenharmony_ci	}
375d4afb5ceSopenharmony_ci	lws_free(expbuf);
376d4afb5ceSopenharmony_ci
377d4afb5ceSopenharmony_ci	return ret;
378d4afb5ceSopenharmony_ci}
379d4afb5ceSopenharmony_ci
380d4afb5ceSopenharmony_cistatic void
381d4afb5ceSopenharmony_cisecstream_mqtt_shadow_cleanup(struct lws *wsi)
382d4afb5ceSopenharmony_ci{
383d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
384d4afb5ceSopenharmony_ci	uint32_t i = 0;
385d4afb5ceSopenharmony_ci
386d4afb5ceSopenharmony_ci	for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++)
387d4afb5ceSopenharmony_ci		lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name);
388d4afb5ceSopenharmony_ci
389d4afb5ceSopenharmony_ci	h->u.mqtt.shadow_sub.num_topics = 0;
390d4afb5ceSopenharmony_ci
391d4afb5ceSopenharmony_ci	if (h->u.mqtt.shadow_sub.topic) {
392d4afb5ceSopenharmony_ci		lws_free(h->u.mqtt.shadow_sub.topic);
393d4afb5ceSopenharmony_ci		h->u.mqtt.shadow_sub.topic = NULL;
394d4afb5ceSopenharmony_ci	}
395d4afb5ceSopenharmony_ci}
396d4afb5ceSopenharmony_ci
397d4afb5ceSopenharmony_cistatic lws_ss_state_return_t
398d4afb5ceSopenharmony_cisecstream_mqtt_shadow_unsubscribe(struct lws *wsi)
399d4afb5ceSopenharmony_ci{
400d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
401d4afb5ceSopenharmony_ci
402d4afb5ceSopenharmony_ci	if (h->u.mqtt.shadow_sub.num_topics == 0) {
403d4afb5ceSopenharmony_ci		wsi->mqtt->send_shadow_unsubscribe = 0;
404d4afb5ceSopenharmony_ci		wsi->mqtt->inside_shadow = 0;
405d4afb5ceSopenharmony_ci		wsi->mqtt->done_shadow_subscribe = 0;
406d4afb5ceSopenharmony_ci
407d4afb5ceSopenharmony_ci		return LWSSSSRET_OK;
408d4afb5ceSopenharmony_ci	}
409d4afb5ceSopenharmony_ci
410d4afb5ceSopenharmony_ci	if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) {
411d4afb5ceSopenharmony_ci		lwsl_wsi_err(wsi, "Failed to send MQTT unsubsribe");
412d4afb5ceSopenharmony_ci
413d4afb5ceSopenharmony_ci		return LWSSSSRET_DISCONNECT_ME;
414d4afb5ceSopenharmony_ci	}
415d4afb5ceSopenharmony_ci	/* Expect a UNSUBACK */
416d4afb5ceSopenharmony_ci	if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
417d4afb5ceSopenharmony_ci		lwsl_wsi_err(wsi, "Unable to set LWS_POLLIN");
418d4afb5ceSopenharmony_ci
419d4afb5ceSopenharmony_ci		return LWSSSSRET_DISCONNECT_ME;
420d4afb5ceSopenharmony_ci	}
421d4afb5ceSopenharmony_ci	wsi->mqtt->send_shadow_unsubscribe = 0;
422d4afb5ceSopenharmony_ci
423d4afb5ceSopenharmony_ci	return LWSSSSRET_OK;
424d4afb5ceSopenharmony_ci}
425d4afb5ceSopenharmony_ci
426d4afb5ceSopenharmony_cistatic int
427d4afb5ceSopenharmony_cisecstream_mqtt_shadow_subscribe(struct lws *wsi)
428d4afb5ceSopenharmony_ci{
429d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
430d4afb5ceSopenharmony_ci	char* expbuf = NULL;
431d4afb5ceSopenharmony_ci	const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR,
432d4afb5ceSopenharmony_ci				   LWS_MQTT_SHADOW_RESP_REJECTED_STR };
433d4afb5ceSopenharmony_ci	unsigned int i, suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]);
434d4afb5ceSopenharmony_ci
435d4afb5ceSopenharmony_ci	if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow)
436d4afb5ceSopenharmony_ci		return 0;
437d4afb5ceSopenharmony_ci
438d4afb5ceSopenharmony_ci	if (h->u.mqtt.shadow_sub.num_topics > 0)
439d4afb5ceSopenharmony_ci		secstream_mqtt_shadow_cleanup(wsi);
440d4afb5ceSopenharmony_ci
441d4afb5ceSopenharmony_ci	memset(&h->u.mqtt.shadow_sub, 0, sizeof(lws_mqtt_subscribe_param_t));
442d4afb5ceSopenharmony_ci	h->u.mqtt.shadow_sub.topic = lws_malloc(
443d4afb5ceSopenharmony_ci			sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__);
444d4afb5ceSopenharmony_ci	if (!h->u.mqtt.shadow_sub.topic) {
445d4afb5ceSopenharmony_ci		lwsl_ss_err(h, "Failed to allocate Shadow topics");
446d4afb5ceSopenharmony_ci		return -1;
447d4afb5ceSopenharmony_ci	}
448d4afb5ceSopenharmony_ci	h->u.mqtt.shadow_sub.num_topics = suffixes_len;
449d4afb5ceSopenharmony_ci	for (i = 0; i < suffixes_len; i++) {
450d4afb5ceSopenharmony_ci		expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i],
451d4afb5ceSopenharmony_ci					 LWS_MQTT_MAX_AWSIOT_TOPICLEN);
452d4afb5ceSopenharmony_ci		if (!expbuf) {
453d4afb5ceSopenharmony_ci			lwsl_ss_err(h, "Failed to allocate Shadow topic");
454d4afb5ceSopenharmony_ci			secstream_mqtt_shadow_cleanup(wsi);
455d4afb5ceSopenharmony_ci
456d4afb5ceSopenharmony_ci			return -1;
457d4afb5ceSopenharmony_ci		}
458d4afb5ceSopenharmony_ci		h->u.mqtt.shadow_sub.topic[i].name = expbuf;
459d4afb5ceSopenharmony_ci		h->u.mqtt.shadow_sub.topic[i].qos = h->policy->u.mqtt.qos;
460d4afb5ceSopenharmony_ci	}
461d4afb5ceSopenharmony_ci	h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1);
462d4afb5ceSopenharmony_ci
463d4afb5ceSopenharmony_ci	if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) {
464d4afb5ceSopenharmony_ci		lwsl_wsi_notice(wsi, "Unable to subscribe Shadow topics");
465d4afb5ceSopenharmony_ci
466d4afb5ceSopenharmony_ci		return 0;
467d4afb5ceSopenharmony_ci	}
468d4afb5ceSopenharmony_ci
469d4afb5ceSopenharmony_ci	/* Expect a SUBACK */
470d4afb5ceSopenharmony_ci	if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
471d4afb5ceSopenharmony_ci		lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
472d4afb5ceSopenharmony_ci		return -1;
473d4afb5ceSopenharmony_ci	}
474d4afb5ceSopenharmony_ci	wsi->mqtt->inside_shadow = 1;
475d4afb5ceSopenharmony_ci
476d4afb5ceSopenharmony_ci	return 0;
477d4afb5ceSopenharmony_ci}
478d4afb5ceSopenharmony_ci
479d4afb5ceSopenharmony_cistatic int
480d4afb5ceSopenharmony_cisecstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
481d4afb5ceSopenharmony_ci	     void *in, size_t len)
482d4afb5ceSopenharmony_ci{
483d4afb5ceSopenharmony_ci	lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
484d4afb5ceSopenharmony_ci	size_t used_in = 0, used_out = 0, topic_len = 0;
485d4afb5ceSopenharmony_ci	lws_mqtt_publish_param_t *pmqpp = NULL;
486d4afb5ceSopenharmony_ci	lws_ss_state_return_t r = LWSSSSRET_OK;
487d4afb5ceSopenharmony_ci	uint8_t buf[LWS_PRE + 1400];
488d4afb5ceSopenharmony_ci	size_t buflen = sizeof(buf) - LWS_PRE;
489d4afb5ceSopenharmony_ci	lws_ss_metadata_t *omd = NULL;
490d4afb5ceSopenharmony_ci	char *sub_topic = NULL;
491d4afb5ceSopenharmony_ci	lws_strexp_t exp;
492d4afb5ceSopenharmony_ci	int f = 0;
493d4afb5ceSopenharmony_ci
494d4afb5ceSopenharmony_ci	memset(buf, 0, sizeof(buf));
495d4afb5ceSopenharmony_ci	memset(&exp, 0, sizeof(exp));
496d4afb5ceSopenharmony_ci
497d4afb5ceSopenharmony_ci	switch (reason) {
498d4afb5ceSopenharmony_ci
499d4afb5ceSopenharmony_ci	/* because we are protocols[0] ... */
500d4afb5ceSopenharmony_ci	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
501d4afb5ceSopenharmony_ci		lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
502d4afb5ceSopenharmony_ci			 in ? (char *)in : "(null)");
503d4afb5ceSopenharmony_ci		if (!h)
504d4afb5ceSopenharmony_ci			break;
505d4afb5ceSopenharmony_ci
506d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CONMON)
507d4afb5ceSopenharmony_ci		lws_conmon_ss_json(h);
508d4afb5ceSopenharmony_ci#endif
509d4afb5ceSopenharmony_ci
510d4afb5ceSopenharmony_ci		r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
511d4afb5ceSopenharmony_ci		h->wsi = NULL;
512d4afb5ceSopenharmony_ci
513d4afb5ceSopenharmony_ci		secstream_mqtt_cleanup(h);
514d4afb5ceSopenharmony_ci
515d4afb5ceSopenharmony_ci		if (r == LWSSSSRET_DESTROY_ME)
516d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
517d4afb5ceSopenharmony_ci
518d4afb5ceSopenharmony_ci		r = lws_ss_backoff(h);
519d4afb5ceSopenharmony_ci		if (r != LWSSSSRET_OK)
520d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
521d4afb5ceSopenharmony_ci
522d4afb5ceSopenharmony_ci		break;
523d4afb5ceSopenharmony_ci
524d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
525d4afb5ceSopenharmony_ci		if (!h)
526d4afb5ceSopenharmony_ci			break;
527d4afb5ceSopenharmony_ci		lws_sul_cancel(&h->sul_timeout);
528d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CONMON)
529d4afb5ceSopenharmony_ci		lws_conmon_ss_json(h);
530d4afb5ceSopenharmony_ci#endif
531d4afb5ceSopenharmony_ci		if (h->ss_dangling_connected)
532d4afb5ceSopenharmony_ci			r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
533d4afb5ceSopenharmony_ci		else
534d4afb5ceSopenharmony_ci			r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
535d4afb5ceSopenharmony_ci		if (h->wsi)
536d4afb5ceSopenharmony_ci			lws_set_opaque_user_data(h->wsi, NULL);
537d4afb5ceSopenharmony_ci		h->wsi = NULL;
538d4afb5ceSopenharmony_ci
539d4afb5ceSopenharmony_ci		secstream_mqtt_cleanup(h);
540d4afb5ceSopenharmony_ci
541d4afb5ceSopenharmony_ci		if (r)
542d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
543d4afb5ceSopenharmony_ci
544d4afb5ceSopenharmony_ci		if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
545d4afb5ceSopenharmony_ci		    !h->txn_ok && !wsi->a.context->being_destroyed) {
546d4afb5ceSopenharmony_ci			r = lws_ss_backoff(h);
547d4afb5ceSopenharmony_ci			if (r != LWSSSSRET_OK)
548d4afb5ceSopenharmony_ci				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
549d4afb5ceSopenharmony_ci		}
550d4afb5ceSopenharmony_ci		break;
551d4afb5ceSopenharmony_ci
552d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
553d4afb5ceSopenharmony_ci		/*
554d4afb5ceSopenharmony_ci		 * Make sure the handle wsi points to the stream wsi not the
555d4afb5ceSopenharmony_ci		 * original nwsi, in the case it was migrated
556d4afb5ceSopenharmony_ci		 */
557d4afb5ceSopenharmony_ci		h->wsi = wsi;
558d4afb5ceSopenharmony_ci		h->retry = 0;
559d4afb5ceSopenharmony_ci		h->seqstate = SSSEQ_CONNECTED;
560d4afb5ceSopenharmony_ci
561d4afb5ceSopenharmony_ci		if (h->policy->u.mqtt.birth_topic &&
562d4afb5ceSopenharmony_ci		    !wsi->mqtt->done_birth) {
563d4afb5ceSopenharmony_ci			struct lws *nwsi = lws_get_network_wsi(wsi);
564d4afb5ceSopenharmony_ci			lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) {
565d4afb5ceSopenharmony_ci				if (w != wsi &&
566d4afb5ceSopenharmony_ci					(w->mqtt->done_birth || w->mqtt->inside_birth)) {
567d4afb5ceSopenharmony_ci					/*
568d4afb5ceSopenharmony_ci					 * If any Birth was sent out or
569d4afb5ceSopenharmony_ci					 * is pending on other stream,
570d4afb5ceSopenharmony_ci					 * skip sending Birth.
571d4afb5ceSopenharmony_ci					 */
572d4afb5ceSopenharmony_ci					wsi->mqtt->done_birth = 1;
573d4afb5ceSopenharmony_ci					break;
574d4afb5ceSopenharmony_ci				}
575d4afb5ceSopenharmony_ci			} lws_end_foreach_ll(w, mux.sibling_list);
576d4afb5ceSopenharmony_ci		}
577d4afb5ceSopenharmony_ci
578d4afb5ceSopenharmony_ci		if (!h->policy->u.mqtt.subscribe ||
579d4afb5ceSopenharmony_ci		    !h->policy->u.mqtt.subscribe[0]) {
580d4afb5ceSopenharmony_ci			/*
581d4afb5ceSopenharmony_ci			 * If subscribe is empty in the policy, then,
582d4afb5ceSopenharmony_ci			 * skip sending SUBSCRIBE and signal the user
583d4afb5ceSopenharmony_ci			 * application.
584d4afb5ceSopenharmony_ci			 */
585d4afb5ceSopenharmony_ci			wsi->mqtt->done_subscribe = 1;
586d4afb5ceSopenharmony_ci		} else if (!h->policy->u.mqtt.clean_start &&
587d4afb5ceSopenharmony_ci			   wsi->mqtt->session_resumed) {
588d4afb5ceSopenharmony_ci			wsi->mqtt->inside_resume_session = 1;
589d4afb5ceSopenharmony_ci			/*
590d4afb5ceSopenharmony_ci			 * If the previous session is resumed and Server has
591d4afb5ceSopenharmony_ci			 * stored session, then, do not subscribe.
592d4afb5ceSopenharmony_ci			 */
593d4afb5ceSopenharmony_ci			if (!secstream_mqtt_subscribe(wsi))
594d4afb5ceSopenharmony_ci				wsi->mqtt->done_subscribe = 1;
595d4afb5ceSopenharmony_ci			wsi->mqtt->inside_resume_session = 0;
596d4afb5ceSopenharmony_ci		} else if (h->policy->u.mqtt.subscribe &&
597d4afb5ceSopenharmony_ci			   !wsi->mqtt->done_subscribe) {
598d4afb5ceSopenharmony_ci			/*
599d4afb5ceSopenharmony_ci			 * If a subscribe is pending on the stream, then make
600d4afb5ceSopenharmony_ci			 * sure the SUBSCRIBE is done before signaling the
601d4afb5ceSopenharmony_ci			 * user application.
602d4afb5ceSopenharmony_ci			 */
603d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
604d4afb5ceSopenharmony_ci			break;
605d4afb5ceSopenharmony_ci		}
606d4afb5ceSopenharmony_ci
607d4afb5ceSopenharmony_ci		if (h->policy->u.mqtt.birth_topic &&
608d4afb5ceSopenharmony_ci		    !wsi->mqtt->done_birth) {
609d4afb5ceSopenharmony_ci			/*
610d4afb5ceSopenharmony_ci			 * If a Birth is pending on the stream, then make
611d4afb5ceSopenharmony_ci			 * sure the Birth is done before signaling the
612d4afb5ceSopenharmony_ci			 * user application.
613d4afb5ceSopenharmony_ci			 */
614d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
615d4afb5ceSopenharmony_ci			break;
616d4afb5ceSopenharmony_ci		}
617d4afb5ceSopenharmony_ci		lws_sul_cancel(&h->sul);
618d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SYS_METRICS)
619d4afb5ceSopenharmony_ci		/*
620d4afb5ceSopenharmony_ci		 * If any hanging caliper measurement, dump it, and free any tags
621d4afb5ceSopenharmony_ci		 */
622d4afb5ceSopenharmony_ci		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
623d4afb5ceSopenharmony_ci#endif
624d4afb5ceSopenharmony_ci		r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
625d4afb5ceSopenharmony_ci		if (r != LWSSSSRET_OK)
626d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
627d4afb5ceSopenharmony_ci		if (h->policy->u.mqtt.topic)
628d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
629d4afb5ceSopenharmony_ci		break;
630d4afb5ceSopenharmony_ci
631d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_RX:
632d4afb5ceSopenharmony_ci		// lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len);
633d4afb5ceSopenharmony_ci		if (!h || !h->info.rx)
634d4afb5ceSopenharmony_ci			return 0;
635d4afb5ceSopenharmony_ci
636d4afb5ceSopenharmony_ci		pmqpp = (lws_mqtt_publish_param_t *)in;
637d4afb5ceSopenharmony_ci
638d4afb5ceSopenharmony_ci		f = 0;
639d4afb5ceSopenharmony_ci		if (!pmqpp->payload_pos)
640d4afb5ceSopenharmony_ci			f |= LWSSS_FLAG_SOM;
641d4afb5ceSopenharmony_ci		if (pmqpp->payload_pos + len == pmqpp->payload_len)
642d4afb5ceSopenharmony_ci			f |= LWSSS_FLAG_EOM;
643d4afb5ceSopenharmony_ci
644d4afb5ceSopenharmony_ci		h->subseq = 1;
645d4afb5ceSopenharmony_ci
646d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_shadow) {
647d4afb5ceSopenharmony_ci			/*
648d4afb5ceSopenharmony_ci			 * When Shadow is used, the stream receives multiple
649d4afb5ceSopenharmony_ci			 * topics including Shadow response, set received
650d4afb5ceSopenharmony_ci			 * topic on the metadata
651d4afb5ceSopenharmony_ci			 */
652d4afb5ceSopenharmony_ci			lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata,
653d4afb5ceSopenharmony_ci					NULL, (size_t)-1);
654d4afb5ceSopenharmony_ci
655d4afb5ceSopenharmony_ci			if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
656d4afb5ceSopenharmony_ci					strlen(h->policy->u.mqtt.subscribe),
657d4afb5ceSopenharmony_ci					&used_in, &used_out) != LSTRX_DONE) {
658d4afb5ceSopenharmony_ci				lwsl_err("%s, failed to expand subscribe topic",
659d4afb5ceSopenharmony_ci					 __func__);
660d4afb5ceSopenharmony_ci				return -1;
661d4afb5ceSopenharmony_ci			}
662d4afb5ceSopenharmony_ci			omd = lws_ss_get_handle_metadata(h, exp.name);
663d4afb5ceSopenharmony_ci
664d4afb5ceSopenharmony_ci			if (!omd) {
665d4afb5ceSopenharmony_ci				lwsl_err("%s, failed to find metadata for subscribe",
666d4afb5ceSopenharmony_ci					 __func__);
667d4afb5ceSopenharmony_ci				return -1;
668d4afb5ceSopenharmony_ci			}
669d4afb5ceSopenharmony_ci			sub_topic = omd->value__may_own_heap;
670d4afb5ceSopenharmony_ci			topic_len = omd->length;
671d4afb5ceSopenharmony_ci
672d4afb5ceSopenharmony_ci			_lws_ss_set_metadata(omd, exp.name,
673d4afb5ceSopenharmony_ci					     (const void *)pmqpp->topic,
674d4afb5ceSopenharmony_ci					     pmqpp->topic_len);
675d4afb5ceSopenharmony_ci		}
676d4afb5ceSopenharmony_ci
677d4afb5ceSopenharmony_ci		r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
678d4afb5ceSopenharmony_ci			   len, f);
679d4afb5ceSopenharmony_ci
680d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_shadow)
681d4afb5ceSopenharmony_ci			_lws_ss_set_metadata(omd, exp.name, &sub_topic,
682d4afb5ceSopenharmony_ci					     topic_len);
683d4afb5ceSopenharmony_ci
684d4afb5ceSopenharmony_ci		if (r != LWSSSSRET_OK)
685d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
686d4afb5ceSopenharmony_ci
687d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_shadow) {
688d4afb5ceSopenharmony_ci			size_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR);
689d4afb5ceSopenharmony_ci			size_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR);
690d4afb5ceSopenharmony_ci			uint32_t i;
691d4afb5ceSopenharmony_ci
692d4afb5ceSopenharmony_ci			for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) {
693d4afb5ceSopenharmony_ci				/*
694d4afb5ceSopenharmony_ci				 * received response ('/accepted' or 'rejected')
695d4afb5ceSopenharmony_ci				 * and clean up Shadow operation
696d4afb5ceSopenharmony_ci				 */
697d4afb5ceSopenharmony_ci				if (strncmp(h->u.mqtt.shadow_sub.topic[i].name,
698d4afb5ceSopenharmony_ci					    pmqpp->topic, pmqpp->topic_len) ||
699d4afb5ceSopenharmony_ci				    (strlen(pmqpp->topic) < acc_n ||
700d4afb5ceSopenharmony_ci				     strlen(pmqpp->topic) < rej_n))
701d4afb5ceSopenharmony_ci					continue;
702d4afb5ceSopenharmony_ci
703d4afb5ceSopenharmony_ci				if (!strcmp(pmqpp->topic +
704d4afb5ceSopenharmony_ci						(strlen(pmqpp->topic) - acc_n),
705d4afb5ceSopenharmony_ci					 	LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) ||
706d4afb5ceSopenharmony_ci				    !strcmp(pmqpp->topic +
707d4afb5ceSopenharmony_ci						(strlen(pmqpp->topic) - rej_n),
708d4afb5ceSopenharmony_ci						 LWS_MQTT_SHADOW_RESP_REJECTED_STR)) {
709d4afb5ceSopenharmony_ci					lws_sul_cancel(&wsi->mqtt->sul_shadow_wait);
710d4afb5ceSopenharmony_ci					wsi->mqtt->send_shadow_unsubscribe = 1;
711d4afb5ceSopenharmony_ci					lws_callback_on_writable(wsi);
712d4afb5ceSopenharmony_ci
713d4afb5ceSopenharmony_ci					return 0;
714d4afb5ceSopenharmony_ci				}
715d4afb5ceSopenharmony_ci			}
716d4afb5ceSopenharmony_ci		}
717d4afb5ceSopenharmony_ci		return 0; /* don't passthru */
718d4afb5ceSopenharmony_ci
719d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_SUBSCRIBED:
720d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_shadow) {
721d4afb5ceSopenharmony_ci			wsi->mqtt->done_shadow_subscribe = 1;
722d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
723d4afb5ceSopenharmony_ci
724d4afb5ceSopenharmony_ci			return 0;
725d4afb5ceSopenharmony_ci		}
726d4afb5ceSopenharmony_ci		/*
727d4afb5ceSopenharmony_ci		 * Stream demanded a subscribe without a Birth while connecting, once
728d4afb5ceSopenharmony_ci		 * done notify CONNECTED event to the application.
729d4afb5ceSopenharmony_ci		 */
730d4afb5ceSopenharmony_ci		if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) {
731d4afb5ceSopenharmony_ci			lws_sul_cancel(&h->sul);
732d4afb5ceSopenharmony_ci			r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
733d4afb5ceSopenharmony_ci			if (r != LWSSSSRET_OK)
734d4afb5ceSopenharmony_ci				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
735d4afb5ceSopenharmony_ci		}
736d4afb5ceSopenharmony_ci		wsi->mqtt->done_subscribe = 1;
737d4afb5ceSopenharmony_ci		lws_callback_on_writable(wsi);
738d4afb5ceSopenharmony_ci		break;
739d4afb5ceSopenharmony_ci
740d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_ACK:
741d4afb5ceSopenharmony_ci		lws_sul_cancel(&h->sul_timeout);
742d4afb5ceSopenharmony_ci		if (h->u.mqtt.send_unacked) {
743d4afb5ceSopenharmony_ci			lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
744d4afb5ceSopenharmony_ci			h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
745d4afb5ceSopenharmony_ci		}
746d4afb5ceSopenharmony_ci
747d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_birth) {
748d4afb5ceSopenharmony_ci			/*
749d4afb5ceSopenharmony_ci			 * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify
750d4afb5ceSopenharmony_ci			 * CONNECTED event to the application.
751d4afb5ceSopenharmony_ci			 */
752d4afb5ceSopenharmony_ci			wsi->mqtt->inside_birth = 0;
753d4afb5ceSopenharmony_ci			wsi->mqtt->done_birth = 1;
754d4afb5ceSopenharmony_ci			r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
755d4afb5ceSopenharmony_ci			if (r != LWSSSSRET_OK)
756d4afb5ceSopenharmony_ci				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
757d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
758d4afb5ceSopenharmony_ci			break;
759d4afb5ceSopenharmony_ci		}
760d4afb5ceSopenharmony_ci		r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
761d4afb5ceSopenharmony_ci		if (r != LWSSSSRET_OK)
762d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
763d4afb5ceSopenharmony_ci		break;
764d4afb5ceSopenharmony_ci
765d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_RESEND:
766d4afb5ceSopenharmony_ci		lws_sul_cancel(&h->sul_timeout);
767d4afb5ceSopenharmony_ci		if (h->u.mqtt.retry_count++ < LWS_MQTT_MAX_PUBLISH_RETRY) {
768d4afb5ceSopenharmony_ci			h->u.mqtt.unacked_size =
769d4afb5ceSopenharmony_ci				(uint32_t)lws_buflist_total_len(&h->u.mqtt.buflist_unacked);
770d4afb5ceSopenharmony_ci			if (h->u.mqtt.unacked_size) {
771d4afb5ceSopenharmony_ci				lwsl_notice("%s: %s: resend unacked message (%d/%d) \n",
772d4afb5ceSopenharmony_ci					    __func__, lws_ss_tag(h),
773d4afb5ceSopenharmony_ci					    h->u.mqtt.retry_count,
774d4afb5ceSopenharmony_ci					    LWS_MQTT_MAX_PUBLISH_RETRY);
775d4afb5ceSopenharmony_ci				h->u.mqtt.send_unacked = 1;
776d4afb5ceSopenharmony_ci				lws_callback_on_writable(wsi);
777d4afb5ceSopenharmony_ci				break;
778d4afb5ceSopenharmony_ci			}
779d4afb5ceSopenharmony_ci		}
780d4afb5ceSopenharmony_ci
781d4afb5ceSopenharmony_ci		lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
782d4afb5ceSopenharmony_ci		h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
783d4afb5ceSopenharmony_ci
784d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_birth) {
785d4afb5ceSopenharmony_ci			lwsl_err("%s: %s: failed to send Birth\n", __func__,
786d4afb5ceSopenharmony_ci				 lws_ss_tag(h));
787d4afb5ceSopenharmony_ci			return -1;
788d4afb5ceSopenharmony_ci		}
789d4afb5ceSopenharmony_ci
790d4afb5ceSopenharmony_ci		r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
791d4afb5ceSopenharmony_ci		if (r != LWSSSSRET_OK)
792d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
793d4afb5ceSopenharmony_ci		break;
794d4afb5ceSopenharmony_ci
795d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
796d4afb5ceSopenharmony_ci	{
797d4afb5ceSopenharmony_ci		if (!h || !h->info.tx)
798d4afb5ceSopenharmony_ci			return 0;
799d4afb5ceSopenharmony_ci		lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
800d4afb5ceSopenharmony_ci
801d4afb5ceSopenharmony_ci		if (h->seqstate != SSSEQ_CONNECTED) {
802d4afb5ceSopenharmony_ci			lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
803d4afb5ceSopenharmony_ci			break;
804d4afb5ceSopenharmony_ci		}
805d4afb5ceSopenharmony_ci
806d4afb5ceSopenharmony_ci		if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
807d4afb5ceSopenharmony_ci			return secstream_mqtt_subscribe(wsi);
808d4afb5ceSopenharmony_ci
809d4afb5ceSopenharmony_ci		if (h->u.mqtt.send_unacked)
810d4afb5ceSopenharmony_ci			return secstream_mqtt_resend(wsi, buf + LWS_PRE);
811d4afb5ceSopenharmony_ci
812d4afb5ceSopenharmony_ci		if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic)
813d4afb5ceSopenharmony_ci			return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen);
814d4afb5ceSopenharmony_ci
815d4afb5ceSopenharmony_ci		if (h->policy->u.mqtt.aws_iot) {
816d4afb5ceSopenharmony_ci			if (secstream_mqtt_is_shadow_matched(wsi,
817d4afb5ceSopenharmony_ci			    h->policy->u.mqtt.topic) == LMMTR_TOPIC_MATCH) {
818d4afb5ceSopenharmony_ci				if (!wsi->mqtt->done_shadow_subscribe)
819d4afb5ceSopenharmony_ci					return secstream_mqtt_shadow_subscribe(wsi);
820d4afb5ceSopenharmony_ci				if (wsi->mqtt->send_shadow_unsubscribe)
821d4afb5ceSopenharmony_ci					return secstream_mqtt_shadow_unsubscribe(wsi);
822d4afb5ceSopenharmony_ci			}
823d4afb5ceSopenharmony_ci		}
824d4afb5ceSopenharmony_ci
825d4afb5ceSopenharmony_ci		r = h->info.tx(ss_to_userobj(h),  h->txord++,  buf + LWS_PRE,
826d4afb5ceSopenharmony_ci			       &buflen, &f);
827d4afb5ceSopenharmony_ci
828d4afb5ceSopenharmony_ci		if (r == LWSSSSRET_TX_DONT_SEND) {
829d4afb5ceSopenharmony_ci			if (wsi->mqtt->done_shadow_subscribe) {
830d4afb5ceSopenharmony_ci				return secstream_mqtt_shadow_unsubscribe(wsi);
831d4afb5ceSopenharmony_ci			}
832d4afb5ceSopenharmony_ci			return 0;
833d4afb5ceSopenharmony_ci		}
834d4afb5ceSopenharmony_ci
835d4afb5ceSopenharmony_ci		if (r == LWSSSSRET_DISCONNECT_ME) {
836d4afb5ceSopenharmony_ci			lws_mqtt_subscribe_param_t lmsp;
837d4afb5ceSopenharmony_ci			if (h->u.mqtt.sub_info.num_topics) {
838d4afb5ceSopenharmony_ci				lmsp.num_topics = h->u.mqtt.sub_info.num_topics;
839d4afb5ceSopenharmony_ci				lmsp.topic = h->u.mqtt.sub_info.topic;
840d4afb5ceSopenharmony_ci				lmsp.packet_id = (uint16_t)(h->txord - 1);
841d4afb5ceSopenharmony_ci				if (lws_mqtt_client_send_unsubcribe(wsi,
842d4afb5ceSopenharmony_ci								    &lmsp)) {
843d4afb5ceSopenharmony_ci					lwsl_err("%s, failed to send"
844d4afb5ceSopenharmony_ci					         " MQTT unsubsribe", __func__);
845d4afb5ceSopenharmony_ci					return -1;
846d4afb5ceSopenharmony_ci				}
847d4afb5ceSopenharmony_ci				return 0;
848d4afb5ceSopenharmony_ci			}
849d4afb5ceSopenharmony_ci		}
850d4afb5ceSopenharmony_ci
851d4afb5ceSopenharmony_ci		if (r < 0)
852d4afb5ceSopenharmony_ci			return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
853d4afb5ceSopenharmony_ci
854d4afb5ceSopenharmony_ci		if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
855d4afb5ceSopenharmony_ci					   (uint32_t)h->writeable_len,
856d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.topic,
857d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.qos,
858d4afb5ceSopenharmony_ci					   h->policy->u.mqtt.retain, 0, f) != 0) {
859d4afb5ceSopenharmony_ci			r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
860d4afb5ceSopenharmony_ci			if (r != LWSSSSRET_OK)
861d4afb5ceSopenharmony_ci				return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
862d4afb5ceSopenharmony_ci		}
863d4afb5ceSopenharmony_ci		return 0;
864d4afb5ceSopenharmony_ci	}
865d4afb5ceSopenharmony_ci
866d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
867d4afb5ceSopenharmony_ci	{
868d4afb5ceSopenharmony_ci		struct lws *nwsi = lws_get_network_wsi(wsi);
869d4afb5ceSopenharmony_ci
870d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_shadow) {
871d4afb5ceSopenharmony_ci			secstream_mqtt_shadow_cleanup(wsi);
872d4afb5ceSopenharmony_ci			wsi->mqtt->inside_shadow = 0;
873d4afb5ceSopenharmony_ci			wsi->mqtt->done_shadow_subscribe = 0;
874d4afb5ceSopenharmony_ci			break;
875d4afb5ceSopenharmony_ci		}
876d4afb5ceSopenharmony_ci		if (nwsi && (nwsi->mux.child_count == 1))
877d4afb5ceSopenharmony_ci			lws_mqtt_client_send_disconnect(nwsi);
878d4afb5ceSopenharmony_ci		return -1;
879d4afb5ceSopenharmony_ci	}
880d4afb5ceSopenharmony_ci
881d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT:
882d4afb5ceSopenharmony_ci		if (!wsi->mqtt)
883d4afb5ceSopenharmony_ci			return -1;
884d4afb5ceSopenharmony_ci
885d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_shadow) {
886d4afb5ceSopenharmony_ci			secstream_mqtt_shadow_cleanup(wsi);
887d4afb5ceSopenharmony_ci			wsi->mqtt->inside_shadow = 0;
888d4afb5ceSopenharmony_ci			wsi->mqtt->done_shadow_subscribe = 0;
889d4afb5ceSopenharmony_ci			lwsl_warn("%s: %s: Unsubscribe (Shadow) timeout.\n",
890d4afb5ceSopenharmony_ci				  __func__, lws_ss_tag(h));
891d4afb5ceSopenharmony_ci			break;
892d4afb5ceSopenharmony_ci		}
893d4afb5ceSopenharmony_ci
894d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_unsubscribe) {
895d4afb5ceSopenharmony_ci			lwsl_warn("%s: %s: Unsubscribe timeout.\n", __func__,
896d4afb5ceSopenharmony_ci				  lws_ss_tag(h));
897d4afb5ceSopenharmony_ci			return -1;
898d4afb5ceSopenharmony_ci		}
899d4afb5ceSopenharmony_ci		break;
900d4afb5ceSopenharmony_ci
901d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_SHADOW_TIMEOUT:
902d4afb5ceSopenharmony_ci		if (!wsi->mqtt)
903d4afb5ceSopenharmony_ci			return -1;
904d4afb5ceSopenharmony_ci
905d4afb5ceSopenharmony_ci		if (wsi->mqtt->inside_shadow) {
906d4afb5ceSopenharmony_ci			lwsl_warn("%s: %s: Shadow timeout.\n", __func__,
907d4afb5ceSopenharmony_ci				  lws_ss_tag(h));
908d4afb5ceSopenharmony_ci			wsi->mqtt->send_shadow_unsubscribe = 1;
909d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
910d4afb5ceSopenharmony_ci		}
911d4afb5ceSopenharmony_ci		break;
912d4afb5ceSopenharmony_ci
913d4afb5ceSopenharmony_ci	default:
914d4afb5ceSopenharmony_ci		break;
915d4afb5ceSopenharmony_ci	}
916d4afb5ceSopenharmony_ci
917d4afb5ceSopenharmony_ci	return lws_callback_http_dummy(wsi, reason, user, in, len);
918d4afb5ceSopenharmony_ci}
919d4afb5ceSopenharmony_ci
920d4afb5ceSopenharmony_ciconst struct lws_protocols protocol_secstream_mqtt = {
921d4afb5ceSopenharmony_ci	"lws-secstream-mqtt",
922d4afb5ceSopenharmony_ci	secstream_mqtt,
923d4afb5ceSopenharmony_ci	0, 0, 0, NULL, 0
924d4afb5ceSopenharmony_ci};
925d4afb5ceSopenharmony_ci/*
926d4afb5ceSopenharmony_ci * Munge connect info according to protocol-specific considerations... this
927d4afb5ceSopenharmony_ci * usually means interpreting aux in a protocol-specific way and using the
928d4afb5ceSopenharmony_ci * pieces at connection setup time, eg, http url pieces.
929d4afb5ceSopenharmony_ci *
930d4afb5ceSopenharmony_ci * len bytes of buf can be used for things with scope until after the actual
931d4afb5ceSopenharmony_ci * connect.
932d4afb5ceSopenharmony_ci *
933d4afb5ceSopenharmony_ci * For ws, protocol aux is <url path>;<ws subprotocol name>
934d4afb5ceSopenharmony_ci */
935d4afb5ceSopenharmony_ci
936d4afb5ceSopenharmony_cienum {
937d4afb5ceSopenharmony_ci	SSCMM_STRSUB_WILL_TOPIC,
938d4afb5ceSopenharmony_ci	SSCMM_STRSUB_WILL_MESSAGE,
939d4afb5ceSopenharmony_ci	SSCMM_STRSUB_SUBSCRIBE,
940d4afb5ceSopenharmony_ci	SSCMM_STRSUB_TOPIC,
941d4afb5ceSopenharmony_ci	SSCMM_STRSUB_BIRTH_TOPIC,
942d4afb5ceSopenharmony_ci	SSCMM_STRSUB_BIRTH_MESSAGE
943d4afb5ceSopenharmony_ci};
944d4afb5ceSopenharmony_ci
945d4afb5ceSopenharmony_cistatic int
946d4afb5ceSopenharmony_cisecstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
947d4afb5ceSopenharmony_ci			     struct lws_client_connect_info *i,
948d4afb5ceSopenharmony_ci			     union lws_ss_contemp *ct)
949d4afb5ceSopenharmony_ci{
950d4afb5ceSopenharmony_ci	const char *sources[6] = {
951d4afb5ceSopenharmony_ci		/* we're going to string-substitute these before use */
952d4afb5ceSopenharmony_ci		h->policy->u.mqtt.will_topic,
953d4afb5ceSopenharmony_ci		h->policy->u.mqtt.will_message,
954d4afb5ceSopenharmony_ci		h->policy->u.mqtt.subscribe,
955d4afb5ceSopenharmony_ci		h->policy->u.mqtt.topic,
956d4afb5ceSopenharmony_ci		h->policy->u.mqtt.birth_topic,
957d4afb5ceSopenharmony_ci		h->policy->u.mqtt.birth_message
958d4afb5ceSopenharmony_ci	};
959d4afb5ceSopenharmony_ci	size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
960d4afb5ceSopenharmony_ci	lws_strexp_t exp;
961d4afb5ceSopenharmony_ci	char *ps[6];
962d4afb5ceSopenharmony_ci	uint8_t *p = NULL;
963d4afb5ceSopenharmony_ci	int n = -1;
964d4afb5ceSopenharmony_ci	size_t blen;
965d4afb5ceSopenharmony_ci	lws_system_blob_t *b = NULL;
966d4afb5ceSopenharmony_ci
967d4afb5ceSopenharmony_ci	memset(&ct->ccp, 0, sizeof(ct->ccp));
968d4afb5ceSopenharmony_ci	b = lws_system_get_blob(i->context,
969d4afb5ceSopenharmony_ci				LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID, 0);
970d4afb5ceSopenharmony_ci
971d4afb5ceSopenharmony_ci	/* If LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID is set */
972d4afb5ceSopenharmony_ci	if (b && (blen = lws_system_blob_get_size(b))) {
973d4afb5ceSopenharmony_ci		if (blen > LWS_MQTT_MAX_CIDLEN) {
974d4afb5ceSopenharmony_ci			lwsl_err("%s - Client ID too long.\n",
975d4afb5ceSopenharmony_ci				 __func__);
976d4afb5ceSopenharmony_ci			return -1;
977d4afb5ceSopenharmony_ci		}
978d4afb5ceSopenharmony_ci		p = (uint8_t *)lws_zalloc(blen+1, __func__);
979d4afb5ceSopenharmony_ci		if (!p)
980d4afb5ceSopenharmony_ci			return -1;
981d4afb5ceSopenharmony_ci		n = lws_system_blob_get(b, p, &blen, 0);
982d4afb5ceSopenharmony_ci		if (n) {
983d4afb5ceSopenharmony_ci			ct->ccp.client_id = NULL;
984d4afb5ceSopenharmony_ci		} else {
985d4afb5ceSopenharmony_ci			ct->ccp.client_id = (const char *)p;
986d4afb5ceSopenharmony_ci			lwsl_notice("%s - Client ID = %s\n",
987d4afb5ceSopenharmony_ci				    __func__, ct->ccp.client_id);
988d4afb5ceSopenharmony_ci		}
989d4afb5ceSopenharmony_ci	} else {
990d4afb5ceSopenharmony_ci		/* Default (Random) client ID */
991d4afb5ceSopenharmony_ci		ct->ccp.client_id = NULL;
992d4afb5ceSopenharmony_ci	}
993d4afb5ceSopenharmony_ci
994d4afb5ceSopenharmony_ci	b = lws_system_get_blob(i->context,
995d4afb5ceSopenharmony_ci				LWS_SYSBLOB_TYPE_MQTT_USERNAME, 0);
996d4afb5ceSopenharmony_ci
997d4afb5ceSopenharmony_ci	/* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
998d4afb5ceSopenharmony_ci	if (b && (blen = lws_system_blob_get_size(b))) {
999d4afb5ceSopenharmony_ci		p = (uint8_t *)lws_zalloc(blen+1, __func__);
1000d4afb5ceSopenharmony_ci		if (!p)
1001d4afb5ceSopenharmony_ci			return -1;
1002d4afb5ceSopenharmony_ci		n = lws_system_blob_get(b, p, &blen, 0);
1003d4afb5ceSopenharmony_ci		if (n) {
1004d4afb5ceSopenharmony_ci			ct->ccp.username = NULL;
1005d4afb5ceSopenharmony_ci		} else {
1006d4afb5ceSopenharmony_ci			ct->ccp.username = (const char *)p;
1007d4afb5ceSopenharmony_ci			lwsl_notice("%s - Username ID = %s\n",
1008d4afb5ceSopenharmony_ci				    __func__, ct->ccp.username);
1009d4afb5ceSopenharmony_ci		}
1010d4afb5ceSopenharmony_ci	}
1011d4afb5ceSopenharmony_ci
1012d4afb5ceSopenharmony_ci	b = lws_system_get_blob(i->context,
1013d4afb5ceSopenharmony_ci				LWS_SYSBLOB_TYPE_MQTT_PASSWORD, 0);
1014d4afb5ceSopenharmony_ci
1015d4afb5ceSopenharmony_ci	/* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
1016d4afb5ceSopenharmony_ci	if (b && (blen = lws_system_blob_get_size(b))) {
1017d4afb5ceSopenharmony_ci		p = (uint8_t *)lws_zalloc(blen+1, __func__);
1018d4afb5ceSopenharmony_ci		if (!p)
1019d4afb5ceSopenharmony_ci			return -1;
1020d4afb5ceSopenharmony_ci		n = lws_system_blob_get(b, p, &blen, 0);
1021d4afb5ceSopenharmony_ci		if (n) {
1022d4afb5ceSopenharmony_ci			ct->ccp.password = NULL;
1023d4afb5ceSopenharmony_ci		} else {
1024d4afb5ceSopenharmony_ci			ct->ccp.password = (const char *)p;
1025d4afb5ceSopenharmony_ci			lwsl_notice("%s - Password ID = %s\n",
1026d4afb5ceSopenharmony_ci				    __func__, ct->ccp.password);
1027d4afb5ceSopenharmony_ci		}
1028d4afb5ceSopenharmony_ci	}
1029d4afb5ceSopenharmony_ci
1030d4afb5ceSopenharmony_ci	ct->ccp.keep_alive		= h->policy->u.mqtt.keep_alive;
1031d4afb5ceSopenharmony_ci	ct->ccp.clean_start		= (h->policy->u.mqtt.clean_start & 1u);
1032d4afb5ceSopenharmony_ci	ct->ccp.will_param.qos		= h->policy->u.mqtt.will_qos;
1033d4afb5ceSopenharmony_ci	ct->ccp.will_param.retain	= h->policy->u.mqtt.will_retain;
1034d4afb5ceSopenharmony_ci	ct->ccp.birth_param.qos		= h->policy->u.mqtt.birth_qos;
1035d4afb5ceSopenharmony_ci	ct->ccp.birth_param.retain	= h->policy->u.mqtt.birth_retain;
1036d4afb5ceSopenharmony_ci	ct->ccp.aws_iot			= h->policy->u.mqtt.aws_iot;
1037d4afb5ceSopenharmony_ci	h->u.mqtt.topic_qos.qos		= h->policy->u.mqtt.qos;
1038d4afb5ceSopenharmony_ci
1039d4afb5ceSopenharmony_ci	/*
1040d4afb5ceSopenharmony_ci	 * We're going to string-substitute several of these parameters, which
1041d4afb5ceSopenharmony_ci	 * have unknown, possibly large size.  And, as their usage is deferred
1042d4afb5ceSopenharmony_ci	 * inside the asynchronous lifetime of the MQTT connection, they need
1043d4afb5ceSopenharmony_ci	 * to live on the heap.
1044d4afb5ceSopenharmony_ci	 *
1045d4afb5ceSopenharmony_ci	 * Notice these allocations at h->u.mqtt.heap_baggage belong to the
1046d4afb5ceSopenharmony_ci	 * underlying MQTT stream lifetime, not the logical SS lifetime, and
1047d4afb5ceSopenharmony_ci	 * are destroyed if present at connection error or close of the
1048d4afb5ceSopenharmony_ci	 * underlying connection.
1049d4afb5ceSopenharmony_ci	 *
1050d4afb5ceSopenharmony_ci	 *
1051d4afb5ceSopenharmony_ci	 * First, compute the length of each without producing strsubst output,
1052d4afb5ceSopenharmony_ci	 * and keep a running total.
1053d4afb5ceSopenharmony_ci	 */
1054d4afb5ceSopenharmony_ci
1055d4afb5ceSopenharmony_ci	for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
1056d4afb5ceSopenharmony_ci		if (!sources[n])
1057d4afb5ceSopenharmony_ci			continue;
1058d4afb5ceSopenharmony_ci
1059d4afb5ceSopenharmony_ci		lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
1060d4afb5ceSopenharmony_ci				NULL, (size_t)-1);
1061d4afb5ceSopenharmony_ci		if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
1062d4afb5ceSopenharmony_ci				      &used_in, &olen[n]) != LSTRX_DONE) {
1063d4afb5ceSopenharmony_ci			lwsl_err("%s: failed to subsitute %s\n", __func__,
1064d4afb5ceSopenharmony_ci					sources[n]);
1065d4afb5ceSopenharmony_ci			return 1;
1066d4afb5ceSopenharmony_ci		}
1067d4afb5ceSopenharmony_ci		tot += olen[n] + 1;
1068d4afb5ceSopenharmony_ci	}
1069d4afb5ceSopenharmony_ci
1070d4afb5ceSopenharmony_ci	/*
1071d4afb5ceSopenharmony_ci	 * Then, allocate enough space on the heap for the total of the
1072d4afb5ceSopenharmony_ci	 * substituted results
1073d4afb5ceSopenharmony_ci	 */
1074d4afb5ceSopenharmony_ci
1075d4afb5ceSopenharmony_ci	h->u.mqtt.heap_baggage = lws_malloc(tot, __func__);
1076d4afb5ceSopenharmony_ci	if (!h->u.mqtt.heap_baggage)
1077d4afb5ceSopenharmony_ci		return 1;
1078d4afb5ceSopenharmony_ci
1079d4afb5ceSopenharmony_ci	/*
1080d4afb5ceSopenharmony_ci	 * Finally, issue the subsitutions one after the other into the single
1081d4afb5ceSopenharmony_ci	 * allocated result buffer and prepare pointers into them
1082d4afb5ceSopenharmony_ci	 */
1083d4afb5ceSopenharmony_ci
1084d4afb5ceSopenharmony_ci	p = h->u.mqtt.heap_baggage;
1085d4afb5ceSopenharmony_ci	for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
1086d4afb5ceSopenharmony_ci		lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
1087d4afb5ceSopenharmony_ci				(char *)p, (size_t)-1);
1088d4afb5ceSopenharmony_ci		if (!sources[n]) {
1089d4afb5ceSopenharmony_ci			ps[n] = NULL;
1090d4afb5ceSopenharmony_ci			continue;
1091d4afb5ceSopenharmony_ci		}
1092d4afb5ceSopenharmony_ci		ps[n] = (char *)p;
1093d4afb5ceSopenharmony_ci		if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
1094d4afb5ceSopenharmony_ci				      &used_in, &olen[n]) != LSTRX_DONE)
1095d4afb5ceSopenharmony_ci			return 1;
1096d4afb5ceSopenharmony_ci
1097d4afb5ceSopenharmony_ci		p += olen[n] + 1;
1098d4afb5ceSopenharmony_ci	}
1099d4afb5ceSopenharmony_ci
1100d4afb5ceSopenharmony_ci	/*
1101d4afb5ceSopenharmony_ci	 * Point the guys who want the substituted content at the substituted
1102d4afb5ceSopenharmony_ci	 * strings
1103d4afb5ceSopenharmony_ci	 */
1104d4afb5ceSopenharmony_ci
1105d4afb5ceSopenharmony_ci	ct->ccp.will_param.topic	= ps[SSCMM_STRSUB_WILL_TOPIC];
1106d4afb5ceSopenharmony_ci	ct->ccp.will_param.message	= ps[SSCMM_STRSUB_WILL_MESSAGE];
1107d4afb5ceSopenharmony_ci	h->u.mqtt.subscribe_to		= ps[SSCMM_STRSUB_SUBSCRIBE];
1108d4afb5ceSopenharmony_ci	h->u.mqtt.subscribe_to_len	= olen[SSCMM_STRSUB_SUBSCRIBE];
1109d4afb5ceSopenharmony_ci	h->u.mqtt.topic_qos.name	= ps[SSCMM_STRSUB_TOPIC];
1110d4afb5ceSopenharmony_ci	ct->ccp.birth_param.topic	= ps[SSCMM_STRSUB_BIRTH_TOPIC];
1111d4afb5ceSopenharmony_ci	ct->ccp.birth_param.message	= ps[SSCMM_STRSUB_BIRTH_MESSAGE];
1112d4afb5ceSopenharmony_ci
1113d4afb5ceSopenharmony_ci	i->method = "MQTT";
1114d4afb5ceSopenharmony_ci	i->mqtt_cp = &ct->ccp;
1115d4afb5ceSopenharmony_ci
1116d4afb5ceSopenharmony_ci	i->alpn = "x-amzn-mqtt-ca";
1117d4afb5ceSopenharmony_ci
1118d4afb5ceSopenharmony_ci	/* share connections where possible */
1119d4afb5ceSopenharmony_ci	i->ssl_connection |= LCCSCF_PIPELINE;
1120d4afb5ceSopenharmony_ci
1121d4afb5ceSopenharmony_ci	return 0;
1122d4afb5ceSopenharmony_ci}
1123d4afb5ceSopenharmony_ci
1124d4afb5ceSopenharmony_ciconst struct ss_pcols ss_pcol_mqtt = {
1125d4afb5ceSopenharmony_ci	"MQTT",
1126d4afb5ceSopenharmony_ci	"x-amzn-mqtt-ca", //"mqtt/3.1.1",
1127d4afb5ceSopenharmony_ci	&protocol_secstream_mqtt,
1128d4afb5ceSopenharmony_ci	secstream_connect_munge_mqtt,
1129d4afb5ceSopenharmony_ci	NULL, NULL
1130d4afb5ceSopenharmony_ci};
1131