113498266Sopenharmony_ci/*************************************************************************** 213498266Sopenharmony_ci * _ _ ____ _ 313498266Sopenharmony_ci * Project ___| | | | _ \| | 413498266Sopenharmony_ci * / __| | | | |_) | | 513498266Sopenharmony_ci * | (__| |_| | _ <| |___ 613498266Sopenharmony_ci * \___|\___/|_| \_\_____| 713498266Sopenharmony_ci * 813498266Sopenharmony_ci * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 913498266Sopenharmony_ci * Copyright (C) Björn Stenberg, <bjorn@haxx.se> 1013498266Sopenharmony_ci * 1113498266Sopenharmony_ci * This software is licensed as described in the file COPYING, which 1213498266Sopenharmony_ci * you should have received as part of this distribution. The terms 1313498266Sopenharmony_ci * are also available at https://curl.se/docs/copyright.html. 1413498266Sopenharmony_ci * 1513498266Sopenharmony_ci * You may opt to use, copy, modify, merge, publish, distribute and/or sell 1613498266Sopenharmony_ci * copies of the Software, and permit persons to whom the Software is 1713498266Sopenharmony_ci * furnished to do so, under the terms of the COPYING file. 1813498266Sopenharmony_ci * 1913498266Sopenharmony_ci * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 2013498266Sopenharmony_ci * KIND, either express or implied. 2113498266Sopenharmony_ci * 2213498266Sopenharmony_ci * SPDX-License-Identifier: curl 2313498266Sopenharmony_ci * 2413498266Sopenharmony_ci ***************************************************************************/ 2513498266Sopenharmony_ci 2613498266Sopenharmony_ci#include "curl_setup.h" 2713498266Sopenharmony_ci 2813498266Sopenharmony_ci#ifndef CURL_DISABLE_MQTT 2913498266Sopenharmony_ci 3013498266Sopenharmony_ci#include "urldata.h" 3113498266Sopenharmony_ci#include <curl/curl.h> 3213498266Sopenharmony_ci#include "transfer.h" 3313498266Sopenharmony_ci#include "sendf.h" 3413498266Sopenharmony_ci#include "progress.h" 3513498266Sopenharmony_ci#include "mqtt.h" 3613498266Sopenharmony_ci#include "select.h" 3713498266Sopenharmony_ci#include "strdup.h" 3813498266Sopenharmony_ci#include "url.h" 3913498266Sopenharmony_ci#include "escape.h" 4013498266Sopenharmony_ci#include "warnless.h" 4113498266Sopenharmony_ci#include "curl_printf.h" 4213498266Sopenharmony_ci#include "curl_memory.h" 4313498266Sopenharmony_ci#include "multiif.h" 4413498266Sopenharmony_ci#include "rand.h" 4513498266Sopenharmony_ci 4613498266Sopenharmony_ci/* The last #include file should be: */ 4713498266Sopenharmony_ci#include "memdebug.h" 4813498266Sopenharmony_ci 4913498266Sopenharmony_ci#define MQTT_MSG_CONNECT 0x10 5013498266Sopenharmony_ci#define MQTT_MSG_CONNACK 0x20 5113498266Sopenharmony_ci#define MQTT_MSG_PUBLISH 0x30 5213498266Sopenharmony_ci#define MQTT_MSG_SUBSCRIBE 0x82 5313498266Sopenharmony_ci#define MQTT_MSG_SUBACK 0x90 5413498266Sopenharmony_ci#define MQTT_MSG_DISCONNECT 0xe0 5513498266Sopenharmony_ci 5613498266Sopenharmony_ci#define MQTT_CONNACK_LEN 2 5713498266Sopenharmony_ci#define MQTT_SUBACK_LEN 3 5813498266Sopenharmony_ci#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ 5913498266Sopenharmony_ci 6013498266Sopenharmony_ci/* 6113498266Sopenharmony_ci * Forward declarations. 6213498266Sopenharmony_ci */ 6313498266Sopenharmony_ci 6413498266Sopenharmony_cistatic CURLcode mqtt_do(struct Curl_easy *data, bool *done); 6513498266Sopenharmony_cistatic CURLcode mqtt_done(struct Curl_easy *data, 6613498266Sopenharmony_ci CURLcode status, bool premature); 6713498266Sopenharmony_cistatic CURLcode mqtt_doing(struct Curl_easy *data, bool *done); 6813498266Sopenharmony_cistatic int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn, 6913498266Sopenharmony_ci curl_socket_t *sock); 7013498266Sopenharmony_cistatic CURLcode mqtt_setup_conn(struct Curl_easy *data, 7113498266Sopenharmony_ci struct connectdata *conn); 7213498266Sopenharmony_ci 7313498266Sopenharmony_ci/* 7413498266Sopenharmony_ci * MQTT protocol handler. 7513498266Sopenharmony_ci */ 7613498266Sopenharmony_ci 7713498266Sopenharmony_ciconst struct Curl_handler Curl_handler_mqtt = { 7813498266Sopenharmony_ci "MQTT", /* scheme */ 7913498266Sopenharmony_ci mqtt_setup_conn, /* setup_connection */ 8013498266Sopenharmony_ci mqtt_do, /* do_it */ 8113498266Sopenharmony_ci mqtt_done, /* done */ 8213498266Sopenharmony_ci ZERO_NULL, /* do_more */ 8313498266Sopenharmony_ci ZERO_NULL, /* connect_it */ 8413498266Sopenharmony_ci ZERO_NULL, /* connecting */ 8513498266Sopenharmony_ci mqtt_doing, /* doing */ 8613498266Sopenharmony_ci ZERO_NULL, /* proto_getsock */ 8713498266Sopenharmony_ci mqtt_getsock, /* doing_getsock */ 8813498266Sopenharmony_ci ZERO_NULL, /* domore_getsock */ 8913498266Sopenharmony_ci ZERO_NULL, /* perform_getsock */ 9013498266Sopenharmony_ci ZERO_NULL, /* disconnect */ 9113498266Sopenharmony_ci ZERO_NULL, /* write_resp */ 9213498266Sopenharmony_ci ZERO_NULL, /* connection_check */ 9313498266Sopenharmony_ci ZERO_NULL, /* attach connection */ 9413498266Sopenharmony_ci PORT_MQTT, /* defport */ 9513498266Sopenharmony_ci CURLPROTO_MQTT, /* protocol */ 9613498266Sopenharmony_ci CURLPROTO_MQTT, /* family */ 9713498266Sopenharmony_ci PROTOPT_NONE /* flags */ 9813498266Sopenharmony_ci}; 9913498266Sopenharmony_ci 10013498266Sopenharmony_cistatic CURLcode mqtt_setup_conn(struct Curl_easy *data, 10113498266Sopenharmony_ci struct connectdata *conn) 10213498266Sopenharmony_ci{ 10313498266Sopenharmony_ci /* allocate the HTTP-specific struct for the Curl_easy, only to survive 10413498266Sopenharmony_ci during this request */ 10513498266Sopenharmony_ci struct MQTT *mq; 10613498266Sopenharmony_ci (void)conn; 10713498266Sopenharmony_ci DEBUGASSERT(data->req.p.mqtt == NULL); 10813498266Sopenharmony_ci 10913498266Sopenharmony_ci mq = calloc(1, sizeof(struct MQTT)); 11013498266Sopenharmony_ci if(!mq) 11113498266Sopenharmony_ci return CURLE_OUT_OF_MEMORY; 11213498266Sopenharmony_ci Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); 11313498266Sopenharmony_ci data->req.p.mqtt = mq; 11413498266Sopenharmony_ci return CURLE_OK; 11513498266Sopenharmony_ci} 11613498266Sopenharmony_ci 11713498266Sopenharmony_cistatic CURLcode mqtt_send(struct Curl_easy *data, 11813498266Sopenharmony_ci char *buf, size_t len) 11913498266Sopenharmony_ci{ 12013498266Sopenharmony_ci CURLcode result = CURLE_OK; 12113498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 12213498266Sopenharmony_ci ssize_t n; 12313498266Sopenharmony_ci result = Curl_nwrite(data, FIRSTSOCKET, buf, len, &n); 12413498266Sopenharmony_ci if(result) 12513498266Sopenharmony_ci return result; 12613498266Sopenharmony_ci Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); 12713498266Sopenharmony_ci if(len != (size_t)n) { 12813498266Sopenharmony_ci size_t nsend = len - n; 12913498266Sopenharmony_ci char *sendleftovers = Curl_memdup(&buf[n], nsend); 13013498266Sopenharmony_ci if(!sendleftovers) 13113498266Sopenharmony_ci return CURLE_OUT_OF_MEMORY; 13213498266Sopenharmony_ci mq->sendleftovers = sendleftovers; 13313498266Sopenharmony_ci mq->nsend = nsend; 13413498266Sopenharmony_ci } 13513498266Sopenharmony_ci else { 13613498266Sopenharmony_ci mq->sendleftovers = NULL; 13713498266Sopenharmony_ci mq->nsend = 0; 13813498266Sopenharmony_ci } 13913498266Sopenharmony_ci return result; 14013498266Sopenharmony_ci} 14113498266Sopenharmony_ci 14213498266Sopenharmony_ci/* Generic function called by the multi interface to figure out what socket(s) 14313498266Sopenharmony_ci to wait for and for what actions during the DOING and PROTOCONNECT 14413498266Sopenharmony_ci states */ 14513498266Sopenharmony_cistatic int mqtt_getsock(struct Curl_easy *data, 14613498266Sopenharmony_ci struct connectdata *conn, 14713498266Sopenharmony_ci curl_socket_t *sock) 14813498266Sopenharmony_ci{ 14913498266Sopenharmony_ci (void)data; 15013498266Sopenharmony_ci sock[0] = conn->sock[FIRSTSOCKET]; 15113498266Sopenharmony_ci return GETSOCK_READSOCK(FIRSTSOCKET); 15213498266Sopenharmony_ci} 15313498266Sopenharmony_ci 15413498266Sopenharmony_cistatic int mqtt_encode_len(char *buf, size_t len) 15513498266Sopenharmony_ci{ 15613498266Sopenharmony_ci unsigned char encoded; 15713498266Sopenharmony_ci int i; 15813498266Sopenharmony_ci 15913498266Sopenharmony_ci for(i = 0; (len > 0) && (i<4); i++) { 16013498266Sopenharmony_ci encoded = len % 0x80; 16113498266Sopenharmony_ci len /= 0x80; 16213498266Sopenharmony_ci if(len) 16313498266Sopenharmony_ci encoded |= 0x80; 16413498266Sopenharmony_ci buf[i] = encoded; 16513498266Sopenharmony_ci } 16613498266Sopenharmony_ci 16713498266Sopenharmony_ci return i; 16813498266Sopenharmony_ci} 16913498266Sopenharmony_ci 17013498266Sopenharmony_ci/* add the passwd to the CONNECT packet */ 17113498266Sopenharmony_cistatic int add_passwd(const char *passwd, const size_t plen, 17213498266Sopenharmony_ci char *pkt, const size_t start, int remain_pos) 17313498266Sopenharmony_ci{ 17413498266Sopenharmony_ci /* magic number that need to be set properly */ 17513498266Sopenharmony_ci const size_t conn_flags_pos = remain_pos + 8; 17613498266Sopenharmony_ci if(plen > 0xffff) 17713498266Sopenharmony_ci return 1; 17813498266Sopenharmony_ci 17913498266Sopenharmony_ci /* set password flag */ 18013498266Sopenharmony_ci pkt[conn_flags_pos] |= 0x40; 18113498266Sopenharmony_ci 18213498266Sopenharmony_ci /* length of password provided */ 18313498266Sopenharmony_ci pkt[start] = (char)((plen >> 8) & 0xFF); 18413498266Sopenharmony_ci pkt[start + 1] = (char)(plen & 0xFF); 18513498266Sopenharmony_ci memcpy(&pkt[start + 2], passwd, plen); 18613498266Sopenharmony_ci return 0; 18713498266Sopenharmony_ci} 18813498266Sopenharmony_ci 18913498266Sopenharmony_ci/* add user to the CONNECT packet */ 19013498266Sopenharmony_cistatic int add_user(const char *username, const size_t ulen, 19113498266Sopenharmony_ci unsigned char *pkt, const size_t start, int remain_pos) 19213498266Sopenharmony_ci{ 19313498266Sopenharmony_ci /* magic number that need to be set properly */ 19413498266Sopenharmony_ci const size_t conn_flags_pos = remain_pos + 8; 19513498266Sopenharmony_ci if(ulen > 0xffff) 19613498266Sopenharmony_ci return 1; 19713498266Sopenharmony_ci 19813498266Sopenharmony_ci /* set username flag */ 19913498266Sopenharmony_ci pkt[conn_flags_pos] |= 0x80; 20013498266Sopenharmony_ci /* length of username provided */ 20113498266Sopenharmony_ci pkt[start] = (unsigned char)((ulen >> 8) & 0xFF); 20213498266Sopenharmony_ci pkt[start + 1] = (unsigned char)(ulen & 0xFF); 20313498266Sopenharmony_ci memcpy(&pkt[start + 2], username, ulen); 20413498266Sopenharmony_ci return 0; 20513498266Sopenharmony_ci} 20613498266Sopenharmony_ci 20713498266Sopenharmony_ci/* add client ID to the CONNECT packet */ 20813498266Sopenharmony_cistatic int add_client_id(const char *client_id, const size_t client_id_len, 20913498266Sopenharmony_ci char *pkt, const size_t start) 21013498266Sopenharmony_ci{ 21113498266Sopenharmony_ci if(client_id_len != MQTT_CLIENTID_LEN) 21213498266Sopenharmony_ci return 1; 21313498266Sopenharmony_ci pkt[start] = 0x00; 21413498266Sopenharmony_ci pkt[start + 1] = MQTT_CLIENTID_LEN; 21513498266Sopenharmony_ci memcpy(&pkt[start + 2], client_id, MQTT_CLIENTID_LEN); 21613498266Sopenharmony_ci return 0; 21713498266Sopenharmony_ci} 21813498266Sopenharmony_ci 21913498266Sopenharmony_ci/* Set initial values of CONNECT packet */ 22013498266Sopenharmony_cistatic int init_connpack(char *packet, char *remain, int remain_pos) 22113498266Sopenharmony_ci{ 22213498266Sopenharmony_ci /* Fixed header starts */ 22313498266Sopenharmony_ci /* packet type */ 22413498266Sopenharmony_ci packet[0] = MQTT_MSG_CONNECT; 22513498266Sopenharmony_ci /* remaining length field */ 22613498266Sopenharmony_ci memcpy(&packet[1], remain, remain_pos); 22713498266Sopenharmony_ci /* Fixed header ends */ 22813498266Sopenharmony_ci 22913498266Sopenharmony_ci /* Variable header starts */ 23013498266Sopenharmony_ci /* protocol length */ 23113498266Sopenharmony_ci packet[remain_pos + 1] = 0x00; 23213498266Sopenharmony_ci packet[remain_pos + 2] = 0x04; 23313498266Sopenharmony_ci /* protocol name */ 23413498266Sopenharmony_ci packet[remain_pos + 3] = 'M'; 23513498266Sopenharmony_ci packet[remain_pos + 4] = 'Q'; 23613498266Sopenharmony_ci packet[remain_pos + 5] = 'T'; 23713498266Sopenharmony_ci packet[remain_pos + 6] = 'T'; 23813498266Sopenharmony_ci /* protocol level */ 23913498266Sopenharmony_ci packet[remain_pos + 7] = 0x04; 24013498266Sopenharmony_ci /* CONNECT flag: CleanSession */ 24113498266Sopenharmony_ci packet[remain_pos + 8] = 0x02; 24213498266Sopenharmony_ci /* keep-alive 0 = disabled */ 24313498266Sopenharmony_ci packet[remain_pos + 9] = 0x00; 24413498266Sopenharmony_ci packet[remain_pos + 10] = 0x3c; 24513498266Sopenharmony_ci /* end of variable header */ 24613498266Sopenharmony_ci return remain_pos + 10; 24713498266Sopenharmony_ci} 24813498266Sopenharmony_ci 24913498266Sopenharmony_cistatic CURLcode mqtt_connect(struct Curl_easy *data) 25013498266Sopenharmony_ci{ 25113498266Sopenharmony_ci CURLcode result = CURLE_OK; 25213498266Sopenharmony_ci int pos = 0; 25313498266Sopenharmony_ci int rc = 0; 25413498266Sopenharmony_ci /* remain length */ 25513498266Sopenharmony_ci int remain_pos = 0; 25613498266Sopenharmony_ci char remain[4] = {0}; 25713498266Sopenharmony_ci size_t packetlen = 0; 25813498266Sopenharmony_ci size_t payloadlen = 0; 25913498266Sopenharmony_ci size_t start_user = 0; 26013498266Sopenharmony_ci size_t start_pwd = 0; 26113498266Sopenharmony_ci char client_id[MQTT_CLIENTID_LEN + 1] = "curl"; 26213498266Sopenharmony_ci const size_t clen = strlen("curl"); 26313498266Sopenharmony_ci char *packet = NULL; 26413498266Sopenharmony_ci 26513498266Sopenharmony_ci /* extracting username from request */ 26613498266Sopenharmony_ci const char *username = data->state.aptr.user ? 26713498266Sopenharmony_ci data->state.aptr.user : ""; 26813498266Sopenharmony_ci const size_t ulen = strlen(username); 26913498266Sopenharmony_ci /* extracting password from request */ 27013498266Sopenharmony_ci const char *passwd = data->state.aptr.passwd ? 27113498266Sopenharmony_ci data->state.aptr.passwd : ""; 27213498266Sopenharmony_ci const size_t plen = strlen(passwd); 27313498266Sopenharmony_ci 27413498266Sopenharmony_ci payloadlen = ulen + plen + MQTT_CLIENTID_LEN + 2; 27513498266Sopenharmony_ci /* The plus 2 are for the MSB and LSB describing the length of the string to 27613498266Sopenharmony_ci * be added on the payload. Refer to spec 1.5.2 and 1.5.4 */ 27713498266Sopenharmony_ci if(ulen) 27813498266Sopenharmony_ci payloadlen += 2; 27913498266Sopenharmony_ci if(plen) 28013498266Sopenharmony_ci payloadlen += 2; 28113498266Sopenharmony_ci 28213498266Sopenharmony_ci /* getting how much occupy the remain length */ 28313498266Sopenharmony_ci remain_pos = mqtt_encode_len(remain, payloadlen + 10); 28413498266Sopenharmony_ci 28513498266Sopenharmony_ci /* 10 length of variable header and 1 the first byte of the fixed header */ 28613498266Sopenharmony_ci packetlen = payloadlen + 10 + remain_pos + 1; 28713498266Sopenharmony_ci 28813498266Sopenharmony_ci /* allocating packet */ 28913498266Sopenharmony_ci if(packetlen > 268435455) 29013498266Sopenharmony_ci return CURLE_WEIRD_SERVER_REPLY; 29113498266Sopenharmony_ci packet = malloc(packetlen); 29213498266Sopenharmony_ci if(!packet) 29313498266Sopenharmony_ci return CURLE_OUT_OF_MEMORY; 29413498266Sopenharmony_ci memset(packet, 0, packetlen); 29513498266Sopenharmony_ci 29613498266Sopenharmony_ci /* set initial values for the CONNECT packet */ 29713498266Sopenharmony_ci pos = init_connpack(packet, remain, remain_pos); 29813498266Sopenharmony_ci 29913498266Sopenharmony_ci result = Curl_rand_alnum(data, (unsigned char *)&client_id[clen], 30013498266Sopenharmony_ci MQTT_CLIENTID_LEN - clen + 1); 30113498266Sopenharmony_ci /* add client id */ 30213498266Sopenharmony_ci rc = add_client_id(client_id, strlen(client_id), packet, pos + 1); 30313498266Sopenharmony_ci if(rc) { 30413498266Sopenharmony_ci failf(data, "Client ID length mismatched: [%zu]", strlen(client_id)); 30513498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 30613498266Sopenharmony_ci goto end; 30713498266Sopenharmony_ci } 30813498266Sopenharmony_ci infof(data, "Using client id '%s'", client_id); 30913498266Sopenharmony_ci 31013498266Sopenharmony_ci /* position where starts the user payload */ 31113498266Sopenharmony_ci start_user = pos + 3 + MQTT_CLIENTID_LEN; 31213498266Sopenharmony_ci /* position where starts the password payload */ 31313498266Sopenharmony_ci start_pwd = start_user + ulen; 31413498266Sopenharmony_ci /* if user name was provided, add it to the packet */ 31513498266Sopenharmony_ci if(ulen) { 31613498266Sopenharmony_ci start_pwd += 2; 31713498266Sopenharmony_ci 31813498266Sopenharmony_ci rc = add_user(username, ulen, 31913498266Sopenharmony_ci (unsigned char *)packet, start_user, remain_pos); 32013498266Sopenharmony_ci if(rc) { 32113498266Sopenharmony_ci failf(data, "Username is too large: [%zu]", ulen); 32213498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 32313498266Sopenharmony_ci goto end; 32413498266Sopenharmony_ci } 32513498266Sopenharmony_ci } 32613498266Sopenharmony_ci 32713498266Sopenharmony_ci /* if passwd was provided, add it to the packet */ 32813498266Sopenharmony_ci if(plen) { 32913498266Sopenharmony_ci rc = add_passwd(passwd, plen, packet, start_pwd, remain_pos); 33013498266Sopenharmony_ci if(rc) { 33113498266Sopenharmony_ci failf(data, "Password is too large: [%zu]", plen); 33213498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 33313498266Sopenharmony_ci goto end; 33413498266Sopenharmony_ci } 33513498266Sopenharmony_ci } 33613498266Sopenharmony_ci 33713498266Sopenharmony_ci if(!result) 33813498266Sopenharmony_ci result = mqtt_send(data, packet, packetlen); 33913498266Sopenharmony_ci 34013498266Sopenharmony_ciend: 34113498266Sopenharmony_ci if(packet) 34213498266Sopenharmony_ci free(packet); 34313498266Sopenharmony_ci Curl_safefree(data->state.aptr.user); 34413498266Sopenharmony_ci Curl_safefree(data->state.aptr.passwd); 34513498266Sopenharmony_ci return result; 34613498266Sopenharmony_ci} 34713498266Sopenharmony_ci 34813498266Sopenharmony_cistatic CURLcode mqtt_disconnect(struct Curl_easy *data) 34913498266Sopenharmony_ci{ 35013498266Sopenharmony_ci CURLcode result = CURLE_OK; 35113498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 35213498266Sopenharmony_ci result = mqtt_send(data, (char *)"\xe0\x00", 2); 35313498266Sopenharmony_ci Curl_safefree(mq->sendleftovers); 35413498266Sopenharmony_ci Curl_dyn_free(&mq->recvbuf); 35513498266Sopenharmony_ci return result; 35613498266Sopenharmony_ci} 35713498266Sopenharmony_ci 35813498266Sopenharmony_cistatic CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) 35913498266Sopenharmony_ci{ 36013498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 36113498266Sopenharmony_ci size_t rlen = Curl_dyn_len(&mq->recvbuf); 36213498266Sopenharmony_ci CURLcode result; 36313498266Sopenharmony_ci 36413498266Sopenharmony_ci if(rlen < nbytes) { 36513498266Sopenharmony_ci unsigned char readbuf[1024]; 36613498266Sopenharmony_ci ssize_t nread; 36713498266Sopenharmony_ci 36813498266Sopenharmony_ci DEBUGASSERT(nbytes - rlen < sizeof(readbuf)); 36913498266Sopenharmony_ci result = Curl_read(data, data->conn->sock[FIRSTSOCKET], 37013498266Sopenharmony_ci (char *)readbuf, nbytes - rlen, &nread); 37113498266Sopenharmony_ci if(result) 37213498266Sopenharmony_ci return result; 37313498266Sopenharmony_ci DEBUGASSERT(nread >= 0); 37413498266Sopenharmony_ci if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread)) 37513498266Sopenharmony_ci return CURLE_OUT_OF_MEMORY; 37613498266Sopenharmony_ci rlen = Curl_dyn_len(&mq->recvbuf); 37713498266Sopenharmony_ci } 37813498266Sopenharmony_ci return (rlen >= nbytes)? CURLE_OK : CURLE_AGAIN; 37913498266Sopenharmony_ci} 38013498266Sopenharmony_ci 38113498266Sopenharmony_cistatic void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes) 38213498266Sopenharmony_ci{ 38313498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 38413498266Sopenharmony_ci size_t rlen = Curl_dyn_len(&mq->recvbuf); 38513498266Sopenharmony_ci if(rlen <= nbytes) 38613498266Sopenharmony_ci Curl_dyn_reset(&mq->recvbuf); 38713498266Sopenharmony_ci else 38813498266Sopenharmony_ci Curl_dyn_tail(&mq->recvbuf, rlen - nbytes); 38913498266Sopenharmony_ci} 39013498266Sopenharmony_ci 39113498266Sopenharmony_cistatic CURLcode mqtt_verify_connack(struct Curl_easy *data) 39213498266Sopenharmony_ci{ 39313498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 39413498266Sopenharmony_ci CURLcode result; 39513498266Sopenharmony_ci char *ptr; 39613498266Sopenharmony_ci 39713498266Sopenharmony_ci result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN); 39813498266Sopenharmony_ci if(result) 39913498266Sopenharmony_ci goto fail; 40013498266Sopenharmony_ci 40113498266Sopenharmony_ci /* verify CONNACK */ 40213498266Sopenharmony_ci DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN); 40313498266Sopenharmony_ci ptr = Curl_dyn_ptr(&mq->recvbuf); 40413498266Sopenharmony_ci Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN); 40513498266Sopenharmony_ci 40613498266Sopenharmony_ci if(ptr[0] != 0x00 || ptr[1] != 0x00) { 40713498266Sopenharmony_ci failf(data, "Expected %02x%02x but got %02x%02x", 40813498266Sopenharmony_ci 0x00, 0x00, ptr[0], ptr[1]); 40913498266Sopenharmony_ci Curl_dyn_reset(&mq->recvbuf); 41013498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 41113498266Sopenharmony_ci goto fail; 41213498266Sopenharmony_ci } 41313498266Sopenharmony_ci mqtt_recv_consume(data, MQTT_CONNACK_LEN); 41413498266Sopenharmony_cifail: 41513498266Sopenharmony_ci return result; 41613498266Sopenharmony_ci} 41713498266Sopenharmony_ci 41813498266Sopenharmony_cistatic CURLcode mqtt_get_topic(struct Curl_easy *data, 41913498266Sopenharmony_ci char **topic, size_t *topiclen) 42013498266Sopenharmony_ci{ 42113498266Sopenharmony_ci char *path = data->state.up.path; 42213498266Sopenharmony_ci CURLcode result = CURLE_URL_MALFORMAT; 42313498266Sopenharmony_ci if(strlen(path) > 1) { 42413498266Sopenharmony_ci result = Curl_urldecode(path + 1, 0, topic, topiclen, REJECT_NADA); 42513498266Sopenharmony_ci if(!result && (*topiclen > 0xffff)) { 42613498266Sopenharmony_ci failf(data, "Too long MQTT topic"); 42713498266Sopenharmony_ci result = CURLE_URL_MALFORMAT; 42813498266Sopenharmony_ci } 42913498266Sopenharmony_ci } 43013498266Sopenharmony_ci else 43113498266Sopenharmony_ci failf(data, "No MQTT topic found. Forgot to URL encode it?"); 43213498266Sopenharmony_ci 43313498266Sopenharmony_ci return result; 43413498266Sopenharmony_ci} 43513498266Sopenharmony_ci 43613498266Sopenharmony_cistatic CURLcode mqtt_subscribe(struct Curl_easy *data) 43713498266Sopenharmony_ci{ 43813498266Sopenharmony_ci CURLcode result = CURLE_OK; 43913498266Sopenharmony_ci char *topic = NULL; 44013498266Sopenharmony_ci size_t topiclen; 44113498266Sopenharmony_ci unsigned char *packet = NULL; 44213498266Sopenharmony_ci size_t packetlen; 44313498266Sopenharmony_ci char encodedsize[4]; 44413498266Sopenharmony_ci size_t n; 44513498266Sopenharmony_ci struct connectdata *conn = data->conn; 44613498266Sopenharmony_ci 44713498266Sopenharmony_ci result = mqtt_get_topic(data, &topic, &topiclen); 44813498266Sopenharmony_ci if(result) 44913498266Sopenharmony_ci goto fail; 45013498266Sopenharmony_ci 45113498266Sopenharmony_ci conn->proto.mqtt.packetid++; 45213498266Sopenharmony_ci 45313498266Sopenharmony_ci packetlen = topiclen + 5; /* packetid + topic (has a two byte length field) 45413498266Sopenharmony_ci + 2 bytes topic length + QoS byte */ 45513498266Sopenharmony_ci n = mqtt_encode_len((char *)encodedsize, packetlen); 45613498266Sopenharmony_ci packetlen += n + 1; /* add one for the control packet type byte */ 45713498266Sopenharmony_ci 45813498266Sopenharmony_ci packet = malloc(packetlen); 45913498266Sopenharmony_ci if(!packet) { 46013498266Sopenharmony_ci result = CURLE_OUT_OF_MEMORY; 46113498266Sopenharmony_ci goto fail; 46213498266Sopenharmony_ci } 46313498266Sopenharmony_ci 46413498266Sopenharmony_ci packet[0] = MQTT_MSG_SUBSCRIBE; 46513498266Sopenharmony_ci memcpy(&packet[1], encodedsize, n); 46613498266Sopenharmony_ci packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff; 46713498266Sopenharmony_ci packet[2 + n] = conn->proto.mqtt.packetid & 0xff; 46813498266Sopenharmony_ci packet[3 + n] = (topiclen >> 8) & 0xff; 46913498266Sopenharmony_ci packet[4 + n ] = topiclen & 0xff; 47013498266Sopenharmony_ci memcpy(&packet[5 + n], topic, topiclen); 47113498266Sopenharmony_ci packet[5 + n + topiclen] = 0; /* QoS zero */ 47213498266Sopenharmony_ci 47313498266Sopenharmony_ci result = mqtt_send(data, (char *)packet, packetlen); 47413498266Sopenharmony_ci 47513498266Sopenharmony_cifail: 47613498266Sopenharmony_ci free(topic); 47713498266Sopenharmony_ci free(packet); 47813498266Sopenharmony_ci return result; 47913498266Sopenharmony_ci} 48013498266Sopenharmony_ci 48113498266Sopenharmony_ci/* 48213498266Sopenharmony_ci * Called when the first byte was already read. 48313498266Sopenharmony_ci */ 48413498266Sopenharmony_cistatic CURLcode mqtt_verify_suback(struct Curl_easy *data) 48513498266Sopenharmony_ci{ 48613498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 48713498266Sopenharmony_ci struct connectdata *conn = data->conn; 48813498266Sopenharmony_ci struct mqtt_conn *mqtt = &conn->proto.mqtt; 48913498266Sopenharmony_ci CURLcode result; 49013498266Sopenharmony_ci char *ptr; 49113498266Sopenharmony_ci 49213498266Sopenharmony_ci result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN); 49313498266Sopenharmony_ci if(result) 49413498266Sopenharmony_ci goto fail; 49513498266Sopenharmony_ci 49613498266Sopenharmony_ci /* verify SUBACK */ 49713498266Sopenharmony_ci DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN); 49813498266Sopenharmony_ci ptr = Curl_dyn_ptr(&mq->recvbuf); 49913498266Sopenharmony_ci Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN); 50013498266Sopenharmony_ci 50113498266Sopenharmony_ci if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) || 50213498266Sopenharmony_ci ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) || 50313498266Sopenharmony_ci ptr[2] != 0x00) { 50413498266Sopenharmony_ci Curl_dyn_reset(&mq->recvbuf); 50513498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 50613498266Sopenharmony_ci goto fail; 50713498266Sopenharmony_ci } 50813498266Sopenharmony_ci mqtt_recv_consume(data, MQTT_SUBACK_LEN); 50913498266Sopenharmony_cifail: 51013498266Sopenharmony_ci return result; 51113498266Sopenharmony_ci} 51213498266Sopenharmony_ci 51313498266Sopenharmony_cistatic CURLcode mqtt_publish(struct Curl_easy *data) 51413498266Sopenharmony_ci{ 51513498266Sopenharmony_ci CURLcode result; 51613498266Sopenharmony_ci char *payload = data->set.postfields; 51713498266Sopenharmony_ci size_t payloadlen; 51813498266Sopenharmony_ci char *topic = NULL; 51913498266Sopenharmony_ci size_t topiclen; 52013498266Sopenharmony_ci unsigned char *pkt = NULL; 52113498266Sopenharmony_ci size_t i = 0; 52213498266Sopenharmony_ci size_t remaininglength; 52313498266Sopenharmony_ci size_t encodelen; 52413498266Sopenharmony_ci char encodedbytes[4]; 52513498266Sopenharmony_ci curl_off_t postfieldsize = data->set.postfieldsize; 52613498266Sopenharmony_ci 52713498266Sopenharmony_ci if(!payload) { 52813498266Sopenharmony_ci DEBUGF(infof(data, "mqtt_publish without payload, return bad arg")); 52913498266Sopenharmony_ci return CURLE_BAD_FUNCTION_ARGUMENT; 53013498266Sopenharmony_ci } 53113498266Sopenharmony_ci if(postfieldsize < 0) 53213498266Sopenharmony_ci payloadlen = strlen(payload); 53313498266Sopenharmony_ci else 53413498266Sopenharmony_ci payloadlen = (size_t)postfieldsize; 53513498266Sopenharmony_ci 53613498266Sopenharmony_ci result = mqtt_get_topic(data, &topic, &topiclen); 53713498266Sopenharmony_ci if(result) 53813498266Sopenharmony_ci goto fail; 53913498266Sopenharmony_ci 54013498266Sopenharmony_ci remaininglength = payloadlen + 2 + topiclen; 54113498266Sopenharmony_ci encodelen = mqtt_encode_len(encodedbytes, remaininglength); 54213498266Sopenharmony_ci 54313498266Sopenharmony_ci /* add the control byte and the encoded remaining length */ 54413498266Sopenharmony_ci pkt = malloc(remaininglength + 1 + encodelen); 54513498266Sopenharmony_ci if(!pkt) { 54613498266Sopenharmony_ci result = CURLE_OUT_OF_MEMORY; 54713498266Sopenharmony_ci goto fail; 54813498266Sopenharmony_ci } 54913498266Sopenharmony_ci 55013498266Sopenharmony_ci /* assemble packet */ 55113498266Sopenharmony_ci pkt[i++] = MQTT_MSG_PUBLISH; 55213498266Sopenharmony_ci memcpy(&pkt[i], encodedbytes, encodelen); 55313498266Sopenharmony_ci i += encodelen; 55413498266Sopenharmony_ci pkt[i++] = (topiclen >> 8) & 0xff; 55513498266Sopenharmony_ci pkt[i++] = (topiclen & 0xff); 55613498266Sopenharmony_ci memcpy(&pkt[i], topic, topiclen); 55713498266Sopenharmony_ci i += topiclen; 55813498266Sopenharmony_ci memcpy(&pkt[i], payload, payloadlen); 55913498266Sopenharmony_ci i += payloadlen; 56013498266Sopenharmony_ci result = mqtt_send(data, (char *)pkt, i); 56113498266Sopenharmony_ci 56213498266Sopenharmony_cifail: 56313498266Sopenharmony_ci free(pkt); 56413498266Sopenharmony_ci free(topic); 56513498266Sopenharmony_ci return result; 56613498266Sopenharmony_ci} 56713498266Sopenharmony_ci 56813498266Sopenharmony_cistatic size_t mqtt_decode_len(unsigned char *buf, 56913498266Sopenharmony_ci size_t buflen, size_t *lenbytes) 57013498266Sopenharmony_ci{ 57113498266Sopenharmony_ci size_t len = 0; 57213498266Sopenharmony_ci size_t mult = 1; 57313498266Sopenharmony_ci size_t i; 57413498266Sopenharmony_ci unsigned char encoded = 128; 57513498266Sopenharmony_ci 57613498266Sopenharmony_ci for(i = 0; (i < buflen) && (encoded & 128); i++) { 57713498266Sopenharmony_ci encoded = buf[i]; 57813498266Sopenharmony_ci len += (encoded & 127) * mult; 57913498266Sopenharmony_ci mult *= 128; 58013498266Sopenharmony_ci } 58113498266Sopenharmony_ci 58213498266Sopenharmony_ci if(lenbytes) 58313498266Sopenharmony_ci *lenbytes = i; 58413498266Sopenharmony_ci 58513498266Sopenharmony_ci return len; 58613498266Sopenharmony_ci} 58713498266Sopenharmony_ci 58813498266Sopenharmony_ci#ifdef CURLDEBUG 58913498266Sopenharmony_cistatic const char *statenames[]={ 59013498266Sopenharmony_ci "MQTT_FIRST", 59113498266Sopenharmony_ci "MQTT_REMAINING_LENGTH", 59213498266Sopenharmony_ci "MQTT_CONNACK", 59313498266Sopenharmony_ci "MQTT_SUBACK", 59413498266Sopenharmony_ci "MQTT_SUBACK_COMING", 59513498266Sopenharmony_ci "MQTT_PUBWAIT", 59613498266Sopenharmony_ci "MQTT_PUB_REMAIN", 59713498266Sopenharmony_ci 59813498266Sopenharmony_ci "NOT A STATE" 59913498266Sopenharmony_ci}; 60013498266Sopenharmony_ci#endif 60113498266Sopenharmony_ci 60213498266Sopenharmony_ci/* The only way to change state */ 60313498266Sopenharmony_cistatic void mqstate(struct Curl_easy *data, 60413498266Sopenharmony_ci enum mqttstate state, 60513498266Sopenharmony_ci enum mqttstate nextstate) /* used if state == FIRST */ 60613498266Sopenharmony_ci{ 60713498266Sopenharmony_ci struct connectdata *conn = data->conn; 60813498266Sopenharmony_ci struct mqtt_conn *mqtt = &conn->proto.mqtt; 60913498266Sopenharmony_ci#ifdef CURLDEBUG 61013498266Sopenharmony_ci infof(data, "%s (from %s) (next is %s)", 61113498266Sopenharmony_ci statenames[state], 61213498266Sopenharmony_ci statenames[mqtt->state], 61313498266Sopenharmony_ci (state == MQTT_FIRST)? statenames[nextstate] : ""); 61413498266Sopenharmony_ci#endif 61513498266Sopenharmony_ci mqtt->state = state; 61613498266Sopenharmony_ci if(state == MQTT_FIRST) 61713498266Sopenharmony_ci mqtt->nextstate = nextstate; 61813498266Sopenharmony_ci} 61913498266Sopenharmony_ci 62013498266Sopenharmony_ci 62113498266Sopenharmony_cistatic CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done) 62213498266Sopenharmony_ci{ 62313498266Sopenharmony_ci CURLcode result = CURLE_OK; 62413498266Sopenharmony_ci struct connectdata *conn = data->conn; 62513498266Sopenharmony_ci curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; 62613498266Sopenharmony_ci ssize_t nread; 62713498266Sopenharmony_ci size_t remlen; 62813498266Sopenharmony_ci struct mqtt_conn *mqtt = &conn->proto.mqtt; 62913498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 63013498266Sopenharmony_ci unsigned char packet; 63113498266Sopenharmony_ci 63213498266Sopenharmony_ci switch(mqtt->state) { 63313498266Sopenharmony_ciMQTT_SUBACK_COMING: 63413498266Sopenharmony_ci case MQTT_SUBACK_COMING: 63513498266Sopenharmony_ci result = mqtt_verify_suback(data); 63613498266Sopenharmony_ci if(result) 63713498266Sopenharmony_ci break; 63813498266Sopenharmony_ci 63913498266Sopenharmony_ci mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); 64013498266Sopenharmony_ci break; 64113498266Sopenharmony_ci 64213498266Sopenharmony_ci case MQTT_SUBACK: 64313498266Sopenharmony_ci case MQTT_PUBWAIT: 64413498266Sopenharmony_ci /* we are expecting PUBLISH or SUBACK */ 64513498266Sopenharmony_ci packet = mq->firstbyte & 0xf0; 64613498266Sopenharmony_ci if(packet == MQTT_MSG_PUBLISH) 64713498266Sopenharmony_ci mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE); 64813498266Sopenharmony_ci else if(packet == MQTT_MSG_SUBACK) { 64913498266Sopenharmony_ci mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE); 65013498266Sopenharmony_ci goto MQTT_SUBACK_COMING; 65113498266Sopenharmony_ci } 65213498266Sopenharmony_ci else if(packet == MQTT_MSG_DISCONNECT) { 65313498266Sopenharmony_ci infof(data, "Got DISCONNECT"); 65413498266Sopenharmony_ci *done = TRUE; 65513498266Sopenharmony_ci goto end; 65613498266Sopenharmony_ci } 65713498266Sopenharmony_ci else { 65813498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 65913498266Sopenharmony_ci goto end; 66013498266Sopenharmony_ci } 66113498266Sopenharmony_ci 66213498266Sopenharmony_ci /* -- switched state -- */ 66313498266Sopenharmony_ci remlen = mq->remaining_length; 66413498266Sopenharmony_ci infof(data, "Remaining length: %zu bytes", remlen); 66513498266Sopenharmony_ci if(data->set.max_filesize && 66613498266Sopenharmony_ci (curl_off_t)remlen > data->set.max_filesize) { 66713498266Sopenharmony_ci failf(data, "Maximum file size exceeded"); 66813498266Sopenharmony_ci result = CURLE_FILESIZE_EXCEEDED; 66913498266Sopenharmony_ci goto end; 67013498266Sopenharmony_ci } 67113498266Sopenharmony_ci Curl_pgrsSetDownloadSize(data, remlen); 67213498266Sopenharmony_ci data->req.bytecount = 0; 67313498266Sopenharmony_ci data->req.size = remlen; 67413498266Sopenharmony_ci mq->npacket = remlen; /* get this many bytes */ 67513498266Sopenharmony_ci FALLTHROUGH(); 67613498266Sopenharmony_ci case MQTT_PUB_REMAIN: { 67713498266Sopenharmony_ci /* read rest of packet, but no more. Cap to buffer size */ 67813498266Sopenharmony_ci char buffer[4*1024]; 67913498266Sopenharmony_ci size_t rest = mq->npacket; 68013498266Sopenharmony_ci if(rest > sizeof(buffer)) 68113498266Sopenharmony_ci rest = sizeof(buffer); 68213498266Sopenharmony_ci result = Curl_read(data, sockfd, buffer, rest, &nread); 68313498266Sopenharmony_ci if(result) { 68413498266Sopenharmony_ci if(CURLE_AGAIN == result) { 68513498266Sopenharmony_ci infof(data, "EEEE AAAAGAIN"); 68613498266Sopenharmony_ci } 68713498266Sopenharmony_ci goto end; 68813498266Sopenharmony_ci } 68913498266Sopenharmony_ci if(!nread) { 69013498266Sopenharmony_ci infof(data, "server disconnected"); 69113498266Sopenharmony_ci result = CURLE_PARTIAL_FILE; 69213498266Sopenharmony_ci goto end; 69313498266Sopenharmony_ci } 69413498266Sopenharmony_ci 69513498266Sopenharmony_ci /* if QoS is set, message contains packet id */ 69613498266Sopenharmony_ci result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread); 69713498266Sopenharmony_ci if(result) 69813498266Sopenharmony_ci goto end; 69913498266Sopenharmony_ci 70013498266Sopenharmony_ci mq->npacket -= nread; 70113498266Sopenharmony_ci if(!mq->npacket) 70213498266Sopenharmony_ci /* no more PUBLISH payload, back to subscribe wait state */ 70313498266Sopenharmony_ci mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); 70413498266Sopenharmony_ci break; 70513498266Sopenharmony_ci } 70613498266Sopenharmony_ci default: 70713498266Sopenharmony_ci DEBUGASSERT(NULL); /* illegal state */ 70813498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 70913498266Sopenharmony_ci goto end; 71013498266Sopenharmony_ci } 71113498266Sopenharmony_ciend: 71213498266Sopenharmony_ci return result; 71313498266Sopenharmony_ci} 71413498266Sopenharmony_ci 71513498266Sopenharmony_cistatic CURLcode mqtt_do(struct Curl_easy *data, bool *done) 71613498266Sopenharmony_ci{ 71713498266Sopenharmony_ci CURLcode result = CURLE_OK; 71813498266Sopenharmony_ci *done = FALSE; /* unconditionally */ 71913498266Sopenharmony_ci 72013498266Sopenharmony_ci result = mqtt_connect(data); 72113498266Sopenharmony_ci if(result) { 72213498266Sopenharmony_ci failf(data, "Error %d sending MQTT CONNECT request", result); 72313498266Sopenharmony_ci return result; 72413498266Sopenharmony_ci } 72513498266Sopenharmony_ci mqstate(data, MQTT_FIRST, MQTT_CONNACK); 72613498266Sopenharmony_ci return CURLE_OK; 72713498266Sopenharmony_ci} 72813498266Sopenharmony_ci 72913498266Sopenharmony_cistatic CURLcode mqtt_done(struct Curl_easy *data, 73013498266Sopenharmony_ci CURLcode status, bool premature) 73113498266Sopenharmony_ci{ 73213498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 73313498266Sopenharmony_ci (void)status; 73413498266Sopenharmony_ci (void)premature; 73513498266Sopenharmony_ci Curl_safefree(mq->sendleftovers); 73613498266Sopenharmony_ci Curl_dyn_free(&mq->recvbuf); 73713498266Sopenharmony_ci return CURLE_OK; 73813498266Sopenharmony_ci} 73913498266Sopenharmony_ci 74013498266Sopenharmony_cistatic CURLcode mqtt_doing(struct Curl_easy *data, bool *done) 74113498266Sopenharmony_ci{ 74213498266Sopenharmony_ci CURLcode result = CURLE_OK; 74313498266Sopenharmony_ci struct connectdata *conn = data->conn; 74413498266Sopenharmony_ci struct mqtt_conn *mqtt = &conn->proto.mqtt; 74513498266Sopenharmony_ci struct MQTT *mq = data->req.p.mqtt; 74613498266Sopenharmony_ci ssize_t nread; 74713498266Sopenharmony_ci curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; 74813498266Sopenharmony_ci unsigned char byte; 74913498266Sopenharmony_ci 75013498266Sopenharmony_ci *done = FALSE; 75113498266Sopenharmony_ci 75213498266Sopenharmony_ci if(mq->nsend) { 75313498266Sopenharmony_ci /* send the remainder of an outgoing packet */ 75413498266Sopenharmony_ci char *ptr = mq->sendleftovers; 75513498266Sopenharmony_ci result = mqtt_send(data, mq->sendleftovers, mq->nsend); 75613498266Sopenharmony_ci free(ptr); 75713498266Sopenharmony_ci if(result) 75813498266Sopenharmony_ci return result; 75913498266Sopenharmony_ci } 76013498266Sopenharmony_ci 76113498266Sopenharmony_ci infof(data, "mqtt_doing: state [%d]", (int) mqtt->state); 76213498266Sopenharmony_ci switch(mqtt->state) { 76313498266Sopenharmony_ci case MQTT_FIRST: 76413498266Sopenharmony_ci /* Read the initial byte only */ 76513498266Sopenharmony_ci result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread); 76613498266Sopenharmony_ci if(result) 76713498266Sopenharmony_ci break; 76813498266Sopenharmony_ci else if(!nread) { 76913498266Sopenharmony_ci failf(data, "Connection disconnected"); 77013498266Sopenharmony_ci *done = TRUE; 77113498266Sopenharmony_ci result = CURLE_RECV_ERROR; 77213498266Sopenharmony_ci break; 77313498266Sopenharmony_ci } 77413498266Sopenharmony_ci Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1); 77513498266Sopenharmony_ci /* remember the first byte */ 77613498266Sopenharmony_ci mq->npacket = 0; 77713498266Sopenharmony_ci mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); 77813498266Sopenharmony_ci FALLTHROUGH(); 77913498266Sopenharmony_ci case MQTT_REMAINING_LENGTH: 78013498266Sopenharmony_ci do { 78113498266Sopenharmony_ci result = Curl_read(data, sockfd, (char *)&byte, 1, &nread); 78213498266Sopenharmony_ci if(!nread) 78313498266Sopenharmony_ci break; 78413498266Sopenharmony_ci Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1); 78513498266Sopenharmony_ci mq->pkt_hd[mq->npacket++] = byte; 78613498266Sopenharmony_ci } while((byte & 0x80) && (mq->npacket < 4)); 78713498266Sopenharmony_ci if(nread && (byte & 0x80)) 78813498266Sopenharmony_ci /* MQTT supports up to 127 * 128^0 + 127 * 128^1 + 127 * 128^2 + 78913498266Sopenharmony_ci 127 * 128^3 bytes. server tried to send more */ 79013498266Sopenharmony_ci result = CURLE_WEIRD_SERVER_REPLY; 79113498266Sopenharmony_ci if(result) 79213498266Sopenharmony_ci break; 79313498266Sopenharmony_ci mq->remaining_length = mqtt_decode_len(mq->pkt_hd, mq->npacket, NULL); 79413498266Sopenharmony_ci mq->npacket = 0; 79513498266Sopenharmony_ci if(mq->remaining_length) { 79613498266Sopenharmony_ci mqstate(data, mqtt->nextstate, MQTT_NOSTATE); 79713498266Sopenharmony_ci break; 79813498266Sopenharmony_ci } 79913498266Sopenharmony_ci mqstate(data, MQTT_FIRST, MQTT_FIRST); 80013498266Sopenharmony_ci 80113498266Sopenharmony_ci if(mq->firstbyte == MQTT_MSG_DISCONNECT) { 80213498266Sopenharmony_ci infof(data, "Got DISCONNECT"); 80313498266Sopenharmony_ci *done = TRUE; 80413498266Sopenharmony_ci } 80513498266Sopenharmony_ci break; 80613498266Sopenharmony_ci case MQTT_CONNACK: 80713498266Sopenharmony_ci result = mqtt_verify_connack(data); 80813498266Sopenharmony_ci if(result) 80913498266Sopenharmony_ci break; 81013498266Sopenharmony_ci 81113498266Sopenharmony_ci if(data->state.httpreq == HTTPREQ_POST) { 81213498266Sopenharmony_ci result = mqtt_publish(data); 81313498266Sopenharmony_ci if(!result) { 81413498266Sopenharmony_ci result = mqtt_disconnect(data); 81513498266Sopenharmony_ci *done = TRUE; 81613498266Sopenharmony_ci } 81713498266Sopenharmony_ci mqtt->nextstate = MQTT_FIRST; 81813498266Sopenharmony_ci } 81913498266Sopenharmony_ci else { 82013498266Sopenharmony_ci result = mqtt_subscribe(data); 82113498266Sopenharmony_ci if(!result) { 82213498266Sopenharmony_ci mqstate(data, MQTT_FIRST, MQTT_SUBACK); 82313498266Sopenharmony_ci } 82413498266Sopenharmony_ci } 82513498266Sopenharmony_ci break; 82613498266Sopenharmony_ci 82713498266Sopenharmony_ci case MQTT_SUBACK: 82813498266Sopenharmony_ci case MQTT_PUBWAIT: 82913498266Sopenharmony_ci case MQTT_PUB_REMAIN: 83013498266Sopenharmony_ci result = mqtt_read_publish(data, done); 83113498266Sopenharmony_ci break; 83213498266Sopenharmony_ci 83313498266Sopenharmony_ci default: 83413498266Sopenharmony_ci failf(data, "State not handled yet"); 83513498266Sopenharmony_ci *done = TRUE; 83613498266Sopenharmony_ci break; 83713498266Sopenharmony_ci } 83813498266Sopenharmony_ci 83913498266Sopenharmony_ci if(result == CURLE_AGAIN) 84013498266Sopenharmony_ci result = CURLE_OK; 84113498266Sopenharmony_ci return result; 84213498266Sopenharmony_ci} 84313498266Sopenharmony_ci 84413498266Sopenharmony_ci#endif /* CURL_DISABLE_MQTT */ 845