113498266Sopenharmony_ci/*************************************************************************** 213498266Sopenharmony_ci * _ _ ____ _ 313498266Sopenharmony_ci * Project ___| | | | _ \| | 413498266Sopenharmony_ci * / __| | | | |_) | | 513498266Sopenharmony_ci * | (__| |_| | _ <| |___ 613498266Sopenharmony_ci * \___|\___/|_| \_\_____| 713498266Sopenharmony_ci * 813498266Sopenharmony_ci * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 913498266Sopenharmony_ci * 1013498266Sopenharmony_ci * This software is licensed as described in the file COPYING, which 1113498266Sopenharmony_ci * you should have received as part of this distribution. The terms 1213498266Sopenharmony_ci * are also available at https://curl.se/docs/copyright.html. 1313498266Sopenharmony_ci * 1413498266Sopenharmony_ci * You may opt to use, copy, modify, merge, publish, distribute and/or sell 1513498266Sopenharmony_ci * copies of the Software, and permit persons to whom the Software is 1613498266Sopenharmony_ci * furnished to do so, under the terms of the COPYING file. 1713498266Sopenharmony_ci * 1813498266Sopenharmony_ci * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 1913498266Sopenharmony_ci * KIND, either express or implied. 2013498266Sopenharmony_ci * 2113498266Sopenharmony_ci * SPDX-License-Identifier: curl 2213498266Sopenharmony_ci * 2313498266Sopenharmony_ci ***************************************************************************/ 2413498266Sopenharmony_ci#include "server_setup.h" 2513498266Sopenharmony_ci#include <stdlib.h> 2613498266Sopenharmony_ci#include <string.h> 2713498266Sopenharmony_ci#include "util.h" 2813498266Sopenharmony_ci 2913498266Sopenharmony_ci/* Function 3013498266Sopenharmony_ci * 3113498266Sopenharmony_ci * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT. 3213498266Sopenharmony_ci * 3313498266Sopenharmony_ci * Read commands from FILE (set with --config). The commands control how to 3413498266Sopenharmony_ci * act and is reset to defaults each client TCP connect. 3513498266Sopenharmony_ci * 3613498266Sopenharmony_ci * Config file keywords: 3713498266Sopenharmony_ci * 3813498266Sopenharmony_ci * TODO 3913498266Sopenharmony_ci */ 4013498266Sopenharmony_ci 4113498266Sopenharmony_ci/* based on sockfilt.c */ 4213498266Sopenharmony_ci 4313498266Sopenharmony_ci#include <signal.h> 4413498266Sopenharmony_ci#ifdef HAVE_NETINET_IN_H 4513498266Sopenharmony_ci#include <netinet/in.h> 4613498266Sopenharmony_ci#endif 4713498266Sopenharmony_ci#ifdef HAVE_NETINET_IN6_H 4813498266Sopenharmony_ci#include <netinet/in6.h> 4913498266Sopenharmony_ci#endif 5013498266Sopenharmony_ci#ifdef HAVE_ARPA_INET_H 5113498266Sopenharmony_ci#include <arpa/inet.h> 5213498266Sopenharmony_ci#endif 5313498266Sopenharmony_ci#ifdef HAVE_NETDB_H 5413498266Sopenharmony_ci#include <netdb.h> 5513498266Sopenharmony_ci#endif 5613498266Sopenharmony_ci 5713498266Sopenharmony_ci#define ENABLE_CURLX_PRINTF 5813498266Sopenharmony_ci/* make the curlx header define all printf() functions to use the curlx_* 5913498266Sopenharmony_ci versions instead */ 6013498266Sopenharmony_ci#include "curlx.h" /* from the private lib dir */ 6113498266Sopenharmony_ci#include "getpart.h" 6213498266Sopenharmony_ci#include "inet_pton.h" 6313498266Sopenharmony_ci#include "server_sockaddr.h" 6413498266Sopenharmony_ci#include "warnless.h" 6513498266Sopenharmony_ci 6613498266Sopenharmony_ci/* include memdebug.h last */ 6713498266Sopenharmony_ci#include "memdebug.h" 6813498266Sopenharmony_ci 6913498266Sopenharmony_ci#ifdef USE_WINSOCK 7013498266Sopenharmony_ci#undef EINTR 7113498266Sopenharmony_ci#define EINTR 4 /* errno.h value */ 7213498266Sopenharmony_ci#undef EAGAIN 7313498266Sopenharmony_ci#define EAGAIN 11 /* errno.h value */ 7413498266Sopenharmony_ci#undef ENOMEM 7513498266Sopenharmony_ci#define ENOMEM 12 /* errno.h value */ 7613498266Sopenharmony_ci#undef EINVAL 7713498266Sopenharmony_ci#define EINVAL 22 /* errno.h value */ 7813498266Sopenharmony_ci#endif 7913498266Sopenharmony_ci 8013498266Sopenharmony_ci#define DEFAULT_PORT 1883 /* MQTT default port */ 8113498266Sopenharmony_ci 8213498266Sopenharmony_ci#ifndef DEFAULT_LOGFILE 8313498266Sopenharmony_ci#define DEFAULT_LOGFILE "log/mqttd.log" 8413498266Sopenharmony_ci#endif 8513498266Sopenharmony_ci 8613498266Sopenharmony_ci#ifndef DEFAULT_CONFIG 8713498266Sopenharmony_ci#define DEFAULT_CONFIG "mqttd.config" 8813498266Sopenharmony_ci#endif 8913498266Sopenharmony_ci 9013498266Sopenharmony_ci#define MQTT_MSG_CONNECT 0x10 9113498266Sopenharmony_ci#define MQTT_MSG_CONNACK 0x20 9213498266Sopenharmony_ci#define MQTT_MSG_PUBLISH 0x30 9313498266Sopenharmony_ci#define MQTT_MSG_PUBACK 0x40 9413498266Sopenharmony_ci#define MQTT_MSG_SUBSCRIBE 0x82 9513498266Sopenharmony_ci#define MQTT_MSG_SUBACK 0x90 9613498266Sopenharmony_ci#define MQTT_MSG_DISCONNECT 0xe0 9713498266Sopenharmony_ci 9813498266Sopenharmony_ci#define MQTT_CONNACK_LEN 4 9913498266Sopenharmony_ci#define MQTT_SUBACK_LEN 5 10013498266Sopenharmony_ci#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ 10113498266Sopenharmony_ci 10213498266Sopenharmony_cistruct configurable { 10313498266Sopenharmony_ci unsigned char version; /* initial version byte in the request must match 10413498266Sopenharmony_ci this */ 10513498266Sopenharmony_ci bool publish_before_suback; 10613498266Sopenharmony_ci bool short_publish; 10713498266Sopenharmony_ci bool excessive_remaining; 10813498266Sopenharmony_ci unsigned char error_connack; 10913498266Sopenharmony_ci int testnum; 11013498266Sopenharmony_ci}; 11113498266Sopenharmony_ci 11213498266Sopenharmony_ci#define REQUEST_DUMP "server.input" 11313498266Sopenharmony_ci#define CONFIG_VERSION 5 11413498266Sopenharmony_ci 11513498266Sopenharmony_cistatic struct configurable config; 11613498266Sopenharmony_ci 11713498266Sopenharmony_ciconst char *serverlogfile = DEFAULT_LOGFILE; 11813498266Sopenharmony_cistatic const char *configfile = DEFAULT_CONFIG; 11913498266Sopenharmony_cistatic const char *logdir = "log"; 12013498266Sopenharmony_cistatic char loglockfile[256]; 12113498266Sopenharmony_ci 12213498266Sopenharmony_ci#ifdef ENABLE_IPV6 12313498266Sopenharmony_cistatic bool use_ipv6 = FALSE; 12413498266Sopenharmony_ci#endif 12513498266Sopenharmony_cistatic const char *ipv_inuse = "IPv4"; 12613498266Sopenharmony_cistatic unsigned short port = DEFAULT_PORT; 12713498266Sopenharmony_ci 12813498266Sopenharmony_cistatic void resetdefaults(void) 12913498266Sopenharmony_ci{ 13013498266Sopenharmony_ci logmsg("Reset to defaults"); 13113498266Sopenharmony_ci config.version = CONFIG_VERSION; 13213498266Sopenharmony_ci config.publish_before_suback = FALSE; 13313498266Sopenharmony_ci config.short_publish = FALSE; 13413498266Sopenharmony_ci config.excessive_remaining = FALSE; 13513498266Sopenharmony_ci config.error_connack = 0; 13613498266Sopenharmony_ci config.testnum = 0; 13713498266Sopenharmony_ci} 13813498266Sopenharmony_ci 13913498266Sopenharmony_cistatic unsigned char byteval(char *value) 14013498266Sopenharmony_ci{ 14113498266Sopenharmony_ci unsigned long num = strtoul(value, NULL, 10); 14213498266Sopenharmony_ci return num & 0xff; 14313498266Sopenharmony_ci} 14413498266Sopenharmony_ci 14513498266Sopenharmony_cistatic void getconfig(void) 14613498266Sopenharmony_ci{ 14713498266Sopenharmony_ci FILE *fp = fopen(configfile, FOPEN_READTEXT); 14813498266Sopenharmony_ci resetdefaults(); 14913498266Sopenharmony_ci if(fp) { 15013498266Sopenharmony_ci char buffer[512]; 15113498266Sopenharmony_ci logmsg("parse config file"); 15213498266Sopenharmony_ci while(fgets(buffer, sizeof(buffer), fp)) { 15313498266Sopenharmony_ci char key[32]; 15413498266Sopenharmony_ci char value[32]; 15513498266Sopenharmony_ci if(2 == sscanf(buffer, "%31s %31s", key, value)) { 15613498266Sopenharmony_ci if(!strcmp(key, "version")) { 15713498266Sopenharmony_ci config.version = byteval(value); 15813498266Sopenharmony_ci logmsg("version [%d] set", config.version); 15913498266Sopenharmony_ci } 16013498266Sopenharmony_ci else if(!strcmp(key, "PUBLISH-before-SUBACK")) { 16113498266Sopenharmony_ci logmsg("PUBLISH-before-SUBACK set"); 16213498266Sopenharmony_ci config.publish_before_suback = TRUE; 16313498266Sopenharmony_ci } 16413498266Sopenharmony_ci else if(!strcmp(key, "short-PUBLISH")) { 16513498266Sopenharmony_ci logmsg("short-PUBLISH set"); 16613498266Sopenharmony_ci config.short_publish = TRUE; 16713498266Sopenharmony_ci } 16813498266Sopenharmony_ci else if(!strcmp(key, "error-CONNACK")) { 16913498266Sopenharmony_ci config.error_connack = byteval(value); 17013498266Sopenharmony_ci logmsg("error-CONNACK = %d", config.error_connack); 17113498266Sopenharmony_ci } 17213498266Sopenharmony_ci else if(!strcmp(key, "Testnum")) { 17313498266Sopenharmony_ci config.testnum = atoi(value); 17413498266Sopenharmony_ci logmsg("testnum = %d", config.testnum); 17513498266Sopenharmony_ci } 17613498266Sopenharmony_ci else if(!strcmp(key, "excessive-remaining")) { 17713498266Sopenharmony_ci logmsg("excessive-remaining set"); 17813498266Sopenharmony_ci config.excessive_remaining = TRUE; 17913498266Sopenharmony_ci } 18013498266Sopenharmony_ci } 18113498266Sopenharmony_ci } 18213498266Sopenharmony_ci fclose(fp); 18313498266Sopenharmony_ci } 18413498266Sopenharmony_ci else { 18513498266Sopenharmony_ci logmsg("No config file '%s' to read", configfile); 18613498266Sopenharmony_ci } 18713498266Sopenharmony_ci} 18813498266Sopenharmony_ci 18913498266Sopenharmony_cistatic void loghex(unsigned char *buffer, ssize_t len) 19013498266Sopenharmony_ci{ 19113498266Sopenharmony_ci char data[12000]; 19213498266Sopenharmony_ci ssize_t i; 19313498266Sopenharmony_ci unsigned char *ptr = buffer; 19413498266Sopenharmony_ci char *optr = data; 19513498266Sopenharmony_ci ssize_t width = 0; 19613498266Sopenharmony_ci int left = sizeof(data); 19713498266Sopenharmony_ci 19813498266Sopenharmony_ci for(i = 0; i<len && (left >= 0); i++) { 19913498266Sopenharmony_ci msnprintf(optr, left, "%02x", ptr[i]); 20013498266Sopenharmony_ci width += 2; 20113498266Sopenharmony_ci optr += 2; 20213498266Sopenharmony_ci left -= 2; 20313498266Sopenharmony_ci } 20413498266Sopenharmony_ci if(width) 20513498266Sopenharmony_ci logmsg("'%s'", data); 20613498266Sopenharmony_ci} 20713498266Sopenharmony_ci 20813498266Sopenharmony_citypedef enum { 20913498266Sopenharmony_ci FROM_CLIENT, 21013498266Sopenharmony_ci FROM_SERVER 21113498266Sopenharmony_ci} mqttdir; 21213498266Sopenharmony_ci 21313498266Sopenharmony_cistatic void logprotocol(mqttdir dir, 21413498266Sopenharmony_ci const char *prefix, size_t remlen, 21513498266Sopenharmony_ci FILE *output, 21613498266Sopenharmony_ci unsigned char *buffer, ssize_t len) 21713498266Sopenharmony_ci{ 21813498266Sopenharmony_ci char data[12000] = ""; 21913498266Sopenharmony_ci ssize_t i; 22013498266Sopenharmony_ci unsigned char *ptr = buffer; 22113498266Sopenharmony_ci char *optr = data; 22213498266Sopenharmony_ci int left = sizeof(data); 22313498266Sopenharmony_ci 22413498266Sopenharmony_ci for(i = 0; i<len && (left >= 0); i++) { 22513498266Sopenharmony_ci msnprintf(optr, left, "%02x", ptr[i]); 22613498266Sopenharmony_ci optr += 2; 22713498266Sopenharmony_ci left -= 2; 22813498266Sopenharmony_ci } 22913498266Sopenharmony_ci fprintf(output, "%s %s %zx %s\n", 23013498266Sopenharmony_ci dir == FROM_CLIENT? "client": "server", 23113498266Sopenharmony_ci prefix, remlen, 23213498266Sopenharmony_ci data); 23313498266Sopenharmony_ci} 23413498266Sopenharmony_ci 23513498266Sopenharmony_ci 23613498266Sopenharmony_ci/* return 0 on success */ 23713498266Sopenharmony_cistatic int connack(FILE *dump, curl_socket_t fd) 23813498266Sopenharmony_ci{ 23913498266Sopenharmony_ci unsigned char packet[]={ 24013498266Sopenharmony_ci MQTT_MSG_CONNACK, 0x02, 24113498266Sopenharmony_ci 0x00, 0x00 24213498266Sopenharmony_ci }; 24313498266Sopenharmony_ci ssize_t rc; 24413498266Sopenharmony_ci 24513498266Sopenharmony_ci packet[3] = config.error_connack; 24613498266Sopenharmony_ci 24713498266Sopenharmony_ci rc = swrite(fd, (char *)packet, sizeof(packet)); 24813498266Sopenharmony_ci if(rc > 0) { 24913498266Sopenharmony_ci logmsg("WROTE %zd bytes [CONNACK]", rc); 25013498266Sopenharmony_ci loghex(packet, rc); 25113498266Sopenharmony_ci logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet)); 25213498266Sopenharmony_ci } 25313498266Sopenharmony_ci if(rc == sizeof(packet)) { 25413498266Sopenharmony_ci return 0; 25513498266Sopenharmony_ci } 25613498266Sopenharmony_ci return 1; 25713498266Sopenharmony_ci} 25813498266Sopenharmony_ci 25913498266Sopenharmony_ci/* return 0 on success */ 26013498266Sopenharmony_cistatic int suback(FILE *dump, curl_socket_t fd, unsigned short packetid) 26113498266Sopenharmony_ci{ 26213498266Sopenharmony_ci unsigned char packet[]={ 26313498266Sopenharmony_ci MQTT_MSG_SUBACK, 0x03, 26413498266Sopenharmony_ci 0, 0, /* filled in below */ 26513498266Sopenharmony_ci 0x00 26613498266Sopenharmony_ci }; 26713498266Sopenharmony_ci ssize_t rc; 26813498266Sopenharmony_ci packet[2] = (unsigned char)(packetid >> 8); 26913498266Sopenharmony_ci packet[3] = (unsigned char)(packetid & 0xff); 27013498266Sopenharmony_ci 27113498266Sopenharmony_ci rc = swrite(fd, (char *)packet, sizeof(packet)); 27213498266Sopenharmony_ci if(rc == sizeof(packet)) { 27313498266Sopenharmony_ci logmsg("WROTE %zd bytes [SUBACK]", rc); 27413498266Sopenharmony_ci loghex(packet, rc); 27513498266Sopenharmony_ci logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc); 27613498266Sopenharmony_ci return 0; 27713498266Sopenharmony_ci } 27813498266Sopenharmony_ci return 1; 27913498266Sopenharmony_ci} 28013498266Sopenharmony_ci 28113498266Sopenharmony_ci#ifdef QOS 28213498266Sopenharmony_ci/* return 0 on success */ 28313498266Sopenharmony_cistatic int puback(FILE *dump, curl_socket_t fd, unsigned short packetid) 28413498266Sopenharmony_ci{ 28513498266Sopenharmony_ci unsigned char packet[]={ 28613498266Sopenharmony_ci MQTT_MSG_PUBACK, 0x00, 28713498266Sopenharmony_ci 0, 0 /* filled in below */ 28813498266Sopenharmony_ci }; 28913498266Sopenharmony_ci ssize_t rc; 29013498266Sopenharmony_ci packet[2] = (unsigned char)(packetid >> 8); 29113498266Sopenharmony_ci packet[3] = (unsigned char)(packetid & 0xff); 29213498266Sopenharmony_ci 29313498266Sopenharmony_ci rc = swrite(fd, (char *)packet, sizeof(packet)); 29413498266Sopenharmony_ci if(rc == sizeof(packet)) { 29513498266Sopenharmony_ci logmsg("WROTE %zd bytes [PUBACK]", rc); 29613498266Sopenharmony_ci loghex(packet, rc); 29713498266Sopenharmony_ci logprotocol(FROM_SERVER, dump, packet, rc); 29813498266Sopenharmony_ci return 0; 29913498266Sopenharmony_ci } 30013498266Sopenharmony_ci logmsg("Failed sending [PUBACK]"); 30113498266Sopenharmony_ci return 1; 30213498266Sopenharmony_ci} 30313498266Sopenharmony_ci#endif 30413498266Sopenharmony_ci 30513498266Sopenharmony_ci/* return 0 on success */ 30613498266Sopenharmony_cistatic int disconnect(FILE *dump, curl_socket_t fd) 30713498266Sopenharmony_ci{ 30813498266Sopenharmony_ci unsigned char packet[]={ 30913498266Sopenharmony_ci MQTT_MSG_DISCONNECT, 0x00, 31013498266Sopenharmony_ci }; 31113498266Sopenharmony_ci ssize_t rc = swrite(fd, (char *)packet, sizeof(packet)); 31213498266Sopenharmony_ci if(rc == sizeof(packet)) { 31313498266Sopenharmony_ci logmsg("WROTE %zd bytes [DISCONNECT]", rc); 31413498266Sopenharmony_ci loghex(packet, rc); 31513498266Sopenharmony_ci logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc); 31613498266Sopenharmony_ci return 0; 31713498266Sopenharmony_ci } 31813498266Sopenharmony_ci logmsg("Failed sending [DISCONNECT]"); 31913498266Sopenharmony_ci return 1; 32013498266Sopenharmony_ci} 32113498266Sopenharmony_ci 32213498266Sopenharmony_ci 32313498266Sopenharmony_ci 32413498266Sopenharmony_ci/* 32513498266Sopenharmony_ci do 32613498266Sopenharmony_ci 32713498266Sopenharmony_ci encodedByte = X MOD 128 32813498266Sopenharmony_ci 32913498266Sopenharmony_ci X = X DIV 128 33013498266Sopenharmony_ci 33113498266Sopenharmony_ci // if there are more data to encode, set the top bit of this byte 33213498266Sopenharmony_ci 33313498266Sopenharmony_ci if ( X > 0 ) 33413498266Sopenharmony_ci 33513498266Sopenharmony_ci encodedByte = encodedByte OR 128 33613498266Sopenharmony_ci 33713498266Sopenharmony_ci endif 33813498266Sopenharmony_ci 33913498266Sopenharmony_ci 'output' encodedByte 34013498266Sopenharmony_ci 34113498266Sopenharmony_ci while ( X > 0 ) 34213498266Sopenharmony_ci 34313498266Sopenharmony_ci*/ 34413498266Sopenharmony_ci 34513498266Sopenharmony_ci/* return number of bytes used */ 34613498266Sopenharmony_cistatic int encode_length(size_t packetlen, 34713498266Sopenharmony_ci unsigned char *remlength) /* 4 bytes */ 34813498266Sopenharmony_ci{ 34913498266Sopenharmony_ci int bytes = 0; 35013498266Sopenharmony_ci unsigned char encode; 35113498266Sopenharmony_ci 35213498266Sopenharmony_ci do { 35313498266Sopenharmony_ci encode = packetlen % 0x80; 35413498266Sopenharmony_ci packetlen /= 0x80; 35513498266Sopenharmony_ci if(packetlen) 35613498266Sopenharmony_ci encode |= 0x80; 35713498266Sopenharmony_ci 35813498266Sopenharmony_ci remlength[bytes++] = encode; 35913498266Sopenharmony_ci 36013498266Sopenharmony_ci if(bytes > 3) { 36113498266Sopenharmony_ci logmsg("too large packet!"); 36213498266Sopenharmony_ci return 0; 36313498266Sopenharmony_ci } 36413498266Sopenharmony_ci } while(packetlen); 36513498266Sopenharmony_ci 36613498266Sopenharmony_ci return bytes; 36713498266Sopenharmony_ci} 36813498266Sopenharmony_ci 36913498266Sopenharmony_ci 37013498266Sopenharmony_cistatic size_t decode_length(unsigned char *buf, 37113498266Sopenharmony_ci size_t buflen, size_t *lenbytes) 37213498266Sopenharmony_ci{ 37313498266Sopenharmony_ci size_t len = 0; 37413498266Sopenharmony_ci size_t mult = 1; 37513498266Sopenharmony_ci size_t i; 37613498266Sopenharmony_ci unsigned char encoded = 0x80; 37713498266Sopenharmony_ci 37813498266Sopenharmony_ci for(i = 0; (i < buflen) && (encoded & 0x80); i++) { 37913498266Sopenharmony_ci encoded = buf[i]; 38013498266Sopenharmony_ci len += (encoded & 0x7f) * mult; 38113498266Sopenharmony_ci mult *= 0x80; 38213498266Sopenharmony_ci } 38313498266Sopenharmony_ci 38413498266Sopenharmony_ci if(lenbytes) 38513498266Sopenharmony_ci *lenbytes = i; 38613498266Sopenharmony_ci 38713498266Sopenharmony_ci return len; 38813498266Sopenharmony_ci} 38913498266Sopenharmony_ci 39013498266Sopenharmony_ci 39113498266Sopenharmony_ci/* return 0 on success */ 39213498266Sopenharmony_cistatic int publish(FILE *dump, 39313498266Sopenharmony_ci curl_socket_t fd, unsigned short packetid, 39413498266Sopenharmony_ci char *topic, char *payload, size_t payloadlen) 39513498266Sopenharmony_ci{ 39613498266Sopenharmony_ci size_t topiclen = strlen(topic); 39713498266Sopenharmony_ci unsigned char *packet; 39813498266Sopenharmony_ci size_t payloadindex; 39913498266Sopenharmony_ci ssize_t remaininglength = topiclen + 2 + payloadlen; 40013498266Sopenharmony_ci ssize_t packetlen; 40113498266Sopenharmony_ci ssize_t sendamount; 40213498266Sopenharmony_ci ssize_t rc; 40313498266Sopenharmony_ci unsigned char rembuffer[4]; 40413498266Sopenharmony_ci int encodedlen; 40513498266Sopenharmony_ci 40613498266Sopenharmony_ci if(config.excessive_remaining) { 40713498266Sopenharmony_ci /* manually set illegal remaining length */ 40813498266Sopenharmony_ci rembuffer[0] = 0xff; 40913498266Sopenharmony_ci rembuffer[1] = 0xff; 41013498266Sopenharmony_ci rembuffer[2] = 0xff; 41113498266Sopenharmony_ci rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */ 41213498266Sopenharmony_ci encodedlen = 4; 41313498266Sopenharmony_ci } 41413498266Sopenharmony_ci else 41513498266Sopenharmony_ci encodedlen = encode_length(remaininglength, rembuffer); 41613498266Sopenharmony_ci 41713498266Sopenharmony_ci /* one packet type byte (possibly two more for packetid) */ 41813498266Sopenharmony_ci packetlen = remaininglength + encodedlen + 1; 41913498266Sopenharmony_ci packet = malloc(packetlen); 42013498266Sopenharmony_ci if(!packet) 42113498266Sopenharmony_ci return 1; 42213498266Sopenharmony_ci 42313498266Sopenharmony_ci packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */ 42413498266Sopenharmony_ci memcpy(&packet[1], rembuffer, encodedlen); 42513498266Sopenharmony_ci 42613498266Sopenharmony_ci (void)packetid; 42713498266Sopenharmony_ci /* packet_id if QoS is set */ 42813498266Sopenharmony_ci 42913498266Sopenharmony_ci packet[1 + encodedlen] = (unsigned char)(topiclen >> 8); 43013498266Sopenharmony_ci packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff); 43113498266Sopenharmony_ci memcpy(&packet[3 + encodedlen], topic, topiclen); 43213498266Sopenharmony_ci 43313498266Sopenharmony_ci payloadindex = 3 + topiclen + encodedlen; 43413498266Sopenharmony_ci memcpy(&packet[payloadindex], payload, payloadlen); 43513498266Sopenharmony_ci 43613498266Sopenharmony_ci sendamount = packetlen; 43713498266Sopenharmony_ci if(config.short_publish) 43813498266Sopenharmony_ci sendamount -= 2; 43913498266Sopenharmony_ci 44013498266Sopenharmony_ci rc = swrite(fd, (char *)packet, sendamount); 44113498266Sopenharmony_ci if(rc > 0) { 44213498266Sopenharmony_ci logmsg("WROTE %zd bytes [PUBLISH]", rc); 44313498266Sopenharmony_ci loghex(packet, rc); 44413498266Sopenharmony_ci logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc); 44513498266Sopenharmony_ci } 44613498266Sopenharmony_ci if(rc == packetlen) 44713498266Sopenharmony_ci return 0; 44813498266Sopenharmony_ci return 1; 44913498266Sopenharmony_ci} 45013498266Sopenharmony_ci 45113498266Sopenharmony_ci#define MAX_TOPIC_LENGTH 65535 45213498266Sopenharmony_ci#define MAX_CLIENT_ID_LENGTH 32 45313498266Sopenharmony_ci 45413498266Sopenharmony_cistatic char topic[MAX_TOPIC_LENGTH + 1]; 45513498266Sopenharmony_ci 45613498266Sopenharmony_cistatic int fixedheader(curl_socket_t fd, 45713498266Sopenharmony_ci unsigned char *bytep, 45813498266Sopenharmony_ci size_t *remaining_lengthp, 45913498266Sopenharmony_ci size_t *remaining_length_bytesp) 46013498266Sopenharmony_ci{ 46113498266Sopenharmony_ci /* get the fixed header */ 46213498266Sopenharmony_ci unsigned char buffer[10]; 46313498266Sopenharmony_ci 46413498266Sopenharmony_ci /* get the first two bytes */ 46513498266Sopenharmony_ci ssize_t rc = sread(fd, (char *)buffer, 2); 46613498266Sopenharmony_ci int i; 46713498266Sopenharmony_ci if(rc < 2) { 46813498266Sopenharmony_ci logmsg("READ %zd bytes [SHORT!]", rc); 46913498266Sopenharmony_ci return 1; /* fail */ 47013498266Sopenharmony_ci } 47113498266Sopenharmony_ci logmsg("READ %zd bytes", rc); 47213498266Sopenharmony_ci loghex(buffer, rc); 47313498266Sopenharmony_ci *bytep = buffer[0]; 47413498266Sopenharmony_ci 47513498266Sopenharmony_ci /* if the length byte has the top bit set, get the next one too */ 47613498266Sopenharmony_ci i = 1; 47713498266Sopenharmony_ci while(buffer[i] & 0x80) { 47813498266Sopenharmony_ci i++; 47913498266Sopenharmony_ci rc = sread(fd, (char *)&buffer[i], 1); 48013498266Sopenharmony_ci if(rc != 1) { 48113498266Sopenharmony_ci logmsg("Remaining Length broken"); 48213498266Sopenharmony_ci return 1; 48313498266Sopenharmony_ci } 48413498266Sopenharmony_ci } 48513498266Sopenharmony_ci *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp); 48613498266Sopenharmony_ci logmsg("Remaining Length: %zu [%zu bytes]", *remaining_lengthp, 48713498266Sopenharmony_ci *remaining_length_bytesp); 48813498266Sopenharmony_ci return 0; 48913498266Sopenharmony_ci} 49013498266Sopenharmony_ci 49113498266Sopenharmony_cistatic curl_socket_t mqttit(curl_socket_t fd) 49213498266Sopenharmony_ci{ 49313498266Sopenharmony_ci size_t buff_size = 10*1024; 49413498266Sopenharmony_ci unsigned char *buffer = NULL; 49513498266Sopenharmony_ci ssize_t rc; 49613498266Sopenharmony_ci unsigned char byte; 49713498266Sopenharmony_ci unsigned short packet_id; 49813498266Sopenharmony_ci size_t payload_len; 49913498266Sopenharmony_ci size_t client_id_length; 50013498266Sopenharmony_ci size_t topic_len; 50113498266Sopenharmony_ci size_t remaining_length = 0; 50213498266Sopenharmony_ci size_t bytes = 0; /* remaining length field size in bytes */ 50313498266Sopenharmony_ci char client_id[MAX_CLIENT_ID_LENGTH]; 50413498266Sopenharmony_ci long testno; 50513498266Sopenharmony_ci FILE *stream = NULL; 50613498266Sopenharmony_ci FILE *dump; 50713498266Sopenharmony_ci char dumpfile[256]; 50813498266Sopenharmony_ci 50913498266Sopenharmony_ci static const char protocol[7] = { 51013498266Sopenharmony_ci 0x00, 0x04, /* protocol length */ 51113498266Sopenharmony_ci 'M','Q','T','T', /* protocol name */ 51213498266Sopenharmony_ci 0x04 /* protocol level */ 51313498266Sopenharmony_ci }; 51413498266Sopenharmony_ci msnprintf(dumpfile, sizeof(dumpfile), "%s/%s", logdir, REQUEST_DUMP); 51513498266Sopenharmony_ci dump = fopen(dumpfile, "ab"); 51613498266Sopenharmony_ci if(!dump) 51713498266Sopenharmony_ci goto end; 51813498266Sopenharmony_ci 51913498266Sopenharmony_ci getconfig(); 52013498266Sopenharmony_ci 52113498266Sopenharmony_ci testno = config.testnum; 52213498266Sopenharmony_ci 52313498266Sopenharmony_ci if(testno) 52413498266Sopenharmony_ci logmsg("Found test number %ld", testno); 52513498266Sopenharmony_ci 52613498266Sopenharmony_ci buffer = malloc(buff_size); 52713498266Sopenharmony_ci if(!buffer) { 52813498266Sopenharmony_ci logmsg("Out of memory, unable to allocate buffer"); 52913498266Sopenharmony_ci goto end; 53013498266Sopenharmony_ci } 53113498266Sopenharmony_ci 53213498266Sopenharmony_ci do { 53313498266Sopenharmony_ci unsigned char usr_flag = 0x80; 53413498266Sopenharmony_ci unsigned char passwd_flag = 0x40; 53513498266Sopenharmony_ci unsigned char conn_flags; 53613498266Sopenharmony_ci const size_t client_id_offset = 12; 53713498266Sopenharmony_ci size_t start_usr; 53813498266Sopenharmony_ci size_t start_passwd; 53913498266Sopenharmony_ci 54013498266Sopenharmony_ci /* get the fixed header */ 54113498266Sopenharmony_ci rc = fixedheader(fd, &byte, &remaining_length, &bytes); 54213498266Sopenharmony_ci if(rc) 54313498266Sopenharmony_ci break; 54413498266Sopenharmony_ci 54513498266Sopenharmony_ci if(remaining_length >= buff_size) { 54613498266Sopenharmony_ci buff_size = remaining_length; 54713498266Sopenharmony_ci buffer = realloc(buffer, buff_size); 54813498266Sopenharmony_ci if(!buffer) { 54913498266Sopenharmony_ci logmsg("Failed realloc of size %zu", buff_size); 55013498266Sopenharmony_ci goto end; 55113498266Sopenharmony_ci } 55213498266Sopenharmony_ci } 55313498266Sopenharmony_ci 55413498266Sopenharmony_ci if(remaining_length) { 55513498266Sopenharmony_ci /* reading variable header and payload into buffer */ 55613498266Sopenharmony_ci rc = sread(fd, (char *)buffer, remaining_length); 55713498266Sopenharmony_ci if(rc > 0) { 55813498266Sopenharmony_ci logmsg("READ %zd bytes", rc); 55913498266Sopenharmony_ci loghex(buffer, rc); 56013498266Sopenharmony_ci } 56113498266Sopenharmony_ci } 56213498266Sopenharmony_ci 56313498266Sopenharmony_ci if(byte == MQTT_MSG_CONNECT) { 56413498266Sopenharmony_ci logprotocol(FROM_CLIENT, "CONNECT", remaining_length, 56513498266Sopenharmony_ci dump, buffer, rc); 56613498266Sopenharmony_ci 56713498266Sopenharmony_ci if(memcmp(protocol, buffer, sizeof(protocol))) { 56813498266Sopenharmony_ci logmsg("Protocol preamble mismatch"); 56913498266Sopenharmony_ci goto end; 57013498266Sopenharmony_ci } 57113498266Sopenharmony_ci /* ignore the connect flag byte and two keepalive bytes */ 57213498266Sopenharmony_ci payload_len = (size_t)(buffer[10] << 8) | buffer[11]; 57313498266Sopenharmony_ci /* first part of the payload is the client ID */ 57413498266Sopenharmony_ci client_id_length = payload_len; 57513498266Sopenharmony_ci 57613498266Sopenharmony_ci /* checking if user and password flags were set */ 57713498266Sopenharmony_ci conn_flags = buffer[7]; 57813498266Sopenharmony_ci 57913498266Sopenharmony_ci start_usr = client_id_offset + payload_len; 58013498266Sopenharmony_ci if(usr_flag == (unsigned char)(conn_flags & usr_flag)) { 58113498266Sopenharmony_ci logmsg("User flag is present in CONN flag"); 58213498266Sopenharmony_ci payload_len += (size_t)(buffer[start_usr] << 8) | 58313498266Sopenharmony_ci buffer[start_usr + 1]; 58413498266Sopenharmony_ci payload_len += 2; /* MSB and LSB for user length */ 58513498266Sopenharmony_ci } 58613498266Sopenharmony_ci 58713498266Sopenharmony_ci start_passwd = client_id_offset + payload_len; 58813498266Sopenharmony_ci if(passwd_flag == (char)(conn_flags & passwd_flag)) { 58913498266Sopenharmony_ci logmsg("Password flag is present in CONN flags"); 59013498266Sopenharmony_ci payload_len += (size_t)(buffer[start_passwd] << 8) | 59113498266Sopenharmony_ci buffer[start_passwd + 1]; 59213498266Sopenharmony_ci payload_len += 2; /* MSB and LSB for password length */ 59313498266Sopenharmony_ci } 59413498266Sopenharmony_ci 59513498266Sopenharmony_ci /* check the length of the payload */ 59613498266Sopenharmony_ci if((ssize_t)payload_len != (rc - 12)) { 59713498266Sopenharmony_ci logmsg("Payload length mismatch, expected %zx got %zx", 59813498266Sopenharmony_ci rc - 12, payload_len); 59913498266Sopenharmony_ci goto end; 60013498266Sopenharmony_ci } 60113498266Sopenharmony_ci /* check the length of the client ID */ 60213498266Sopenharmony_ci else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) { 60313498266Sopenharmony_ci logmsg("Too large client id"); 60413498266Sopenharmony_ci goto end; 60513498266Sopenharmony_ci } 60613498266Sopenharmony_ci memcpy(client_id, &buffer[12], client_id_length); 60713498266Sopenharmony_ci client_id[client_id_length] = 0; 60813498266Sopenharmony_ci 60913498266Sopenharmony_ci logmsg("MQTT client connect accepted: %s", client_id); 61013498266Sopenharmony_ci 61113498266Sopenharmony_ci /* The first packet sent from the Server to the Client MUST be a 61213498266Sopenharmony_ci CONNACK Packet */ 61313498266Sopenharmony_ci 61413498266Sopenharmony_ci if(connack(dump, fd)) { 61513498266Sopenharmony_ci logmsg("failed sending CONNACK"); 61613498266Sopenharmony_ci goto end; 61713498266Sopenharmony_ci } 61813498266Sopenharmony_ci } 61913498266Sopenharmony_ci else if(byte == MQTT_MSG_SUBSCRIBE) { 62013498266Sopenharmony_ci int error; 62113498266Sopenharmony_ci char *data; 62213498266Sopenharmony_ci size_t datalen; 62313498266Sopenharmony_ci logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length, 62413498266Sopenharmony_ci dump, buffer, rc); 62513498266Sopenharmony_ci logmsg("Incoming SUBSCRIBE"); 62613498266Sopenharmony_ci 62713498266Sopenharmony_ci if(rc < 6) { 62813498266Sopenharmony_ci logmsg("Too small SUBSCRIBE"); 62913498266Sopenharmony_ci goto end; 63013498266Sopenharmony_ci } 63113498266Sopenharmony_ci 63213498266Sopenharmony_ci /* two bytes packet id */ 63313498266Sopenharmony_ci packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]); 63413498266Sopenharmony_ci 63513498266Sopenharmony_ci /* two bytes topic length */ 63613498266Sopenharmony_ci topic_len = (size_t)(buffer[2] << 8) | buffer[3]; 63713498266Sopenharmony_ci if(topic_len != (remaining_length - 5)) { 63813498266Sopenharmony_ci logmsg("Wrong topic length, got %zu expected %zu", 63913498266Sopenharmony_ci topic_len, remaining_length - 5); 64013498266Sopenharmony_ci goto end; 64113498266Sopenharmony_ci } 64213498266Sopenharmony_ci memcpy(topic, &buffer[4], topic_len); 64313498266Sopenharmony_ci topic[topic_len] = 0; 64413498266Sopenharmony_ci 64513498266Sopenharmony_ci /* there's a QoS byte (two bits) after the topic */ 64613498266Sopenharmony_ci 64713498266Sopenharmony_ci logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id); 64813498266Sopenharmony_ci stream = test2fopen(testno, logdir); 64913498266Sopenharmony_ci error = getpart(&data, &datalen, "reply", "data", stream); 65013498266Sopenharmony_ci if(!error) { 65113498266Sopenharmony_ci if(!config.publish_before_suback) { 65213498266Sopenharmony_ci if(suback(dump, fd, packet_id)) { 65313498266Sopenharmony_ci logmsg("failed sending SUBACK"); 65413498266Sopenharmony_ci goto end; 65513498266Sopenharmony_ci } 65613498266Sopenharmony_ci } 65713498266Sopenharmony_ci if(publish(dump, fd, packet_id, topic, data, datalen)) { 65813498266Sopenharmony_ci logmsg("PUBLISH failed"); 65913498266Sopenharmony_ci goto end; 66013498266Sopenharmony_ci } 66113498266Sopenharmony_ci if(config.publish_before_suback) { 66213498266Sopenharmony_ci if(suback(dump, fd, packet_id)) { 66313498266Sopenharmony_ci logmsg("failed sending SUBACK"); 66413498266Sopenharmony_ci goto end; 66513498266Sopenharmony_ci } 66613498266Sopenharmony_ci } 66713498266Sopenharmony_ci } 66813498266Sopenharmony_ci else { 66913498266Sopenharmony_ci char *def = (char *)"this is random payload yes yes it is"; 67013498266Sopenharmony_ci publish(dump, fd, packet_id, topic, def, strlen(def)); 67113498266Sopenharmony_ci } 67213498266Sopenharmony_ci disconnect(dump, fd); 67313498266Sopenharmony_ci } 67413498266Sopenharmony_ci else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) { 67513498266Sopenharmony_ci size_t topiclen; 67613498266Sopenharmony_ci 67713498266Sopenharmony_ci logmsg("Incoming PUBLISH"); 67813498266Sopenharmony_ci logprotocol(FROM_CLIENT, "PUBLISH", remaining_length, 67913498266Sopenharmony_ci dump, buffer, rc); 68013498266Sopenharmony_ci 68113498266Sopenharmony_ci topiclen = (size_t)(buffer[1 + bytes] << 8) | buffer[2 + bytes]; 68213498266Sopenharmony_ci logmsg("Got %zu bytes topic", topiclen); 68313498266Sopenharmony_ci /* TODO: verify topiclen */ 68413498266Sopenharmony_ci 68513498266Sopenharmony_ci#ifdef QOS 68613498266Sopenharmony_ci /* TODO: handle packetid if there is one. Send puback if QoS > 0 */ 68713498266Sopenharmony_ci puback(dump, fd, 0); 68813498266Sopenharmony_ci#endif 68913498266Sopenharmony_ci /* expect a disconnect here */ 69013498266Sopenharmony_ci /* get the request */ 69113498266Sopenharmony_ci rc = sread(fd, (char *)&buffer[0], 2); 69213498266Sopenharmony_ci 69313498266Sopenharmony_ci logmsg("READ %zd bytes [DISCONNECT]", rc); 69413498266Sopenharmony_ci loghex(buffer, rc); 69513498266Sopenharmony_ci logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc); 69613498266Sopenharmony_ci goto end; 69713498266Sopenharmony_ci } 69813498266Sopenharmony_ci else { 69913498266Sopenharmony_ci /* not supported (yet) */ 70013498266Sopenharmony_ci goto end; 70113498266Sopenharmony_ci } 70213498266Sopenharmony_ci } while(1); 70313498266Sopenharmony_ci 70413498266Sopenharmony_ciend: 70513498266Sopenharmony_ci if(buffer) 70613498266Sopenharmony_ci free(buffer); 70713498266Sopenharmony_ci if(dump) 70813498266Sopenharmony_ci fclose(dump); 70913498266Sopenharmony_ci if(stream) 71013498266Sopenharmony_ci fclose(stream); 71113498266Sopenharmony_ci return CURL_SOCKET_BAD; 71213498266Sopenharmony_ci} 71313498266Sopenharmony_ci 71413498266Sopenharmony_ci/* 71513498266Sopenharmony_ci sockfdp is a pointer to an established stream or CURL_SOCKET_BAD 71613498266Sopenharmony_ci 71713498266Sopenharmony_ci if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must 71813498266Sopenharmony_ci accept() 71913498266Sopenharmony_ci*/ 72013498266Sopenharmony_cistatic bool incoming(curl_socket_t listenfd) 72113498266Sopenharmony_ci{ 72213498266Sopenharmony_ci fd_set fds_read; 72313498266Sopenharmony_ci fd_set fds_write; 72413498266Sopenharmony_ci fd_set fds_err; 72513498266Sopenharmony_ci int clients = 0; /* connected clients */ 72613498266Sopenharmony_ci 72713498266Sopenharmony_ci if(got_exit_signal) { 72813498266Sopenharmony_ci logmsg("signalled to die, exiting..."); 72913498266Sopenharmony_ci return FALSE; 73013498266Sopenharmony_ci } 73113498266Sopenharmony_ci 73213498266Sopenharmony_ci#ifdef HAVE_GETPPID 73313498266Sopenharmony_ci /* As a last resort, quit if socks5 process becomes orphan. */ 73413498266Sopenharmony_ci if(getppid() <= 1) { 73513498266Sopenharmony_ci logmsg("process becomes orphan, exiting"); 73613498266Sopenharmony_ci return FALSE; 73713498266Sopenharmony_ci } 73813498266Sopenharmony_ci#endif 73913498266Sopenharmony_ci 74013498266Sopenharmony_ci do { 74113498266Sopenharmony_ci ssize_t rc; 74213498266Sopenharmony_ci int error = 0; 74313498266Sopenharmony_ci curl_socket_t sockfd = listenfd; 74413498266Sopenharmony_ci int maxfd = (int)sockfd; 74513498266Sopenharmony_ci 74613498266Sopenharmony_ci FD_ZERO(&fds_read); 74713498266Sopenharmony_ci FD_ZERO(&fds_write); 74813498266Sopenharmony_ci FD_ZERO(&fds_err); 74913498266Sopenharmony_ci 75013498266Sopenharmony_ci /* there's always a socket to wait for */ 75113498266Sopenharmony_ci FD_SET(sockfd, &fds_read); 75213498266Sopenharmony_ci 75313498266Sopenharmony_ci do { 75413498266Sopenharmony_ci /* select() blocking behavior call on blocking descriptors please */ 75513498266Sopenharmony_ci rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL); 75613498266Sopenharmony_ci if(got_exit_signal) { 75713498266Sopenharmony_ci logmsg("signalled to die, exiting..."); 75813498266Sopenharmony_ci return FALSE; 75913498266Sopenharmony_ci } 76013498266Sopenharmony_ci } while((rc == -1) && ((error = SOCKERRNO) == EINTR)); 76113498266Sopenharmony_ci 76213498266Sopenharmony_ci if(rc < 0) { 76313498266Sopenharmony_ci logmsg("select() failed with error: (%d) %s", 76413498266Sopenharmony_ci error, strerror(error)); 76513498266Sopenharmony_ci return FALSE; 76613498266Sopenharmony_ci } 76713498266Sopenharmony_ci 76813498266Sopenharmony_ci if(FD_ISSET(sockfd, &fds_read)) { 76913498266Sopenharmony_ci curl_socket_t newfd = accept(sockfd, NULL, NULL); 77013498266Sopenharmony_ci if(CURL_SOCKET_BAD == newfd) { 77113498266Sopenharmony_ci error = SOCKERRNO; 77213498266Sopenharmony_ci logmsg("accept(%" CURL_FORMAT_SOCKET_T ", NULL, NULL) " 77313498266Sopenharmony_ci "failed with error: (%d) %s", sockfd, error, sstrerror(error)); 77413498266Sopenharmony_ci } 77513498266Sopenharmony_ci else { 77613498266Sopenharmony_ci logmsg("====> Client connect, fd %" CURL_FORMAT_SOCKET_T ". " 77713498266Sopenharmony_ci "Read config from %s", newfd, configfile); 77813498266Sopenharmony_ci set_advisor_read_lock(loglockfile); 77913498266Sopenharmony_ci (void)mqttit(newfd); /* until done */ 78013498266Sopenharmony_ci clear_advisor_read_lock(loglockfile); 78113498266Sopenharmony_ci 78213498266Sopenharmony_ci logmsg("====> Client disconnect"); 78313498266Sopenharmony_ci sclose(newfd); 78413498266Sopenharmony_ci } 78513498266Sopenharmony_ci } 78613498266Sopenharmony_ci } while(clients); 78713498266Sopenharmony_ci 78813498266Sopenharmony_ci return TRUE; 78913498266Sopenharmony_ci} 79013498266Sopenharmony_ci 79113498266Sopenharmony_cistatic curl_socket_t sockdaemon(curl_socket_t sock, 79213498266Sopenharmony_ci unsigned short *listenport) 79313498266Sopenharmony_ci{ 79413498266Sopenharmony_ci /* passive daemon style */ 79513498266Sopenharmony_ci srvr_sockaddr_union_t listener; 79613498266Sopenharmony_ci int flag; 79713498266Sopenharmony_ci int rc; 79813498266Sopenharmony_ci int totdelay = 0; 79913498266Sopenharmony_ci int maxretr = 10; 80013498266Sopenharmony_ci int delay = 20; 80113498266Sopenharmony_ci int attempt = 0; 80213498266Sopenharmony_ci int error = 0; 80313498266Sopenharmony_ci 80413498266Sopenharmony_ci do { 80513498266Sopenharmony_ci attempt++; 80613498266Sopenharmony_ci flag = 1; 80713498266Sopenharmony_ci rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 80813498266Sopenharmony_ci (void *)&flag, sizeof(flag)); 80913498266Sopenharmony_ci if(rc) { 81013498266Sopenharmony_ci error = SOCKERRNO; 81113498266Sopenharmony_ci logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s", 81213498266Sopenharmony_ci error, sstrerror(error)); 81313498266Sopenharmony_ci if(maxretr) { 81413498266Sopenharmony_ci rc = wait_ms(delay); 81513498266Sopenharmony_ci if(rc) { 81613498266Sopenharmony_ci /* should not happen */ 81713498266Sopenharmony_ci logmsg("wait_ms() failed with error: %d", rc); 81813498266Sopenharmony_ci sclose(sock); 81913498266Sopenharmony_ci return CURL_SOCKET_BAD; 82013498266Sopenharmony_ci } 82113498266Sopenharmony_ci if(got_exit_signal) { 82213498266Sopenharmony_ci logmsg("signalled to die, exiting..."); 82313498266Sopenharmony_ci sclose(sock); 82413498266Sopenharmony_ci return CURL_SOCKET_BAD; 82513498266Sopenharmony_ci } 82613498266Sopenharmony_ci totdelay += delay; 82713498266Sopenharmony_ci delay *= 2; /* double the sleep for next attempt */ 82813498266Sopenharmony_ci } 82913498266Sopenharmony_ci } 83013498266Sopenharmony_ci } while(rc && maxretr--); 83113498266Sopenharmony_ci 83213498266Sopenharmony_ci if(rc) { 83313498266Sopenharmony_ci logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s", 83413498266Sopenharmony_ci attempt, totdelay, error, strerror(error)); 83513498266Sopenharmony_ci logmsg("Continuing anyway..."); 83613498266Sopenharmony_ci } 83713498266Sopenharmony_ci 83813498266Sopenharmony_ci /* When the specified listener port is zero, it is actually a 83913498266Sopenharmony_ci request to let the system choose a non-zero available port. */ 84013498266Sopenharmony_ci 84113498266Sopenharmony_ci#ifdef ENABLE_IPV6 84213498266Sopenharmony_ci if(!use_ipv6) { 84313498266Sopenharmony_ci#endif 84413498266Sopenharmony_ci memset(&listener.sa4, 0, sizeof(listener.sa4)); 84513498266Sopenharmony_ci listener.sa4.sin_family = AF_INET; 84613498266Sopenharmony_ci listener.sa4.sin_addr.s_addr = INADDR_ANY; 84713498266Sopenharmony_ci listener.sa4.sin_port = htons(*listenport); 84813498266Sopenharmony_ci rc = bind(sock, &listener.sa, sizeof(listener.sa4)); 84913498266Sopenharmony_ci#ifdef ENABLE_IPV6 85013498266Sopenharmony_ci } 85113498266Sopenharmony_ci else { 85213498266Sopenharmony_ci memset(&listener.sa6, 0, sizeof(listener.sa6)); 85313498266Sopenharmony_ci listener.sa6.sin6_family = AF_INET6; 85413498266Sopenharmony_ci listener.sa6.sin6_addr = in6addr_any; 85513498266Sopenharmony_ci listener.sa6.sin6_port = htons(*listenport); 85613498266Sopenharmony_ci rc = bind(sock, &listener.sa, sizeof(listener.sa6)); 85713498266Sopenharmony_ci } 85813498266Sopenharmony_ci#endif /* ENABLE_IPV6 */ 85913498266Sopenharmony_ci if(rc) { 86013498266Sopenharmony_ci error = SOCKERRNO; 86113498266Sopenharmony_ci logmsg("Error binding socket on port %hu: (%d) %s", 86213498266Sopenharmony_ci *listenport, error, sstrerror(error)); 86313498266Sopenharmony_ci sclose(sock); 86413498266Sopenharmony_ci return CURL_SOCKET_BAD; 86513498266Sopenharmony_ci } 86613498266Sopenharmony_ci 86713498266Sopenharmony_ci if(!*listenport) { 86813498266Sopenharmony_ci /* The system was supposed to choose a port number, figure out which 86913498266Sopenharmony_ci port we actually got and update the listener port value with it. */ 87013498266Sopenharmony_ci curl_socklen_t la_size; 87113498266Sopenharmony_ci srvr_sockaddr_union_t localaddr; 87213498266Sopenharmony_ci#ifdef ENABLE_IPV6 87313498266Sopenharmony_ci if(!use_ipv6) 87413498266Sopenharmony_ci#endif 87513498266Sopenharmony_ci la_size = sizeof(localaddr.sa4); 87613498266Sopenharmony_ci#ifdef ENABLE_IPV6 87713498266Sopenharmony_ci else 87813498266Sopenharmony_ci la_size = sizeof(localaddr.sa6); 87913498266Sopenharmony_ci#endif 88013498266Sopenharmony_ci memset(&localaddr.sa, 0, (size_t)la_size); 88113498266Sopenharmony_ci if(getsockname(sock, &localaddr.sa, &la_size) < 0) { 88213498266Sopenharmony_ci error = SOCKERRNO; 88313498266Sopenharmony_ci logmsg("getsockname() failed with error: (%d) %s", 88413498266Sopenharmony_ci error, sstrerror(error)); 88513498266Sopenharmony_ci sclose(sock); 88613498266Sopenharmony_ci return CURL_SOCKET_BAD; 88713498266Sopenharmony_ci } 88813498266Sopenharmony_ci switch(localaddr.sa.sa_family) { 88913498266Sopenharmony_ci case AF_INET: 89013498266Sopenharmony_ci *listenport = ntohs(localaddr.sa4.sin_port); 89113498266Sopenharmony_ci break; 89213498266Sopenharmony_ci#ifdef ENABLE_IPV6 89313498266Sopenharmony_ci case AF_INET6: 89413498266Sopenharmony_ci *listenport = ntohs(localaddr.sa6.sin6_port); 89513498266Sopenharmony_ci break; 89613498266Sopenharmony_ci#endif 89713498266Sopenharmony_ci default: 89813498266Sopenharmony_ci break; 89913498266Sopenharmony_ci } 90013498266Sopenharmony_ci if(!*listenport) { 90113498266Sopenharmony_ci /* Real failure, listener port shall not be zero beyond this point. */ 90213498266Sopenharmony_ci logmsg("Apparently getsockname() succeeded, with listener port zero."); 90313498266Sopenharmony_ci logmsg("A valid reason for this failure is a binary built without"); 90413498266Sopenharmony_ci logmsg("proper network library linkage. This might not be the only"); 90513498266Sopenharmony_ci logmsg("reason, but double check it before anything else."); 90613498266Sopenharmony_ci sclose(sock); 90713498266Sopenharmony_ci return CURL_SOCKET_BAD; 90813498266Sopenharmony_ci } 90913498266Sopenharmony_ci } 91013498266Sopenharmony_ci 91113498266Sopenharmony_ci /* start accepting connections */ 91213498266Sopenharmony_ci rc = listen(sock, 5); 91313498266Sopenharmony_ci if(0 != rc) { 91413498266Sopenharmony_ci error = SOCKERRNO; 91513498266Sopenharmony_ci logmsg("listen(%" CURL_FORMAT_SOCKET_T ", 5) failed with error: (%d) %s", 91613498266Sopenharmony_ci sock, error, sstrerror(error)); 91713498266Sopenharmony_ci sclose(sock); 91813498266Sopenharmony_ci return CURL_SOCKET_BAD; 91913498266Sopenharmony_ci } 92013498266Sopenharmony_ci 92113498266Sopenharmony_ci return sock; 92213498266Sopenharmony_ci} 92313498266Sopenharmony_ci 92413498266Sopenharmony_ci 92513498266Sopenharmony_ciint main(int argc, char *argv[]) 92613498266Sopenharmony_ci{ 92713498266Sopenharmony_ci curl_socket_t sock = CURL_SOCKET_BAD; 92813498266Sopenharmony_ci curl_socket_t msgsock = CURL_SOCKET_BAD; 92913498266Sopenharmony_ci int wrotepidfile = 0; 93013498266Sopenharmony_ci int wroteportfile = 0; 93113498266Sopenharmony_ci const char *pidname = ".mqttd.pid"; 93213498266Sopenharmony_ci const char *portname = ".mqttd.port"; 93313498266Sopenharmony_ci bool juggle_again; 93413498266Sopenharmony_ci int error; 93513498266Sopenharmony_ci int arg = 1; 93613498266Sopenharmony_ci 93713498266Sopenharmony_ci while(argc>arg) { 93813498266Sopenharmony_ci if(!strcmp("--version", argv[arg])) { 93913498266Sopenharmony_ci printf("mqttd IPv4%s\n", 94013498266Sopenharmony_ci#ifdef ENABLE_IPV6 94113498266Sopenharmony_ci "/IPv6" 94213498266Sopenharmony_ci#else 94313498266Sopenharmony_ci "" 94413498266Sopenharmony_ci#endif 94513498266Sopenharmony_ci ); 94613498266Sopenharmony_ci return 0; 94713498266Sopenharmony_ci } 94813498266Sopenharmony_ci else if(!strcmp("--pidfile", argv[arg])) { 94913498266Sopenharmony_ci arg++; 95013498266Sopenharmony_ci if(argc>arg) 95113498266Sopenharmony_ci pidname = argv[arg++]; 95213498266Sopenharmony_ci } 95313498266Sopenharmony_ci else if(!strcmp("--portfile", argv[arg])) { 95413498266Sopenharmony_ci arg++; 95513498266Sopenharmony_ci if(argc>arg) 95613498266Sopenharmony_ci portname = argv[arg++]; 95713498266Sopenharmony_ci } 95813498266Sopenharmony_ci else if(!strcmp("--config", argv[arg])) { 95913498266Sopenharmony_ci arg++; 96013498266Sopenharmony_ci if(argc>arg) 96113498266Sopenharmony_ci configfile = argv[arg++]; 96213498266Sopenharmony_ci } 96313498266Sopenharmony_ci else if(!strcmp("--logfile", argv[arg])) { 96413498266Sopenharmony_ci arg++; 96513498266Sopenharmony_ci if(argc>arg) 96613498266Sopenharmony_ci serverlogfile = argv[arg++]; 96713498266Sopenharmony_ci } 96813498266Sopenharmony_ci else if(!strcmp("--logdir", argv[arg])) { 96913498266Sopenharmony_ci arg++; 97013498266Sopenharmony_ci if(argc>arg) 97113498266Sopenharmony_ci logdir = argv[arg++]; 97213498266Sopenharmony_ci } 97313498266Sopenharmony_ci else if(!strcmp("--ipv6", argv[arg])) { 97413498266Sopenharmony_ci#ifdef ENABLE_IPV6 97513498266Sopenharmony_ci ipv_inuse = "IPv6"; 97613498266Sopenharmony_ci use_ipv6 = TRUE; 97713498266Sopenharmony_ci#endif 97813498266Sopenharmony_ci arg++; 97913498266Sopenharmony_ci } 98013498266Sopenharmony_ci else if(!strcmp("--ipv4", argv[arg])) { 98113498266Sopenharmony_ci /* for completeness, we support this option as well */ 98213498266Sopenharmony_ci#ifdef ENABLE_IPV6 98313498266Sopenharmony_ci ipv_inuse = "IPv4"; 98413498266Sopenharmony_ci use_ipv6 = FALSE; 98513498266Sopenharmony_ci#endif 98613498266Sopenharmony_ci arg++; 98713498266Sopenharmony_ci } 98813498266Sopenharmony_ci else if(!strcmp("--port", argv[arg])) { 98913498266Sopenharmony_ci arg++; 99013498266Sopenharmony_ci if(argc>arg) { 99113498266Sopenharmony_ci char *endptr; 99213498266Sopenharmony_ci unsigned long ulnum = strtoul(argv[arg], &endptr, 10); 99313498266Sopenharmony_ci if((endptr != argv[arg] + strlen(argv[arg])) || 99413498266Sopenharmony_ci ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) { 99513498266Sopenharmony_ci fprintf(stderr, "mqttd: invalid --port argument (%s)\n", 99613498266Sopenharmony_ci argv[arg]); 99713498266Sopenharmony_ci return 0; 99813498266Sopenharmony_ci } 99913498266Sopenharmony_ci port = curlx_ultous(ulnum); 100013498266Sopenharmony_ci arg++; 100113498266Sopenharmony_ci } 100213498266Sopenharmony_ci } 100313498266Sopenharmony_ci else { 100413498266Sopenharmony_ci puts("Usage: mqttd [option]\n" 100513498266Sopenharmony_ci " --config [file]\n" 100613498266Sopenharmony_ci " --version\n" 100713498266Sopenharmony_ci " --logfile [file]\n" 100813498266Sopenharmony_ci " --logdir [directory]\n" 100913498266Sopenharmony_ci " --pidfile [file]\n" 101013498266Sopenharmony_ci " --portfile [file]\n" 101113498266Sopenharmony_ci " --ipv4\n" 101213498266Sopenharmony_ci " --ipv6\n" 101313498266Sopenharmony_ci " --port [port]\n"); 101413498266Sopenharmony_ci return 0; 101513498266Sopenharmony_ci } 101613498266Sopenharmony_ci } 101713498266Sopenharmony_ci 101813498266Sopenharmony_ci msnprintf(loglockfile, sizeof(loglockfile), "%s/%s/mqtt-%s.lock", 101913498266Sopenharmony_ci logdir, SERVERLOGS_LOCKDIR, ipv_inuse); 102013498266Sopenharmony_ci 102113498266Sopenharmony_ci#ifdef _WIN32 102213498266Sopenharmony_ci win32_init(); 102313498266Sopenharmony_ci atexit(win32_cleanup); 102413498266Sopenharmony_ci 102513498266Sopenharmony_ci setmode(fileno(stdin), O_BINARY); 102613498266Sopenharmony_ci setmode(fileno(stdout), O_BINARY); 102713498266Sopenharmony_ci setmode(fileno(stderr), O_BINARY); 102813498266Sopenharmony_ci#endif 102913498266Sopenharmony_ci 103013498266Sopenharmony_ci install_signal_handlers(FALSE); 103113498266Sopenharmony_ci 103213498266Sopenharmony_ci#ifdef ENABLE_IPV6 103313498266Sopenharmony_ci if(!use_ipv6) 103413498266Sopenharmony_ci#endif 103513498266Sopenharmony_ci sock = socket(AF_INET, SOCK_STREAM, 0); 103613498266Sopenharmony_ci#ifdef ENABLE_IPV6 103713498266Sopenharmony_ci else 103813498266Sopenharmony_ci sock = socket(AF_INET6, SOCK_STREAM, 0); 103913498266Sopenharmony_ci#endif 104013498266Sopenharmony_ci 104113498266Sopenharmony_ci if(CURL_SOCKET_BAD == sock) { 104213498266Sopenharmony_ci error = SOCKERRNO; 104313498266Sopenharmony_ci logmsg("Error creating socket: (%d) %s", error, sstrerror(error)); 104413498266Sopenharmony_ci goto mqttd_cleanup; 104513498266Sopenharmony_ci } 104613498266Sopenharmony_ci 104713498266Sopenharmony_ci { 104813498266Sopenharmony_ci /* passive daemon style */ 104913498266Sopenharmony_ci sock = sockdaemon(sock, &port); 105013498266Sopenharmony_ci if(CURL_SOCKET_BAD == sock) { 105113498266Sopenharmony_ci goto mqttd_cleanup; 105213498266Sopenharmony_ci } 105313498266Sopenharmony_ci msgsock = CURL_SOCKET_BAD; /* no stream socket yet */ 105413498266Sopenharmony_ci } 105513498266Sopenharmony_ci 105613498266Sopenharmony_ci logmsg("Running %s version", ipv_inuse); 105713498266Sopenharmony_ci logmsg("Listening on port %hu", port); 105813498266Sopenharmony_ci 105913498266Sopenharmony_ci wrotepidfile = write_pidfile(pidname); 106013498266Sopenharmony_ci if(!wrotepidfile) { 106113498266Sopenharmony_ci goto mqttd_cleanup; 106213498266Sopenharmony_ci } 106313498266Sopenharmony_ci 106413498266Sopenharmony_ci wroteportfile = write_portfile(portname, port); 106513498266Sopenharmony_ci if(!wroteportfile) { 106613498266Sopenharmony_ci goto mqttd_cleanup; 106713498266Sopenharmony_ci } 106813498266Sopenharmony_ci 106913498266Sopenharmony_ci do { 107013498266Sopenharmony_ci juggle_again = incoming(sock); 107113498266Sopenharmony_ci } while(juggle_again); 107213498266Sopenharmony_ci 107313498266Sopenharmony_cimqttd_cleanup: 107413498266Sopenharmony_ci 107513498266Sopenharmony_ci if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD)) 107613498266Sopenharmony_ci sclose(msgsock); 107713498266Sopenharmony_ci 107813498266Sopenharmony_ci if(sock != CURL_SOCKET_BAD) 107913498266Sopenharmony_ci sclose(sock); 108013498266Sopenharmony_ci 108113498266Sopenharmony_ci if(wrotepidfile) 108213498266Sopenharmony_ci unlink(pidname); 108313498266Sopenharmony_ci if(wroteportfile) 108413498266Sopenharmony_ci unlink(portname); 108513498266Sopenharmony_ci 108613498266Sopenharmony_ci restore_signal_handlers(FALSE); 108713498266Sopenharmony_ci 108813498266Sopenharmony_ci if(got_exit_signal) { 108913498266Sopenharmony_ci logmsg("============> mqttd exits with signal (%d)", exit_signal); 109013498266Sopenharmony_ci /* 109113498266Sopenharmony_ci * To properly set the return status of the process we 109213498266Sopenharmony_ci * must raise the same signal SIGINT or SIGTERM that we 109313498266Sopenharmony_ci * caught and let the old handler take care of it. 109413498266Sopenharmony_ci */ 109513498266Sopenharmony_ci raise(exit_signal); 109613498266Sopenharmony_ci } 109713498266Sopenharmony_ci 109813498266Sopenharmony_ci logmsg("============> mqttd quits"); 109913498266Sopenharmony_ci return 0; 110013498266Sopenharmony_ci} 1101