1d4afb5ceSopenharmony_ci/*
2d4afb5ceSopenharmony_ci * lws-minimal-mqtt-client
3d4afb5ceSopenharmony_ci *
4d4afb5ceSopenharmony_ci * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5d4afb5ceSopenharmony_ci *                         Sakthi Kannan <saktr@amazon.com>
6d4afb5ceSopenharmony_ci *
7d4afb5ceSopenharmony_ci * This file is made available under the Creative Commons CC0 1.0
8d4afb5ceSopenharmony_ci * Universal Public Domain Dedication.
9d4afb5ceSopenharmony_ci */
10d4afb5ceSopenharmony_ci
11d4afb5ceSopenharmony_ci#include <libwebsockets.h>
12d4afb5ceSopenharmony_ci#include <string.h>
13d4afb5ceSopenharmony_ci#include <signal.h>
14d4afb5ceSopenharmony_ci#if defined(WIN32)
15d4afb5ceSopenharmony_ci#define HAVE_STRUCT_TIMESPEC
16d4afb5ceSopenharmony_ci#if defined(pid_t)
17d4afb5ceSopenharmony_ci#undef pid_t
18d4afb5ceSopenharmony_ci#endif
19d4afb5ceSopenharmony_ci#endif
20d4afb5ceSopenharmony_ci#include <pthread.h>
21d4afb5ceSopenharmony_ci#include <assert.h>
22d4afb5ceSopenharmony_ci
23d4afb5ceSopenharmony_ci#define COUNT 8
24d4afb5ceSopenharmony_ci
25d4afb5ceSopenharmony_cistruct test_item {
26d4afb5ceSopenharmony_ci	struct lws_context		*context;
27d4afb5ceSopenharmony_ci	struct lws			*wsi;
28d4afb5ceSopenharmony_ci	lws_sorted_usec_list_t		sul;
29d4afb5ceSopenharmony_ci} items[COUNT];
30d4afb5ceSopenharmony_ci
31d4afb5ceSopenharmony_cienum {
32d4afb5ceSopenharmony_ci	STATE_SUBSCRIBE,	/* subscribe to the topic */
33d4afb5ceSopenharmony_ci	STATE_WAIT_SUBACK,
34d4afb5ceSopenharmony_ci	STATE_PUBLISH_QOS0,	/* Send the message in QoS0 */
35d4afb5ceSopenharmony_ci	STATE_WAIT_ACK0,	/* Wait for the synthetic "ack" */
36d4afb5ceSopenharmony_ci	STATE_PUBLISH_QOS1,	/* Send the message in QoS1 */
37d4afb5ceSopenharmony_ci	STATE_WAIT_ACK1,	/* Wait for the real ack (or timeout + retry) */
38d4afb5ceSopenharmony_ci	STATE_UNSUBSCRIBE,
39d4afb5ceSopenharmony_ci	STATE_WAIT_UNSUBACK,
40d4afb5ceSopenharmony_ci
41d4afb5ceSopenharmony_ci	STATE_TEST_FINISH
42d4afb5ceSopenharmony_ci};
43d4afb5ceSopenharmony_ci
44d4afb5ceSopenharmony_cistatic int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
45d4afb5ceSopenharmony_ci	   done, count = COUNT;
46d4afb5ceSopenharmony_ci
47d4afb5ceSopenharmony_cistatic const lws_retry_bo_t retry = {
48d4afb5ceSopenharmony_ci	.secs_since_valid_ping		= 20, /* if idle, PINGREQ after secs */
49d4afb5ceSopenharmony_ci	.secs_since_valid_hangup	= 25, /* hangup if still idle secs */
50d4afb5ceSopenharmony_ci};
51d4afb5ceSopenharmony_ci
52d4afb5ceSopenharmony_cistatic const lws_mqtt_client_connect_param_t client_connect_param = {
53d4afb5ceSopenharmony_ci	.client_id			= NULL,
54d4afb5ceSopenharmony_ci	.keep_alive			= 60,
55d4afb5ceSopenharmony_ci	.clean_start			= 1,
56d4afb5ceSopenharmony_ci	.client_id_nofree		= 1,
57d4afb5ceSopenharmony_ci	.username_nofree		= 1,
58d4afb5ceSopenharmony_ci	.password_nofree		= 1,
59d4afb5ceSopenharmony_ci	.will_param = {
60d4afb5ceSopenharmony_ci		.topic			= "good/bye",
61d4afb5ceSopenharmony_ci		.message		= "sign-off",
62d4afb5ceSopenharmony_ci		.qos			= 0,
63d4afb5ceSopenharmony_ci		.retain			= 0,
64d4afb5ceSopenharmony_ci	},
65d4afb5ceSopenharmony_ci	.username			= "lwsUser",
66d4afb5ceSopenharmony_ci	.password			= "mySecretPassword",
67d4afb5ceSopenharmony_ci};
68d4afb5ceSopenharmony_ci
69d4afb5ceSopenharmony_cistatic lws_mqtt_topic_elem_t topics[] = {
70d4afb5ceSopenharmony_ci	[0] = { .name = "test/topic0", .qos = QOS0 },
71d4afb5ceSopenharmony_ci	[1] = { .name = "test/topic1", .qos = QOS1 },
72d4afb5ceSopenharmony_ci};
73d4afb5ceSopenharmony_ci
74d4afb5ceSopenharmony_cistatic lws_mqtt_subscribe_param_t sub_param = {
75d4afb5ceSopenharmony_ci	.topic				= &topics[0],
76d4afb5ceSopenharmony_ci	.num_topics			= LWS_ARRAY_SIZE(topics),
77d4afb5ceSopenharmony_ci};
78d4afb5ceSopenharmony_ci
79d4afb5ceSopenharmony_cistatic const char * const test_string =
80d4afb5ceSopenharmony_ci	"No one would have believed in the last years of the nineteenth "
81d4afb5ceSopenharmony_ci	"century that this world was being watched keenly and closely by "
82d4afb5ceSopenharmony_ci	"intelligences greater than man's and yet as mortal as his own; that as "
83d4afb5ceSopenharmony_ci	"men busied themselves about their various concerns they were "
84d4afb5ceSopenharmony_ci	"scrutinised and studied, perhaps almost as narrowly as a man with a "
85d4afb5ceSopenharmony_ci	"microscope might scrutinise the transient creatures that swarm and "
86d4afb5ceSopenharmony_ci	"multiply in a drop of water.  With infinite complacency men went to "
87d4afb5ceSopenharmony_ci	"and fro over this globe about their little affairs, serene in their "
88d4afb5ceSopenharmony_ci	"assurance of their empire over matter. It is possible that the "
89d4afb5ceSopenharmony_ci	"infusoria under the microscope do the same.  No one gave a thought to "
90d4afb5ceSopenharmony_ci	"the older worlds of space as sources of human danger, or thought of "
91d4afb5ceSopenharmony_ci	"them only to dismiss the idea of life upon them as impossible or "
92d4afb5ceSopenharmony_ci	"improbable.  It is curious to recall some of the mental habits of "
93d4afb5ceSopenharmony_ci	"those departed days.  At most terrestrial men fancied there might be "
94d4afb5ceSopenharmony_ci	"other men upon Mars, perhaps inferior to themselves and ready to "
95d4afb5ceSopenharmony_ci	"welcome a missionary enterprise. Yet across the gulf of space, minds "
96d4afb5ceSopenharmony_ci	"that are to our minds as ours are to those of the beasts that perish, "
97d4afb5ceSopenharmony_ci	"intellects vast and cool and unsympathetic, regarded this earth with "
98d4afb5ceSopenharmony_ci	"envious eyes, and slowly and surely drew their plans against us.  And "
99d4afb5ceSopenharmony_ci	"early in the twentieth century came the great disillusionment. ";
100d4afb5ceSopenharmony_ci
101d4afb5ceSopenharmony_ci/* this reflects the length of the string above */
102d4afb5ceSopenharmony_ci#define TEST_STRING_LEN 1337
103d4afb5ceSopenharmony_ci
104d4afb5ceSopenharmony_cistruct pss {
105d4afb5ceSopenharmony_ci	lws_mqtt_publish_param_t	pub_param;
106d4afb5ceSopenharmony_ci	int				state;
107d4afb5ceSopenharmony_ci	size_t				pos;
108d4afb5ceSopenharmony_ci	int				retries;
109d4afb5ceSopenharmony_ci};
110d4afb5ceSopenharmony_ci
111d4afb5ceSopenharmony_cistatic void
112d4afb5ceSopenharmony_cisigint_handler(int sig)
113d4afb5ceSopenharmony_ci{
114d4afb5ceSopenharmony_ci	interrupted = 1;
115d4afb5ceSopenharmony_ci}
116d4afb5ceSopenharmony_ci
117d4afb5ceSopenharmony_cistatic int
118d4afb5ceSopenharmony_ciconnect_client(struct lws_context *context, struct test_item *item)
119d4afb5ceSopenharmony_ci{
120d4afb5ceSopenharmony_ci	struct lws_client_connect_info i;
121d4afb5ceSopenharmony_ci
122d4afb5ceSopenharmony_ci	memset(&i, 0, sizeof i);
123d4afb5ceSopenharmony_ci
124d4afb5ceSopenharmony_ci	i.mqtt_cp = &client_connect_param;
125d4afb5ceSopenharmony_ci	i.opaque_user_data = item;
126d4afb5ceSopenharmony_ci	i.protocol = "test-mqtt";
127d4afb5ceSopenharmony_ci	i.address = "localhost";
128d4afb5ceSopenharmony_ci	i.host = "localhost";
129d4afb5ceSopenharmony_ci	i.pwsi = &item->wsi;
130d4afb5ceSopenharmony_ci	i.context = context;
131d4afb5ceSopenharmony_ci	i.method = "MQTT";
132d4afb5ceSopenharmony_ci	i.alpn = "mqtt";
133d4afb5ceSopenharmony_ci	i.port = 1883;
134d4afb5ceSopenharmony_ci
135d4afb5ceSopenharmony_ci	if (do_ssl) {
136d4afb5ceSopenharmony_ci		i.ssl_connection = LCCSCF_USE_SSL;
137d4afb5ceSopenharmony_ci		i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
138d4afb5ceSopenharmony_ci		i.port = 8883;
139d4afb5ceSopenharmony_ci	}
140d4afb5ceSopenharmony_ci
141d4afb5ceSopenharmony_ci	if (pipeline)
142d4afb5ceSopenharmony_ci		i.ssl_connection |= LCCSCF_PIPELINE;
143d4afb5ceSopenharmony_ci
144d4afb5ceSopenharmony_ci	if (!lws_client_connect_via_info(&i)) {
145d4afb5ceSopenharmony_ci		lwsl_err("%s: Client Connect Failed\n", __func__);
146d4afb5ceSopenharmony_ci
147d4afb5ceSopenharmony_ci		return 1;
148d4afb5ceSopenharmony_ci	}
149d4afb5ceSopenharmony_ci
150d4afb5ceSopenharmony_ci	return 0;
151d4afb5ceSopenharmony_ci}
152d4afb5ceSopenharmony_ci
153d4afb5ceSopenharmony_cistatic void
154d4afb5ceSopenharmony_cistart_conn(struct lws_sorted_usec_list *sul)
155d4afb5ceSopenharmony_ci{
156d4afb5ceSopenharmony_ci	struct test_item *item = lws_container_of(sul, struct test_item, sul);
157d4afb5ceSopenharmony_ci
158d4afb5ceSopenharmony_ci	lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
159d4afb5ceSopenharmony_ci
160d4afb5ceSopenharmony_ci	if (connect_client(item->context, item))
161d4afb5ceSopenharmony_ci		interrupted = 1;
162d4afb5ceSopenharmony_ci}
163d4afb5ceSopenharmony_ci
164d4afb5ceSopenharmony_ci
165d4afb5ceSopenharmony_cistatic int
166d4afb5ceSopenharmony_cisystem_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
167d4afb5ceSopenharmony_ci		 int current, int target)
168d4afb5ceSopenharmony_ci{
169d4afb5ceSopenharmony_ci	struct lws_context *context = mgr->parent;
170d4afb5ceSopenharmony_ci	int n;
171d4afb5ceSopenharmony_ci
172d4afb5ceSopenharmony_ci	if (current != LWS_SYSTATE_OPERATIONAL ||
173d4afb5ceSopenharmony_ci	    target != LWS_SYSTATE_OPERATIONAL)
174d4afb5ceSopenharmony_ci		return 0;
175d4afb5ceSopenharmony_ci
176d4afb5ceSopenharmony_ci	/*
177d4afb5ceSopenharmony_ci	* We delay trying to do the client connection until the protocols have
178d4afb5ceSopenharmony_ci	* been initialized for each vhost... this happens after we have network
179d4afb5ceSopenharmony_ci	* and time so we can judge tls cert validity.
180d4afb5ceSopenharmony_ci	*
181d4afb5ceSopenharmony_ci	* Stagger the connection attempts so we get some joining before the
182d4afb5ceSopenharmony_ci	* first has connected and some afterwards
183d4afb5ceSopenharmony_ci	*/
184d4afb5ceSopenharmony_ci
185d4afb5ceSopenharmony_ci	for (n = 0; n < count; n++) {
186d4afb5ceSopenharmony_ci		items[n].context = context;
187d4afb5ceSopenharmony_ci		lws_sul_schedule(context, 0, &items[n].sul, start_conn,
188d4afb5ceSopenharmony_ci				 n * stagger_us);
189d4afb5ceSopenharmony_ci	}
190d4afb5ceSopenharmony_ci
191d4afb5ceSopenharmony_ci	return 0;
192d4afb5ceSopenharmony_ci}
193d4afb5ceSopenharmony_ci
194d4afb5ceSopenharmony_ci
195d4afb5ceSopenharmony_cistatic int
196d4afb5ceSopenharmony_cicallback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
197d4afb5ceSopenharmony_ci	      void *user, void *in, size_t len)
198d4afb5ceSopenharmony_ci{
199d4afb5ceSopenharmony_ci	struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
200d4afb5ceSopenharmony_ci	struct pss *pss = (struct pss *)user;
201d4afb5ceSopenharmony_ci	lws_mqtt_publish_param_t *pub;
202d4afb5ceSopenharmony_ci	size_t chunk;
203d4afb5ceSopenharmony_ci
204d4afb5ceSopenharmony_ci	switch (reason) {
205d4afb5ceSopenharmony_ci	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
206d4afb5ceSopenharmony_ci		lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
207d4afb5ceSopenharmony_ci			 in ? (char *)in : "(null)");
208d4afb5ceSopenharmony_ci
209d4afb5ceSopenharmony_ci		if (++done == count)
210d4afb5ceSopenharmony_ci			goto finish_test;
211d4afb5ceSopenharmony_ci		break;
212d4afb5ceSopenharmony_ci
213d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
214d4afb5ceSopenharmony_ci		lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
215d4afb5ceSopenharmony_ci
216d4afb5ceSopenharmony_ci		if (++done == count)
217d4afb5ceSopenharmony_ci			goto finish_test;
218d4afb5ceSopenharmony_ci		break;
219d4afb5ceSopenharmony_ci
220d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
221d4afb5ceSopenharmony_ci		lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
222d4afb5ceSopenharmony_ci		lws_callback_on_writable(wsi);
223d4afb5ceSopenharmony_ci
224d4afb5ceSopenharmony_ci		return 0;
225d4afb5ceSopenharmony_ci
226d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_SUBSCRIBED:
227d4afb5ceSopenharmony_ci		lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
228d4afb5ceSopenharmony_ci
229d4afb5ceSopenharmony_ci		/* then we can get on with the actual test part */
230d4afb5ceSopenharmony_ci
231d4afb5ceSopenharmony_ci		pss->state++;
232d4afb5ceSopenharmony_ci		lws_callback_on_writable(wsi);
233d4afb5ceSopenharmony_ci		break;
234d4afb5ceSopenharmony_ci
235d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
236d4afb5ceSopenharmony_ci		lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
237d4afb5ceSopenharmony_ci			  __func__, (int)(item - &item[0]), wsi);
238d4afb5ceSopenharmony_ci		okay++;
239d4afb5ceSopenharmony_ci
240d4afb5ceSopenharmony_ci		if (++pss->state == STATE_TEST_FINISH) {
241d4afb5ceSopenharmony_ci			lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
242d4afb5ceSopenharmony_ci				    __func__, (int)(item - &items[0]), okay, count);
243d4afb5ceSopenharmony_ci			/* We are done, request to close */
244d4afb5ceSopenharmony_ci			return -1;
245d4afb5ceSopenharmony_ci		}
246d4afb5ceSopenharmony_ci		break;
247d4afb5ceSopenharmony_ci
248d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
249d4afb5ceSopenharmony_ci
250d4afb5ceSopenharmony_ci		/*
251d4afb5ceSopenharmony_ci		 * Extra WRITEABLE may appear here other than ones we asked
252d4afb5ceSopenharmony_ci		 * for, so we must consult our own state to decide if we want
253d4afb5ceSopenharmony_ci		 * to make use of the opportunity
254d4afb5ceSopenharmony_ci		 */
255d4afb5ceSopenharmony_ci
256d4afb5ceSopenharmony_ci		switch (pss->state) {
257d4afb5ceSopenharmony_ci		case STATE_SUBSCRIBE:
258d4afb5ceSopenharmony_ci			lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
259d4afb5ceSopenharmony_ci
260d4afb5ceSopenharmony_ci			if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
261d4afb5ceSopenharmony_ci				lwsl_notice("%s: subscribe failed\n", __func__);
262d4afb5ceSopenharmony_ci
263d4afb5ceSopenharmony_ci				return -1;
264d4afb5ceSopenharmony_ci			}
265d4afb5ceSopenharmony_ci			pss->state++;
266d4afb5ceSopenharmony_ci			break;
267d4afb5ceSopenharmony_ci
268d4afb5ceSopenharmony_ci		case STATE_PUBLISH_QOS0:
269d4afb5ceSopenharmony_ci		case STATE_PUBLISH_QOS1:
270d4afb5ceSopenharmony_ci
271d4afb5ceSopenharmony_ci			lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
272d4afb5ceSopenharmony_ci
273d4afb5ceSopenharmony_ci			pss->pub_param.topic	= pss->state == STATE_PUBLISH_QOS0 ?
274d4afb5ceSopenharmony_ci						"test/topic0" : "test/topic1";
275d4afb5ceSopenharmony_ci			pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
276d4afb5ceSopenharmony_ci			pss->pub_param.qos =
277d4afb5ceSopenharmony_ci				pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
278d4afb5ceSopenharmony_ci			pss->pub_param.payload_len = TEST_STRING_LEN;
279d4afb5ceSopenharmony_ci
280d4afb5ceSopenharmony_ci			/* We send the message out 300 bytes or less at at time */
281d4afb5ceSopenharmony_ci
282d4afb5ceSopenharmony_ci			chunk = 300;
283d4afb5ceSopenharmony_ci
284d4afb5ceSopenharmony_ci			if (chunk > TEST_STRING_LEN - pss->pos)
285d4afb5ceSopenharmony_ci				chunk = TEST_STRING_LEN - pss->pos;
286d4afb5ceSopenharmony_ci
287d4afb5ceSopenharmony_ci			lwsl_notice("%s: sending %d at +%d\n", __func__,
288d4afb5ceSopenharmony_ci					(int)chunk, (int)pss->pos);
289d4afb5ceSopenharmony_ci
290d4afb5ceSopenharmony_ci			if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
291d4afb5ceSopenharmony_ci					test_string + pss->pos, (uint32_t)chunk,
292d4afb5ceSopenharmony_ci					(pss->pos + chunk == TEST_STRING_LEN))) {
293d4afb5ceSopenharmony_ci				lwsl_notice("%s: publish failed\n", __func__);
294d4afb5ceSopenharmony_ci				return -1;
295d4afb5ceSopenharmony_ci			}
296d4afb5ceSopenharmony_ci
297d4afb5ceSopenharmony_ci			pss->pos += chunk;
298d4afb5ceSopenharmony_ci
299d4afb5ceSopenharmony_ci			if (pss->pos == TEST_STRING_LEN) {
300d4afb5ceSopenharmony_ci				lwsl_debug("%s: sent message\n", __func__);
301d4afb5ceSopenharmony_ci				pss->pos = 0;
302d4afb5ceSopenharmony_ci				pss->state++;
303d4afb5ceSopenharmony_ci			}
304d4afb5ceSopenharmony_ci			break;
305d4afb5ceSopenharmony_ci
306d4afb5ceSopenharmony_ci		case STATE_UNSUBSCRIBE:
307d4afb5ceSopenharmony_ci			lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
308d4afb5ceSopenharmony_ci				  __func__, (int)(item - &item[0]), wsi);
309d4afb5ceSopenharmony_ci			pss->state++;
310d4afb5ceSopenharmony_ci			if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
311d4afb5ceSopenharmony_ci				lwsl_notice("%s: subscribe failed\n", __func__);
312d4afb5ceSopenharmony_ci				return -1;
313d4afb5ceSopenharmony_ci			}
314d4afb5ceSopenharmony_ci			break;
315d4afb5ceSopenharmony_ci		default:
316d4afb5ceSopenharmony_ci			break;
317d4afb5ceSopenharmony_ci		}
318d4afb5ceSopenharmony_ci
319d4afb5ceSopenharmony_ci		return 0;
320d4afb5ceSopenharmony_ci
321d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_ACK:
322d4afb5ceSopenharmony_ci		lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
323d4afb5ceSopenharmony_ci		/*
324d4afb5ceSopenharmony_ci		 * We can forget about the message we just sent, it's done.
325d4afb5ceSopenharmony_ci		 *
326d4afb5ceSopenharmony_ci		 * For our test, that's the indication we can close the wsi.
327d4afb5ceSopenharmony_ci		 */
328d4afb5ceSopenharmony_ci
329d4afb5ceSopenharmony_ci		pss->state++;
330d4afb5ceSopenharmony_ci		if (pss->state != STATE_TEST_FINISH) {
331d4afb5ceSopenharmony_ci			lws_callback_on_writable(wsi);
332d4afb5ceSopenharmony_ci			break;
333d4afb5ceSopenharmony_ci		}
334d4afb5ceSopenharmony_ci
335d4afb5ceSopenharmony_ci		break;
336d4afb5ceSopenharmony_ci
337d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_RESEND:
338d4afb5ceSopenharmony_ci		lwsl_user("%s: MQTT_RESEND\n", __func__);
339d4afb5ceSopenharmony_ci		/*
340d4afb5ceSopenharmony_ci		 * We must resend the packet ID mentioned in len
341d4afb5ceSopenharmony_ci		 */
342d4afb5ceSopenharmony_ci		if (++pss->retries == 3) {
343d4afb5ceSopenharmony_ci			lwsl_notice("%s: too many retries\n", __func__);
344d4afb5ceSopenharmony_ci			return 1; /* kill the connection */
345d4afb5ceSopenharmony_ci		}
346d4afb5ceSopenharmony_ci		pss->state--;
347d4afb5ceSopenharmony_ci		pss->pos = 0;
348d4afb5ceSopenharmony_ci		break;
349d4afb5ceSopenharmony_ci
350d4afb5ceSopenharmony_ci	case LWS_CALLBACK_MQTT_CLIENT_RX:
351d4afb5ceSopenharmony_ci		pub = (lws_mqtt_publish_param_t *)in;
352d4afb5ceSopenharmony_ci		assert(pub);
353d4afb5ceSopenharmony_ci		lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
354d4afb5ceSopenharmony_ci			  (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
355d4afb5ceSopenharmony_ci			  (int)pub->payload_len, (int)len);
356d4afb5ceSopenharmony_ci
357d4afb5ceSopenharmony_ci		//lwsl_hexdump_info(pub->payload, len);
358d4afb5ceSopenharmony_ci
359d4afb5ceSopenharmony_ci		return 0;
360d4afb5ceSopenharmony_ci
361d4afb5ceSopenharmony_ci	default:
362d4afb5ceSopenharmony_ci		break;
363d4afb5ceSopenharmony_ci	}
364d4afb5ceSopenharmony_ci
365d4afb5ceSopenharmony_ci	return 0;
366d4afb5ceSopenharmony_ci
367d4afb5ceSopenharmony_cifinish_test:
368d4afb5ceSopenharmony_ci	interrupted = 1;
369d4afb5ceSopenharmony_ci	lws_cancel_service(lws_get_context(wsi));
370d4afb5ceSopenharmony_ci
371d4afb5ceSopenharmony_ci	return 0;
372d4afb5ceSopenharmony_ci}
373d4afb5ceSopenharmony_ci
374d4afb5ceSopenharmony_cistatic const struct lws_protocols protocols[] = {
375d4afb5ceSopenharmony_ci	{
376d4afb5ceSopenharmony_ci		.name			= "test-mqtt",
377d4afb5ceSopenharmony_ci		.callback		= callback_mqtt,
378d4afb5ceSopenharmony_ci		.per_session_data_size	= sizeof(struct pss)
379d4afb5ceSopenharmony_ci	},
380d4afb5ceSopenharmony_ci	LWS_PROTOCOL_LIST_TERM
381d4afb5ceSopenharmony_ci};
382d4afb5ceSopenharmony_ci
383d4afb5ceSopenharmony_ciint main(int argc, const char **argv)
384d4afb5ceSopenharmony_ci{
385d4afb5ceSopenharmony_ci	lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
386d4afb5ceSopenharmony_ci					     system_notify_cb, "app" };
387d4afb5ceSopenharmony_ci	lws_state_notify_link_t *na[] = { &notifier, NULL };
388d4afb5ceSopenharmony_ci	struct lws_context_creation_info info;
389d4afb5ceSopenharmony_ci	struct lws_context *context;
390d4afb5ceSopenharmony_ci	const char *p;
391d4afb5ceSopenharmony_ci	int n = 0;
392d4afb5ceSopenharmony_ci
393d4afb5ceSopenharmony_ci	signal(SIGINT, sigint_handler);
394d4afb5ceSopenharmony_ci	memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
395d4afb5ceSopenharmony_ci	lws_cmdline_option_handle_builtin(argc, argv, &info);
396d4afb5ceSopenharmony_ci
397d4afb5ceSopenharmony_ci	do_ssl = !!lws_cmdline_option(argc, argv, "-s");
398d4afb5ceSopenharmony_ci	if (do_ssl)
399d4afb5ceSopenharmony_ci		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
400d4afb5ceSopenharmony_ci
401d4afb5ceSopenharmony_ci	if (lws_cmdline_option(argc, argv, "-p"))
402d4afb5ceSopenharmony_ci		pipeline = 1;
403d4afb5ceSopenharmony_ci
404d4afb5ceSopenharmony_ci	if ((p = lws_cmdline_option(argc, argv, "-i")))
405d4afb5ceSopenharmony_ci		stagger_us = atoi(p);
406d4afb5ceSopenharmony_ci
407d4afb5ceSopenharmony_ci	if ((p = lws_cmdline_option(argc, argv, "-c")))
408d4afb5ceSopenharmony_ci		count = atoi(p);
409d4afb5ceSopenharmony_ci
410d4afb5ceSopenharmony_ci	if (count > COUNT) {
411d4afb5ceSopenharmony_ci		count = COUNT;
412d4afb5ceSopenharmony_ci		lwsl_err("%s: clipped count at max %d\n", __func__, count);
413d4afb5ceSopenharmony_ci	}
414d4afb5ceSopenharmony_ci
415d4afb5ceSopenharmony_ci	lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
416d4afb5ceSopenharmony_ci			do_ssl ? "tls enabled": "unencrypted");
417d4afb5ceSopenharmony_ci
418d4afb5ceSopenharmony_ci	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
419d4afb5ceSopenharmony_ci	info.protocols = protocols;
420d4afb5ceSopenharmony_ci	info.register_notifier_list = na;
421d4afb5ceSopenharmony_ci	info.fd_limit_per_thread = 1 + COUNT + 1;
422d4afb5ceSopenharmony_ci	info.retry_and_idle_policy = &retry;
423d4afb5ceSopenharmony_ci
424d4afb5ceSopenharmony_ci#if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
425d4afb5ceSopenharmony_ci	/*
426d4afb5ceSopenharmony_ci	 * OpenSSL uses the system trust store.  mbedTLS has to be told which
427d4afb5ceSopenharmony_ci	 * CA to trust explicitly.
428d4afb5ceSopenharmony_ci	 */
429d4afb5ceSopenharmony_ci	info.client_ssl_ca_filepath = "./mosq-ca.crt";
430d4afb5ceSopenharmony_ci#endif
431d4afb5ceSopenharmony_ci
432d4afb5ceSopenharmony_ci	context = lws_create_context(&info);
433d4afb5ceSopenharmony_ci	if (!context) {
434d4afb5ceSopenharmony_ci		lwsl_err("lws init failed\n");
435d4afb5ceSopenharmony_ci		return 1;
436d4afb5ceSopenharmony_ci	}
437d4afb5ceSopenharmony_ci
438d4afb5ceSopenharmony_ci	/* Event loop */
439d4afb5ceSopenharmony_ci	while (n >= 0 && !interrupted)
440d4afb5ceSopenharmony_ci		n = lws_service(context, 0);
441d4afb5ceSopenharmony_ci
442d4afb5ceSopenharmony_ci	lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
443d4afb5ceSopenharmony_ci			okay != count ? "failed" : "OK");
444d4afb5ceSopenharmony_ci	lws_context_destroy(context);
445d4afb5ceSopenharmony_ci
446d4afb5ceSopenharmony_ci	return okay != count;
447d4afb5ceSopenharmony_ci}
448