1/* 2 * lws-minimal-mqtt-client 3 * 4 * Written in 2010-2020 by Andy Green <andy@warmcat.com> 5 * Sakthi Kannan <saktr@amazon.com> 6 * 7 * This file is made available under the Creative Commons CC0 1.0 8 * Universal Public Domain Dedication. 9 */ 10 11#include <libwebsockets.h> 12#include <string.h> 13#include <signal.h> 14#if defined(WIN32) 15#define HAVE_STRUCT_TIMESPEC 16#if defined(pid_t) 17#undef pid_t 18#endif 19#endif 20#include <pthread.h> 21#include <assert.h> 22 23enum { 24 STATE_SUBSCRIBE, /* subscribe to the topic */ 25 STATE_PUBLISH_QOS0, /* Send the message in QoS0 */ 26 STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */ 27 STATE_PUBLISH_QOS1, /* Send the message in QoS1 */ 28 STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */ 29 30 STATE_TEST_FINISH 31}; 32 33static int interrupted, bad = 1, do_ssl; 34 35static const lws_retry_bo_t retry = { 36 .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */ 37 .secs_since_valid_hangup = 25, /* hangup if still idle secs */ 38}; 39 40static const lws_mqtt_client_connect_param_t client_connect_param = { 41 .client_id = "lwsMqttClient", 42 .keep_alive = 60, 43 .clean_start = 1, 44 .client_id_nofree = 1, 45 .username_nofree = 1, 46 .password_nofree = 1, 47 .will_param = { 48 .topic = "good/bye", 49 .message = "sign-off", 50 .qos = 0, 51 .retain = 0, 52 }, 53 .username = "lwsUser", 54 .password = "mySecretPassword", 55}; 56 57static lws_mqtt_publish_param_t pub_param; 58 59static lws_mqtt_topic_elem_t topics[] = { 60 [0] = { .name = "test/topic0", .qos = QOS0 }, 61 [1] = { .name = "test/topic1", .qos = QOS1 }, 62}; 63 64static lws_mqtt_subscribe_param_t sub_param = { 65 .topic = &topics[0], 66 .num_topics = LWS_ARRAY_SIZE(topics), 67}; 68 69static const char * const test_string = 70 "No one would have believed in the last years of the nineteenth " 71 "century that this world was being watched keenly and closely by " 72 "intelligences greater than man's and yet as mortal as his own; that as " 73 "men busied themselves about their various concerns they were " 74 "scrutinised and studied, perhaps almost as narrowly as a man with a " 75 "microscope might scrutinise the transient creatures that swarm and " 76 "multiply in a drop of water. With infinite complacency men went to " 77 "and fro over this globe about their little affairs, serene in their " 78 "assurance of their empire over matter. It is possible that the " 79 "infusoria under the microscope do the same. No one gave a thought to " 80 "the older worlds of space as sources of human danger, or thought of " 81 "them only to dismiss the idea of life upon them as impossible or " 82 "improbable. It is curious to recall some of the mental habits of " 83 "those departed days. At most terrestrial men fancied there might be " 84 "other men upon Mars, perhaps inferior to themselves and ready to " 85 "welcome a missionary enterprise. Yet across the gulf of space, minds " 86 "that are to our minds as ours are to those of the beasts that perish, " 87 "intellects vast and cool and unsympathetic, regarded this earth with " 88 "envious eyes, and slowly and surely drew their plans against us. And " 89 "early in the twentieth century came the great disillusionment. "; 90 91/* this reflects the length of the string above */ 92#define TEST_STRING_LEN 1337 93 94struct pss { 95 int state; 96 size_t pos; 97 int retries; 98}; 99 100static void 101sigint_handler(int sig) 102{ 103 interrupted = 1; 104} 105 106static int 107connect_client(struct lws_context *context) 108{ 109 struct lws_client_connect_info i; 110 111 memset(&i, 0, sizeof i); 112 113 i.mqtt_cp = &client_connect_param; 114 i.address = "localhost"; 115 i.host = "localhost"; 116 i.protocol = "mqtt"; 117 i.context = context; 118 i.method = "MQTT"; 119 i.alpn = "mqtt"; 120 i.port = 1883; 121 122 if (do_ssl) { 123 i.ssl_connection = LCCSCF_USE_SSL; 124 i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED; 125 i.port = 8883; 126 } 127 128 if (!lws_client_connect_via_info(&i)) { 129 lwsl_err("%s: Client Connect Failed\n", __func__); 130 131 return 1; 132 } 133 134 return 0; 135} 136 137static int 138system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link, 139 int current, int target) 140{ 141 struct lws_context *context = mgr->parent; 142 143 if (current != LWS_SYSTATE_OPERATIONAL || 144 target != LWS_SYSTATE_OPERATIONAL) 145 return 0; 146 147 /* 148 * We delay trying to do the client connection until 149 * the protocols have been initialized for each 150 * vhost... this happens after we have network and 151 * time so we can judge tls cert validity. 152 */ 153 154 if (connect_client(context)) 155 interrupted = 1; 156 157 return 0; 158 } 159 160 161static int 162callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason, 163 void *user, void *in, size_t len) 164{ 165 struct pss *pss = (struct pss *)user; 166 lws_mqtt_publish_param_t *pub; 167 size_t chunk; 168 169 switch (reason) { 170 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: 171 lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__, 172 in ? (char *)in : "(null)"); 173 interrupted = 1; 174 break; 175 176 case LWS_CALLBACK_MQTT_CLIENT_CLOSED: 177 lwsl_user("%s: CLIENT_CLOSED\n", __func__); 178 interrupted = 1; 179 break; 180 181 case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED: 182 lwsl_user("%s: MQTT_CLIENT_ESTABLISHED\n", __func__); 183 lws_callback_on_writable(wsi); 184 185 return 0; 186 187 case LWS_CALLBACK_MQTT_SUBSCRIBED: 188 lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__); 189 lws_callback_on_writable(wsi); 190 break; 191 192 case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: 193 /* 194 * Extra WRITEABLE may appear here other than ones we asked 195 * for, so we must consult our own state to decide if we want 196 * to make use of the opportunity 197 */ 198 199 switch (pss->state) { 200 case STATE_SUBSCRIBE: 201 lwsl_user("%s: WRITEABLE: Subscribing\n", __func__); 202 203 if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) { 204 lwsl_notice("%s: subscribe failed\n", __func__); 205 206 return -1; 207 } 208 pss->state++; 209 break; 210 211 case STATE_PUBLISH_QOS0: 212 case STATE_PUBLISH_QOS1: 213 214 lwsl_user("%s: WRITEABLE: Publish\n", __func__); 215 216 pub_param.topic = "test/topic"; 217 pub_param.topic_len = (uint16_t)strlen(pub_param.topic); 218 pub_param.qos = pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1; 219 pub_param.payload_len = TEST_STRING_LEN; 220 221 /* We send the message out 300 bytes or less at at time */ 222 223 chunk = 300; 224 225 if (chunk > TEST_STRING_LEN - pss->pos) 226 chunk = TEST_STRING_LEN - pss->pos; 227 228 if (lws_mqtt_client_send_publish(wsi, &pub_param, 229 test_string + pss->pos, (uint32_t)chunk, 230 (pss->pos + chunk == TEST_STRING_LEN))) 231 return -1; 232 233 pss->pos += chunk; 234 235 if (pss->pos == TEST_STRING_LEN) { 236 pss->pos = 0; 237 pss->state++; 238 } 239 break; 240 241 default: 242 break; 243 } 244 245 return 0; 246 247 case LWS_CALLBACK_MQTT_ACK: 248 lwsl_user("%s: MQTT_ACK\n", __func__); 249 /* 250 * We can forget about the message we just sent, it's done. 251 * 252 * For our test, that's the indication we can close the wsi. 253 */ 254 255 pss->state++; 256 if (pss->state != STATE_TEST_FINISH) { 257 lws_callback_on_writable(wsi); 258 break; 259 } 260 261 /* Oh we are done then */ 262 263 bad = 0; 264 interrupted = 1; 265 lws_cancel_service(lws_get_context(wsi)); 266 break; 267 268 case LWS_CALLBACK_MQTT_RESEND: 269 lwsl_user("%s: MQTT_RESEND\n", __func__); 270 /* 271 * We must resend the packet ID mentioned in len 272 */ 273 if (++pss->retries == 3) { 274 interrupted = 1; 275 break; 276 } 277 pss->state--; 278 pss->pos = 0; 279 break; 280 281 case LWS_CALLBACK_MQTT_CLIENT_RX: 282 lwsl_user("%s: MQTT_CLIENT_RX\n", __func__); 283 284 pub = (lws_mqtt_publish_param_t *)in; 285 assert(pub); 286 287 lwsl_hexdump_notice(pub->topic, pub->topic_len); 288 lwsl_hexdump_notice(pub->payload, pub->payload_len); 289 290 return 0; 291 292 default: 293 break; 294 } 295 296 return 0; 297} 298 299static const struct lws_protocols protocols[] = { 300 { 301 .name = "mqtt", 302 .callback = callback_mqtt, 303 .per_session_data_size = sizeof(struct pss) 304 }, 305 LWS_PROTOCOL_LIST_TERM 306}; 307 308int main(int argc, const char **argv) 309{ 310 lws_state_notify_link_t notifier = { { NULL, NULL, NULL }, 311 system_notify_cb, "app" }; 312 lws_state_notify_link_t *na[] = { ¬ifier, NULL }; 313 struct lws_context_creation_info info; 314 struct lws_context *context; 315 int n = 0; 316 317 signal(SIGINT, sigint_handler); 318 memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ 319 lws_cmdline_option_handle_builtin(argc, argv, &info); 320 321 do_ssl = !!lws_cmdline_option(argc, argv, "-s"); 322 if (do_ssl) 323 info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; 324 325 lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n", 326 do_ssl ? "tls enabled": "unencrypted"); 327 328 info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ 329 info.protocols = protocols; 330 info.register_notifier_list = na; 331 info.fd_limit_per_thread = 1 + 1 + 1; 332 info.retry_and_idle_policy = &retry; 333 334#if defined(LWS_WITH_MBEDTLS) || defined(USE_WOLFSSL) 335 /* 336 * OpenSSL uses the system trust store. mbedTLS has to be told which 337 * CA to trust explicitly. 338 */ 339 info.client_ssl_ca_filepath = "./mosq-ca.crt"; 340#endif 341 342 context = lws_create_context(&info); 343 if (!context) { 344 lwsl_err("lws init failed\n"); 345 return 1; 346 } 347 348 /* Event loop */ 349 while (n >= 0 && !interrupted) 350 n = lws_service(context, 0); 351 352 lwsl_user("Completed: %s\n", bad ? "failed" : "OK"); 353 lws_context_destroy(context); 354 355 return bad; 356} 357