1/* 2 * lws-minimal-mqtt-client 3 * 4 * Written in 2010-2020 by Andy Green <andy@warmcat.com> 5 * Sakthi Kannan <saktr@amazon.com> 6 * 7 * This file is made available under the Creative Commons CC0 1.0 8 * Universal Public Domain Dedication. 9 */ 10 11#include <libwebsockets.h> 12#include <string.h> 13#include <signal.h> 14#if defined(WIN32) 15#define HAVE_STRUCT_TIMESPEC 16#if defined(pid_t) 17#undef pid_t 18#endif 19#endif 20#include <pthread.h> 21#include <assert.h> 22 23#define COUNT 8 24 25struct test_item { 26 struct lws_context *context; 27 struct lws *wsi; 28 lws_sorted_usec_list_t sul; 29} items[COUNT]; 30 31enum { 32 STATE_SUBSCRIBE, /* subscribe to the topic */ 33 STATE_WAIT_SUBACK, 34 STATE_PUBLISH_QOS0, /* Send the message in QoS0 */ 35 STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */ 36 STATE_PUBLISH_QOS1, /* Send the message in QoS1 */ 37 STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */ 38 STATE_UNSUBSCRIBE, 39 STATE_WAIT_UNSUBACK, 40 41 STATE_TEST_FINISH 42}; 43 44static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay, 45 done, count = COUNT; 46 47static const lws_retry_bo_t retry = { 48 .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */ 49 .secs_since_valid_hangup = 25, /* hangup if still idle secs */ 50}; 51 52static const lws_mqtt_client_connect_param_t client_connect_param = { 53 .client_id = NULL, 54 .keep_alive = 60, 55 .clean_start = 1, 56 .client_id_nofree = 1, 57 .username_nofree = 1, 58 .password_nofree = 1, 59 .will_param = { 60 .topic = "good/bye", 61 .message = "sign-off", 62 .qos = 0, 63 .retain = 0, 64 }, 65 .username = "lwsUser", 66 .password = "mySecretPassword", 67}; 68 69static lws_mqtt_topic_elem_t topics[] = { 70 [0] = { .name = "test/topic0", .qos = QOS0 }, 71 [1] = { .name = "test/topic1", .qos = QOS1 }, 72}; 73 74static lws_mqtt_subscribe_param_t sub_param = { 75 .topic = &topics[0], 76 .num_topics = LWS_ARRAY_SIZE(topics), 77}; 78 79static const char * const test_string = 80 "No one would have believed in the last years of the nineteenth " 81 "century that this world was being watched keenly and closely by " 82 "intelligences greater than man's and yet as mortal as his own; that as " 83 "men busied themselves about their various concerns they were " 84 "scrutinised and studied, perhaps almost as narrowly as a man with a " 85 "microscope might scrutinise the transient creatures that swarm and " 86 "multiply in a drop of water. With infinite complacency men went to " 87 "and fro over this globe about their little affairs, serene in their " 88 "assurance of their empire over matter. It is possible that the " 89 "infusoria under the microscope do the same. No one gave a thought to " 90 "the older worlds of space as sources of human danger, or thought of " 91 "them only to dismiss the idea of life upon them as impossible or " 92 "improbable. It is curious to recall some of the mental habits of " 93 "those departed days. At most terrestrial men fancied there might be " 94 "other men upon Mars, perhaps inferior to themselves and ready to " 95 "welcome a missionary enterprise. Yet across the gulf of space, minds " 96 "that are to our minds as ours are to those of the beasts that perish, " 97 "intellects vast and cool and unsympathetic, regarded this earth with " 98 "envious eyes, and slowly and surely drew their plans against us. And " 99 "early in the twentieth century came the great disillusionment. "; 100 101/* this reflects the length of the string above */ 102#define TEST_STRING_LEN 1337 103 104struct pss { 105 lws_mqtt_publish_param_t pub_param; 106 int state; 107 size_t pos; 108 int retries; 109}; 110 111static void 112sigint_handler(int sig) 113{ 114 interrupted = 1; 115} 116 117static int 118connect_client(struct lws_context *context, struct test_item *item) 119{ 120 struct lws_client_connect_info i; 121 122 memset(&i, 0, sizeof i); 123 124 i.mqtt_cp = &client_connect_param; 125 i.opaque_user_data = item; 126 i.protocol = "test-mqtt"; 127 i.address = "localhost"; 128 i.host = "localhost"; 129 i.pwsi = &item->wsi; 130 i.context = context; 131 i.method = "MQTT"; 132 i.alpn = "mqtt"; 133 i.port = 1883; 134 135 if (do_ssl) { 136 i.ssl_connection = LCCSCF_USE_SSL; 137 i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED; 138 i.port = 8883; 139 } 140 141 if (pipeline) 142 i.ssl_connection |= LCCSCF_PIPELINE; 143 144 if (!lws_client_connect_via_info(&i)) { 145 lwsl_err("%s: Client Connect Failed\n", __func__); 146 147 return 1; 148 } 149 150 return 0; 151} 152 153static void 154start_conn(struct lws_sorted_usec_list *sul) 155{ 156 struct test_item *item = lws_container_of(sul, struct test_item, sul); 157 158 lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0])); 159 160 if (connect_client(item->context, item)) 161 interrupted = 1; 162} 163 164 165static int 166system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link, 167 int current, int target) 168{ 169 struct lws_context *context = mgr->parent; 170 int n; 171 172 if (current != LWS_SYSTATE_OPERATIONAL || 173 target != LWS_SYSTATE_OPERATIONAL) 174 return 0; 175 176 /* 177 * We delay trying to do the client connection until the protocols have 178 * been initialized for each vhost... this happens after we have network 179 * and time so we can judge tls cert validity. 180 * 181 * Stagger the connection attempts so we get some joining before the 182 * first has connected and some afterwards 183 */ 184 185 for (n = 0; n < count; n++) { 186 items[n].context = context; 187 lws_sul_schedule(context, 0, &items[n].sul, start_conn, 188 n * stagger_us); 189 } 190 191 return 0; 192} 193 194 195static int 196callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason, 197 void *user, void *in, size_t len) 198{ 199 struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi); 200 struct pss *pss = (struct pss *)user; 201 lws_mqtt_publish_param_t *pub; 202 size_t chunk; 203 204 switch (reason) { 205 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: 206 lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__, 207 in ? (char *)in : "(null)"); 208 209 if (++done == count) 210 goto finish_test; 211 break; 212 213 case LWS_CALLBACK_MQTT_CLIENT_CLOSED: 214 lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi); 215 216 if (++done == count) 217 goto finish_test; 218 break; 219 220 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED: 221 lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi); 222 lws_callback_on_writable(wsi); 223 224 return 0; 225 226 case LWS_CALLBACK_MQTT_SUBSCRIBED: 227 lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__); 228 229 /* then we can get on with the actual test part */ 230 231 pss->state++; 232 lws_callback_on_writable(wsi); 233 break; 234 235 case LWS_CALLBACK_MQTT_UNSUBSCRIBED: 236 lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n", 237 __func__, (int)(item - &item[0]), wsi); 238 okay++; 239 240 if (++pss->state == STATE_TEST_FINISH) { 241 lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n", 242 __func__, (int)(item - &items[0]), okay, count); 243 /* We are done, request to close */ 244 return -1; 245 } 246 break; 247 248 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: 249 250 /* 251 * Extra WRITEABLE may appear here other than ones we asked 252 * for, so we must consult our own state to decide if we want 253 * to make use of the opportunity 254 */ 255 256 switch (pss->state) { 257 case STATE_SUBSCRIBE: 258 lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi); 259 260 if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) { 261 lwsl_notice("%s: subscribe failed\n", __func__); 262 263 return -1; 264 } 265 pss->state++; 266 break; 267 268 case STATE_PUBLISH_QOS0: 269 case STATE_PUBLISH_QOS1: 270 271 lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi); 272 273 pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ? 274 "test/topic0" : "test/topic1"; 275 pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic); 276 pss->pub_param.qos = 277 pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1; 278 pss->pub_param.payload_len = TEST_STRING_LEN; 279 280 /* We send the message out 300 bytes or less at at time */ 281 282 chunk = 300; 283 284 if (chunk > TEST_STRING_LEN - pss->pos) 285 chunk = TEST_STRING_LEN - pss->pos; 286 287 lwsl_notice("%s: sending %d at +%d\n", __func__, 288 (int)chunk, (int)pss->pos); 289 290 if (lws_mqtt_client_send_publish(wsi, &pss->pub_param, 291 test_string + pss->pos, (uint32_t)chunk, 292 (pss->pos + chunk == TEST_STRING_LEN))) { 293 lwsl_notice("%s: publish failed\n", __func__); 294 return -1; 295 } 296 297 pss->pos += chunk; 298 299 if (pss->pos == TEST_STRING_LEN) { 300 lwsl_debug("%s: sent message\n", __func__); 301 pss->pos = 0; 302 pss->state++; 303 } 304 break; 305 306 case STATE_UNSUBSCRIBE: 307 lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n", 308 __func__, (int)(item - &item[0]), wsi); 309 pss->state++; 310 if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) { 311 lwsl_notice("%s: subscribe failed\n", __func__); 312 return -1; 313 } 314 break; 315 default: 316 break; 317 } 318 319 return 0; 320 321 case LWS_CALLBACK_MQTT_ACK: 322 lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state); 323 /* 324 * We can forget about the message we just sent, it's done. 325 * 326 * For our test, that's the indication we can close the wsi. 327 */ 328 329 pss->state++; 330 if (pss->state != STATE_TEST_FINISH) { 331 lws_callback_on_writable(wsi); 332 break; 333 } 334 335 break; 336 337 case LWS_CALLBACK_MQTT_RESEND: 338 lwsl_user("%s: MQTT_RESEND\n", __func__); 339 /* 340 * We must resend the packet ID mentioned in len 341 */ 342 if (++pss->retries == 3) { 343 lwsl_notice("%s: too many retries\n", __func__); 344 return 1; /* kill the connection */ 345 } 346 pss->state--; 347 pss->pos = 0; 348 break; 349 350 case LWS_CALLBACK_MQTT_CLIENT_RX: 351 pub = (lws_mqtt_publish_param_t *)in; 352 assert(pub); 353 lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__, 354 (int)(item - &items[0]), pub->topic, (int)pub->payload_pos, 355 (int)pub->payload_len, (int)len); 356 357 //lwsl_hexdump_info(pub->payload, len); 358 359 return 0; 360 361 default: 362 break; 363 } 364 365 return 0; 366 367finish_test: 368 interrupted = 1; 369 lws_cancel_service(lws_get_context(wsi)); 370 371 return 0; 372} 373 374static const struct lws_protocols protocols[] = { 375 { 376 .name = "test-mqtt", 377 .callback = callback_mqtt, 378 .per_session_data_size = sizeof(struct pss) 379 }, 380 LWS_PROTOCOL_LIST_TERM 381}; 382 383int main(int argc, const char **argv) 384{ 385 lws_state_notify_link_t notifier = { { NULL, NULL, NULL }, 386 system_notify_cb, "app" }; 387 lws_state_notify_link_t *na[] = { ¬ifier, NULL }; 388 struct lws_context_creation_info info; 389 struct lws_context *context; 390 const char *p; 391 int n = 0; 392 393 signal(SIGINT, sigint_handler); 394 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ 395 lws_cmdline_option_handle_builtin(argc, argv, &info); 396 397 do_ssl = !!lws_cmdline_option(argc, argv, "-s"); 398 if (do_ssl) 399 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; 400 401 if (lws_cmdline_option(argc, argv, "-p")) 402 pipeline = 1; 403 404 if ((p = lws_cmdline_option(argc, argv, "-i"))) 405 stagger_us = atoi(p); 406 407 if ((p = lws_cmdline_option(argc, argv, "-c"))) 408 count = atoi(p); 409 410 if (count > COUNT) { 411 count = COUNT; 412 lwsl_err("%s: clipped count at max %d\n", __func__, count); 413 } 414 415 lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n", 416 do_ssl ? "tls enabled": "unencrypted"); 417 418 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ 419 info.protocols = protocols; 420 info.register_notifier_list = na; 421 info.fd_limit_per_thread = 1 + COUNT + 1; 422 info.retry_and_idle_policy = &retry; 423 424#if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL) 425 /* 426 * OpenSSL uses the system trust store. mbedTLS has to be told which 427 * CA to trust explicitly. 428 */ 429 info.client_ssl_ca_filepath = "./mosq-ca.crt"; 430#endif 431 432 context = lws_create_context(&info); 433 if (!context) { 434 lwsl_err("lws init failed\n"); 435 return 1; 436 } 437 438 /* Event loop */ 439 while (n >= 0 && !interrupted) 440 n = lws_service(context, 0); 441 442 lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count, 443 okay != count ? "failed" : "OK"); 444 lws_context_destroy(context); 445 446 return okay != count; 447} 448