1/*
2 * lws-minimal-mqtt-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 *                         Sakthi Kannan <saktr@amazon.com>
6 *
7 * This file is made available under the Creative Commons CC0 1.0
8 * Universal Public Domain Dedication.
9 */
10
11#include <libwebsockets.h>
12#include <string.h>
13#include <signal.h>
14#if defined(WIN32)
15#define HAVE_STRUCT_TIMESPEC
16#if defined(pid_t)
17#undef pid_t
18#endif
19#endif
20#include <pthread.h>
21#include <assert.h>
22
23#define COUNT 8
24
25struct test_item {
26	struct lws_context		*context;
27	struct lws			*wsi;
28	lws_sorted_usec_list_t		sul;
29} items[COUNT];
30
31enum {
32	STATE_SUBSCRIBE,	/* subscribe to the topic */
33	STATE_WAIT_SUBACK,
34	STATE_PUBLISH_QOS0,	/* Send the message in QoS0 */
35	STATE_WAIT_ACK0,	/* Wait for the synthetic "ack" */
36	STATE_PUBLISH_QOS1,	/* Send the message in QoS1 */
37	STATE_WAIT_ACK1,	/* Wait for the real ack (or timeout + retry) */
38	STATE_UNSUBSCRIBE,
39	STATE_WAIT_UNSUBACK,
40
41	STATE_TEST_FINISH
42};
43
44static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
45	   done, count = COUNT;
46
47static const lws_retry_bo_t retry = {
48	.secs_since_valid_ping		= 20, /* if idle, PINGREQ after secs */
49	.secs_since_valid_hangup	= 25, /* hangup if still idle secs */
50};
51
52static const lws_mqtt_client_connect_param_t client_connect_param = {
53	.client_id			= NULL,
54	.keep_alive			= 60,
55	.clean_start			= 1,
56	.client_id_nofree		= 1,
57	.username_nofree		= 1,
58	.password_nofree		= 1,
59	.will_param = {
60		.topic			= "good/bye",
61		.message		= "sign-off",
62		.qos			= 0,
63		.retain			= 0,
64	},
65	.username			= "lwsUser",
66	.password			= "mySecretPassword",
67};
68
69static lws_mqtt_topic_elem_t topics[] = {
70	[0] = { .name = "test/topic0", .qos = QOS0 },
71	[1] = { .name = "test/topic1", .qos = QOS1 },
72};
73
74static lws_mqtt_subscribe_param_t sub_param = {
75	.topic				= &topics[0],
76	.num_topics			= LWS_ARRAY_SIZE(topics),
77};
78
79static const char * const test_string =
80	"No one would have believed in the last years of the nineteenth "
81	"century that this world was being watched keenly and closely by "
82	"intelligences greater than man's and yet as mortal as his own; that as "
83	"men busied themselves about their various concerns they were "
84	"scrutinised and studied, perhaps almost as narrowly as a man with a "
85	"microscope might scrutinise the transient creatures that swarm and "
86	"multiply in a drop of water.  With infinite complacency men went to "
87	"and fro over this globe about their little affairs, serene in their "
88	"assurance of their empire over matter. It is possible that the "
89	"infusoria under the microscope do the same.  No one gave a thought to "
90	"the older worlds of space as sources of human danger, or thought of "
91	"them only to dismiss the idea of life upon them as impossible or "
92	"improbable.  It is curious to recall some of the mental habits of "
93	"those departed days.  At most terrestrial men fancied there might be "
94	"other men upon Mars, perhaps inferior to themselves and ready to "
95	"welcome a missionary enterprise. Yet across the gulf of space, minds "
96	"that are to our minds as ours are to those of the beasts that perish, "
97	"intellects vast and cool and unsympathetic, regarded this earth with "
98	"envious eyes, and slowly and surely drew their plans against us.  And "
99	"early in the twentieth century came the great disillusionment. ";
100
101/* this reflects the length of the string above */
102#define TEST_STRING_LEN 1337
103
104struct pss {
105	lws_mqtt_publish_param_t	pub_param;
106	int				state;
107	size_t				pos;
108	int				retries;
109};
110
111static void
112sigint_handler(int sig)
113{
114	interrupted = 1;
115}
116
117static int
118connect_client(struct lws_context *context, struct test_item *item)
119{
120	struct lws_client_connect_info i;
121
122	memset(&i, 0, sizeof i);
123
124	i.mqtt_cp = &client_connect_param;
125	i.opaque_user_data = item;
126	i.protocol = "test-mqtt";
127	i.address = "localhost";
128	i.host = "localhost";
129	i.pwsi = &item->wsi;
130	i.context = context;
131	i.method = "MQTT";
132	i.alpn = "mqtt";
133	i.port = 1883;
134
135	if (do_ssl) {
136		i.ssl_connection = LCCSCF_USE_SSL;
137		i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
138		i.port = 8883;
139	}
140
141	if (pipeline)
142		i.ssl_connection |= LCCSCF_PIPELINE;
143
144	if (!lws_client_connect_via_info(&i)) {
145		lwsl_err("%s: Client Connect Failed\n", __func__);
146
147		return 1;
148	}
149
150	return 0;
151}
152
153static void
154start_conn(struct lws_sorted_usec_list *sul)
155{
156	struct test_item *item = lws_container_of(sul, struct test_item, sul);
157
158	lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
159
160	if (connect_client(item->context, item))
161		interrupted = 1;
162}
163
164
165static int
166system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
167		 int current, int target)
168{
169	struct lws_context *context = mgr->parent;
170	int n;
171
172	if (current != LWS_SYSTATE_OPERATIONAL ||
173	    target != LWS_SYSTATE_OPERATIONAL)
174		return 0;
175
176	/*
177	* We delay trying to do the client connection until the protocols have
178	* been initialized for each vhost... this happens after we have network
179	* and time so we can judge tls cert validity.
180	*
181	* Stagger the connection attempts so we get some joining before the
182	* first has connected and some afterwards
183	*/
184
185	for (n = 0; n < count; n++) {
186		items[n].context = context;
187		lws_sul_schedule(context, 0, &items[n].sul, start_conn,
188				 n * stagger_us);
189	}
190
191	return 0;
192}
193
194
195static int
196callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
197	      void *user, void *in, size_t len)
198{
199	struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
200	struct pss *pss = (struct pss *)user;
201	lws_mqtt_publish_param_t *pub;
202	size_t chunk;
203
204	switch (reason) {
205	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
206		lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
207			 in ? (char *)in : "(null)");
208
209		if (++done == count)
210			goto finish_test;
211		break;
212
213	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
214		lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
215
216		if (++done == count)
217			goto finish_test;
218		break;
219
220	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
221		lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
222		lws_callback_on_writable(wsi);
223
224		return 0;
225
226	case LWS_CALLBACK_MQTT_SUBSCRIBED:
227		lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
228
229		/* then we can get on with the actual test part */
230
231		pss->state++;
232		lws_callback_on_writable(wsi);
233		break;
234
235	case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
236		lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
237			  __func__, (int)(item - &item[0]), wsi);
238		okay++;
239
240		if (++pss->state == STATE_TEST_FINISH) {
241			lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
242				    __func__, (int)(item - &items[0]), okay, count);
243			/* We are done, request to close */
244			return -1;
245		}
246		break;
247
248	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
249
250		/*
251		 * Extra WRITEABLE may appear here other than ones we asked
252		 * for, so we must consult our own state to decide if we want
253		 * to make use of the opportunity
254		 */
255
256		switch (pss->state) {
257		case STATE_SUBSCRIBE:
258			lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
259
260			if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
261				lwsl_notice("%s: subscribe failed\n", __func__);
262
263				return -1;
264			}
265			pss->state++;
266			break;
267
268		case STATE_PUBLISH_QOS0:
269		case STATE_PUBLISH_QOS1:
270
271			lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
272
273			pss->pub_param.topic	= pss->state == STATE_PUBLISH_QOS0 ?
274						"test/topic0" : "test/topic1";
275			pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
276			pss->pub_param.qos =
277				pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
278			pss->pub_param.payload_len = TEST_STRING_LEN;
279
280			/* We send the message out 300 bytes or less at at time */
281
282			chunk = 300;
283
284			if (chunk > TEST_STRING_LEN - pss->pos)
285				chunk = TEST_STRING_LEN - pss->pos;
286
287			lwsl_notice("%s: sending %d at +%d\n", __func__,
288					(int)chunk, (int)pss->pos);
289
290			if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
291					test_string + pss->pos, (uint32_t)chunk,
292					(pss->pos + chunk == TEST_STRING_LEN))) {
293				lwsl_notice("%s: publish failed\n", __func__);
294				return -1;
295			}
296
297			pss->pos += chunk;
298
299			if (pss->pos == TEST_STRING_LEN) {
300				lwsl_debug("%s: sent message\n", __func__);
301				pss->pos = 0;
302				pss->state++;
303			}
304			break;
305
306		case STATE_UNSUBSCRIBE:
307			lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
308				  __func__, (int)(item - &item[0]), wsi);
309			pss->state++;
310			if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
311				lwsl_notice("%s: subscribe failed\n", __func__);
312				return -1;
313			}
314			break;
315		default:
316			break;
317		}
318
319		return 0;
320
321	case LWS_CALLBACK_MQTT_ACK:
322		lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
323		/*
324		 * We can forget about the message we just sent, it's done.
325		 *
326		 * For our test, that's the indication we can close the wsi.
327		 */
328
329		pss->state++;
330		if (pss->state != STATE_TEST_FINISH) {
331			lws_callback_on_writable(wsi);
332			break;
333		}
334
335		break;
336
337	case LWS_CALLBACK_MQTT_RESEND:
338		lwsl_user("%s: MQTT_RESEND\n", __func__);
339		/*
340		 * We must resend the packet ID mentioned in len
341		 */
342		if (++pss->retries == 3) {
343			lwsl_notice("%s: too many retries\n", __func__);
344			return 1; /* kill the connection */
345		}
346		pss->state--;
347		pss->pos = 0;
348		break;
349
350	case LWS_CALLBACK_MQTT_CLIENT_RX:
351		pub = (lws_mqtt_publish_param_t *)in;
352		assert(pub);
353		lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
354			  (int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
355			  (int)pub->payload_len, (int)len);
356
357		//lwsl_hexdump_info(pub->payload, len);
358
359		return 0;
360
361	default:
362		break;
363	}
364
365	return 0;
366
367finish_test:
368	interrupted = 1;
369	lws_cancel_service(lws_get_context(wsi));
370
371	return 0;
372}
373
374static const struct lws_protocols protocols[] = {
375	{
376		.name			= "test-mqtt",
377		.callback		= callback_mqtt,
378		.per_session_data_size	= sizeof(struct pss)
379	},
380	LWS_PROTOCOL_LIST_TERM
381};
382
383int main(int argc, const char **argv)
384{
385	lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
386					     system_notify_cb, "app" };
387	lws_state_notify_link_t *na[] = { &notifier, NULL };
388	struct lws_context_creation_info info;
389	struct lws_context *context;
390	const char *p;
391	int n = 0;
392
393	signal(SIGINT, sigint_handler);
394	memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
395	lws_cmdline_option_handle_builtin(argc, argv, &info);
396
397	do_ssl = !!lws_cmdline_option(argc, argv, "-s");
398	if (do_ssl)
399		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
400
401	if (lws_cmdline_option(argc, argv, "-p"))
402		pipeline = 1;
403
404	if ((p = lws_cmdline_option(argc, argv, "-i")))
405		stagger_us = atoi(p);
406
407	if ((p = lws_cmdline_option(argc, argv, "-c")))
408		count = atoi(p);
409
410	if (count > COUNT) {
411		count = COUNT;
412		lwsl_err("%s: clipped count at max %d\n", __func__, count);
413	}
414
415	lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
416			do_ssl ? "tls enabled": "unencrypted");
417
418	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
419	info.protocols = protocols;
420	info.register_notifier_list = na;
421	info.fd_limit_per_thread = 1 + COUNT + 1;
422	info.retry_and_idle_policy = &retry;
423
424#if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
425	/*
426	 * OpenSSL uses the system trust store.  mbedTLS has to be told which
427	 * CA to trust explicitly.
428	 */
429	info.client_ssl_ca_filepath = "./mosq-ca.crt";
430#endif
431
432	context = lws_create_context(&info);
433	if (!context) {
434		lwsl_err("lws init failed\n");
435		return 1;
436	}
437
438	/* Event loop */
439	while (n >= 0 && !interrupted)
440		n = lws_service(context, 0);
441
442	lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
443			okay != count ? "failed" : "OK");
444	lws_context_destroy(context);
445
446	return okay != count;
447}
448