1/*
2 * lws-minimal-mqtt-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 *                         Sakthi Kannan <saktr@amazon.com>
6 *
7 * This file is made available under the Creative Commons CC0 1.0
8 * Universal Public Domain Dedication.
9 */
10
11#include <libwebsockets.h>
12#include <string.h>
13#include <signal.h>
14#if defined(WIN32)
15#define HAVE_STRUCT_TIMESPEC
16#if defined(pid_t)
17#undef pid_t
18#endif
19#endif
20#include <pthread.h>
21#include <assert.h>
22
23enum {
24	STATE_SUBSCRIBE,	/* subscribe to the topic */
25	STATE_PUBLISH_QOS0,	/* Send the message in QoS0 */
26	STATE_WAIT_ACK0,	/* Wait for the synthetic "ack" */
27	STATE_PUBLISH_QOS1,	/* Send the message in QoS1 */
28	STATE_WAIT_ACK1,	/* Wait for the real ack (or timeout + retry) */
29
30	STATE_TEST_FINISH
31};
32
33static int interrupted, bad = 1, do_ssl;
34
35static const lws_retry_bo_t retry = {
36	.secs_since_valid_ping		= 20, /* if idle, PINGREQ after secs */
37	.secs_since_valid_hangup	= 25, /* hangup if still idle secs */
38};
39
40static const lws_mqtt_client_connect_param_t client_connect_param = {
41	.client_id			= "lwsMqttClient",
42	.keep_alive			= 60,
43	.clean_start			= 1,
44	.client_id_nofree		= 1,
45	.username_nofree		= 1,
46	.password_nofree		= 1,
47	.will_param = {
48		.topic			= "good/bye",
49		.message		= "sign-off",
50		.qos			= 0,
51		.retain			= 0,
52	},
53	.username			= "lwsUser",
54	.password			= "mySecretPassword",
55};
56
57static lws_mqtt_publish_param_t pub_param;
58
59static lws_mqtt_topic_elem_t topics[] = {
60	[0] = { .name = "test/topic0", .qos = QOS0 },
61	[1] = { .name = "test/topic1", .qos = QOS1 },
62};
63
64static lws_mqtt_subscribe_param_t sub_param = {
65	.topic				= &topics[0],
66	.num_topics			= LWS_ARRAY_SIZE(topics),
67};
68
69static const char * const test_string =
70	"No one would have believed in the last years of the nineteenth "
71	"century that this world was being watched keenly and closely by "
72	"intelligences greater than man's and yet as mortal as his own; that as "
73	"men busied themselves about their various concerns they were "
74	"scrutinised and studied, perhaps almost as narrowly as a man with a "
75	"microscope might scrutinise the transient creatures that swarm and "
76	"multiply in a drop of water.  With infinite complacency men went to "
77	"and fro over this globe about their little affairs, serene in their "
78	"assurance of their empire over matter. It is possible that the "
79	"infusoria under the microscope do the same.  No one gave a thought to "
80	"the older worlds of space as sources of human danger, or thought of "
81	"them only to dismiss the idea of life upon them as impossible or "
82	"improbable.  It is curious to recall some of the mental habits of "
83	"those departed days.  At most terrestrial men fancied there might be "
84	"other men upon Mars, perhaps inferior to themselves and ready to "
85	"welcome a missionary enterprise. Yet across the gulf of space, minds "
86	"that are to our minds as ours are to those of the beasts that perish, "
87	"intellects vast and cool and unsympathetic, regarded this earth with "
88	"envious eyes, and slowly and surely drew their plans against us.  And "
89	"early in the twentieth century came the great disillusionment. ";
90
91/* this reflects the length of the string above */
92#define TEST_STRING_LEN 1337
93
94struct pss {
95	int		state;
96	size_t		pos;
97	int		retries;
98};
99
100static void
101sigint_handler(int sig)
102{
103	interrupted = 1;
104}
105
106static int
107connect_client(struct lws_context *context)
108{
109	struct lws_client_connect_info i;
110
111	memset(&i, 0, sizeof i);
112
113	i.mqtt_cp = &client_connect_param;
114	i.address = "localhost";
115	i.host = "localhost";
116	i.protocol = "mqtt";
117	i.context = context;
118	i.method = "MQTT";
119	i.alpn = "mqtt";
120	i.port = 1883;
121
122	if (do_ssl) {
123		i.ssl_connection = LCCSCF_USE_SSL;
124		i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
125		i.port = 8883;
126	}
127
128	if (!lws_client_connect_via_info(&i)) {
129		lwsl_err("%s: Client Connect Failed\n", __func__);
130
131		return 1;
132	}
133
134	return 0;
135}
136
137static int
138system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
139		 int current, int target)
140{
141	struct lws_context *context = mgr->parent;
142
143	if (current != LWS_SYSTATE_OPERATIONAL ||
144	    target != LWS_SYSTATE_OPERATIONAL)
145		return 0;
146
147	/*
148	* We delay trying to do the client connection until
149	* the protocols have been initialized for each
150	* vhost... this happens after we have network and
151	* time so we can judge tls cert validity.
152	*/
153
154	if (connect_client(context))
155		interrupted = 1;
156
157	return 0;
158 }
159
160
161static int
162callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
163	      void *user, void *in, size_t len)
164{
165	struct pss *pss = (struct pss *)user;
166	lws_mqtt_publish_param_t *pub;
167	size_t chunk;
168
169	switch (reason) {
170	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
171		lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
172			 in ? (char *)in : "(null)");
173		interrupted = 1;
174		break;
175
176	case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
177		lwsl_user("%s: CLIENT_CLOSED\n", __func__);
178		interrupted = 1;
179		break;
180
181	case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
182		lwsl_user("%s: MQTT_CLIENT_ESTABLISHED\n", __func__);
183		lws_callback_on_writable(wsi);
184
185		return 0;
186
187	case LWS_CALLBACK_MQTT_SUBSCRIBED:
188		lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
189		lws_callback_on_writable(wsi);
190		break;
191
192	case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
193		/*
194		 * Extra WRITEABLE may appear here other than ones we asked
195		 * for, so we must consult our own state to decide if we want
196		 * to make use of the opportunity
197		 */
198
199		switch (pss->state) {
200		case STATE_SUBSCRIBE:
201			lwsl_user("%s: WRITEABLE: Subscribing\n", __func__);
202
203			if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
204				lwsl_notice("%s: subscribe failed\n", __func__);
205
206				return -1;
207			}
208			pss->state++;
209			break;
210
211		case STATE_PUBLISH_QOS0:
212		case STATE_PUBLISH_QOS1:
213
214			lwsl_user("%s: WRITEABLE: Publish\n", __func__);
215
216			pub_param.topic	= "test/topic";
217			pub_param.topic_len = (uint16_t)strlen(pub_param.topic);
218			pub_param.qos = pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
219			pub_param.payload_len = TEST_STRING_LEN;
220
221			/* We send the message out 300 bytes or less at at time */
222
223			chunk = 300;
224
225			if (chunk > TEST_STRING_LEN - pss->pos)
226				chunk = TEST_STRING_LEN - pss->pos;
227
228			if (lws_mqtt_client_send_publish(wsi, &pub_param,
229					test_string + pss->pos, (uint32_t)chunk,
230					(pss->pos + chunk == TEST_STRING_LEN)))
231				return -1;
232
233			pss->pos += chunk;
234
235			if (pss->pos == TEST_STRING_LEN) {
236				pss->pos = 0;
237				pss->state++;
238			}
239			break;
240
241		default:
242			break;
243		}
244
245		return 0;
246
247	case LWS_CALLBACK_MQTT_ACK:
248		lwsl_user("%s: MQTT_ACK\n", __func__);
249		/*
250		 * We can forget about the message we just sent, it's done.
251		 *
252		 * For our test, that's the indication we can close the wsi.
253		 */
254
255		pss->state++;
256		if (pss->state != STATE_TEST_FINISH) {
257			lws_callback_on_writable(wsi);
258			break;
259		}
260
261		/* Oh we are done then */
262
263		bad = 0;
264		interrupted = 1;
265		lws_cancel_service(lws_get_context(wsi));
266		break;
267
268	case LWS_CALLBACK_MQTT_RESEND:
269		lwsl_user("%s: MQTT_RESEND\n", __func__);
270		/*
271		 * We must resend the packet ID mentioned in len
272		 */
273		if (++pss->retries == 3) {
274			interrupted = 1;
275			break;
276		}
277		pss->state--;
278		pss->pos = 0;
279		break;
280
281	case LWS_CALLBACK_MQTT_CLIENT_RX:
282		lwsl_user("%s: MQTT_CLIENT_RX\n", __func__);
283
284		pub = (lws_mqtt_publish_param_t *)in;
285		assert(pub);
286
287		lwsl_hexdump_notice(pub->topic, pub->topic_len);
288		lwsl_hexdump_notice(pub->payload, pub->payload_len);
289
290		return 0;
291
292	default:
293		break;
294	}
295
296	return 0;
297}
298
299static const struct lws_protocols protocols[] = {
300	{
301		.name			= "mqtt",
302		.callback		= callback_mqtt,
303		.per_session_data_size	= sizeof(struct pss)
304	},
305	LWS_PROTOCOL_LIST_TERM
306};
307
308int main(int argc, const char **argv)
309{
310	lws_state_notify_link_t notifier = { { NULL, NULL, NULL },
311					     system_notify_cb, "app" };
312	lws_state_notify_link_t *na[] = { &notifier, NULL };
313	struct lws_context_creation_info info;
314	struct lws_context *context;
315	int n = 0;
316
317	signal(SIGINT, sigint_handler);
318	memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
319	lws_cmdline_option_handle_builtin(argc, argv, &info);
320
321	do_ssl = !!lws_cmdline_option(argc, argv, "-s");
322	if (do_ssl)
323		info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
324
325	lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
326			do_ssl ? "tls enabled": "unencrypted");
327
328	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
329	info.protocols = protocols;
330	info.register_notifier_list = na;
331	info.fd_limit_per_thread = 1 + 1 + 1;
332	info.retry_and_idle_policy = &retry;
333
334#if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL)
335	/*
336	 * OpenSSL uses the system trust store.  mbedTLS has to be told which
337	 * CA to trust explicitly.
338	 */
339	info.client_ssl_ca_filepath = "./mosq-ca.crt";
340#endif
341
342	context = lws_create_context(&info);
343	if (!context) {
344		lwsl_err("lws init failed\n");
345		return 1;
346	}
347
348	/* Event loop */
349	while (n >= 0 && !interrupted)
350		n = lws_service(context, 0);
351
352	lwsl_user("Completed: %s\n", bad ? "failed" : "OK");
353	lws_context_destroy(context);
354
355	return bad;
356}
357