1/*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 9 * Copyright (C) Björn Stenberg, <bjorn@haxx.se> 10 * 11 * This software is licensed as described in the file COPYING, which 12 * you should have received as part of this distribution. The terms 13 * are also available at https://curl.se/docs/copyright.html. 14 * 15 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 16 * copies of the Software, and permit persons to whom the Software is 17 * furnished to do so, under the terms of the COPYING file. 18 * 19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 20 * KIND, either express or implied. 21 * 22 * SPDX-License-Identifier: curl 23 * 24 ***************************************************************************/ 25 26#include "curl_setup.h" 27 28#ifndef CURL_DISABLE_MQTT 29 30#include "urldata.h" 31#include <curl/curl.h> 32#include "transfer.h" 33#include "sendf.h" 34#include "progress.h" 35#include "mqtt.h" 36#include "select.h" 37#include "strdup.h" 38#include "url.h" 39#include "escape.h" 40#include "warnless.h" 41#include "curl_printf.h" 42#include "curl_memory.h" 43#include "multiif.h" 44#include "rand.h" 45 46/* The last #include file should be: */ 47#include "memdebug.h" 48 49#define MQTT_MSG_CONNECT 0x10 50#define MQTT_MSG_CONNACK 0x20 51#define MQTT_MSG_PUBLISH 0x30 52#define MQTT_MSG_SUBSCRIBE 0x82 53#define MQTT_MSG_SUBACK 0x90 54#define MQTT_MSG_DISCONNECT 0xe0 55 56#define MQTT_CONNACK_LEN 2 57#define MQTT_SUBACK_LEN 3 58#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ 59 60/* 61 * Forward declarations. 62 */ 63 64static CURLcode mqtt_do(struct Curl_easy *data, bool *done); 65static CURLcode mqtt_done(struct Curl_easy *data, 66 CURLcode status, bool premature); 67static CURLcode mqtt_doing(struct Curl_easy *data, bool *done); 68static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn, 69 curl_socket_t *sock); 70static CURLcode mqtt_setup_conn(struct Curl_easy *data, 71 struct connectdata *conn); 72 73/* 74 * MQTT protocol handler. 75 */ 76 77const struct Curl_handler Curl_handler_mqtt = { 78 "MQTT", /* scheme */ 79 mqtt_setup_conn, /* setup_connection */ 80 mqtt_do, /* do_it */ 81 mqtt_done, /* done */ 82 ZERO_NULL, /* do_more */ 83 ZERO_NULL, /* connect_it */ 84 ZERO_NULL, /* connecting */ 85 mqtt_doing, /* doing */ 86 ZERO_NULL, /* proto_getsock */ 87 mqtt_getsock, /* doing_getsock */ 88 ZERO_NULL, /* domore_getsock */ 89 ZERO_NULL, /* perform_getsock */ 90 ZERO_NULL, /* disconnect */ 91 ZERO_NULL, /* write_resp */ 92 ZERO_NULL, /* connection_check */ 93 ZERO_NULL, /* attach connection */ 94 PORT_MQTT, /* defport */ 95 CURLPROTO_MQTT, /* protocol */ 96 CURLPROTO_MQTT, /* family */ 97 PROTOPT_NONE /* flags */ 98}; 99 100static CURLcode mqtt_setup_conn(struct Curl_easy *data, 101 struct connectdata *conn) 102{ 103 /* allocate the HTTP-specific struct for the Curl_easy, only to survive 104 during this request */ 105 struct MQTT *mq; 106 (void)conn; 107 DEBUGASSERT(data->req.p.mqtt == NULL); 108 109 mq = calloc(1, sizeof(struct MQTT)); 110 if(!mq) 111 return CURLE_OUT_OF_MEMORY; 112 Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); 113 data->req.p.mqtt = mq; 114 return CURLE_OK; 115} 116 117static CURLcode mqtt_send(struct Curl_easy *data, 118 char *buf, size_t len) 119{ 120 CURLcode result = CURLE_OK; 121 struct MQTT *mq = data->req.p.mqtt; 122 ssize_t n; 123 result = Curl_nwrite(data, FIRSTSOCKET, buf, len, &n); 124 if(result) 125 return result; 126 Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); 127 if(len != (size_t)n) { 128 size_t nsend = len - n; 129 char *sendleftovers = Curl_memdup(&buf[n], nsend); 130 if(!sendleftovers) 131 return CURLE_OUT_OF_MEMORY; 132 mq->sendleftovers = sendleftovers; 133 mq->nsend = nsend; 134 } 135 else { 136 mq->sendleftovers = NULL; 137 mq->nsend = 0; 138 } 139 return result; 140} 141 142/* Generic function called by the multi interface to figure out what socket(s) 143 to wait for and for what actions during the DOING and PROTOCONNECT 144 states */ 145static int mqtt_getsock(struct Curl_easy *data, 146 struct connectdata *conn, 147 curl_socket_t *sock) 148{ 149 (void)data; 150 sock[0] = conn->sock[FIRSTSOCKET]; 151 return GETSOCK_READSOCK(FIRSTSOCKET); 152} 153 154static int mqtt_encode_len(char *buf, size_t len) 155{ 156 unsigned char encoded; 157 int i; 158 159 for(i = 0; (len > 0) && (i<4); i++) { 160 encoded = len % 0x80; 161 len /= 0x80; 162 if(len) 163 encoded |= 0x80; 164 buf[i] = encoded; 165 } 166 167 return i; 168} 169 170/* add the passwd to the CONNECT packet */ 171static int add_passwd(const char *passwd, const size_t plen, 172 char *pkt, const size_t start, int remain_pos) 173{ 174 /* magic number that need to be set properly */ 175 const size_t conn_flags_pos = remain_pos + 8; 176 if(plen > 0xffff) 177 return 1; 178 179 /* set password flag */ 180 pkt[conn_flags_pos] |= 0x40; 181 182 /* length of password provided */ 183 pkt[start] = (char)((plen >> 8) & 0xFF); 184 pkt[start + 1] = (char)(plen & 0xFF); 185 memcpy(&pkt[start + 2], passwd, plen); 186 return 0; 187} 188 189/* add user to the CONNECT packet */ 190static int add_user(const char *username, const size_t ulen, 191 unsigned char *pkt, const size_t start, int remain_pos) 192{ 193 /* magic number that need to be set properly */ 194 const size_t conn_flags_pos = remain_pos + 8; 195 if(ulen > 0xffff) 196 return 1; 197 198 /* set username flag */ 199 pkt[conn_flags_pos] |= 0x80; 200 /* length of username provided */ 201 pkt[start] = (unsigned char)((ulen >> 8) & 0xFF); 202 pkt[start + 1] = (unsigned char)(ulen & 0xFF); 203 memcpy(&pkt[start + 2], username, ulen); 204 return 0; 205} 206 207/* add client ID to the CONNECT packet */ 208static int add_client_id(const char *client_id, const size_t client_id_len, 209 char *pkt, const size_t start) 210{ 211 if(client_id_len != MQTT_CLIENTID_LEN) 212 return 1; 213 pkt[start] = 0x00; 214 pkt[start + 1] = MQTT_CLIENTID_LEN; 215 memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN); 216 return 0; 217} 218 219/* Set initial values of CONNECT packet */ 220static int init_connpack(char *packet, char *remain, int remain_pos) 221{ 222 /* Fixed header starts */ 223 /* packet type */ 224 packet[0] = MQTT_MSG_CONNECT; 225 /* remaining length field */ 226 memcpy(&packet[1], remain, remain_pos); 227 /* Fixed header ends */ 228 229 /* Variable header starts */ 230 /* protocol length */ 231 packet[remain_pos + 1] = 0x00; 232 packet[remain_pos + 2] = 0x04; 233 /* protocol name */ 234 packet[remain_pos + 3] = 'M'; 235 packet[remain_pos + 4] = 'Q'; 236 packet[remain_pos + 5] = 'T'; 237 packet[remain_pos + 6] = 'T'; 238 /* protocol level */ 239 packet[remain_pos + 7] = 0x04; 240 /* CONNECT flag: CleanSession */ 241 packet[remain_pos + 8] = 0x02; 242 /* keep-alive 0 = disabled */ 243 packet[remain_pos + 9] = 0x00; 244 packet[remain_pos + 10] = 0x3c; 245 /* end of variable header */ 246 return remain_pos + 10; 247} 248 249static CURLcode mqtt_connect(struct Curl_easy *data) 250{ 251 CURLcode result = CURLE_OK; 252 int pos = 0; 253 int rc = 0; 254 /* remain length */ 255 int remain_pos = 0; 256 char remain[4] = {0}; 257 size_t packetlen = 0; 258 size_t payloadlen = 0; 259 size_t start_user = 0; 260 size_t start_pwd = 0; 261 char client_id[MQTT_CLIENTID_LEN + 1] = "curl"; 262 const size_t clen = strlen("curl"); 263 char *packet = NULL; 264 265 /* extracting username from request */ 266 const char *username = data->state.aptr.user ? 267 data->state.aptr.user : ""; 268 const size_t ulen = strlen(username); 269 /* extracting password from request */ 270 const char *passwd = data->state.aptr.passwd ? 271 data->state.aptr.passwd : ""; 272 const size_t plen = strlen(passwd); 273 274 payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2; 275 /* The plus 2 are for the MSB and LSB describing the length of the string to 276 * be added on the payload. Refer to spec 1.5.2 and 1.5.4 */ 277 if(ulen) 278 payloadlen += 2; 279 if(plen) 280 payloadlen += 2; 281 282 /* getting how much occupy the remain length */ 283 remain_pos = mqtt_encode_len(remain, payloadlen + 10); 284 285 /* 10 length of variable header and 1 the first byte of the fixed header */ 286 packetlen = payloadlen + 10 + remain_pos + 1; 287 288 /* allocating packet */ 289 if(packetlen > 268435455) 290 return CURLE_WEIRD_SERVER_REPLY; 291 packet = malloc(packetlen); 292 if(!packet) 293 return CURLE_OUT_OF_MEMORY; 294 memset(packet, 0, packetlen); 295 296 /* set initial values for the CONNECT packet */ 297 pos = init_connpack(packet, remain, remain_pos); 298 299 result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen], 300 MQTT_CLIENTID_LEN - clen + 1); 301 /* add client id */ 302 rc = add_client_id(client_id, strlen(client_id), packet, pos + 1); 303 if(rc) { 304 failf(data, "Client ID length mismatched: [%zu]", strlen(client_id)); 305 result = CURLE_WEIRD_SERVER_REPLY; 306 goto end; 307 } 308 infof(data, "Using client id '%s'", client_id); 309 310 /* position where starts the user payload */ 311 start_user = pos + 3 + MQTT_CLIENTID_LEN; 312 /* position where starts the password payload */ 313 start_pwd = start_user + ulen; 314 /* if user name was provided, add it to the packet */ 315 if(ulen) { 316 start_pwd += 2; 317 318 rc = add_user(username, ulen, 319 (unsigned char *)packet, start_user, remain_pos); 320 if(rc) { 321 failf(data, "Username is too large: [%zu]", ulen); 322 result = CURLE_WEIRD_SERVER_REPLY; 323 goto end; 324 } 325 } 326 327 /* if passwd was provided, add it to the packet */ 328 if(plen) { 329 rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos); 330 if(rc) { 331 failf(data, "Password is too large: [%zu]", plen); 332 result = CURLE_WEIRD_SERVER_REPLY; 333 goto end; 334 } 335 } 336 337 if(!result) 338 result = mqtt_send(data, packet, packetlen); 339 340end: 341 if(packet) 342 free(packet); 343 Curl_safefree(data->state.aptr.user); 344 Curl_safefree(data->state.aptr.passwd); 345 return result; 346} 347 348static CURLcode mqtt_disconnect(struct Curl_easy *data) 349{ 350 CURLcode result = CURLE_OK; 351 struct MQTT *mq = data->req.p.mqtt; 352 result = mqtt_send(data, (char *)"\xe0\x00", 2); 353 Curl_safefree(mq->sendleftovers); 354 Curl_dyn_free(&mq->recvbuf); 355 return result; 356} 357 358static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) 359{ 360 struct MQTT *mq = data->req.p.mqtt; 361 size_t rlen = Curl_dyn_len(&mq->recvbuf); 362 CURLcode result; 363 364 if(rlen < nbytes) { 365 unsigned char readbuf[1024]; 366 ssize_t nread; 367 368 DEBUGASSERT(nbytes - rlen < sizeof(readbuf)); 369 result = Curl_read(data, data->conn->sock[FIRSTSOCKET], 370 (char *)readbuf, nbytes - rlen, &nread); 371 if(result) 372 return result; 373 DEBUGASSERT(nread >= 0); 374 if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread)) 375 return CURLE_OUT_OF_MEMORY; 376 rlen = Curl_dyn_len(&mq->recvbuf); 377 } 378 return (rlen >= nbytes)? CURLE_OK : CURLE_AGAIN; 379} 380 381static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes) 382{ 383 struct MQTT *mq = data->req.p.mqtt; 384 size_t rlen = Curl_dyn_len(&mq->recvbuf); 385 if(rlen <= nbytes) 386 Curl_dyn_reset(&mq->recvbuf); 387 else 388 Curl_dyn_tail(&mq->recvbuf, rlen - nbytes); 389} 390 391static CURLcode mqtt_verify_connack(struct Curl_easy *data) 392{ 393 struct MQTT *mq = data->req.p.mqtt; 394 CURLcode result; 395 char *ptr; 396 397 result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN); 398 if(result) 399 goto fail; 400 401 /* verify CONNACK */ 402 DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN); 403 ptr = Curl_dyn_ptr(&mq->recvbuf); 404 Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN); 405 406 if(ptr[0] != 0x00 || ptr[1] != 0x00) { 407 failf(data, "Expected %02x%02x but got %02x%02x", 408 0x00, 0x00, ptr[0], ptr[1]); 409 Curl_dyn_reset(&mq->recvbuf); 410 result = CURLE_WEIRD_SERVER_REPLY; 411 goto fail; 412 } 413 mqtt_recv_consume(data, MQTT_CONNACK_LEN); 414fail: 415 return result; 416} 417 418static CURLcode mqtt_get_topic(struct Curl_easy *data, 419 char **topic, size_t *topiclen) 420{ 421 char *path = data->state.up.path; 422 CURLcode result = CURLE_URL_MALFORMAT; 423 if(strlen(path) > 1) { 424 result = Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA); 425 if(!result && (*topiclen > 0xffff)) { 426 failf(data, "Too long MQTT topic"); 427 result = CURLE_URL_MALFORMAT; 428 } 429 } 430 else 431 failf(data, "No MQTT topic found. Forgot to URL encode it?"); 432 433 return result; 434} 435 436static CURLcode mqtt_subscribe(struct Curl_easy *data) 437{ 438 CURLcode result = CURLE_OK; 439 char *topic = NULL; 440 size_t topiclen; 441 unsigned char *packet = NULL; 442 size_t packetlen; 443 char encodedsize[4]; 444 size_t n; 445 struct connectdata *conn = data->conn; 446 447 result = mqtt_get_topic(data, &topic, &topiclen); 448 if(result) 449 goto fail; 450 451 conn->proto.mqtt.packetid++; 452 453 packetlen = topiclen + 5; /* packetid + topic (has a two byte length field) 454 + 2 bytes topic length + QoS byte */ 455 n = mqtt_encode_len((char *)encodedsize, packetlen); 456 packetlen += n + 1; /* add one for the control packet type byte */ 457 458 packet = malloc(packetlen); 459 if(!packet) { 460 result = CURLE_OUT_OF_MEMORY; 461 goto fail; 462 } 463 464 packet[0] = MQTT_MSG_SUBSCRIBE; 465 memcpy(&packet[1], encodedsize, n); 466 packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff; 467 packet[2 + n] = conn->proto.mqtt.packetid & 0xff; 468 packet[3 + n] = (topiclen >> 8) & 0xff; 469 packet[4 + n ] = topiclen & 0xff; 470 memcpy(&packet[5 + n], topic, topiclen); 471 packet[5 + n + topiclen] = 0; /* QoS zero */ 472 473 result = mqtt_send(data, (char *)packet, packetlen); 474 475fail: 476 free(topic); 477 free(packet); 478 return result; 479} 480 481/* 482 * Called when the first byte was already read. 483 */ 484static CURLcode mqtt_verify_suback(struct Curl_easy *data) 485{ 486 struct MQTT *mq = data->req.p.mqtt; 487 struct connectdata *conn = data->conn; 488 struct mqtt_conn *mqtt = &conn->proto.mqtt; 489 CURLcode result; 490 char *ptr; 491 492 result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN); 493 if(result) 494 goto fail; 495 496 /* verify SUBACK */ 497 DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN); 498 ptr = Curl_dyn_ptr(&mq->recvbuf); 499 Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN); 500 501 if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) || 502 ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) || 503 ptr[2] != 0x00) { 504 Curl_dyn_reset(&mq->recvbuf); 505 result = CURLE_WEIRD_SERVER_REPLY; 506 goto fail; 507 } 508 mqtt_recv_consume(data, MQTT_SUBACK_LEN); 509fail: 510 return result; 511} 512 513static CURLcode mqtt_publish(struct Curl_easy *data) 514{ 515 CURLcode result; 516 char *payload = data->set.postfields; 517 size_t payloadlen; 518 char *topic = NULL; 519 size_t topiclen; 520 unsigned char *pkt = NULL; 521 size_t i = 0; 522 size_t remaininglength; 523 size_t encodelen; 524 char encodedbytes[4]; 525 curl_off_t postfieldsize = data->set.postfieldsize; 526 527 if(!payload) { 528 DEBUGF(infof(data, "mqtt_publish without payload, return bad arg")); 529 return CURLE_BAD_FUNCTION_ARGUMENT; 530 } 531 if(postfieldsize < 0) 532 payloadlen = strlen(payload); 533 else 534 payloadlen = (size_t)postfieldsize; 535 536 result = mqtt_get_topic(data, &topic, &topiclen); 537 if(result) 538 goto fail; 539 540 remaininglength = payloadlen + 2 + topiclen; 541 encodelen = mqtt_encode_len(encodedbytes, remaininglength); 542 543 /* add the control byte and the encoded remaining length */ 544 pkt = malloc(remaininglength + 1 + encodelen); 545 if(!pkt) { 546 result = CURLE_OUT_OF_MEMORY; 547 goto fail; 548 } 549 550 /* assemble packet */ 551 pkt[i++] = MQTT_MSG_PUBLISH; 552 memcpy(&pkt[i], encodedbytes, encodelen); 553 i += encodelen; 554 pkt[i++] = (topiclen >> 8) & 0xff; 555 pkt[i++] = (topiclen & 0xff); 556 memcpy(&pkt[i], topic, topiclen); 557 i += topiclen; 558 memcpy(&pkt[i], payload, payloadlen); 559 i += payloadlen; 560 result = mqtt_send(data, (char *)pkt, i); 561 562fail: 563 free(pkt); 564 free(topic); 565 return result; 566} 567 568static size_t mqtt_decode_len(unsigned char *buf, 569 size_t buflen, size_t *lenbytes) 570{ 571 size_t len = 0; 572 size_t mult = 1; 573 size_t i; 574 unsigned char encoded = 128; 575 576 for(i = 0; (i < buflen) && (encoded & 128); i++) { 577 encoded = buf[i]; 578 len += (encoded & 127) * mult; 579 mult *= 128; 580 } 581 582 if(lenbytes) 583 *lenbytes = i; 584 585 return len; 586} 587 588#ifdef CURLDEBUG 589static const char *statenames[]={ 590 "MQTT_FIRST", 591 "MQTT_REMAINING_LENGTH", 592 "MQTT_CONNACK", 593 "MQTT_SUBACK", 594 "MQTT_SUBACK_COMING", 595 "MQTT_PUBWAIT", 596 "MQTT_PUB_REMAIN", 597 598 "NOT A STATE" 599}; 600#endif 601 602/* The only way to change state */ 603static void mqstate(struct Curl_easy *data, 604 enum mqttstate state, 605 enum mqttstate nextstate) /* used if state == FIRST */ 606{ 607 struct connectdata *conn = data->conn; 608 struct mqtt_conn *mqtt = &conn->proto.mqtt; 609#ifdef CURLDEBUG 610 infof(data, "%s (from %s) (next is %s)", 611 statenames[state], 612 statenames[mqtt->state], 613 (state == MQTT_FIRST)? statenames[nextstate] : ""); 614#endif 615 mqtt->state = state; 616 if(state == MQTT_FIRST) 617 mqtt->nextstate = nextstate; 618} 619 620 621static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done) 622{ 623 CURLcode result = CURLE_OK; 624 struct connectdata *conn = data->conn; 625 curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; 626 ssize_t nread; 627 size_t remlen; 628 struct mqtt_conn *mqtt = &conn->proto.mqtt; 629 struct MQTT *mq = data->req.p.mqtt; 630 unsigned char packet; 631 632 switch(mqtt->state) { 633MQTT_SUBACK_COMING: 634 case MQTT_SUBACK_COMING: 635 result = mqtt_verify_suback(data); 636 if(result) 637 break; 638 639 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); 640 break; 641 642 case MQTT_SUBACK: 643 case MQTT_PUBWAIT: 644 /* we are expecting PUBLISH or SUBACK */ 645 packet = mq->firstbyte & 0xf0; 646 if(packet == MQTT_MSG_PUBLISH) 647 mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE); 648 else if(packet == MQTT_MSG_SUBACK) { 649 mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE); 650 goto MQTT_SUBACK_COMING; 651 } 652 else if(packet == MQTT_MSG_DISCONNECT) { 653 infof(data, "Got DISCONNECT"); 654 *done = TRUE; 655 goto end; 656 } 657 else { 658 result = CURLE_WEIRD_SERVER_REPLY; 659 goto end; 660 } 661 662 /* -- switched state -- */ 663 remlen = mq->remaining_length; 664 infof(data, "Remaining length: %zu bytes", remlen); 665 if(data->set.max_filesize && 666 (curl_off_t)remlen > data->set.max_filesize) { 667 failf(data, "Maximum file size exceeded"); 668 result = CURLE_FILESIZE_EXCEEDED; 669 goto end; 670 } 671 Curl_pgrsSetDownloadSize(data, remlen); 672 data->req.bytecount = 0; 673 data->req.size = remlen; 674 mq->npacket = remlen; /* get this many bytes */ 675 FALLTHROUGH(); 676 case MQTT_PUB_REMAIN: { 677 /* read rest of packet, but no more. Cap to buffer size */ 678 char buffer[4*1024]; 679 size_t rest = mq->npacket; 680 if(rest > sizeof(buffer)) 681 rest = sizeof(buffer); 682 result = Curl_read(data, sockfd, buffer, rest, &nread); 683 if(result) { 684 if(CURLE_AGAIN == result) { 685 infof(data, "EEEE AAAAGAIN"); 686 } 687 goto end; 688 } 689 if(!nread) { 690 infof(data, "server disconnected"); 691 result = CURLE_PARTIAL_FILE; 692 goto end; 693 } 694 695 /* if QoS is set, message contains packet id */ 696 result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread); 697 if(result) 698 goto end; 699 700 mq->npacket -= nread; 701 if(!mq->npacket) 702 /* no more PUBLISH payload, back to subscribe wait state */ 703 mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); 704 break; 705 } 706 default: 707 DEBUGASSERT(NULL); /* illegal state */ 708 result = CURLE_WEIRD_SERVER_REPLY; 709 goto end; 710 } 711end: 712 return result; 713} 714 715static CURLcode mqtt_do(struct Curl_easy *data, bool *done) 716{ 717 CURLcode result = CURLE_OK; 718 *done = FALSE; /* unconditionally */ 719 720 result = mqtt_connect(data); 721 if(result) { 722 failf(data, "Error %d sending MQTT CONNECT request", result); 723 return result; 724 } 725 mqstate(data, MQTT_FIRST, MQTT_CONNACK); 726 return CURLE_OK; 727} 728 729static CURLcode mqtt_done(struct Curl_easy *data, 730 CURLcode status, bool premature) 731{ 732 struct MQTT *mq = data->req.p.mqtt; 733 (void)status; 734 (void)premature; 735 Curl_safefree(mq->sendleftovers); 736 Curl_dyn_free(&mq->recvbuf); 737 return CURLE_OK; 738} 739 740static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) 741{ 742 CURLcode result = CURLE_OK; 743 struct connectdata *conn = data->conn; 744 struct mqtt_conn *mqtt = &conn->proto.mqtt; 745 struct MQTT *mq = data->req.p.mqtt; 746 ssize_t nread; 747 curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; 748 unsigned char byte; 749 750 *done = FALSE; 751 752 if(mq->nsend) { 753 /* send the remainder of an outgoing packet */ 754 char *ptr = mq->sendleftovers; 755 result = mqtt_send(data, mq->sendleftovers, mq->nsend); 756 free(ptr); 757 if(result) 758 return result; 759 } 760 761 infof(data, "mqtt_doing: state [%d]", (int) mqtt->state); 762 switch(mqtt->state) { 763 case MQTT_FIRST: 764 /* Read the initial byte only */ 765 result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread); 766 if(result) 767 break; 768 else if(!nread) { 769 failf(data, "Connection disconnected"); 770 *done = TRUE; 771 result = CURLE_RECV_ERROR; 772 break; 773 } 774 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1); 775 /* remember the first byte */ 776 mq->npacket = 0; 777 mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); 778 FALLTHROUGH(); 779 case MQTT_REMAINING_LENGTH: 780 do { 781 result = Curl_read(data, sockfd, (char *)&byte, 1, &nread); 782 if(!nread) 783 break; 784 Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1); 785 mq->pkt_hd[mq->npacket++] = byte; 786 } while((byte & 0x80) && (mq->npacket < 4)); 787 if(nread && (byte & 0x80)) 788 /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 + 789 127 * 128^3 bytes. server tried to send more */ 790 result = CURLE_WEIRD_SERVER_REPLY; 791 if(result) 792 break; 793 mq->remaining_length = mqtt_decode_len(mq->pkt_hd, mq->npacket, NULL); 794 mq->npacket = 0; 795 if(mq->remaining_length) { 796 mqstate(data, mqtt->nextstate, MQTT_NOSTATE); 797 break; 798 } 799 mqstate(data, MQTT_FIRST, MQTT_FIRST); 800 801 if(mq->firstbyte == MQTT_MSG_DISCONNECT) { 802 infof(data, "Got DISCONNECT"); 803 *done = TRUE; 804 } 805 break; 806 case MQTT_CONNACK: 807 result = mqtt_verify_connack(data); 808 if(result) 809 break; 810 811 if(data->state.httpreq == HTTPREQ_POST) { 812 result = mqtt_publish(data); 813 if(!result) { 814 result = mqtt_disconnect(data); 815 *done = TRUE; 816 } 817 mqtt->nextstate = MQTT_FIRST; 818 } 819 else { 820 result = mqtt_subscribe(data); 821 if(!result) { 822 mqstate(data, MQTT_FIRST, MQTT_SUBACK); 823 } 824 } 825 break; 826 827 case MQTT_SUBACK: 828 case MQTT_PUBWAIT: 829 case MQTT_PUB_REMAIN: 830 result = mqtt_read_publish(data, done); 831 break; 832 833 default: 834 failf(data, "State not handled yet"); 835 *done = TRUE; 836 break; 837 } 838 839 if(result == CURLE_AGAIN) 840 result = CURLE_OK; 841 return result; 842} 843 844#endif /* CURL_DISABLE_MQTT */ 845