1/*************************************************************************** 2 * _ _ ____ _ 3 * Project ___| | | | _ \| | 4 * / __| | | | |_) | | 5 * | (__| |_| | _ <| |___ 6 * \___|\___/|_| \_\_____| 7 * 8 * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al. 9 * 10 * This software is licensed as described in the file COPYING, which 11 * you should have received as part of this distribution. The terms 12 * are also available at https://curl.se/docs/copyright.html. 13 * 14 * You may opt to use, copy, modify, merge, publish, distribute and/or sell 15 * copies of the Software, and permit persons to whom the Software is 16 * furnished to do so, under the terms of the COPYING file. 17 * 18 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY 19 * KIND, either express or implied. 20 * 21 * SPDX-License-Identifier: curl 22 * 23 ***************************************************************************/ 24#include "server_setup.h" 25#include <stdlib.h> 26#include <string.h> 27#include "util.h" 28 29/* Function 30 * 31 * Accepts a TCP connection on a custom port (IPv4 or IPv6). Speaks MQTT. 32 * 33 * Read commands from FILE (set with --config). The commands control how to 34 * act and is reset to defaults each client TCP connect. 35 * 36 * Config file keywords: 37 * 38 * TODO 39 */ 40 41/* based on sockfilt.c */ 42 43#include <signal.h> 44#ifdef HAVE_NETINET_IN_H 45#include <netinet/in.h> 46#endif 47#ifdef HAVE_NETINET_IN6_H 48#include <netinet/in6.h> 49#endif 50#ifdef HAVE_ARPA_INET_H 51#include <arpa/inet.h> 52#endif 53#ifdef HAVE_NETDB_H 54#include <netdb.h> 55#endif 56 57#define ENABLE_CURLX_PRINTF 58/* make the curlx header define all printf() functions to use the curlx_* 59 versions instead */ 60#include "curlx.h" /* from the private lib dir */ 61#include "getpart.h" 62#include "inet_pton.h" 63#include "server_sockaddr.h" 64#include "warnless.h" 65 66/* include memdebug.h last */ 67#include "memdebug.h" 68 69#ifdef USE_WINSOCK 70#undef EINTR 71#define EINTR 4 /* errno.h value */ 72#undef EAGAIN 73#define EAGAIN 11 /* errno.h value */ 74#undef ENOMEM 75#define ENOMEM 12 /* errno.h value */ 76#undef EINVAL 77#define EINVAL 22 /* errno.h value */ 78#endif 79 80#define DEFAULT_PORT 1883 /* MQTT default port */ 81 82#ifndef DEFAULT_LOGFILE 83#define DEFAULT_LOGFILE "log/mqttd.log" 84#endif 85 86#ifndef DEFAULT_CONFIG 87#define DEFAULT_CONFIG "mqttd.config" 88#endif 89 90#define MQTT_MSG_CONNECT 0x10 91#define MQTT_MSG_CONNACK 0x20 92#define MQTT_MSG_PUBLISH 0x30 93#define MQTT_MSG_PUBACK 0x40 94#define MQTT_MSG_SUBSCRIBE 0x82 95#define MQTT_MSG_SUBACK 0x90 96#define MQTT_MSG_DISCONNECT 0xe0 97 98#define MQTT_CONNACK_LEN 4 99#define MQTT_SUBACK_LEN 5 100#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ 101 102struct configurable { 103 unsigned char version; /* initial version byte in the request must match 104 this */ 105 bool publish_before_suback; 106 bool short_publish; 107 bool excessive_remaining; 108 unsigned char error_connack; 109 int testnum; 110}; 111 112#define REQUEST_DUMP "server.input" 113#define CONFIG_VERSION 5 114 115static struct configurable config; 116 117const char *serverlogfile = DEFAULT_LOGFILE; 118static const char *configfile = DEFAULT_CONFIG; 119static const char *logdir = "log"; 120static char loglockfile[256]; 121 122#ifdef ENABLE_IPV6 123static bool use_ipv6 = FALSE; 124#endif 125static const char *ipv_inuse = "IPv4"; 126static unsigned short port = DEFAULT_PORT; 127 128static void resetdefaults(void) 129{ 130 logmsg("Reset to defaults"); 131 config.version = CONFIG_VERSION; 132 config.publish_before_suback = FALSE; 133 config.short_publish = FALSE; 134 config.excessive_remaining = FALSE; 135 config.error_connack = 0; 136 config.testnum = 0; 137} 138 139static unsigned char byteval(char *value) 140{ 141 unsigned long num = strtoul(value, NULL, 10); 142 return num & 0xff; 143} 144 145static void getconfig(void) 146{ 147 FILE *fp = fopen(configfile, FOPEN_READTEXT); 148 resetdefaults(); 149 if(fp) { 150 char buffer[512]; 151 logmsg("parse config file"); 152 while(fgets(buffer, sizeof(buffer), fp)) { 153 char key[32]; 154 char value[32]; 155 if(2 == sscanf(buffer, "%31s %31s", key, value)) { 156 if(!strcmp(key, "version")) { 157 config.version = byteval(value); 158 logmsg("version [%d] set", config.version); 159 } 160 else if(!strcmp(key, "PUBLISH-before-SUBACK")) { 161 logmsg("PUBLISH-before-SUBACK set"); 162 config.publish_before_suback = TRUE; 163 } 164 else if(!strcmp(key, "short-PUBLISH")) { 165 logmsg("short-PUBLISH set"); 166 config.short_publish = TRUE; 167 } 168 else if(!strcmp(key, "error-CONNACK")) { 169 config.error_connack = byteval(value); 170 logmsg("error-CONNACK = %d", config.error_connack); 171 } 172 else if(!strcmp(key, "Testnum")) { 173 config.testnum = atoi(value); 174 logmsg("testnum = %d", config.testnum); 175 } 176 else if(!strcmp(key, "excessive-remaining")) { 177 logmsg("excessive-remaining set"); 178 config.excessive_remaining = TRUE; 179 } 180 } 181 } 182 fclose(fp); 183 } 184 else { 185 logmsg("No config file '%s' to read", configfile); 186 } 187} 188 189static void loghex(unsigned char *buffer, ssize_t len) 190{ 191 char data[12000]; 192 ssize_t i; 193 unsigned char *ptr = buffer; 194 char *optr = data; 195 ssize_t width = 0; 196 int left = sizeof(data); 197 198 for(i = 0; i<len && (left >= 0); i++) { 199 msnprintf(optr, left, "%02x", ptr[i]); 200 width += 2; 201 optr += 2; 202 left -= 2; 203 } 204 if(width) 205 logmsg("'%s'", data); 206} 207 208typedef enum { 209 FROM_CLIENT, 210 FROM_SERVER 211} mqttdir; 212 213static void logprotocol(mqttdir dir, 214 const char *prefix, size_t remlen, 215 FILE *output, 216 unsigned char *buffer, ssize_t len) 217{ 218 char data[12000] = ""; 219 ssize_t i; 220 unsigned char *ptr = buffer; 221 char *optr = data; 222 int left = sizeof(data); 223 224 for(i = 0; i<len && (left >= 0); i++) { 225 msnprintf(optr, left, "%02x", ptr[i]); 226 optr += 2; 227 left -= 2; 228 } 229 fprintf(output, "%s %s %zx %s\n", 230 dir == FROM_CLIENT? "client": "server", 231 prefix, remlen, 232 data); 233} 234 235 236/* return 0 on success */ 237static int connack(FILE *dump, curl_socket_t fd) 238{ 239 unsigned char packet[]={ 240 MQTT_MSG_CONNACK, 0x02, 241 0x00, 0x00 242 }; 243 ssize_t rc; 244 245 packet[3] = config.error_connack; 246 247 rc = swrite(fd, (char *)packet, sizeof(packet)); 248 if(rc > 0) { 249 logmsg("WROTE %zd bytes [CONNACK]", rc); 250 loghex(packet, rc); 251 logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet)); 252 } 253 if(rc == sizeof(packet)) { 254 return 0; 255 } 256 return 1; 257} 258 259/* return 0 on success */ 260static int suback(FILE *dump, curl_socket_t fd, unsigned short packetid) 261{ 262 unsigned char packet[]={ 263 MQTT_MSG_SUBACK, 0x03, 264 0, 0, /* filled in below */ 265 0x00 266 }; 267 ssize_t rc; 268 packet[2] = (unsigned char)(packetid >> 8); 269 packet[3] = (unsigned char)(packetid & 0xff); 270 271 rc = swrite(fd, (char *)packet, sizeof(packet)); 272 if(rc == sizeof(packet)) { 273 logmsg("WROTE %zd bytes [SUBACK]", rc); 274 loghex(packet, rc); 275 logprotocol(FROM_SERVER, "SUBACK", 3, dump, packet, rc); 276 return 0; 277 } 278 return 1; 279} 280 281#ifdef QOS 282/* return 0 on success */ 283static int puback(FILE *dump, curl_socket_t fd, unsigned short packetid) 284{ 285 unsigned char packet[]={ 286 MQTT_MSG_PUBACK, 0x00, 287 0, 0 /* filled in below */ 288 }; 289 ssize_t rc; 290 packet[2] = (unsigned char)(packetid >> 8); 291 packet[3] = (unsigned char)(packetid & 0xff); 292 293 rc = swrite(fd, (char *)packet, sizeof(packet)); 294 if(rc == sizeof(packet)) { 295 logmsg("WROTE %zd bytes [PUBACK]", rc); 296 loghex(packet, rc); 297 logprotocol(FROM_SERVER, dump, packet, rc); 298 return 0; 299 } 300 logmsg("Failed sending [PUBACK]"); 301 return 1; 302} 303#endif 304 305/* return 0 on success */ 306static int disconnect(FILE *dump, curl_socket_t fd) 307{ 308 unsigned char packet[]={ 309 MQTT_MSG_DISCONNECT, 0x00, 310 }; 311 ssize_t rc = swrite(fd, (char *)packet, sizeof(packet)); 312 if(rc == sizeof(packet)) { 313 logmsg("WROTE %zd bytes [DISCONNECT]", rc); 314 loghex(packet, rc); 315 logprotocol(FROM_SERVER, "DISCONNECT", 0, dump, packet, rc); 316 return 0; 317 } 318 logmsg("Failed sending [DISCONNECT]"); 319 return 1; 320} 321 322 323 324/* 325 do 326 327 encodedByte = X MOD 128 328 329 X = X DIV 128 330 331 // if there are more data to encode, set the top bit of this byte 332 333 if ( X > 0 ) 334 335 encodedByte = encodedByte OR 128 336 337 endif 338 339 'output' encodedByte 340 341 while ( X > 0 ) 342 343*/ 344 345/* return number of bytes used */ 346static int encode_length(size_t packetlen, 347 unsigned char *remlength) /* 4 bytes */ 348{ 349 int bytes = 0; 350 unsigned char encode; 351 352 do { 353 encode = packetlen % 0x80; 354 packetlen /= 0x80; 355 if(packetlen) 356 encode |= 0x80; 357 358 remlength[bytes++] = encode; 359 360 if(bytes > 3) { 361 logmsg("too large packet!"); 362 return 0; 363 } 364 } while(packetlen); 365 366 return bytes; 367} 368 369 370static size_t decode_length(unsigned char *buf, 371 size_t buflen, size_t *lenbytes) 372{ 373 size_t len = 0; 374 size_t mult = 1; 375 size_t i; 376 unsigned char encoded = 0x80; 377 378 for(i = 0; (i < buflen) && (encoded & 0x80); i++) { 379 encoded = buf[i]; 380 len += (encoded & 0x7f) * mult; 381 mult *= 0x80; 382 } 383 384 if(lenbytes) 385 *lenbytes = i; 386 387 return len; 388} 389 390 391/* return 0 on success */ 392static int publish(FILE *dump, 393 curl_socket_t fd, unsigned short packetid, 394 char *topic, char *payload, size_t payloadlen) 395{ 396 size_t topiclen = strlen(topic); 397 unsigned char *packet; 398 size_t payloadindex; 399 ssize_t remaininglength = topiclen + 2 + payloadlen; 400 ssize_t packetlen; 401 ssize_t sendamount; 402 ssize_t rc; 403 unsigned char rembuffer[4]; 404 int encodedlen; 405 406 if(config.excessive_remaining) { 407 /* manually set illegal remaining length */ 408 rembuffer[0] = 0xff; 409 rembuffer[1] = 0xff; 410 rembuffer[2] = 0xff; 411 rembuffer[3] = 0x80; /* maximum allowed here by spec is 0x7f */ 412 encodedlen = 4; 413 } 414 else 415 encodedlen = encode_length(remaininglength, rembuffer); 416 417 /* one packet type byte (possibly two more for packetid) */ 418 packetlen = remaininglength + encodedlen + 1; 419 packet = malloc(packetlen); 420 if(!packet) 421 return 1; 422 423 packet[0] = MQTT_MSG_PUBLISH; /* TODO: set QoS? */ 424 memcpy(&packet[1], rembuffer, encodedlen); 425 426 (void)packetid; 427 /* packet_id if QoS is set */ 428 429 packet[1 + encodedlen] = (unsigned char)(topiclen >> 8); 430 packet[2 + encodedlen] = (unsigned char)(topiclen & 0xff); 431 memcpy(&packet[3 + encodedlen], topic, topiclen); 432 433 payloadindex = 3 + topiclen + encodedlen; 434 memcpy(&packet[payloadindex], payload, payloadlen); 435 436 sendamount = packetlen; 437 if(config.short_publish) 438 sendamount -= 2; 439 440 rc = swrite(fd, (char *)packet, sendamount); 441 if(rc > 0) { 442 logmsg("WROTE %zd bytes [PUBLISH]", rc); 443 loghex(packet, rc); 444 logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc); 445 } 446 if(rc == packetlen) 447 return 0; 448 return 1; 449} 450 451#define MAX_TOPIC_LENGTH 65535 452#define MAX_CLIENT_ID_LENGTH 32 453 454static char topic[MAX_TOPIC_LENGTH + 1]; 455 456static int fixedheader(curl_socket_t fd, 457 unsigned char *bytep, 458 size_t *remaining_lengthp, 459 size_t *remaining_length_bytesp) 460{ 461 /* get the fixed header */ 462 unsigned char buffer[10]; 463 464 /* get the first two bytes */ 465 ssize_t rc = sread(fd, (char *)buffer, 2); 466 int i; 467 if(rc < 2) { 468 logmsg("READ %zd bytes [SHORT!]", rc); 469 return 1; /* fail */ 470 } 471 logmsg("READ %zd bytes", rc); 472 loghex(buffer, rc); 473 *bytep = buffer[0]; 474 475 /* if the length byte has the top bit set, get the next one too */ 476 i = 1; 477 while(buffer[i] & 0x80) { 478 i++; 479 rc = sread(fd, (char *)&buffer[i], 1); 480 if(rc != 1) { 481 logmsg("Remaining Length broken"); 482 return 1; 483 } 484 } 485 *remaining_lengthp = decode_length(&buffer[1], i, remaining_length_bytesp); 486 logmsg("Remaining Length: %zu [%zu bytes]", *remaining_lengthp, 487 *remaining_length_bytesp); 488 return 0; 489} 490 491static curl_socket_t mqttit(curl_socket_t fd) 492{ 493 size_t buff_size = 10*1024; 494 unsigned char *buffer = NULL; 495 ssize_t rc; 496 unsigned char byte; 497 unsigned short packet_id; 498 size_t payload_len; 499 size_t client_id_length; 500 size_t topic_len; 501 size_t remaining_length = 0; 502 size_t bytes = 0; /* remaining length field size in bytes */ 503 char client_id[MAX_CLIENT_ID_LENGTH]; 504 long testno; 505 FILE *stream = NULL; 506 FILE *dump; 507 char dumpfile[256]; 508 509 static const char protocol[7] = { 510 0x00, 0x04, /* protocol length */ 511 'M','Q','T','T', /* protocol name */ 512 0x04 /* protocol level */ 513 }; 514 msnprintf(dumpfile, sizeof(dumpfile), "%s/%s", logdir, REQUEST_DUMP); 515 dump = fopen(dumpfile, "ab"); 516 if(!dump) 517 goto end; 518 519 getconfig(); 520 521 testno = config.testnum; 522 523 if(testno) 524 logmsg("Found test number %ld", testno); 525 526 buffer = malloc(buff_size); 527 if(!buffer) { 528 logmsg("Out of memory, unable to allocate buffer"); 529 goto end; 530 } 531 532 do { 533 unsigned char usr_flag = 0x80; 534 unsigned char passwd_flag = 0x40; 535 unsigned char conn_flags; 536 const size_t client_id_offset = 12; 537 size_t start_usr; 538 size_t start_passwd; 539 540 /* get the fixed header */ 541 rc = fixedheader(fd, &byte, &remaining_length, &bytes); 542 if(rc) 543 break; 544 545 if(remaining_length >= buff_size) { 546 buff_size = remaining_length; 547 buffer = realloc(buffer, buff_size); 548 if(!buffer) { 549 logmsg("Failed realloc of size %zu", buff_size); 550 goto end; 551 } 552 } 553 554 if(remaining_length) { 555 /* reading variable header and payload into buffer */ 556 rc = sread(fd, (char *)buffer, remaining_length); 557 if(rc > 0) { 558 logmsg("READ %zd bytes", rc); 559 loghex(buffer, rc); 560 } 561 } 562 563 if(byte == MQTT_MSG_CONNECT) { 564 logprotocol(FROM_CLIENT, "CONNECT", remaining_length, 565 dump, buffer, rc); 566 567 if(memcmp(protocol, buffer, sizeof(protocol))) { 568 logmsg("Protocol preamble mismatch"); 569 goto end; 570 } 571 /* ignore the connect flag byte and two keepalive bytes */ 572 payload_len = (size_t)(buffer[10] << 8) | buffer[11]; 573 /* first part of the payload is the client ID */ 574 client_id_length = payload_len; 575 576 /* checking if user and password flags were set */ 577 conn_flags = buffer[7]; 578 579 start_usr = client_id_offset + payload_len; 580 if(usr_flag == (unsigned char)(conn_flags & usr_flag)) { 581 logmsg("User flag is present in CONN flag"); 582 payload_len += (size_t)(buffer[start_usr] << 8) | 583 buffer[start_usr + 1]; 584 payload_len += 2; /* MSB and LSB for user length */ 585 } 586 587 start_passwd = client_id_offset + payload_len; 588 if(passwd_flag == (char)(conn_flags & passwd_flag)) { 589 logmsg("Password flag is present in CONN flags"); 590 payload_len += (size_t)(buffer[start_passwd] << 8) | 591 buffer[start_passwd + 1]; 592 payload_len += 2; /* MSB and LSB for password length */ 593 } 594 595 /* check the length of the payload */ 596 if((ssize_t)payload_len != (rc - 12)) { 597 logmsg("Payload length mismatch, expected %zx got %zx", 598 rc - 12, payload_len); 599 goto end; 600 } 601 /* check the length of the client ID */ 602 else if((client_id_length + 1) > MAX_CLIENT_ID_LENGTH) { 603 logmsg("Too large client id"); 604 goto end; 605 } 606 memcpy(client_id, &buffer[12], client_id_length); 607 client_id[client_id_length] = 0; 608 609 logmsg("MQTT client connect accepted: %s", client_id); 610 611 /* The first packet sent from the Server to the Client MUST be a 612 CONNACK Packet */ 613 614 if(connack(dump, fd)) { 615 logmsg("failed sending CONNACK"); 616 goto end; 617 } 618 } 619 else if(byte == MQTT_MSG_SUBSCRIBE) { 620 int error; 621 char *data; 622 size_t datalen; 623 logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length, 624 dump, buffer, rc); 625 logmsg("Incoming SUBSCRIBE"); 626 627 if(rc < 6) { 628 logmsg("Too small SUBSCRIBE"); 629 goto end; 630 } 631 632 /* two bytes packet id */ 633 packet_id = (unsigned short)((buffer[0] << 8) | buffer[1]); 634 635 /* two bytes topic length */ 636 topic_len = (size_t)(buffer[2] << 8) | buffer[3]; 637 if(topic_len != (remaining_length - 5)) { 638 logmsg("Wrong topic length, got %zu expected %zu", 639 topic_len, remaining_length - 5); 640 goto end; 641 } 642 memcpy(topic, &buffer[4], topic_len); 643 topic[topic_len] = 0; 644 645 /* there's a QoS byte (two bits) after the topic */ 646 647 logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id); 648 stream = test2fopen(testno, logdir); 649 error = getpart(&data, &datalen, "reply", "data", stream); 650 if(!error) { 651 if(!config.publish_before_suback) { 652 if(suback(dump, fd, packet_id)) { 653 logmsg("failed sending SUBACK"); 654 goto end; 655 } 656 } 657 if(publish(dump, fd, packet_id, topic, data, datalen)) { 658 logmsg("PUBLISH failed"); 659 goto end; 660 } 661 if(config.publish_before_suback) { 662 if(suback(dump, fd, packet_id)) { 663 logmsg("failed sending SUBACK"); 664 goto end; 665 } 666 } 667 } 668 else { 669 char *def = (char *)"this is random payload yes yes it is"; 670 publish(dump, fd, packet_id, topic, def, strlen(def)); 671 } 672 disconnect(dump, fd); 673 } 674 else if((byte & 0xf0) == (MQTT_MSG_PUBLISH & 0xf0)) { 675 size_t topiclen; 676 677 logmsg("Incoming PUBLISH"); 678 logprotocol(FROM_CLIENT, "PUBLISH", remaining_length, 679 dump, buffer, rc); 680 681 topiclen = (size_t)(buffer[1 + bytes] << 8) | buffer[2 + bytes]; 682 logmsg("Got %zu bytes topic", topiclen); 683 /* TODO: verify topiclen */ 684 685#ifdef QOS 686 /* TODO: handle packetid if there is one. Send puback if QoS > 0 */ 687 puback(dump, fd, 0); 688#endif 689 /* expect a disconnect here */ 690 /* get the request */ 691 rc = sread(fd, (char *)&buffer[0], 2); 692 693 logmsg("READ %zd bytes [DISCONNECT]", rc); 694 loghex(buffer, rc); 695 logprotocol(FROM_CLIENT, "DISCONNECT", 0, dump, buffer, rc); 696 goto end; 697 } 698 else { 699 /* not supported (yet) */ 700 goto end; 701 } 702 } while(1); 703 704end: 705 if(buffer) 706 free(buffer); 707 if(dump) 708 fclose(dump); 709 if(stream) 710 fclose(stream); 711 return CURL_SOCKET_BAD; 712} 713 714/* 715 sockfdp is a pointer to an established stream or CURL_SOCKET_BAD 716 717 if sockfd is CURL_SOCKET_BAD, listendfd is a listening socket we must 718 accept() 719*/ 720static bool incoming(curl_socket_t listenfd) 721{ 722 fd_set fds_read; 723 fd_set fds_write; 724 fd_set fds_err; 725 int clients = 0; /* connected clients */ 726 727 if(got_exit_signal) { 728 logmsg("signalled to die, exiting..."); 729 return FALSE; 730 } 731 732#ifdef HAVE_GETPPID 733 /* As a last resort, quit if socks5 process becomes orphan. */ 734 if(getppid() <= 1) { 735 logmsg("process becomes orphan, exiting"); 736 return FALSE; 737 } 738#endif 739 740 do { 741 ssize_t rc; 742 int error = 0; 743 curl_socket_t sockfd = listenfd; 744 int maxfd = (int)sockfd; 745 746 FD_ZERO(&fds_read); 747 FD_ZERO(&fds_write); 748 FD_ZERO(&fds_err); 749 750 /* there's always a socket to wait for */ 751 FD_SET(sockfd, &fds_read); 752 753 do { 754 /* select() blocking behavior call on blocking descriptors please */ 755 rc = select(maxfd + 1, &fds_read, &fds_write, &fds_err, NULL); 756 if(got_exit_signal) { 757 logmsg("signalled to die, exiting..."); 758 return FALSE; 759 } 760 } while((rc == -1) && ((error = SOCKERRNO) == EINTR)); 761 762 if(rc < 0) { 763 logmsg("select() failed with error: (%d) %s", 764 error, strerror(error)); 765 return FALSE; 766 } 767 768 if(FD_ISSET(sockfd, &fds_read)) { 769 curl_socket_t newfd = accept(sockfd, NULL, NULL); 770 if(CURL_SOCKET_BAD == newfd) { 771 error = SOCKERRNO; 772 logmsg("accept(%" CURL_FORMAT_SOCKET_T ", NULL, NULL) " 773 "failed with error: (%d) %s", sockfd, error, sstrerror(error)); 774 } 775 else { 776 logmsg("====> Client connect, fd %" CURL_FORMAT_SOCKET_T ". " 777 "Read config from %s", newfd, configfile); 778 set_advisor_read_lock(loglockfile); 779 (void)mqttit(newfd); /* until done */ 780 clear_advisor_read_lock(loglockfile); 781 782 logmsg("====> Client disconnect"); 783 sclose(newfd); 784 } 785 } 786 } while(clients); 787 788 return TRUE; 789} 790 791static curl_socket_t sockdaemon(curl_socket_t sock, 792 unsigned short *listenport) 793{ 794 /* passive daemon style */ 795 srvr_sockaddr_union_t listener; 796 int flag; 797 int rc; 798 int totdelay = 0; 799 int maxretr = 10; 800 int delay = 20; 801 int attempt = 0; 802 int error = 0; 803 804 do { 805 attempt++; 806 flag = 1; 807 rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 808 (void *)&flag, sizeof(flag)); 809 if(rc) { 810 error = SOCKERRNO; 811 logmsg("setsockopt(SO_REUSEADDR) failed with error: (%d) %s", 812 error, sstrerror(error)); 813 if(maxretr) { 814 rc = wait_ms(delay); 815 if(rc) { 816 /* should not happen */ 817 logmsg("wait_ms() failed with error: %d", rc); 818 sclose(sock); 819 return CURL_SOCKET_BAD; 820 } 821 if(got_exit_signal) { 822 logmsg("signalled to die, exiting..."); 823 sclose(sock); 824 return CURL_SOCKET_BAD; 825 } 826 totdelay += delay; 827 delay *= 2; /* double the sleep for next attempt */ 828 } 829 } 830 } while(rc && maxretr--); 831 832 if(rc) { 833 logmsg("setsockopt(SO_REUSEADDR) failed %d times in %d ms. Error: (%d) %s", 834 attempt, totdelay, error, strerror(error)); 835 logmsg("Continuing anyway..."); 836 } 837 838 /* When the specified listener port is zero, it is actually a 839 request to let the system choose a non-zero available port. */ 840 841#ifdef ENABLE_IPV6 842 if(!use_ipv6) { 843#endif 844 memset(&listener.sa4, 0, sizeof(listener.sa4)); 845 listener.sa4.sin_family = AF_INET; 846 listener.sa4.sin_addr.s_addr = INADDR_ANY; 847 listener.sa4.sin_port = htons(*listenport); 848 rc = bind(sock, &listener.sa, sizeof(listener.sa4)); 849#ifdef ENABLE_IPV6 850 } 851 else { 852 memset(&listener.sa6, 0, sizeof(listener.sa6)); 853 listener.sa6.sin6_family = AF_INET6; 854 listener.sa6.sin6_addr = in6addr_any; 855 listener.sa6.sin6_port = htons(*listenport); 856 rc = bind(sock, &listener.sa, sizeof(listener.sa6)); 857 } 858#endif /* ENABLE_IPV6 */ 859 if(rc) { 860 error = SOCKERRNO; 861 logmsg("Error binding socket on port %hu: (%d) %s", 862 *listenport, error, sstrerror(error)); 863 sclose(sock); 864 return CURL_SOCKET_BAD; 865 } 866 867 if(!*listenport) { 868 /* The system was supposed to choose a port number, figure out which 869 port we actually got and update the listener port value with it. */ 870 curl_socklen_t la_size; 871 srvr_sockaddr_union_t localaddr; 872#ifdef ENABLE_IPV6 873 if(!use_ipv6) 874#endif 875 la_size = sizeof(localaddr.sa4); 876#ifdef ENABLE_IPV6 877 else 878 la_size = sizeof(localaddr.sa6); 879#endif 880 memset(&localaddr.sa, 0, (size_t)la_size); 881 if(getsockname(sock, &localaddr.sa, &la_size) < 0) { 882 error = SOCKERRNO; 883 logmsg("getsockname() failed with error: (%d) %s", 884 error, sstrerror(error)); 885 sclose(sock); 886 return CURL_SOCKET_BAD; 887 } 888 switch(localaddr.sa.sa_family) { 889 case AF_INET: 890 *listenport = ntohs(localaddr.sa4.sin_port); 891 break; 892#ifdef ENABLE_IPV6 893 case AF_INET6: 894 *listenport = ntohs(localaddr.sa6.sin6_port); 895 break; 896#endif 897 default: 898 break; 899 } 900 if(!*listenport) { 901 /* Real failure, listener port shall not be zero beyond this point. */ 902 logmsg("Apparently getsockname() succeeded, with listener port zero."); 903 logmsg("A valid reason for this failure is a binary built without"); 904 logmsg("proper network library linkage. This might not be the only"); 905 logmsg("reason, but double check it before anything else."); 906 sclose(sock); 907 return CURL_SOCKET_BAD; 908 } 909 } 910 911 /* start accepting connections */ 912 rc = listen(sock, 5); 913 if(0 != rc) { 914 error = SOCKERRNO; 915 logmsg("listen(%" CURL_FORMAT_SOCKET_T ", 5) failed with error: (%d) %s", 916 sock, error, sstrerror(error)); 917 sclose(sock); 918 return CURL_SOCKET_BAD; 919 } 920 921 return sock; 922} 923 924 925int main(int argc, char *argv[]) 926{ 927 curl_socket_t sock = CURL_SOCKET_BAD; 928 curl_socket_t msgsock = CURL_SOCKET_BAD; 929 int wrotepidfile = 0; 930 int wroteportfile = 0; 931 const char *pidname = ".mqttd.pid"; 932 const char *portname = ".mqttd.port"; 933 bool juggle_again; 934 int error; 935 int arg = 1; 936 937 while(argc>arg) { 938 if(!strcmp("--version", argv[arg])) { 939 printf("mqttd IPv4%s\n", 940#ifdef ENABLE_IPV6 941 "/IPv6" 942#else 943 "" 944#endif 945 ); 946 return 0; 947 } 948 else if(!strcmp("--pidfile", argv[arg])) { 949 arg++; 950 if(argc>arg) 951 pidname = argv[arg++]; 952 } 953 else if(!strcmp("--portfile", argv[arg])) { 954 arg++; 955 if(argc>arg) 956 portname = argv[arg++]; 957 } 958 else if(!strcmp("--config", argv[arg])) { 959 arg++; 960 if(argc>arg) 961 configfile = argv[arg++]; 962 } 963 else if(!strcmp("--logfile", argv[arg])) { 964 arg++; 965 if(argc>arg) 966 serverlogfile = argv[arg++]; 967 } 968 else if(!strcmp("--logdir", argv[arg])) { 969 arg++; 970 if(argc>arg) 971 logdir = argv[arg++]; 972 } 973 else if(!strcmp("--ipv6", argv[arg])) { 974#ifdef ENABLE_IPV6 975 ipv_inuse = "IPv6"; 976 use_ipv6 = TRUE; 977#endif 978 arg++; 979 } 980 else if(!strcmp("--ipv4", argv[arg])) { 981 /* for completeness, we support this option as well */ 982#ifdef ENABLE_IPV6 983 ipv_inuse = "IPv4"; 984 use_ipv6 = FALSE; 985#endif 986 arg++; 987 } 988 else if(!strcmp("--port", argv[arg])) { 989 arg++; 990 if(argc>arg) { 991 char *endptr; 992 unsigned long ulnum = strtoul(argv[arg], &endptr, 10); 993 if((endptr != argv[arg] + strlen(argv[arg])) || 994 ((ulnum != 0UL) && ((ulnum < 1025UL) || (ulnum > 65535UL)))) { 995 fprintf(stderr, "mqttd: invalid --port argument (%s)\n", 996 argv[arg]); 997 return 0; 998 } 999 port = curlx_ultous(ulnum); 1000 arg++; 1001 } 1002 } 1003 else { 1004 puts("Usage: mqttd [option]\n" 1005 " --config [file]\n" 1006 " --version\n" 1007 " --logfile [file]\n" 1008 " --logdir [directory]\n" 1009 " --pidfile [file]\n" 1010 " --portfile [file]\n" 1011 " --ipv4\n" 1012 " --ipv6\n" 1013 " --port [port]\n"); 1014 return 0; 1015 } 1016 } 1017 1018 msnprintf(loglockfile, sizeof(loglockfile), "%s/%s/mqtt-%s.lock", 1019 logdir, SERVERLOGS_LOCKDIR, ipv_inuse); 1020 1021#ifdef _WIN32 1022 win32_init(); 1023 atexit(win32_cleanup); 1024 1025 setmode(fileno(stdin), O_BINARY); 1026 setmode(fileno(stdout), O_BINARY); 1027 setmode(fileno(stderr), O_BINARY); 1028#endif 1029 1030 install_signal_handlers(FALSE); 1031 1032#ifdef ENABLE_IPV6 1033 if(!use_ipv6) 1034#endif 1035 sock = socket(AF_INET, SOCK_STREAM, 0); 1036#ifdef ENABLE_IPV6 1037 else 1038 sock = socket(AF_INET6, SOCK_STREAM, 0); 1039#endif 1040 1041 if(CURL_SOCKET_BAD == sock) { 1042 error = SOCKERRNO; 1043 logmsg("Error creating socket: (%d) %s", error, sstrerror(error)); 1044 goto mqttd_cleanup; 1045 } 1046 1047 { 1048 /* passive daemon style */ 1049 sock = sockdaemon(sock, &port); 1050 if(CURL_SOCKET_BAD == sock) { 1051 goto mqttd_cleanup; 1052 } 1053 msgsock = CURL_SOCKET_BAD; /* no stream socket yet */ 1054 } 1055 1056 logmsg("Running %s version", ipv_inuse); 1057 logmsg("Listening on port %hu", port); 1058 1059 wrotepidfile = write_pidfile(pidname); 1060 if(!wrotepidfile) { 1061 goto mqttd_cleanup; 1062 } 1063 1064 wroteportfile = write_portfile(portname, port); 1065 if(!wroteportfile) { 1066 goto mqttd_cleanup; 1067 } 1068 1069 do { 1070 juggle_again = incoming(sock); 1071 } while(juggle_again); 1072 1073mqttd_cleanup: 1074 1075 if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD)) 1076 sclose(msgsock); 1077 1078 if(sock != CURL_SOCKET_BAD) 1079 sclose(sock); 1080 1081 if(wrotepidfile) 1082 unlink(pidname); 1083 if(wroteportfile) 1084 unlink(portname); 1085 1086 restore_signal_handlers(FALSE); 1087 1088 if(got_exit_signal) { 1089 logmsg("============> mqttd exits with signal (%d)", exit_signal); 1090 /* 1091 * To properly set the return status of the process we 1092 * must raise the same signal SIGINT or SIGTERM that we 1093 * caught and let the old handler take care of it. 1094 */ 1095 raise(exit_signal); 1096 } 1097 1098 logmsg("============> mqttd quits"); 1099 return 0; 1100} 1101