1d4afb5ceSopenharmony_ci/* 2d4afb5ceSopenharmony_ci * libwebsockets - small server side websockets and web server implementation 3d4afb5ceSopenharmony_ci * 4d4afb5ceSopenharmony_ci * Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com> 5d4afb5ceSopenharmony_ci * 6d4afb5ceSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy 7d4afb5ceSopenharmony_ci * of this software and associated documentation files (the "Software"), to 8d4afb5ceSopenharmony_ci * deal in the Software without restriction, including without limitation the 9d4afb5ceSopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10d4afb5ceSopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is 11d4afb5ceSopenharmony_ci * furnished to do so, subject to the following conditions: 12d4afb5ceSopenharmony_ci * 13d4afb5ceSopenharmony_ci * The above copyright notice and this permission notice shall be included in 14d4afb5ceSopenharmony_ci * all copies or substantial portions of the Software. 15d4afb5ceSopenharmony_ci * 16d4afb5ceSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17d4afb5ceSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18d4afb5ceSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19d4afb5ceSopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20d4afb5ceSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21d4afb5ceSopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22d4afb5ceSopenharmony_ci * IN THE SOFTWARE. 23d4afb5ceSopenharmony_ci * 24d4afb5ceSopenharmony_ci * MQTT v5 25d4afb5ceSopenharmony_ci * 26d4afb5ceSopenharmony_ci * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html 27d4afb5ceSopenharmony_ci * 28d4afb5ceSopenharmony_ci * Control Packet structure 29d4afb5ceSopenharmony_ci * 30d4afb5ceSopenharmony_ci * - Always: 2+ byte: Fixed Hdr 31d4afb5ceSopenharmony_ci * - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props 32d4afb5ceSopenharmony_ci * - Required in some: variable: Payload 33d4afb5ceSopenharmony_ci * 34d4afb5ceSopenharmony_ci * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1] 35d4afb5ceSopenharmony_ci * 36d4afb5ceSopenharmony_ci * - Client Identifier 37d4afb5ceSopenharmony_ci * - Will Properties 38d4afb5ceSopenharmony_ci * - Will Topic 39d4afb5ceSopenharmony_ci * - Will Payload 40d4afb5ceSopenharmony_ci * - User Name 41d4afb5ceSopenharmony_ci * - Password 42d4afb5ceSopenharmony_ci */ 43d4afb5ceSopenharmony_ci 44d4afb5ceSopenharmony_ci#include "private-lib-core.h" 45d4afb5ceSopenharmony_ci#include <string.h> 46d4afb5ceSopenharmony_ci#include <sys/types.h> 47d4afb5ceSopenharmony_ci#include <assert.h> 48d4afb5ceSopenharmony_ci 49d4afb5ceSopenharmony_citypedef enum { 50d4afb5ceSopenharmony_ci LMQPRS_AWAITING_CONNECT, 51d4afb5ceSopenharmony_ci 52d4afb5ceSopenharmony_ci} lws_mqtt_protocol_server_connstate_t; 53d4afb5ceSopenharmony_ci 54d4afb5ceSopenharmony_ciconst char * const reason_names_g1[] = { 55d4afb5ceSopenharmony_ci "Success / Normal disconnection / QoS0", 56d4afb5ceSopenharmony_ci "QoS1", 57d4afb5ceSopenharmony_ci "QoS2", 58d4afb5ceSopenharmony_ci "Disconnect Will", 59d4afb5ceSopenharmony_ci "No matching subscriber", 60d4afb5ceSopenharmony_ci "No subscription existed", 61d4afb5ceSopenharmony_ci "Continue authentication", 62d4afb5ceSopenharmony_ci "Re-authenticate" 63d4afb5ceSopenharmony_ci}; 64d4afb5ceSopenharmony_ci 65d4afb5ceSopenharmony_ciconst char * const reason_names_g2[] = { 66d4afb5ceSopenharmony_ci "Unspecified error", 67d4afb5ceSopenharmony_ci "Malformed packet", 68d4afb5ceSopenharmony_ci "Protocol error", 69d4afb5ceSopenharmony_ci "Implementation specific error", 70d4afb5ceSopenharmony_ci "Unsupported protocol", 71d4afb5ceSopenharmony_ci "Client ID invalid", 72d4afb5ceSopenharmony_ci "Bad credentials", 73d4afb5ceSopenharmony_ci "Not Authorized", 74d4afb5ceSopenharmony_ci "Server Unavailable", 75d4afb5ceSopenharmony_ci "Server Busy", 76d4afb5ceSopenharmony_ci "Banned", 77d4afb5ceSopenharmony_ci "Server Shutting Down", 78d4afb5ceSopenharmony_ci "Bad Authentication Method", 79d4afb5ceSopenharmony_ci "Keepalive Timeout", 80d4afb5ceSopenharmony_ci "Session taken over", 81d4afb5ceSopenharmony_ci "Topic Filter Invalid", 82d4afb5ceSopenharmony_ci "Packet ID in use", 83d4afb5ceSopenharmony_ci "Packet ID not found", 84d4afb5ceSopenharmony_ci "Max RX Exceeded", 85d4afb5ceSopenharmony_ci "Topic Alias Invalid", 86d4afb5ceSopenharmony_ci "Packet too large", 87d4afb5ceSopenharmony_ci "Ratelimit", 88d4afb5ceSopenharmony_ci "Quota Exceeded", 89d4afb5ceSopenharmony_ci "Administrative Action", 90d4afb5ceSopenharmony_ci "Payload format invalid", 91d4afb5ceSopenharmony_ci "Retain not supported", 92d4afb5ceSopenharmony_ci "QoS not supported", 93d4afb5ceSopenharmony_ci "Use another server", 94d4afb5ceSopenharmony_ci "Server Moved", 95d4afb5ceSopenharmony_ci "Shared subscriptions not supported", 96d4afb5ceSopenharmony_ci "Connection rate exceeded", 97d4afb5ceSopenharmony_ci "Maximum Connect Time", 98d4afb5ceSopenharmony_ci "Subscription IDs not supported", 99d4afb5ceSopenharmony_ci "Wildcard subscriptions not supported" 100d4afb5ceSopenharmony_ci}; 101d4afb5ceSopenharmony_ci 102d4afb5ceSopenharmony_ci#define LMQCP_WILL_PROPERTIES 0 103d4afb5ceSopenharmony_ci 104d4afb5ceSopenharmony_ci/* For each property, a bitmap describing which commands it is valid for */ 105d4afb5ceSopenharmony_ci 106d4afb5ceSopenharmony_cistatic const uint16_t property_valid[] = { 107d4afb5ceSopenharmony_ci [LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) | 108d4afb5ceSopenharmony_ci (1 << LMQCP_WILL_PROPERTIES), 109d4afb5ceSopenharmony_ci [LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) | 110d4afb5ceSopenharmony_ci (1 << LMQCP_WILL_PROPERTIES), 111d4afb5ceSopenharmony_ci [LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) | 112d4afb5ceSopenharmony_ci (1 << LMQCP_WILL_PROPERTIES), 113d4afb5ceSopenharmony_ci [LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) | 114d4afb5ceSopenharmony_ci (1 << LMQCP_WILL_PROPERTIES), 115d4afb5ceSopenharmony_ci [LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) | 116d4afb5ceSopenharmony_ci (1 << LMQCP_WILL_PROPERTIES), 117d4afb5ceSopenharmony_ci [LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) | 118d4afb5ceSopenharmony_ci (1 << LMQCP_CTOS_SUBSCRIBE), 119d4afb5ceSopenharmony_ci [LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) | 120d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_CONNACK) | 121d4afb5ceSopenharmony_ci (1 << LMQCP_DISCONNECT), 122d4afb5ceSopenharmony_ci [LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK), 123d4afb5ceSopenharmony_ci [LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK), 124d4afb5ceSopenharmony_ci [LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) | 125d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_CONNACK) | 126d4afb5ceSopenharmony_ci (1 << LMQCP_AUTH), 127d4afb5ceSopenharmony_ci [LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) | 128d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_CONNACK) | 129d4afb5ceSopenharmony_ci (1 << LMQCP_AUTH), 130d4afb5ceSopenharmony_ci [LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT), 131d4afb5ceSopenharmony_ci [LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES), 132d4afb5ceSopenharmony_ci [LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT), 133d4afb5ceSopenharmony_ci [LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK), 134d4afb5ceSopenharmony_ci [LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) | 135d4afb5ceSopenharmony_ci (1 << LMQCP_DISCONNECT), 136d4afb5ceSopenharmony_ci [LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) | 137d4afb5ceSopenharmony_ci (1 << LMQCP_PUBACK) | 138d4afb5ceSopenharmony_ci (1 << LMQCP_PUBREC) | 139d4afb5ceSopenharmony_ci (1 << LMQCP_PUBREL) | 140d4afb5ceSopenharmony_ci (1 << LMQCP_PUBCOMP) | 141d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_SUBACK) | 142d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_UNSUBACK) | 143d4afb5ceSopenharmony_ci (1 << LMQCP_DISCONNECT) | 144d4afb5ceSopenharmony_ci (1 << LMQCP_AUTH), 145d4afb5ceSopenharmony_ci [LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) | 146d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_CONNACK), 147d4afb5ceSopenharmony_ci [LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) | 148d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_CONNACK), 149d4afb5ceSopenharmony_ci [LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH), 150d4afb5ceSopenharmony_ci [LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK), 151d4afb5ceSopenharmony_ci [LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK), 152d4afb5ceSopenharmony_ci [LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) | 153d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_CONNACK) | 154d4afb5ceSopenharmony_ci (1 << LMQCP_PUBLISH) | 155d4afb5ceSopenharmony_ci (1 << LMQCP_WILL_PROPERTIES) | 156d4afb5ceSopenharmony_ci (1 << LMQCP_PUBACK) | 157d4afb5ceSopenharmony_ci (1 << LMQCP_PUBREC) | 158d4afb5ceSopenharmony_ci (1 << LMQCP_PUBREL) | 159d4afb5ceSopenharmony_ci (1 << LMQCP_PUBCOMP) | 160d4afb5ceSopenharmony_ci (1 << LMQCP_CTOS_SUBSCRIBE) | 161d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_SUBACK) | 162d4afb5ceSopenharmony_ci (1 << LMQCP_CTOS_UNSUBSCRIBE) | 163d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_UNSUBACK) | 164d4afb5ceSopenharmony_ci (1 << LMQCP_DISCONNECT) | 165d4afb5ceSopenharmony_ci (1 << LMQCP_AUTH), 166d4afb5ceSopenharmony_ci [LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) | 167d4afb5ceSopenharmony_ci (1 << LMQCP_STOC_CONNACK), 168d4afb5ceSopenharmony_ci [LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK), 169d4afb5ceSopenharmony_ci [LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK), 170d4afb5ceSopenharmony_ci [LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK) 171d4afb5ceSopenharmony_ci}; 172d4afb5ceSopenharmony_ci 173d4afb5ceSopenharmony_ci 174d4afb5ceSopenharmony_ci/* 175d4afb5ceSopenharmony_ci * For each command index, maps flags, id, qos and payload legality 176d4afb5ceSopenharmony_ci * notice in most cases PUBLISH requires further processing 177d4afb5ceSopenharmony_ci */ 178d4afb5ceSopenharmony_cistatic const uint8_t map_flags[] = { 179d4afb5ceSopenharmony_ci [LMQCP_RESERVED] = 0x00, 180d4afb5ceSopenharmony_ci [LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 181d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PAYLOAD | 182d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 183d4afb5ceSopenharmony_ci [LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 184d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 185d4afb5ceSopenharmony_ci [LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */ 186d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00, 187d4afb5ceSopenharmony_ci [LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 188d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 189d4afb5ceSopenharmony_ci [LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 190d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 191d4afb5ceSopenharmony_ci [LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 192d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02, 193d4afb5ceSopenharmony_ci [LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 194d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 195d4afb5ceSopenharmony_ci [LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 196d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PAYLOAD | 197d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02, 198d4afb5ceSopenharmony_ci [LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 199d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PAYLOAD | 200d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00, 201d4afb5ceSopenharmony_ci [LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 202d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PAYLOAD | 203d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02, 204d4afb5ceSopenharmony_ci [LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 205d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PAYLOAD | 206d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 207d4afb5ceSopenharmony_ci [LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 208d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 209d4afb5ceSopenharmony_ci [LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 210d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 211d4afb5ceSopenharmony_ci [LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 212d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 213d4afb5ceSopenharmony_ci [LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS | 214d4afb5ceSopenharmony_ci LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, 215d4afb5ceSopenharmony_ci}; 216d4afb5ceSopenharmony_ci 217d4afb5ceSopenharmony_cistatic int 218d4afb5ceSopenharmony_cilws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed) 219d4afb5ceSopenharmony_ci{ 220d4afb5ceSopenharmony_ci par->consumed += (unsigned int)consumed; 221d4afb5ceSopenharmony_ci 222d4afb5ceSopenharmony_ci if (par->consumed > par->props_len) 223d4afb5ceSopenharmony_ci return -1; 224d4afb5ceSopenharmony_ci 225d4afb5ceSopenharmony_ci /* more properties coming */ 226d4afb5ceSopenharmony_ci 227d4afb5ceSopenharmony_ci if (par->consumed < par->props_len) { 228d4afb5ceSopenharmony_ci par->state = LMQCPP_PROP_ID_VBI; 229d4afb5ceSopenharmony_ci return 0; 230d4afb5ceSopenharmony_ci } 231d4afb5ceSopenharmony_ci 232d4afb5ceSopenharmony_ci /* properties finished: are we headed for payload or idle? */ 233d4afb5ceSopenharmony_ci 234d4afb5ceSopenharmony_ci if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) && 235d4afb5ceSopenharmony_ci /* A PUBLISH packet MUST NOT contain a Packet Identifier if 236d4afb5ceSopenharmony_ci * its QoS value is set to 0 [MQTT-2.2.1-2]. */ 237d4afb5ceSopenharmony_ci (ctl_pkt_type(par) != LMQCP_PUBLISH || 238d4afb5ceSopenharmony_ci (par->packet_type_flags & 6))) { 239d4afb5ceSopenharmony_ci par->state = LMQCPP_PAYLOAD; 240d4afb5ceSopenharmony_ci return 0; 241d4afb5ceSopenharmony_ci } 242d4afb5ceSopenharmony_ci 243d4afb5ceSopenharmony_ci par->state = LMQCPP_IDLE; 244d4afb5ceSopenharmony_ci 245d4afb5ceSopenharmony_ci return 0; 246d4afb5ceSopenharmony_ci} 247d4afb5ceSopenharmony_ci 248d4afb5ceSopenharmony_cistatic int 249d4afb5ceSopenharmony_cilws_mqtt_set_client_established(struct lws *wsi) 250d4afb5ceSopenharmony_ci{ 251d4afb5ceSopenharmony_ci lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED, 252d4afb5ceSopenharmony_ci &role_ops_mqtt); 253d4afb5ceSopenharmony_ci 254d4afb5ceSopenharmony_ci if (user_callback_handle_rxflow(wsi->a.protocol->callback, 255d4afb5ceSopenharmony_ci wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED, 256d4afb5ceSopenharmony_ci wsi->user_space, NULL, 0) < 0) { 257d4afb5ceSopenharmony_ci lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__); 258d4afb5ceSopenharmony_ci 259d4afb5ceSopenharmony_ci return -1; 260d4afb5ceSopenharmony_ci } 261d4afb5ceSopenharmony_ci /* 262d4afb5ceSopenharmony_ci * If we made a new connection and got the ACK, our connection is 263d4afb5ceSopenharmony_ci * definitely working in both directions at the moment 264d4afb5ceSopenharmony_ci */ 265d4afb5ceSopenharmony_ci lws_validity_confirmed(wsi); 266d4afb5ceSopenharmony_ci 267d4afb5ceSopenharmony_ci /* clear connection timeout */ 268d4afb5ceSopenharmony_ci lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0); 269d4afb5ceSopenharmony_ci 270d4afb5ceSopenharmony_ci return 0; 271d4afb5ceSopenharmony_ci} 272d4afb5ceSopenharmony_ci 273d4afb5ceSopenharmony_cistatic lws_mqtt_validate_topic_return_t 274d4afb5ceSopenharmony_cilws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot) 275d4afb5ceSopenharmony_ci{ 276d4afb5ceSopenharmony_ci size_t spos = 0; 277d4afb5ceSopenharmony_ci const char *sub = topic; 278d4afb5ceSopenharmony_ci int8_t slashes = 0; 279d4afb5ceSopenharmony_ci lws_mqtt_validate_topic_return_t ret = LMVTR_VALID; 280d4afb5ceSopenharmony_ci 281d4afb5ceSopenharmony_ci if (awsiot) { 282d4afb5ceSopenharmony_ci if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN) 283d4afb5ceSopenharmony_ci return LMVTR_FAILED_OVERSIZE; 284d4afb5ceSopenharmony_ci if (topic[0] == '$') { 285d4afb5ceSopenharmony_ci ret = LMVTR_VALID_SHADOW; 286d4afb5ceSopenharmony_ci slashes = -3; 287d4afb5ceSopenharmony_ci } 288d4afb5ceSopenharmony_ci } else { 289d4afb5ceSopenharmony_ci if (topiclen > LWS_MQTT_MAX_TOPICLEN) 290d4afb5ceSopenharmony_ci return LMVTR_FAILED_OVERSIZE; 291d4afb5ceSopenharmony_ci if (topic[0] == '$') 292d4afb5ceSopenharmony_ci return LMVTR_FAILED_WILDCARD_FORMAT; 293d4afb5ceSopenharmony_ci } 294d4afb5ceSopenharmony_ci 295d4afb5ceSopenharmony_ci while (*sub != 0) { 296d4afb5ceSopenharmony_ci if (sub[0] == '+') { 297d4afb5ceSopenharmony_ci /* topic == "+foo" || "a/+foo" ? */ 298d4afb5ceSopenharmony_ci if (spos > 0 && sub[-1] != '/') 299d4afb5ceSopenharmony_ci return LMVTR_FAILED_WILDCARD_FORMAT; 300d4afb5ceSopenharmony_ci 301d4afb5ceSopenharmony_ci /* topic == "foo+" or "foo+/a" ? */ 302d4afb5ceSopenharmony_ci if (sub[1] != 0 && sub[1] != '/') 303d4afb5ceSopenharmony_ci return LMVTR_FAILED_WILDCARD_FORMAT; 304d4afb5ceSopenharmony_ci 305d4afb5ceSopenharmony_ci ret = LMVTR_VALID_WILDCARD; 306d4afb5ceSopenharmony_ci } else if (sub[0] == '#') { 307d4afb5ceSopenharmony_ci /* topic == "foo#" ? */ 308d4afb5ceSopenharmony_ci if (spos > 0 && sub[-1] != '/') 309d4afb5ceSopenharmony_ci return LMVTR_FAILED_WILDCARD_FORMAT; 310d4afb5ceSopenharmony_ci 311d4afb5ceSopenharmony_ci /* topic == "#foo" ? */ 312d4afb5ceSopenharmony_ci if (sub[1] != 0) 313d4afb5ceSopenharmony_ci return LMVTR_FAILED_WILDCARD_FORMAT; 314d4afb5ceSopenharmony_ci 315d4afb5ceSopenharmony_ci ret = LMVTR_VALID_WILDCARD; 316d4afb5ceSopenharmony_ci } else if (sub[0] == '/') { 317d4afb5ceSopenharmony_ci slashes++; 318d4afb5ceSopenharmony_ci } 319d4afb5ceSopenharmony_ci spos++; 320d4afb5ceSopenharmony_ci sub++; 321d4afb5ceSopenharmony_ci } 322d4afb5ceSopenharmony_ci 323d4afb5ceSopenharmony_ci if (awsiot && (slashes < 0 || slashes > 7)) 324d4afb5ceSopenharmony_ci return LMVTR_FAILED_SHADOW_FORMAT; 325d4afb5ceSopenharmony_ci 326d4afb5ceSopenharmony_ci return ret; 327d4afb5ceSopenharmony_ci} 328d4afb5ceSopenharmony_ci 329d4afb5ceSopenharmony_cistatic lws_mqtt_subs_t * 330d4afb5ceSopenharmony_cilws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic) 331d4afb5ceSopenharmony_ci{ 332d4afb5ceSopenharmony_ci lws_mqtt_subs_t *mysub; 333d4afb5ceSopenharmony_ci size_t topiclen = strlen(topic); 334d4afb5ceSopenharmony_ci lws_mqtt_validate_topic_return_t flag; 335d4afb5ceSopenharmony_ci 336d4afb5ceSopenharmony_ci flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot); 337d4afb5ceSopenharmony_ci switch (flag) { 338d4afb5ceSopenharmony_ci case LMVTR_FAILED_OVERSIZE: 339d4afb5ceSopenharmony_ci lwsl_err("%s: Topic is too long\n", 340d4afb5ceSopenharmony_ci __func__); 341d4afb5ceSopenharmony_ci return NULL; 342d4afb5ceSopenharmony_ci case LMVTR_FAILED_SHADOW_FORMAT: 343d4afb5ceSopenharmony_ci case LMVTR_FAILED_WILDCARD_FORMAT: 344d4afb5ceSopenharmony_ci lwsl_err("%s: Invalid topic format \"%s\"\n", 345d4afb5ceSopenharmony_ci __func__, topic); 346d4afb5ceSopenharmony_ci return NULL; 347d4afb5ceSopenharmony_ci 348d4afb5ceSopenharmony_ci case LMVTR_VALID: 349d4afb5ceSopenharmony_ci case LMVTR_VALID_WILDCARD: 350d4afb5ceSopenharmony_ci case LMVTR_VALID_SHADOW: 351d4afb5ceSopenharmony_ci mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub"); 352d4afb5ceSopenharmony_ci if (!mysub) { 353d4afb5ceSopenharmony_ci lwsl_err("%s: Error allocating mysub\n", 354d4afb5ceSopenharmony_ci __func__); 355d4afb5ceSopenharmony_ci return NULL; 356d4afb5ceSopenharmony_ci } 357d4afb5ceSopenharmony_ci mysub->wildcard = (flag == LMVTR_VALID_WILDCARD); 358d4afb5ceSopenharmony_ci mysub->shadow = (flag == LMVTR_VALID_SHADOW); 359d4afb5ceSopenharmony_ci break; 360d4afb5ceSopenharmony_ci 361d4afb5ceSopenharmony_ci default: 362d4afb5ceSopenharmony_ci lwsl_err("%s: Unknown flag - %d\n", 363d4afb5ceSopenharmony_ci __func__, flag); 364d4afb5ceSopenharmony_ci return NULL; 365d4afb5ceSopenharmony_ci } 366d4afb5ceSopenharmony_ci 367d4afb5ceSopenharmony_ci mysub->next = mqtt->subs_head; 368d4afb5ceSopenharmony_ci mqtt->subs_head = mysub; 369d4afb5ceSopenharmony_ci memcpy(mysub->topic, topic, strlen(topic) + 1); 370d4afb5ceSopenharmony_ci mysub->ref_count = 1; 371d4afb5ceSopenharmony_ci 372d4afb5ceSopenharmony_ci lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n", 373d4afb5ceSopenharmony_ci __func__, mysub, mqtt); 374d4afb5ceSopenharmony_ci 375d4afb5ceSopenharmony_ci return mysub; 376d4afb5ceSopenharmony_ci} 377d4afb5ceSopenharmony_ci 378d4afb5ceSopenharmony_cistatic int 379d4afb5ceSopenharmony_cilws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt) 380d4afb5ceSopenharmony_ci{ 381d4afb5ceSopenharmony_ci lws_mqtt_subs_t *s = mqtt->subs_head; 382d4afb5ceSopenharmony_ci lws_mqtt_subs_t *temp = NULL; 383d4afb5ceSopenharmony_ci 384d4afb5ceSopenharmony_ci 385d4afb5ceSopenharmony_ci lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n", 386d4afb5ceSopenharmony_ci __func__, mqtt); 387d4afb5ceSopenharmony_ci 388d4afb5ceSopenharmony_ci while (s && s->next) { 389d4afb5ceSopenharmony_ci if (s->next->ref_count == 0) 390d4afb5ceSopenharmony_ci break; 391d4afb5ceSopenharmony_ci s = s->next; 392d4afb5ceSopenharmony_ci } 393d4afb5ceSopenharmony_ci 394d4afb5ceSopenharmony_ci if (s && s->next) { 395d4afb5ceSopenharmony_ci temp = s->next; 396d4afb5ceSopenharmony_ci lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n", 397d4afb5ceSopenharmony_ci __func__, temp, mqtt); 398d4afb5ceSopenharmony_ci s->next = temp->next; 399d4afb5ceSopenharmony_ci lws_free(temp); 400d4afb5ceSopenharmony_ci return 0; 401d4afb5ceSopenharmony_ci } 402d4afb5ceSopenharmony_ci return 1; 403d4afb5ceSopenharmony_ci} 404d4afb5ceSopenharmony_ci 405d4afb5ceSopenharmony_ci/* 406d4afb5ceSopenharmony_ci * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or 407d4afb5ceSopenharmony_ci * PUBREC came before the timeout period 408d4afb5ceSopenharmony_ci */ 409d4afb5ceSopenharmony_ci 410d4afb5ceSopenharmony_cistatic void 411d4afb5ceSopenharmony_cilws_mqtt_publish_resend(struct lws_sorted_usec_list *sul) 412d4afb5ceSopenharmony_ci{ 413d4afb5ceSopenharmony_ci struct _lws_mqtt_related *mqtt = lws_container_of(sul, 414d4afb5ceSopenharmony_ci struct _lws_mqtt_related, sul_qos_puback_pubrec_wait); 415d4afb5ceSopenharmony_ci 416d4afb5ceSopenharmony_ci lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); 417d4afb5ceSopenharmony_ci 418d4afb5ceSopenharmony_ci if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND, 419d4afb5ceSopenharmony_ci mqtt->wsi->user_space, NULL, 0)) 420d4afb5ceSopenharmony_ci lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); 421d4afb5ceSopenharmony_ci} 422d4afb5ceSopenharmony_ci 423d4afb5ceSopenharmony_cistatic void 424d4afb5ceSopenharmony_cilws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul) 425d4afb5ceSopenharmony_ci{ 426d4afb5ceSopenharmony_ci struct _lws_mqtt_related *mqtt = lws_container_of(sul, 427d4afb5ceSopenharmony_ci struct _lws_mqtt_related, sul_unsuback_wait); 428d4afb5ceSopenharmony_ci 429d4afb5ceSopenharmony_ci lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); 430d4afb5ceSopenharmony_ci 431d4afb5ceSopenharmony_ci if (mqtt->wsi->a.protocol->callback(mqtt->wsi, 432d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT, 433d4afb5ceSopenharmony_ci mqtt->wsi->user_space, NULL, 0)) 434d4afb5ceSopenharmony_ci lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); 435d4afb5ceSopenharmony_ci} 436d4afb5ceSopenharmony_ci 437d4afb5ceSopenharmony_cistatic void 438d4afb5ceSopenharmony_cilws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul) 439d4afb5ceSopenharmony_ci{ 440d4afb5ceSopenharmony_ci struct _lws_mqtt_related *mqtt = lws_container_of(sul, 441d4afb5ceSopenharmony_ci struct _lws_mqtt_related, sul_shadow_wait); 442d4afb5ceSopenharmony_ci 443d4afb5ceSopenharmony_ci lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); 444d4afb5ceSopenharmony_ci 445d4afb5ceSopenharmony_ci if (mqtt->wsi->a.protocol->callback(mqtt->wsi, 446d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_SHADOW_TIMEOUT, 447d4afb5ceSopenharmony_ci mqtt->wsi->user_space, NULL, 0)) 448d4afb5ceSopenharmony_ci lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); 449d4afb5ceSopenharmony_ci} 450d4afb5ceSopenharmony_ci 451d4afb5ceSopenharmony_civoid 452d4afb5ceSopenharmony_cilws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s) 453d4afb5ceSopenharmony_ci{ 454d4afb5ceSopenharmony_ci lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s); 455d4afb5ceSopenharmony_ci c->estate = s; 456d4afb5ceSopenharmony_ci} 457d4afb5ceSopenharmony_ci 458d4afb5ceSopenharmony_cilws_mqtt_match_topic_return_t 459d4afb5ceSopenharmony_cilws_mqtt_is_topic_matched(const char* sub, const char* pub) 460d4afb5ceSopenharmony_ci{ 461d4afb5ceSopenharmony_ci const char *ppos = pub, *spos = sub; 462d4afb5ceSopenharmony_ci 463d4afb5ceSopenharmony_ci if (!ppos || !spos) { 464d4afb5ceSopenharmony_ci return LMMTR_TOPIC_MATCH_ERROR; 465d4afb5ceSopenharmony_ci } 466d4afb5ceSopenharmony_ci 467d4afb5ceSopenharmony_ci while (*spos) { 468d4afb5ceSopenharmony_ci if (*ppos == '#' || *ppos == '+') { 469d4afb5ceSopenharmony_ci lwsl_err("%s: PUBLISH to wildcard " 470d4afb5ceSopenharmony_ci "topic \"%s\" not supported\n", 471d4afb5ceSopenharmony_ci __func__, pub); 472d4afb5ceSopenharmony_ci return LMMTR_TOPIC_MATCH_ERROR; 473d4afb5ceSopenharmony_ci } 474d4afb5ceSopenharmony_ci /* foo/+/bar == foo/xyz/bar ? */ 475d4afb5ceSopenharmony_ci if (*spos == '+') { 476d4afb5ceSopenharmony_ci /* Skip ahead */ 477d4afb5ceSopenharmony_ci while (*ppos != '\0' && *ppos != '/') { 478d4afb5ceSopenharmony_ci ppos++; 479d4afb5ceSopenharmony_ci } 480d4afb5ceSopenharmony_ci } else if (*spos == '#') { 481d4afb5ceSopenharmony_ci return LMMTR_TOPIC_MATCH; 482d4afb5ceSopenharmony_ci } else { 483d4afb5ceSopenharmony_ci if (*ppos == '\0') { 484d4afb5ceSopenharmony_ci /* foo/bar == foo/bar/# ? */ 485d4afb5ceSopenharmony_ci if (!strncmp(spos, "/#", 2)) 486d4afb5ceSopenharmony_ci return LMMTR_TOPIC_MATCH; 487d4afb5ceSopenharmony_ci return LMMTR_TOPIC_NOMATCH; 488d4afb5ceSopenharmony_ci /* Non-matching character */ 489d4afb5ceSopenharmony_ci } else if (*ppos != *spos) { 490d4afb5ceSopenharmony_ci return LMMTR_TOPIC_NOMATCH; 491d4afb5ceSopenharmony_ci } 492d4afb5ceSopenharmony_ci ppos++; 493d4afb5ceSopenharmony_ci } 494d4afb5ceSopenharmony_ci spos++; 495d4afb5ceSopenharmony_ci } 496d4afb5ceSopenharmony_ci 497d4afb5ceSopenharmony_ci if (*spos == '\0' && *ppos == '\0') 498d4afb5ceSopenharmony_ci return LMMTR_TOPIC_MATCH; 499d4afb5ceSopenharmony_ci 500d4afb5ceSopenharmony_ci return LMMTR_TOPIC_NOMATCH; 501d4afb5ceSopenharmony_ci} 502d4afb5ceSopenharmony_ci 503d4afb5ceSopenharmony_cilws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt, 504d4afb5ceSopenharmony_ci const char* ptopic) { 505d4afb5ceSopenharmony_ci lws_mqtt_subs_t *s = mqtt->subs_head; 506d4afb5ceSopenharmony_ci 507d4afb5ceSopenharmony_ci while (s) { 508d4afb5ceSopenharmony_ci /* SUB topic == PUB topic ? */ 509d4afb5ceSopenharmony_ci /* foo/bar/xyz == foo/bar/xyz ? */ 510d4afb5ceSopenharmony_ci if (!s->wildcard) { 511d4afb5ceSopenharmony_ci if (!strcmp((const char*)s->topic, ptopic)) 512d4afb5ceSopenharmony_ci return s; 513d4afb5ceSopenharmony_ci } else { 514d4afb5ceSopenharmony_ci if (lws_mqtt_is_topic_matched( 515d4afb5ceSopenharmony_ci s->topic, ptopic) == LMMTR_TOPIC_MATCH) 516d4afb5ceSopenharmony_ci return s; 517d4afb5ceSopenharmony_ci } 518d4afb5ceSopenharmony_ci 519d4afb5ceSopenharmony_ci s = s->next; 520d4afb5ceSopenharmony_ci } 521d4afb5ceSopenharmony_ci 522d4afb5ceSopenharmony_ci return NULL; 523d4afb5ceSopenharmony_ci} 524d4afb5ceSopenharmony_ci 525d4afb5ceSopenharmony_ciint 526d4afb5ceSopenharmony_ci_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par, 527d4afb5ceSopenharmony_ci const uint8_t *buf, size_t len) 528d4afb5ceSopenharmony_ci{ 529d4afb5ceSopenharmony_ci struct lws *w; 530d4afb5ceSopenharmony_ci int n; 531d4afb5ceSopenharmony_ci 532d4afb5ceSopenharmony_ci if (par->flag_pending_send_reason_close) 533d4afb5ceSopenharmony_ci return 0; 534d4afb5ceSopenharmony_ci 535d4afb5ceSopenharmony_ci /* 536d4afb5ceSopenharmony_ci * Stateful, fragmentation-immune parser 537d4afb5ceSopenharmony_ci * 538d4afb5ceSopenharmony_ci * Notice that len can always be 1 if under attack, even over tls if 539d4afb5ceSopenharmony_ci * the server is compromised or malicious. 540d4afb5ceSopenharmony_ci */ 541d4afb5ceSopenharmony_ci 542d4afb5ceSopenharmony_ci while (len) { 543d4afb5ceSopenharmony_ci lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len); 544d4afb5ceSopenharmony_ci switch (par->state) { 545d4afb5ceSopenharmony_ci case LMQCPP_IDLE: 546d4afb5ceSopenharmony_ci par->packet_type_flags = *buf++; 547d4afb5ceSopenharmony_ci len--; 548d4afb5ceSopenharmony_ci 549d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 550d4afb5ceSopenharmony_ci /* 551d4afb5ceSopenharmony_ci * The case where we sent the connect, but we received 552d4afb5ceSopenharmony_ci * something else before any CONNACK 553d4afb5ceSopenharmony_ci */ 554d4afb5ceSopenharmony_ci if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK && 555d4afb5ceSopenharmony_ci par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) { 556d4afb5ceSopenharmony_ci lwsl_notice("%s: server sent non-CONNACK\n", 557d4afb5ceSopenharmony_ci __func__); 558d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 559d4afb5ceSopenharmony_ci } 560d4afb5ceSopenharmony_ci#endif /* LWS_WITH_CLIENT */ 561d4afb5ceSopenharmony_ci 562d4afb5ceSopenharmony_ci n = map_flags[par->packet_type_flags >> 4]; 563d4afb5ceSopenharmony_ci /* 564d4afb5ceSopenharmony_ci * Where a flag bit is marked as “Reserved”, it is 565d4afb5ceSopenharmony_ci * reserved for future use and MUST be set to the value 566d4afb5ceSopenharmony_ci * listed [MQTT-2.1.3-1]. 567d4afb5ceSopenharmony_ci */ 568d4afb5ceSopenharmony_ci if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) && 569d4afb5ceSopenharmony_ci ((par->packet_type_flags & 0x0f) != (n & 0x0f))) { 570d4afb5ceSopenharmony_ci lwsl_notice("%s: %s: bad flags, 0x%02x mask 0x%02x (len %d)\n", 571d4afb5ceSopenharmony_ci __func__, lws_wsi_tag(wsi), 572d4afb5ceSopenharmony_ci par->packet_type_flags, n, (int)len + 1); 573d4afb5ceSopenharmony_ci lwsl_hexdump_err(buf - 1, len + 1); 574d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 575d4afb5ceSopenharmony_ci } 576d4afb5ceSopenharmony_ci 577d4afb5ceSopenharmony_ci lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n", 578d4afb5ceSopenharmony_ci __func__, par->packet_type_flags >> 4, 579d4afb5ceSopenharmony_ci par->packet_type_flags & 0xf); 580d4afb5ceSopenharmony_ci 581d4afb5ceSopenharmony_ci /* allows us to know if a property that can only be 582d4afb5ceSopenharmony_ci * given once, appears twice */ 583d4afb5ceSopenharmony_ci memset(par->props_seen, 0, sizeof(par->props_seen)); 584d4afb5ceSopenharmony_ci par->state = par->packet_type_flags & 0xf0; 585d4afb5ceSopenharmony_ci break; 586d4afb5ceSopenharmony_ci 587d4afb5ceSopenharmony_ci case LMQCPP_CONNECT_PACKET: 588d4afb5ceSopenharmony_ci lwsl_debug("%s: received CONNECT pkt\n", __func__); 589d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI; 590d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 591d4afb5ceSopenharmony_ci break; 592d4afb5ceSopenharmony_ci 593d4afb5ceSopenharmony_ci case LMQCPP_CONNECT_REMAINING_LEN_VBI: 594d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 595d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 596d4afb5ceSopenharmony_ci break; 597d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 598d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 599d4afb5ceSopenharmony_ci n = map_flags[ctl_pkt_type(par)]; 600d4afb5ceSopenharmony_ci lws_mqtt_str_init(&par->s_temp, par->temp, 601d4afb5ceSopenharmony_ci sizeof(par->temp), 0); 602d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNECT_VH_PNAME; 603d4afb5ceSopenharmony_ci break; 604d4afb5ceSopenharmony_ci default: 605d4afb5ceSopenharmony_ci lwsl_notice("%s: bad vbi\n", __func__); 606d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 607d4afb5ceSopenharmony_ci } 608d4afb5ceSopenharmony_ci break; 609d4afb5ceSopenharmony_ci 610d4afb5ceSopenharmony_ci case LMQCPP_CONNECT_VH_PNAME: 611d4afb5ceSopenharmony_ci switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) { 612d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 613d4afb5ceSopenharmony_ci break; 614d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 615d4afb5ceSopenharmony_ci if (par->s_temp.len != 4 || 616d4afb5ceSopenharmony_ci memcmp(par->s_temp.buf, "MQTT", 617d4afb5ceSopenharmony_ci par->s_temp.len)) { 618d4afb5ceSopenharmony_ci lwsl_notice("%s: protocol name: %.*s\n", 619d4afb5ceSopenharmony_ci __func__, par->s_temp.len, 620d4afb5ceSopenharmony_ci par->s_temp.buf); 621d4afb5ceSopenharmony_ci goto send_unsupp_connack_and_close; 622d4afb5ceSopenharmony_ci } 623d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNECT_VH_PVERSION; 624d4afb5ceSopenharmony_ci break; 625d4afb5ceSopenharmony_ci default: 626d4afb5ceSopenharmony_ci lwsl_notice("%s: bad protocol name\n", __func__); 627d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 628d4afb5ceSopenharmony_ci } 629d4afb5ceSopenharmony_ci break; 630d4afb5ceSopenharmony_ci 631d4afb5ceSopenharmony_ci case LMQCPP_CONNECT_VH_PVERSION: 632d4afb5ceSopenharmony_ci par->conn_protocol_version = *buf++; 633d4afb5ceSopenharmony_ci len--; 634d4afb5ceSopenharmony_ci if (par->conn_protocol_version != 5) { 635d4afb5ceSopenharmony_ci lwsl_info("%s: unsupported MQTT version %d\n", 636d4afb5ceSopenharmony_ci __func__, par->conn_protocol_version); 637d4afb5ceSopenharmony_ci goto send_unsupp_connack_and_close; 638d4afb5ceSopenharmony_ci } 639d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNECT_VH_FLAGS; 640d4afb5ceSopenharmony_ci break; 641d4afb5ceSopenharmony_ci 642d4afb5ceSopenharmony_ci case LMQCPP_CONNECT_VH_FLAGS: 643d4afb5ceSopenharmony_ci par->cpkt_flags = *buf++; 644d4afb5ceSopenharmony_ci len--; 645d4afb5ceSopenharmony_ci if (par->cpkt_flags & 1) { 646d4afb5ceSopenharmony_ci /* 647d4afb5ceSopenharmony_ci * The Server MUST validate that the reserved 648d4afb5ceSopenharmony_ci * flag in the CONNECT packet is set to 0 649d4afb5ceSopenharmony_ci * [MQTT-3.1.2-3]. 650d4afb5ceSopenharmony_ci */ 651d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_MALFORMED_PACKET; 652d4afb5ceSopenharmony_ci goto send_reason_and_close; 653d4afb5ceSopenharmony_ci } 654d4afb5ceSopenharmony_ci /* 655d4afb5ceSopenharmony_ci * conn_flags specifies the Will Properties that should 656d4afb5ceSopenharmony_ci * appear in the payload section 657d4afb5ceSopenharmony_ci */ 658d4afb5ceSopenharmony_ci lws_mqtt_2byte_init(&par->vbit); 659d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNECT_VH_KEEPALIVE; 660d4afb5ceSopenharmony_ci break; 661d4afb5ceSopenharmony_ci 662d4afb5ceSopenharmony_ci case LMQCPP_CONNECT_VH_KEEPALIVE: 663d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 664d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 665d4afb5ceSopenharmony_ci break; 666d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 667d4afb5ceSopenharmony_ci par->keepalive = (uint16_t)par->vbit.value; 668d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 669d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN; 670d4afb5ceSopenharmony_ci break; 671d4afb5ceSopenharmony_ci default: 672d4afb5ceSopenharmony_ci lwsl_notice("%s: ka bad vbi\n", __func__); 673d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 674d4afb5ceSopenharmony_ci } 675d4afb5ceSopenharmony_ci break; 676d4afb5ceSopenharmony_ci 677d4afb5ceSopenharmony_ci case LMQCPP_PINGRESP_ZERO: 678d4afb5ceSopenharmony_ci len--; 679d4afb5ceSopenharmony_ci /* second byte of PINGRESP must be zero */ 680d4afb5ceSopenharmony_ci if (*buf++) 681d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 682d4afb5ceSopenharmony_ci goto cmd_completion; 683d4afb5ceSopenharmony_ci 684d4afb5ceSopenharmony_ci case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN: 685d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 686d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 687d4afb5ceSopenharmony_ci break; 688d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 689d4afb5ceSopenharmony_ci /* reset consumption counter */ 690d4afb5ceSopenharmony_ci par->consumed = 0; 691d4afb5ceSopenharmony_ci par->props_len = par->vbit.value; 692d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 693d4afb5ceSopenharmony_ci par->state = LMQCPP_PROP_ID_VBI; 694d4afb5ceSopenharmony_ci break; 695d4afb5ceSopenharmony_ci default: 696d4afb5ceSopenharmony_ci lwsl_notice("%s: connpr bad vbi\n", __func__); 697d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 698d4afb5ceSopenharmony_ci } 699d4afb5ceSopenharmony_ci break; 700d4afb5ceSopenharmony_ci 701d4afb5ceSopenharmony_ci /* PUBREC */ 702d4afb5ceSopenharmony_ci case LMQCPP_PUBREC_PACKET: 703d4afb5ceSopenharmony_ci lwsl_debug("%s: received PUBREC pkt\n", __func__); 704d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 705d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 706d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 707d4afb5ceSopenharmony_ci break; 708d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 709d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 710d4afb5ceSopenharmony_ci lwsl_debug("%s: PUBREC pkt len = %d\n", 711d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 712d4afb5ceSopenharmony_ci if (par->cpkt_remlen < 2) 713d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 714d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBREC_VH_PKT_ID; 715d4afb5ceSopenharmony_ci break; 716d4afb5ceSopenharmony_ci default: 717d4afb5ceSopenharmony_ci lwsl_notice("%s: pubrec bad vbi\n", __func__); 718d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 719d4afb5ceSopenharmony_ci } 720d4afb5ceSopenharmony_ci break; 721d4afb5ceSopenharmony_ci 722d4afb5ceSopenharmony_ci case LMQCPP_PUBREC_VH_PKT_ID: 723d4afb5ceSopenharmony_ci if (len < 2) { 724d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage 3\n", __func__); 725d4afb5ceSopenharmony_ci return -1; 726d4afb5ceSopenharmony_ci } 727d4afb5ceSopenharmony_ci 728d4afb5ceSopenharmony_ci par->cpkt_id = lws_ser_ru16be(buf); 729d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = par->cpkt_id; 730d4afb5ceSopenharmony_ci buf += 2; 731d4afb5ceSopenharmony_ci len -= 2; 732d4afb5ceSopenharmony_ci par->cpkt_remlen -= 2; 733d4afb5ceSopenharmony_ci par->n = 0; 734d4afb5ceSopenharmony_ci 735d4afb5ceSopenharmony_ci goto cmd_completion; 736d4afb5ceSopenharmony_ci 737d4afb5ceSopenharmony_ci /* PUBREL */ 738d4afb5ceSopenharmony_ci case LMQCPP_PUBREL_PACKET: 739d4afb5ceSopenharmony_ci lwsl_debug("%s: received PUBREL pkt\n", __func__); 740d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 741d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 742d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 743d4afb5ceSopenharmony_ci break; 744d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 745d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 746d4afb5ceSopenharmony_ci lwsl_debug("%s: PUBREL pkt len = %d\n", 747d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 748d4afb5ceSopenharmony_ci if (par->cpkt_remlen < 2) 749d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 750d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBREL_VH_PKT_ID; 751d4afb5ceSopenharmony_ci break; 752d4afb5ceSopenharmony_ci default: 753d4afb5ceSopenharmony_ci lwsl_err("%s: pubrel bad vbi\n", __func__); 754d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 755d4afb5ceSopenharmony_ci } 756d4afb5ceSopenharmony_ci break; 757d4afb5ceSopenharmony_ci 758d4afb5ceSopenharmony_ci case LMQCPP_PUBREL_VH_PKT_ID: 759d4afb5ceSopenharmony_ci if (len < 2) { 760d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage 3\n", __func__); 761d4afb5ceSopenharmony_ci return -1; 762d4afb5ceSopenharmony_ci } 763d4afb5ceSopenharmony_ci 764d4afb5ceSopenharmony_ci par->cpkt_id = lws_ser_ru16be(buf); 765d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = par->cpkt_id; 766d4afb5ceSopenharmony_ci buf += 2; 767d4afb5ceSopenharmony_ci len -= 2; 768d4afb5ceSopenharmony_ci par->cpkt_remlen -= 2; 769d4afb5ceSopenharmony_ci par->n = 0; 770d4afb5ceSopenharmony_ci 771d4afb5ceSopenharmony_ci goto cmd_completion; 772d4afb5ceSopenharmony_ci 773d4afb5ceSopenharmony_ci /* PUBCOMP */ 774d4afb5ceSopenharmony_ci case LMQCPP_PUBCOMP_PACKET: 775d4afb5ceSopenharmony_ci lwsl_debug("%s: received PUBCOMP pkt\n", __func__); 776d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 777d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 778d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 779d4afb5ceSopenharmony_ci break; 780d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 781d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 782d4afb5ceSopenharmony_ci lwsl_debug("%s: PUBCOMP pkt len = %d\n", 783d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 784d4afb5ceSopenharmony_ci if (par->cpkt_remlen < 2) 785d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 786d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBCOMP_VH_PKT_ID; 787d4afb5ceSopenharmony_ci break; 788d4afb5ceSopenharmony_ci default: 789d4afb5ceSopenharmony_ci lwsl_err("%s: pubcmp bad vbi\n", __func__); 790d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 791d4afb5ceSopenharmony_ci } 792d4afb5ceSopenharmony_ci break; 793d4afb5ceSopenharmony_ci 794d4afb5ceSopenharmony_ci case LMQCPP_PUBCOMP_VH_PKT_ID: 795d4afb5ceSopenharmony_ci if (len < 2) { 796d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage 3\n", __func__); 797d4afb5ceSopenharmony_ci return -1; 798d4afb5ceSopenharmony_ci } 799d4afb5ceSopenharmony_ci 800d4afb5ceSopenharmony_ci par->cpkt_id = lws_ser_ru16be(buf); 801d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = par->cpkt_id; 802d4afb5ceSopenharmony_ci buf += 2; 803d4afb5ceSopenharmony_ci len -= 2; 804d4afb5ceSopenharmony_ci par->cpkt_remlen -= 2; 805d4afb5ceSopenharmony_ci par->n = 0; 806d4afb5ceSopenharmony_ci 807d4afb5ceSopenharmony_ci goto cmd_completion; 808d4afb5ceSopenharmony_ci 809d4afb5ceSopenharmony_ci case LMQCPP_PUBLISH_PACKET: 810d4afb5ceSopenharmony_ci if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) { 811d4afb5ceSopenharmony_ci lwsl_notice("%s: Topic rx before subscribing\n", 812d4afb5ceSopenharmony_ci __func__); 813d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 814d4afb5ceSopenharmony_ci } 815d4afb5ceSopenharmony_ci lwsl_info("%s: received PUBLISH pkt\n", __func__); 816d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI; 817d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 818d4afb5ceSopenharmony_ci break; 819d4afb5ceSopenharmony_ci case LMQCPP_PUBLISH_REMAINING_LEN_VBI: 820d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 821d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 822d4afb5ceSopenharmony_ci break; 823d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 824d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 825d4afb5ceSopenharmony_ci lwsl_debug("%s: PUBLISH pkt len = %d\n", 826d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 827d4afb5ceSopenharmony_ci /* Move on to PUBLISH's variable header */ 828d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBLISH_VH_TOPIC; 829d4afb5ceSopenharmony_ci break; 830d4afb5ceSopenharmony_ci default: 831d4afb5ceSopenharmony_ci lwsl_notice("%s: pubrem bad vbi\n", __func__); 832d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 833d4afb5ceSopenharmony_ci } 834d4afb5ceSopenharmony_ci break; 835d4afb5ceSopenharmony_ci 836d4afb5ceSopenharmony_ci case LMQCPP_PUBLISH_VH_TOPIC: 837d4afb5ceSopenharmony_ci { 838d4afb5ceSopenharmony_ci lws_mqtt_publish_param_t *pub = NULL; 839d4afb5ceSopenharmony_ci 840d4afb5ceSopenharmony_ci if (len < 2) { 841d4afb5ceSopenharmony_ci lwsl_notice("%s: topic too short\n", __func__); 842d4afb5ceSopenharmony_ci return -1; 843d4afb5ceSopenharmony_ci } 844d4afb5ceSopenharmony_ci 845d4afb5ceSopenharmony_ci /* Topic len */ 846d4afb5ceSopenharmony_ci par->n = lws_ser_ru16be(buf); 847d4afb5ceSopenharmony_ci buf += 2; 848d4afb5ceSopenharmony_ci len -= 2; 849d4afb5ceSopenharmony_ci 850d4afb5ceSopenharmony_ci if (len < par->n) {/* the way this is written... */ 851d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage\n", __func__); 852d4afb5ceSopenharmony_ci return -1; 853d4afb5ceSopenharmony_ci } 854d4afb5ceSopenharmony_ci 855d4afb5ceSopenharmony_ci /* Invalid topic len */ 856d4afb5ceSopenharmony_ci if (par->n == 0) { 857d4afb5ceSopenharmony_ci lwsl_notice("%s: zero topic len\n", __func__); 858d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_MALFORMED_PACKET; 859d4afb5ceSopenharmony_ci goto send_reason_and_close; 860d4afb5ceSopenharmony_ci } 861d4afb5ceSopenharmony_ci lwsl_debug("%s: PUBLISH topic len %d\n", 862d4afb5ceSopenharmony_ci __func__, (int)par->n); 863d4afb5ceSopenharmony_ci assert(!wsi->mqtt->rx_cpkt_param); 864d4afb5ceSopenharmony_ci wsi->mqtt->rx_cpkt_param = lws_zalloc( 865d4afb5ceSopenharmony_ci sizeof(lws_mqtt_publish_param_t), "rx pub param"); 866d4afb5ceSopenharmony_ci if (!wsi->mqtt->rx_cpkt_param) 867d4afb5ceSopenharmony_ci goto oom; 868d4afb5ceSopenharmony_ci pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param; 869d4afb5ceSopenharmony_ci 870d4afb5ceSopenharmony_ci pub->topic_len = (uint16_t)par->n; 871d4afb5ceSopenharmony_ci 872d4afb5ceSopenharmony_ci /* Topic Name */ 873d4afb5ceSopenharmony_ci pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1, 874d4afb5ceSopenharmony_ci "rx publish topic"); 875d4afb5ceSopenharmony_ci if (!pub->topic) 876d4afb5ceSopenharmony_ci goto oom; 877d4afb5ceSopenharmony_ci lws_strncpy(pub->topic, (const char *)buf, 878d4afb5ceSopenharmony_ci (size_t)pub->topic_len + 1); 879d4afb5ceSopenharmony_ci buf += pub->topic_len; 880d4afb5ceSopenharmony_ci len -= pub->topic_len; 881d4afb5ceSopenharmony_ci 882d4afb5ceSopenharmony_ci /* Extract QoS Level from Fixed Header Flags */ 883d4afb5ceSopenharmony_ci pub->qos = (lws_mqtt_qos_levels_t) 884d4afb5ceSopenharmony_ci ((par->packet_type_flags >> 1) & 0x3); 885d4afb5ceSopenharmony_ci 886d4afb5ceSopenharmony_ci pub->payload_pos = 0; 887d4afb5ceSopenharmony_ci 888d4afb5ceSopenharmony_ci pub->payload_len = par->cpkt_remlen - 889d4afb5ceSopenharmony_ci (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0)); 890d4afb5ceSopenharmony_ci 891d4afb5ceSopenharmony_ci switch (pub->qos) { 892d4afb5ceSopenharmony_ci case QOS0: 893d4afb5ceSopenharmony_ci par->state = LMQCPP_PAYLOAD; 894d4afb5ceSopenharmony_ci if (pub->payload_len == 0) 895d4afb5ceSopenharmony_ci goto cmd_completion; 896d4afb5ceSopenharmony_ci 897d4afb5ceSopenharmony_ci break; 898d4afb5ceSopenharmony_ci case QOS1: 899d4afb5ceSopenharmony_ci case QOS2: 900d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBLISH_VH_PKT_ID; 901d4afb5ceSopenharmony_ci break; 902d4afb5ceSopenharmony_ci default: 903d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_MALFORMED_PACKET; 904d4afb5ceSopenharmony_ci lws_free_set_NULL(pub->topic); 905d4afb5ceSopenharmony_ci lws_free_set_NULL(wsi->mqtt->rx_cpkt_param); 906d4afb5ceSopenharmony_ci goto send_reason_and_close; 907d4afb5ceSopenharmony_ci } 908d4afb5ceSopenharmony_ci break; 909d4afb5ceSopenharmony_ci } 910d4afb5ceSopenharmony_ci case LMQCPP_PUBLISH_VH_PKT_ID: 911d4afb5ceSopenharmony_ci { 912d4afb5ceSopenharmony_ci lws_mqtt_publish_param_t *pub = 913d4afb5ceSopenharmony_ci (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param; 914d4afb5ceSopenharmony_ci 915d4afb5ceSopenharmony_ci if (len < 2) { 916d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage 2\n", __func__); 917d4afb5ceSopenharmony_ci return -1; 918d4afb5ceSopenharmony_ci } 919d4afb5ceSopenharmony_ci 920d4afb5ceSopenharmony_ci par->cpkt_id = lws_ser_ru16be(buf); 921d4afb5ceSopenharmony_ci buf += 2; 922d4afb5ceSopenharmony_ci len -= 2; 923d4afb5ceSopenharmony_ci wsi->mqtt->peer_ack_pkt_id = par->cpkt_id; 924d4afb5ceSopenharmony_ci lwsl_debug("%s: Packet ID %d\n", 925d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_id); 926d4afb5ceSopenharmony_ci par->state = LMQCPP_PAYLOAD; 927d4afb5ceSopenharmony_ci pub->payload_pos = 0; 928d4afb5ceSopenharmony_ci pub->payload_len = par->cpkt_remlen - 929d4afb5ceSopenharmony_ci (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0)); 930d4afb5ceSopenharmony_ci if (pub->payload_len == 0) 931d4afb5ceSopenharmony_ci goto cmd_completion; 932d4afb5ceSopenharmony_ci 933d4afb5ceSopenharmony_ci break; 934d4afb5ceSopenharmony_ci } 935d4afb5ceSopenharmony_ci case LMQCPP_PAYLOAD: 936d4afb5ceSopenharmony_ci { 937d4afb5ceSopenharmony_ci lws_mqtt_publish_param_t *pub = 938d4afb5ceSopenharmony_ci (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param; 939d4afb5ceSopenharmony_ci if (pub == NULL) { 940d4afb5ceSopenharmony_ci lwsl_err("%s: Uninitialized pub_param\n", 941d4afb5ceSopenharmony_ci __func__); 942d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 943d4afb5ceSopenharmony_ci } 944d4afb5ceSopenharmony_ci 945d4afb5ceSopenharmony_ci pub->payload = buf; 946d4afb5ceSopenharmony_ci goto cmd_completion; 947d4afb5ceSopenharmony_ci } 948d4afb5ceSopenharmony_ci 949d4afb5ceSopenharmony_ci case LMQCPP_CONNACK_PACKET: 950d4afb5ceSopenharmony_ci if (!lwsi_role_client(wsi)) { 951d4afb5ceSopenharmony_ci lwsl_err("%s: CONNACK is only Server to Client", 952d4afb5ceSopenharmony_ci __func__); 953d4afb5ceSopenharmony_ci goto send_unsupp_connack_and_close; 954d4afb5ceSopenharmony_ci } 955d4afb5ceSopenharmony_ci 956d4afb5ceSopenharmony_ci lwsl_debug("%s: received CONNACK pkt\n", __func__); 957d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 958d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 959d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 960d4afb5ceSopenharmony_ci break; 961d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 962d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 963d4afb5ceSopenharmony_ci lwsl_debug("%s: CONNACK pkt len = %d\n", 964d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 965d4afb5ceSopenharmony_ci if (par->cpkt_remlen != 2) 966d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 967d4afb5ceSopenharmony_ci 968d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNACK_VH_FLAGS; 969d4afb5ceSopenharmony_ci break; 970d4afb5ceSopenharmony_ci default: 971d4afb5ceSopenharmony_ci lwsl_notice("%s: connack bad vbi\n", __func__); 972d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 973d4afb5ceSopenharmony_ci } 974d4afb5ceSopenharmony_ci break; 975d4afb5ceSopenharmony_ci 976d4afb5ceSopenharmony_ci case LMQCPP_CONNACK_VH_FLAGS: 977d4afb5ceSopenharmony_ci { 978d4afb5ceSopenharmony_ci lws_mqttc_t *c = &wsi->mqtt->client; 979d4afb5ceSopenharmony_ci par->cpkt_flags = *buf++; 980d4afb5ceSopenharmony_ci len--; 981d4afb5ceSopenharmony_ci 982d4afb5ceSopenharmony_ci if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) { 983d4afb5ceSopenharmony_ci /* 984d4afb5ceSopenharmony_ci * Byte 1 is the "Connect Acknowledge 985d4afb5ceSopenharmony_ci * Flags". Bits 7-1 are reserved and 986d4afb5ceSopenharmony_ci * MUST be set to 0. 987d4afb5ceSopenharmony_ci */ 988d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_MALFORMED_PACKET; 989d4afb5ceSopenharmony_ci goto send_reason_and_close; 990d4afb5ceSopenharmony_ci } 991d4afb5ceSopenharmony_ci /* 992d4afb5ceSopenharmony_ci * If the Server accepts a connection with 993d4afb5ceSopenharmony_ci * CleanSession set to 1, the Server MUST set 994d4afb5ceSopenharmony_ci * Session Present to 0 in the CONNACK packet 995d4afb5ceSopenharmony_ci * in addition to setting a zero return code 996d4afb5ceSopenharmony_ci * in the CONNACK packet [MQTT-3.2.2-1]. If 997d4afb5ceSopenharmony_ci * the Server accepts a connection with 998d4afb5ceSopenharmony_ci * CleanSession set to 0, the value set in 999d4afb5ceSopenharmony_ci * Session Present depends on whether the 1000d4afb5ceSopenharmony_ci * Server already has stored Session state for 1001d4afb5ceSopenharmony_ci * the supplied client ID. If the Server has 1002d4afb5ceSopenharmony_ci * stored Session state, it MUST set 1003d4afb5ceSopenharmony_ci * SessionPresent to 1 in the CONNACK packet 1004d4afb5ceSopenharmony_ci * [MQTT-3.2.2-2]. If the Server does not have 1005d4afb5ceSopenharmony_ci * stored Session state, it MUST set Session 1006d4afb5ceSopenharmony_ci * Present to 0 in the CONNACK packet. This is 1007d4afb5ceSopenharmony_ci * in addition to setting a zero return code 1008d4afb5ceSopenharmony_ci * in the CONNACK packet [MQTT-3.2.2-3]. 1009d4afb5ceSopenharmony_ci */ 1010d4afb5ceSopenharmony_ci if ((c->conn_flags & LMQCFT_CLEAN_START) && 1011d4afb5ceSopenharmony_ci (par->cpkt_flags & LMQCFT_SESSION_PRESENT)) 1012d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1013d4afb5ceSopenharmony_ci 1014d4afb5ceSopenharmony_ci wsi->mqtt->session_resumed = ((unsigned int)par->cpkt_flags & 1015d4afb5ceSopenharmony_ci LMQCFT_SESSION_PRESENT); 1016d4afb5ceSopenharmony_ci 1017d4afb5ceSopenharmony_ci /* Move on to Connect Return Code */ 1018d4afb5ceSopenharmony_ci par->state = LMQCPP_CONNACK_VH_RETURN_CODE; 1019d4afb5ceSopenharmony_ci break; 1020d4afb5ceSopenharmony_ci } 1021d4afb5ceSopenharmony_ci case LMQCPP_CONNACK_VH_RETURN_CODE: 1022d4afb5ceSopenharmony_ci par->conn_rc = *buf++; 1023d4afb5ceSopenharmony_ci len--; 1024d4afb5ceSopenharmony_ci /* 1025d4afb5ceSopenharmony_ci * If a server sends a CONNACK packet containing a 1026d4afb5ceSopenharmony_ci * non-zero return code it MUST then close the Network 1027d4afb5ceSopenharmony_ci * Connection [MQTT-3.2.2-5] 1028d4afb5ceSopenharmony_ci */ 1029d4afb5ceSopenharmony_ci switch (par->conn_rc) { 1030d4afb5ceSopenharmony_ci case 0: 1031d4afb5ceSopenharmony_ci goto cmd_completion; 1032d4afb5ceSopenharmony_ci case 1: 1033d4afb5ceSopenharmony_ci case 2: 1034d4afb5ceSopenharmony_ci case 3: 1035d4afb5ceSopenharmony_ci case 4: 1036d4afb5ceSopenharmony_ci case 5: 1037d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL + 1038d4afb5ceSopenharmony_ci par->conn_rc - 1; 1039d4afb5ceSopenharmony_ci goto send_reason_and_close; 1040d4afb5ceSopenharmony_ci default: 1041d4afb5ceSopenharmony_ci lwsl_notice("%s: bad connack retcode\n", __func__); 1042d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1043d4afb5ceSopenharmony_ci } 1044d4afb5ceSopenharmony_ci break; 1045d4afb5ceSopenharmony_ci 1046d4afb5ceSopenharmony_ci /* SUBACK */ 1047d4afb5ceSopenharmony_ci case LMQCPP_SUBACK_PACKET: 1048d4afb5ceSopenharmony_ci if (!lwsi_role_client(wsi)) { 1049d4afb5ceSopenharmony_ci lwsl_err("%s: SUBACK is only Server to Client", 1050d4afb5ceSopenharmony_ci __func__); 1051d4afb5ceSopenharmony_ci goto send_unsupp_connack_and_close; 1052d4afb5ceSopenharmony_ci } 1053d4afb5ceSopenharmony_ci 1054d4afb5ceSopenharmony_ci lwsl_debug("%s: received SUBACK pkt\n", __func__); 1055d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 1056d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1057d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1058d4afb5ceSopenharmony_ci break; 1059d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1060d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 1061d4afb5ceSopenharmony_ci lwsl_debug("%s: SUBACK pkt len = %d\n", 1062d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 1063d4afb5ceSopenharmony_ci if (par->cpkt_remlen <= 2) 1064d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1065d4afb5ceSopenharmony_ci par->state = LMQCPP_SUBACK_VH_PKT_ID; 1066d4afb5ceSopenharmony_ci break; 1067d4afb5ceSopenharmony_ci default: 1068d4afb5ceSopenharmony_ci lwsl_notice("%s: suback bad vbi\n", __func__); 1069d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1070d4afb5ceSopenharmony_ci } 1071d4afb5ceSopenharmony_ci break; 1072d4afb5ceSopenharmony_ci 1073d4afb5ceSopenharmony_ci case LMQCPP_SUBACK_VH_PKT_ID: 1074d4afb5ceSopenharmony_ci 1075d4afb5ceSopenharmony_ci if (len < 2) { 1076d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage 4\n", __func__); 1077d4afb5ceSopenharmony_ci return -1; 1078d4afb5ceSopenharmony_ci } 1079d4afb5ceSopenharmony_ci 1080d4afb5ceSopenharmony_ci par->cpkt_id = lws_ser_ru16be(buf); 1081d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = par->cpkt_id; 1082d4afb5ceSopenharmony_ci buf += 2; 1083d4afb5ceSopenharmony_ci len -= 2; 1084d4afb5ceSopenharmony_ci par->cpkt_remlen -= 2; 1085d4afb5ceSopenharmony_ci par->n = 0; 1086d4afb5ceSopenharmony_ci par->state = LMQCPP_SUBACK_PAYLOAD; 1087d4afb5ceSopenharmony_ci *par->temp = 0; 1088d4afb5ceSopenharmony_ci break; 1089d4afb5ceSopenharmony_ci 1090d4afb5ceSopenharmony_ci case LMQCPP_SUBACK_PAYLOAD: 1091d4afb5ceSopenharmony_ci { 1092d4afb5ceSopenharmony_ci lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++; 1093d4afb5ceSopenharmony_ci 1094d4afb5ceSopenharmony_ci len--; 1095d4afb5ceSopenharmony_ci switch (qos) { 1096d4afb5ceSopenharmony_ci case QOS0: 1097d4afb5ceSopenharmony_ci case QOS1: 1098d4afb5ceSopenharmony_ci case QOS2: 1099d4afb5ceSopenharmony_ci break; 1100d4afb5ceSopenharmony_ci case FAILURE_QOS_LEVEL: 1101d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1102d4afb5ceSopenharmony_ci 1103d4afb5ceSopenharmony_ci default: 1104d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_MALFORMED_PACKET; 1105d4afb5ceSopenharmony_ci goto send_reason_and_close; 1106d4afb5ceSopenharmony_ci } 1107d4afb5ceSopenharmony_ci 1108d4afb5ceSopenharmony_ci if (++(par->n) == par->cpkt_remlen) { 1109d4afb5ceSopenharmony_ci par->n = 0; 1110d4afb5ceSopenharmony_ci goto cmd_completion; 1111d4afb5ceSopenharmony_ci } 1112d4afb5ceSopenharmony_ci 1113d4afb5ceSopenharmony_ci break; 1114d4afb5ceSopenharmony_ci } 1115d4afb5ceSopenharmony_ci 1116d4afb5ceSopenharmony_ci /* UNSUBACK */ 1117d4afb5ceSopenharmony_ci case LMQCPP_UNSUBACK_PACKET: 1118d4afb5ceSopenharmony_ci if (!lwsi_role_client(wsi)) { 1119d4afb5ceSopenharmony_ci lwsl_err("%s: UNSUBACK is only Server to Client", 1120d4afb5ceSopenharmony_ci __func__); 1121d4afb5ceSopenharmony_ci goto send_unsupp_connack_and_close; 1122d4afb5ceSopenharmony_ci } 1123d4afb5ceSopenharmony_ci 1124d4afb5ceSopenharmony_ci lwsl_debug("%s: received UNSUBACK pkt\n", __func__); 1125d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 1126d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1127d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1128d4afb5ceSopenharmony_ci break; 1129d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1130d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 1131d4afb5ceSopenharmony_ci lwsl_debug("%s: UNSUBACK pkt len = %d\n", 1132d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 1133d4afb5ceSopenharmony_ci if (par->cpkt_remlen < 2) 1134d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1135d4afb5ceSopenharmony_ci par->state = LMQCPP_UNSUBACK_VH_PKT_ID; 1136d4afb5ceSopenharmony_ci break; 1137d4afb5ceSopenharmony_ci default: 1138d4afb5ceSopenharmony_ci lwsl_notice("%s: unsuback bad vbi\n", __func__); 1139d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1140d4afb5ceSopenharmony_ci } 1141d4afb5ceSopenharmony_ci break; 1142d4afb5ceSopenharmony_ci 1143d4afb5ceSopenharmony_ci case LMQCPP_UNSUBACK_VH_PKT_ID: 1144d4afb5ceSopenharmony_ci 1145d4afb5ceSopenharmony_ci if (len < 2) { 1146d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage 3\n", __func__); 1147d4afb5ceSopenharmony_ci return -1; 1148d4afb5ceSopenharmony_ci } 1149d4afb5ceSopenharmony_ci 1150d4afb5ceSopenharmony_ci par->cpkt_id = lws_ser_ru16be(buf); 1151d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = par->cpkt_id; 1152d4afb5ceSopenharmony_ci buf += 2; 1153d4afb5ceSopenharmony_ci len -= 2; 1154d4afb5ceSopenharmony_ci par->cpkt_remlen -= 2; 1155d4afb5ceSopenharmony_ci par->n = 0; 1156d4afb5ceSopenharmony_ci 1157d4afb5ceSopenharmony_ci goto cmd_completion; 1158d4afb5ceSopenharmony_ci 1159d4afb5ceSopenharmony_ci case LMQCPP_PUBACK_PACKET: 1160d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 1161d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1162d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1163d4afb5ceSopenharmony_ci break; 1164d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1165d4afb5ceSopenharmony_ci par->cpkt_remlen = par->vbit.value; 1166d4afb5ceSopenharmony_ci lwsl_info("%s: PUBACK pkt len = %d\n", __func__, 1167d4afb5ceSopenharmony_ci (int)par->cpkt_remlen); 1168d4afb5ceSopenharmony_ci /* 1169d4afb5ceSopenharmony_ci * must be 4 or more, with special case that 2 1170d4afb5ceSopenharmony_ci * means success with no reason code or props 1171d4afb5ceSopenharmony_ci */ 1172d4afb5ceSopenharmony_ci if (par->cpkt_remlen <= 1 || 1173d4afb5ceSopenharmony_ci par->cpkt_remlen == 3) 1174d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1175d4afb5ceSopenharmony_ci 1176d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBACK_VH_PKT_ID; 1177d4afb5ceSopenharmony_ci par->fixed_seen[2] = par->fixed_seen[3] = 0; 1178d4afb5ceSopenharmony_ci par->fixed = 0; 1179d4afb5ceSopenharmony_ci par->n = 0; 1180d4afb5ceSopenharmony_ci break; 1181d4afb5ceSopenharmony_ci default: 1182d4afb5ceSopenharmony_ci lwsl_notice("%s: puback bad vbi\n", __func__); 1183d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1184d4afb5ceSopenharmony_ci } 1185d4afb5ceSopenharmony_ci break; 1186d4afb5ceSopenharmony_ci 1187d4afb5ceSopenharmony_ci case LMQCPP_PUBACK_VH_PKT_ID: 1188d4afb5ceSopenharmony_ci /* 1189d4afb5ceSopenharmony_ci * There are 3 fixed bytes and then a VBI for the 1190d4afb5ceSopenharmony_ci * property section length 1191d4afb5ceSopenharmony_ci */ 1192d4afb5ceSopenharmony_ci par->fixed_seen[par->fixed++] = *buf++; 1193d4afb5ceSopenharmony_ci if (len < par->cpkt_remlen - par->n) { 1194d4afb5ceSopenharmony_ci lwsl_notice("%s: len breakage 4\n", __func__); 1195d4afb5ceSopenharmony_ci return -1; 1196d4afb5ceSopenharmony_ci } 1197d4afb5ceSopenharmony_ci len--; 1198d4afb5ceSopenharmony_ci par->n++; 1199d4afb5ceSopenharmony_ci if (par->fixed == 2) 1200d4afb5ceSopenharmony_ci par->cpkt_id = lws_ser_ru16be(par->fixed_seen); 1201d4afb5ceSopenharmony_ci 1202d4afb5ceSopenharmony_ci if (par->fixed == 3) { 1203d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 1204d4afb5ceSopenharmony_ci par->props_consumed = 0; 1205d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI; 1206d4afb5ceSopenharmony_ci } 1207d4afb5ceSopenharmony_ci /* length of 2 is truncated packet and we completed it */ 1208d4afb5ceSopenharmony_ci if (par->cpkt_remlen == par->fixed) 1209d4afb5ceSopenharmony_ci goto cmd_completion; 1210d4afb5ceSopenharmony_ci break; 1211d4afb5ceSopenharmony_ci 1212d4afb5ceSopenharmony_ci case LMQCPP_PUBACK_PROPERTIES_LEN_VBI: 1213d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1214d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1215d4afb5ceSopenharmony_ci break; 1216d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1217d4afb5ceSopenharmony_ci par->props_len = par->vbit.value; 1218d4afb5ceSopenharmony_ci lwsl_info("%s: PUBACK props len = %d\n", 1219d4afb5ceSopenharmony_ci __func__, (int)par->cpkt_remlen); 1220d4afb5ceSopenharmony_ci /* 1221d4afb5ceSopenharmony_ci * If there are no properties, this is a 1222d4afb5ceSopenharmony_ci * command completion event in itself 1223d4afb5ceSopenharmony_ci */ 1224d4afb5ceSopenharmony_ci if (!par->props_len) 1225d4afb5ceSopenharmony_ci goto cmd_completion; 1226d4afb5ceSopenharmony_ci 1227d4afb5ceSopenharmony_ci /* 1228d4afb5ceSopenharmony_ci * Otherwise consume the properties before 1229d4afb5ceSopenharmony_ci * completing the command 1230d4afb5ceSopenharmony_ci */ 1231d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 1232d4afb5ceSopenharmony_ci par->state = LMQCPP_PUBACK_VH_PKT_ID; 1233d4afb5ceSopenharmony_ci break; 1234d4afb5ceSopenharmony_ci default: 1235d4afb5ceSopenharmony_ci lwsl_notice("%s: puback pr bad vbi\n", __func__); 1236d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1237d4afb5ceSopenharmony_ci } 1238d4afb5ceSopenharmony_ci break; 1239d4afb5ceSopenharmony_ci 1240d4afb5ceSopenharmony_ci case LMQCPP_EAT_PROPERTIES_AND_COMPLETE: 1241d4afb5ceSopenharmony_ci /* 1242d4afb5ceSopenharmony_ci * TODO: stash the props 1243d4afb5ceSopenharmony_ci */ 1244d4afb5ceSopenharmony_ci par->props_consumed++; 1245d4afb5ceSopenharmony_ci len--; 1246d4afb5ceSopenharmony_ci buf++; 1247d4afb5ceSopenharmony_ci if (par->props_len != par->props_consumed) 1248d4afb5ceSopenharmony_ci break; 1249d4afb5ceSopenharmony_ci 1250d4afb5ceSopenharmony_cicmd_completion: 1251d4afb5ceSopenharmony_ci /* 1252d4afb5ceSopenharmony_ci * We come here when we understood we just processed 1253d4afb5ceSopenharmony_ci * the last byte of a command packet, regardless of the 1254d4afb5ceSopenharmony_ci * packet type 1255d4afb5ceSopenharmony_ci */ 1256d4afb5ceSopenharmony_ci par->state = LMQCPP_IDLE; 1257d4afb5ceSopenharmony_ci 1258d4afb5ceSopenharmony_ci switch (par->packet_type_flags >> 4) { 1259d4afb5ceSopenharmony_ci case LMQCP_STOC_CONNACK: 1260d4afb5ceSopenharmony_ci lwsl_info("%s: cmd_completion: CONNACK\n", 1261d4afb5ceSopenharmony_ci __func__); 1262d4afb5ceSopenharmony_ci 1263d4afb5ceSopenharmony_ci /* 1264d4afb5ceSopenharmony_ci * Getting the CONNACK means we are the first, 1265d4afb5ceSopenharmony_ci * the nwsi, and we succeeded to create a new 1266d4afb5ceSopenharmony_ci * network connection ourselves. 1267d4afb5ceSopenharmony_ci * 1268d4afb5ceSopenharmony_ci * Since others may join us sharing the nwsi, 1269d4afb5ceSopenharmony_ci * and we may close while they still want to use 1270d4afb5ceSopenharmony_ci * it, our wsi lifecycle alone can no longer 1271d4afb5ceSopenharmony_ci * define the lifecycle of the nwsi... it means 1272d4afb5ceSopenharmony_ci * we need to do a "magic trick" and instead of 1273d4afb5ceSopenharmony_ci * being both the nwsi and act like a child 1274d4afb5ceSopenharmony_ci * stream, create a new wsi to take over the 1275d4afb5ceSopenharmony_ci * nwsi duties and turn our wsi into a child of 1276d4afb5ceSopenharmony_ci * the nwsi with its own lifecycle. 1277d4afb5ceSopenharmony_ci * 1278d4afb5ceSopenharmony_ci * The nwsi gets a mostly empty wsi->nwsi used 1279d4afb5ceSopenharmony_ci * to track already-subscribed topics globally 1280d4afb5ceSopenharmony_ci * for the connection. 1281d4afb5ceSopenharmony_ci */ 1282d4afb5ceSopenharmony_ci 1283d4afb5ceSopenharmony_ci /* we were under SENT_CLIENT_HANDSHAKE timeout */ 1284d4afb5ceSopenharmony_ci lws_set_timeout(wsi, 0, 0); 1285d4afb5ceSopenharmony_ci 1286d4afb5ceSopenharmony_ci w = lws_create_new_server_wsi(wsi->a.vhost, 1287d4afb5ceSopenharmony_ci wsi->tsi, "mqtt_sid1"); 1288d4afb5ceSopenharmony_ci if (!w) { 1289d4afb5ceSopenharmony_ci lwsl_notice("%s: sid 1 migrate failed\n", 1290d4afb5ceSopenharmony_ci __func__); 1291d4afb5ceSopenharmony_ci return -1; 1292d4afb5ceSopenharmony_ci } 1293d4afb5ceSopenharmony_ci 1294d4afb5ceSopenharmony_ci wsi->mux.highest_sid = 1; 1295d4afb5ceSopenharmony_ci lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++); 1296d4afb5ceSopenharmony_ci 1297d4afb5ceSopenharmony_ci wsi->mux_substream = 1; 1298d4afb5ceSopenharmony_ci w->mux_substream = 1; 1299d4afb5ceSopenharmony_ci w->client_mux_substream = 1; 1300d4afb5ceSopenharmony_ci wsi->client_mux_migrated = 1; 1301d4afb5ceSopenharmony_ci wsi->told_user_closed = 1; /* don't tell nwsi closed */ 1302d4afb5ceSopenharmony_ci 1303d4afb5ceSopenharmony_ci lwsi_set_state(w, LRS_ESTABLISHED); 1304d4afb5ceSopenharmony_ci lwsi_set_state(wsi, LRS_ESTABLISHED); 1305d4afb5ceSopenharmony_ci lwsi_set_role(w, lwsi_role(wsi)); 1306d4afb5ceSopenharmony_ci 1307d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 1308d4afb5ceSopenharmony_ci w->flags = wsi->flags; 1309d4afb5ceSopenharmony_ci#endif 1310d4afb5ceSopenharmony_ci 1311d4afb5ceSopenharmony_ci w->mqtt = wsi->mqtt; 1312d4afb5ceSopenharmony_ci wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt"); 1313d4afb5ceSopenharmony_ci if (!wsi->mqtt) 1314d4afb5ceSopenharmony_ci return -1; 1315d4afb5ceSopenharmony_ci w->mqtt->wsi = w; 1316d4afb5ceSopenharmony_ci w->a.protocol = wsi->a.protocol; 1317d4afb5ceSopenharmony_ci if (w->user_space && 1318d4afb5ceSopenharmony_ci !w->user_space_externally_allocated) 1319d4afb5ceSopenharmony_ci lws_free_set_NULL(w->user_space); 1320d4afb5ceSopenharmony_ci w->user_space = wsi->user_space; 1321d4afb5ceSopenharmony_ci wsi->user_space = NULL; 1322d4afb5ceSopenharmony_ci w->user_space_externally_allocated = 1323d4afb5ceSopenharmony_ci wsi->user_space_externally_allocated; 1324d4afb5ceSopenharmony_ci if (lws_ensure_user_space(w)) 1325d4afb5ceSopenharmony_ci goto bail1; 1326d4afb5ceSopenharmony_ci w->a.opaque_user_data = wsi->a.opaque_user_data; 1327d4afb5ceSopenharmony_ci wsi->a.opaque_user_data = NULL; 1328d4afb5ceSopenharmony_ci w->stash = wsi->stash; 1329d4afb5ceSopenharmony_ci wsi->stash = NULL; 1330d4afb5ceSopenharmony_ci 1331d4afb5ceSopenharmony_ci lws_mux_mark_immortal(w); 1332d4afb5ceSopenharmony_ci 1333d4afb5ceSopenharmony_ci lwsl_notice("%s: migrated nwsi %s to sid 1 %s\n", 1334d4afb5ceSopenharmony_ci __func__, lws_wsi_tag(wsi), 1335d4afb5ceSopenharmony_ci lws_wsi_tag(w)); 1336d4afb5ceSopenharmony_ci 1337d4afb5ceSopenharmony_ci /* 1338d4afb5ceSopenharmony_ci * It was the last thing we were waiting for 1339d4afb5ceSopenharmony_ci * before we can be fully ESTABLISHED 1340d4afb5ceSopenharmony_ci */ 1341d4afb5ceSopenharmony_ci if (lws_mqtt_set_client_established(w)) { 1342d4afb5ceSopenharmony_ci lwsl_notice("%s: set EST fail\n", __func__); 1343d4afb5ceSopenharmony_ci return -1; 1344d4afb5ceSopenharmony_ci } 1345d4afb5ceSopenharmony_ci 1346d4afb5ceSopenharmony_ci /* get the ball rolling */ 1347d4afb5ceSopenharmony_ci lws_validity_confirmed(wsi); 1348d4afb5ceSopenharmony_ci 1349d4afb5ceSopenharmony_ci /* well, add the queued guys as children */ 1350d4afb5ceSopenharmony_ci lws_wsi_mux_apply_queue(wsi); 1351d4afb5ceSopenharmony_ci break; 1352d4afb5ceSopenharmony_ci 1353d4afb5ceSopenharmony_cibail1: 1354d4afb5ceSopenharmony_ci /* undo the insert */ 1355d4afb5ceSopenharmony_ci wsi->mux.child_list = w->mux.sibling_list; 1356d4afb5ceSopenharmony_ci wsi->mux.child_count--; 1357d4afb5ceSopenharmony_ci 1358d4afb5ceSopenharmony_ci if (w->user_space) 1359d4afb5ceSopenharmony_ci lws_free_set_NULL(w->user_space); 1360d4afb5ceSopenharmony_ci w->a.vhost->protocols[0].callback(w, 1361d4afb5ceSopenharmony_ci LWS_CALLBACK_WSI_DESTROY, 1362d4afb5ceSopenharmony_ci NULL, NULL, 0); 1363d4afb5ceSopenharmony_ci __lws_vhost_unbind_wsi(w); /* cx + vh lock */ 1364d4afb5ceSopenharmony_ci lws_free(w); 1365d4afb5ceSopenharmony_ci 1366d4afb5ceSopenharmony_ci return 0; 1367d4afb5ceSopenharmony_ci 1368d4afb5ceSopenharmony_ci case LMQCP_PUBREC: 1369d4afb5ceSopenharmony_ci lwsl_err("%s: cmd_completion: PUBREC\n", 1370d4afb5ceSopenharmony_ci __func__); 1371d4afb5ceSopenharmony_ci /* 1372d4afb5ceSopenharmony_ci * Figure out which child asked for this 1373d4afb5ceSopenharmony_ci */ 1374d4afb5ceSopenharmony_ci n = 0; 1375d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct lws *, w, 1376d4afb5ceSopenharmony_ci wsi->mux.child_list) { 1377d4afb5ceSopenharmony_ci if (w->mqtt->unacked_publish && 1378d4afb5ceSopenharmony_ci w->mqtt->ack_pkt_id == par->cpkt_id) { 1379d4afb5ceSopenharmony_ci char requested_close = 0; 1380d4afb5ceSopenharmony_ci 1381d4afb5ceSopenharmony_ci w->mqtt->unacked_publish = 0; 1382d4afb5ceSopenharmony_ci w->mqtt->unacked_pubrel = 1; 1383d4afb5ceSopenharmony_ci 1384d4afb5ceSopenharmony_ci if (user_callback_handle_rxflow( 1385d4afb5ceSopenharmony_ci w->a.protocol->callback, 1386d4afb5ceSopenharmony_ci w, LWS_CALLBACK_MQTT_ACK, 1387d4afb5ceSopenharmony_ci w->user_space, NULL, 0) < 0) { 1388d4afb5ceSopenharmony_ci lwsl_info("%s: MQTT_ACK requests close\n", 1389d4afb5ceSopenharmony_ci __func__); 1390d4afb5ceSopenharmony_ci requested_close = 1; 1391d4afb5ceSopenharmony_ci } 1392d4afb5ceSopenharmony_ci n = 1; 1393d4afb5ceSopenharmony_ci 1394d4afb5ceSopenharmony_ci /* 1395d4afb5ceSopenharmony_ci * We got an assertive PUBREC, 1396d4afb5ceSopenharmony_ci * no need for timeout wait 1397d4afb5ceSopenharmony_ci * any more 1398d4afb5ceSopenharmony_ci */ 1399d4afb5ceSopenharmony_ci lws_sul_cancel(&w->mqtt-> 1400d4afb5ceSopenharmony_ci sul_qos_puback_pubrec_wait); 1401d4afb5ceSopenharmony_ci 1402d4afb5ceSopenharmony_ci if (requested_close) { 1403d4afb5ceSopenharmony_ci __lws_close_free_wsi(w, 1404d4afb5ceSopenharmony_ci 0, "ack cb"); 1405d4afb5ceSopenharmony_ci break; 1406d4afb5ceSopenharmony_ci } 1407d4afb5ceSopenharmony_ci 1408d4afb5ceSopenharmony_ci break; 1409d4afb5ceSopenharmony_ci } 1410d4afb5ceSopenharmony_ci } lws_end_foreach_ll(w, mux.sibling_list); 1411d4afb5ceSopenharmony_ci 1412d4afb5ceSopenharmony_ci if (!n) { 1413d4afb5ceSopenharmony_ci lwsl_err("%s: unsolicited PUBREC\n", 1414d4afb5ceSopenharmony_ci __func__); 1415d4afb5ceSopenharmony_ci return -1; 1416d4afb5ceSopenharmony_ci } 1417d4afb5ceSopenharmony_ci wsi->mqtt->send_pubrel = 1; 1418d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 1419d4afb5ceSopenharmony_ci break; 1420d4afb5ceSopenharmony_ci 1421d4afb5ceSopenharmony_ci case LMQCP_PUBCOMP: 1422d4afb5ceSopenharmony_ci lwsl_err("%s: cmd_completion: PUBCOMP\n", 1423d4afb5ceSopenharmony_ci __func__); 1424d4afb5ceSopenharmony_ci n = 0; 1425d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct lws *, w, 1426d4afb5ceSopenharmony_ci wsi->mux.child_list) { 1427d4afb5ceSopenharmony_ci if (w->mqtt->unacked_pubrel > 0 && 1428d4afb5ceSopenharmony_ci w->mqtt->ack_pkt_id == par->cpkt_id) { 1429d4afb5ceSopenharmony_ci w->mqtt->unacked_pubrel = 0; 1430d4afb5ceSopenharmony_ci n = 1; 1431d4afb5ceSopenharmony_ci } 1432d4afb5ceSopenharmony_ci } lws_end_foreach_ll(w, mux.sibling_list); 1433d4afb5ceSopenharmony_ci 1434d4afb5ceSopenharmony_ci if (!n) { 1435d4afb5ceSopenharmony_ci lwsl_err("%s: unsolicited PUBCOMP\n", 1436d4afb5ceSopenharmony_ci __func__); 1437d4afb5ceSopenharmony_ci return -1; 1438d4afb5ceSopenharmony_ci } 1439d4afb5ceSopenharmony_ci 1440d4afb5ceSopenharmony_ci /* 1441d4afb5ceSopenharmony_ci * If we published something and PUBCOMP arrived, 1442d4afb5ceSopenharmony_ci * our connection is definitely working in both 1443d4afb5ceSopenharmony_ci * directions at the moment. 1444d4afb5ceSopenharmony_ci */ 1445d4afb5ceSopenharmony_ci lws_validity_confirmed(wsi); 1446d4afb5ceSopenharmony_ci break; 1447d4afb5ceSopenharmony_ci 1448d4afb5ceSopenharmony_ci case LMQCP_PUBREL: 1449d4afb5ceSopenharmony_ci lwsl_err("%s: cmd_completion: PUBREL\n", 1450d4afb5ceSopenharmony_ci __func__); 1451d4afb5ceSopenharmony_ci wsi->mqtt->send_pubcomp = 1; 1452d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 1453d4afb5ceSopenharmony_ci break; 1454d4afb5ceSopenharmony_ci 1455d4afb5ceSopenharmony_ci case LMQCP_PUBACK: 1456d4afb5ceSopenharmony_ci lwsl_info("%s: cmd_completion: PUBACK\n", 1457d4afb5ceSopenharmony_ci __func__); 1458d4afb5ceSopenharmony_ci 1459d4afb5ceSopenharmony_ci /* 1460d4afb5ceSopenharmony_ci * Figure out which child asked for this 1461d4afb5ceSopenharmony_ci */ 1462d4afb5ceSopenharmony_ci 1463d4afb5ceSopenharmony_ci n = 0; 1464d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct lws *, w, 1465d4afb5ceSopenharmony_ci wsi->mux.child_list) { 1466d4afb5ceSopenharmony_ci if (w->mqtt->unacked_publish && 1467d4afb5ceSopenharmony_ci w->mqtt->ack_pkt_id == par->cpkt_id) { 1468d4afb5ceSopenharmony_ci char requested_close = 0; 1469d4afb5ceSopenharmony_ci 1470d4afb5ceSopenharmony_ci w->mqtt->unacked_publish = 0; 1471d4afb5ceSopenharmony_ci if (user_callback_handle_rxflow( 1472d4afb5ceSopenharmony_ci w->a.protocol->callback, 1473d4afb5ceSopenharmony_ci w, LWS_CALLBACK_MQTT_ACK, 1474d4afb5ceSopenharmony_ci w->user_space, NULL, 0) < 0) { 1475d4afb5ceSopenharmony_ci lwsl_info("%s: MQTT_ACK requests close\n", 1476d4afb5ceSopenharmony_ci __func__); 1477d4afb5ceSopenharmony_ci requested_close = 1; 1478d4afb5ceSopenharmony_ci } 1479d4afb5ceSopenharmony_ci n = 1; 1480d4afb5ceSopenharmony_ci 1481d4afb5ceSopenharmony_ci /* 1482d4afb5ceSopenharmony_ci * We got an assertive PUBACK, 1483d4afb5ceSopenharmony_ci * no need for ACK timeout wait 1484d4afb5ceSopenharmony_ci * any more 1485d4afb5ceSopenharmony_ci */ 1486d4afb5ceSopenharmony_ci lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait); 1487d4afb5ceSopenharmony_ci 1488d4afb5ceSopenharmony_ci if (requested_close) { 1489d4afb5ceSopenharmony_ci __lws_close_free_wsi(w, 1490d4afb5ceSopenharmony_ci 0, "ack cb"); 1491d4afb5ceSopenharmony_ci break; 1492d4afb5ceSopenharmony_ci } 1493d4afb5ceSopenharmony_ci 1494d4afb5ceSopenharmony_ci break; 1495d4afb5ceSopenharmony_ci } 1496d4afb5ceSopenharmony_ci } lws_end_foreach_ll(w, mux.sibling_list); 1497d4afb5ceSopenharmony_ci 1498d4afb5ceSopenharmony_ci if (!n) { 1499d4afb5ceSopenharmony_ci lwsl_err("%s: unsolicited PUBACK\n", 1500d4afb5ceSopenharmony_ci __func__); 1501d4afb5ceSopenharmony_ci return -1; 1502d4afb5ceSopenharmony_ci } 1503d4afb5ceSopenharmony_ci 1504d4afb5ceSopenharmony_ci /* 1505d4afb5ceSopenharmony_ci * If we published something and it was acked, 1506d4afb5ceSopenharmony_ci * our connection is definitely working in both 1507d4afb5ceSopenharmony_ci * directions at the moment. 1508d4afb5ceSopenharmony_ci */ 1509d4afb5ceSopenharmony_ci lws_validity_confirmed(wsi); 1510d4afb5ceSopenharmony_ci break; 1511d4afb5ceSopenharmony_ci 1512d4afb5ceSopenharmony_ci case LMQCP_STOC_PINGRESP: 1513d4afb5ceSopenharmony_ci lwsl_info("%s: cmd_completion: PINGRESP\n", 1514d4afb5ceSopenharmony_ci __func__); 1515d4afb5ceSopenharmony_ci /* 1516d4afb5ceSopenharmony_ci * If we asked for a PINGRESP and it came, 1517d4afb5ceSopenharmony_ci * our connection is definitely working in both 1518d4afb5ceSopenharmony_ci * directions at the moment. 1519d4afb5ceSopenharmony_ci */ 1520d4afb5ceSopenharmony_ci lws_validity_confirmed(wsi); 1521d4afb5ceSopenharmony_ci break; 1522d4afb5ceSopenharmony_ci 1523d4afb5ceSopenharmony_ci case LMQCP_STOC_SUBACK: 1524d4afb5ceSopenharmony_ci lwsl_info("%s: cmd_completion: SUBACK\n", 1525d4afb5ceSopenharmony_ci __func__); 1526d4afb5ceSopenharmony_ci 1527d4afb5ceSopenharmony_ci /* 1528d4afb5ceSopenharmony_ci * Figure out which child asked for this 1529d4afb5ceSopenharmony_ci */ 1530d4afb5ceSopenharmony_ci 1531d4afb5ceSopenharmony_ci n = 0; 1532d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct lws *, w, 1533d4afb5ceSopenharmony_ci wsi->mux.child_list) { 1534d4afb5ceSopenharmony_ci if (w->mqtt->inside_subscribe && 1535d4afb5ceSopenharmony_ci w->mqtt->ack_pkt_id == par->cpkt_id) { 1536d4afb5ceSopenharmony_ci w->mqtt->inside_subscribe = 0; 1537d4afb5ceSopenharmony_ci if (user_callback_handle_rxflow( 1538d4afb5ceSopenharmony_ci w->a.protocol->callback, 1539d4afb5ceSopenharmony_ci w, LWS_CALLBACK_MQTT_SUBSCRIBED, 1540d4afb5ceSopenharmony_ci w->user_space, NULL, 0) < 0) { 1541d4afb5ceSopenharmony_ci lwsl_err("%s: MQTT_SUBSCRIBE failed\n", 1542d4afb5ceSopenharmony_ci __func__); 1543d4afb5ceSopenharmony_ci return -1; 1544d4afb5ceSopenharmony_ci } 1545d4afb5ceSopenharmony_ci n = 1; 1546d4afb5ceSopenharmony_ci break; 1547d4afb5ceSopenharmony_ci } 1548d4afb5ceSopenharmony_ci } lws_end_foreach_ll(w, mux.sibling_list); 1549d4afb5ceSopenharmony_ci 1550d4afb5ceSopenharmony_ci if (!n) { 1551d4afb5ceSopenharmony_ci lwsl_err("%s: unsolicited SUBACK\n", 1552d4afb5ceSopenharmony_ci __func__); 1553d4afb5ceSopenharmony_ci return -1; 1554d4afb5ceSopenharmony_ci } 1555d4afb5ceSopenharmony_ci 1556d4afb5ceSopenharmony_ci /* 1557d4afb5ceSopenharmony_ci * If we subscribed to something and SUBACK came, 1558d4afb5ceSopenharmony_ci * our connection is definitely working in both 1559d4afb5ceSopenharmony_ci * directions at the moment. 1560d4afb5ceSopenharmony_ci */ 1561d4afb5ceSopenharmony_ci lws_validity_confirmed(wsi); 1562d4afb5ceSopenharmony_ci 1563d4afb5ceSopenharmony_ci break; 1564d4afb5ceSopenharmony_ci 1565d4afb5ceSopenharmony_ci case LMQCP_STOC_UNSUBACK: 1566d4afb5ceSopenharmony_ci { 1567d4afb5ceSopenharmony_ci char requested_close = 0; 1568d4afb5ceSopenharmony_ci lwsl_info("%s: cmd_completion: UNSUBACK\n", 1569d4afb5ceSopenharmony_ci __func__); 1570d4afb5ceSopenharmony_ci /* 1571d4afb5ceSopenharmony_ci * Figure out which child asked for this 1572d4afb5ceSopenharmony_ci */ 1573d4afb5ceSopenharmony_ci n = 0; 1574d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct lws *, w, 1575d4afb5ceSopenharmony_ci wsi->mux.child_list) { 1576d4afb5ceSopenharmony_ci if (w->mqtt->inside_unsubscribe && 1577d4afb5ceSopenharmony_ci w->mqtt->ack_pkt_id == par->cpkt_id) { 1578d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(w); 1579d4afb5ceSopenharmony_ci 1580d4afb5ceSopenharmony_ci /* 1581d4afb5ceSopenharmony_ci * No more subscribers left, 1582d4afb5ceSopenharmony_ci * remove the topic from nwsi 1583d4afb5ceSopenharmony_ci */ 1584d4afb5ceSopenharmony_ci lws_mqtt_client_remove_subs(nwsi->mqtt); 1585d4afb5ceSopenharmony_ci 1586d4afb5ceSopenharmony_ci w->mqtt->inside_unsubscribe = 0; 1587d4afb5ceSopenharmony_ci if (user_callback_handle_rxflow( 1588d4afb5ceSopenharmony_ci w->a.protocol->callback, 1589d4afb5ceSopenharmony_ci w, LWS_CALLBACK_MQTT_UNSUBSCRIBED, 1590d4afb5ceSopenharmony_ci w->user_space, NULL, 0) < 0) { 1591d4afb5ceSopenharmony_ci lwsl_info("%s: MQTT_UNSUBACK requests close\n", 1592d4afb5ceSopenharmony_ci __func__); 1593d4afb5ceSopenharmony_ci requested_close = 1; 1594d4afb5ceSopenharmony_ci } 1595d4afb5ceSopenharmony_ci n = 1; 1596d4afb5ceSopenharmony_ci 1597d4afb5ceSopenharmony_ci lws_sul_cancel(&w->mqtt->sul_unsuback_wait); 1598d4afb5ceSopenharmony_ci if (requested_close) { 1599d4afb5ceSopenharmony_ci __lws_close_free_wsi(w, 1600d4afb5ceSopenharmony_ci 0, "unsub ack cb"); 1601d4afb5ceSopenharmony_ci break; 1602d4afb5ceSopenharmony_ci } 1603d4afb5ceSopenharmony_ci break; 1604d4afb5ceSopenharmony_ci } 1605d4afb5ceSopenharmony_ci } lws_end_foreach_ll(w, mux.sibling_list); 1606d4afb5ceSopenharmony_ci 1607d4afb5ceSopenharmony_ci if (!n) { 1608d4afb5ceSopenharmony_ci lwsl_err("%s: unsolicited UNSUBACK\n", 1609d4afb5ceSopenharmony_ci __func__); 1610d4afb5ceSopenharmony_ci return -1; 1611d4afb5ceSopenharmony_ci } 1612d4afb5ceSopenharmony_ci 1613d4afb5ceSopenharmony_ci 1614d4afb5ceSopenharmony_ci /* 1615d4afb5ceSopenharmony_ci * If we unsubscribed to something and 1616d4afb5ceSopenharmony_ci * UNSUBACK came, our connection is 1617d4afb5ceSopenharmony_ci * definitely working in both 1618d4afb5ceSopenharmony_ci * directions at the moment. 1619d4afb5ceSopenharmony_ci */ 1620d4afb5ceSopenharmony_ci lws_validity_confirmed(wsi); 1621d4afb5ceSopenharmony_ci 1622d4afb5ceSopenharmony_ci break; 1623d4afb5ceSopenharmony_ci } 1624d4afb5ceSopenharmony_ci case LMQCP_PUBLISH: 1625d4afb5ceSopenharmony_ci { 1626d4afb5ceSopenharmony_ci lws_mqtt_publish_param_t *pub = 1627d4afb5ceSopenharmony_ci (lws_mqtt_publish_param_t *) 1628d4afb5ceSopenharmony_ci wsi->mqtt->rx_cpkt_param; 1629d4afb5ceSopenharmony_ci size_t chunk; 1630d4afb5ceSopenharmony_ci 1631d4afb5ceSopenharmony_ci if (pub == NULL) { 1632d4afb5ceSopenharmony_ci lwsl_notice("%s: no pub\n", __func__); 1633d4afb5ceSopenharmony_ci return -1; 1634d4afb5ceSopenharmony_ci } 1635d4afb5ceSopenharmony_ci 1636d4afb5ceSopenharmony_ci /* 1637d4afb5ceSopenharmony_ci * RX PUBLISH is delivered to any children that 1638d4afb5ceSopenharmony_ci * registered for the related topic 1639d4afb5ceSopenharmony_ci */ 1640d4afb5ceSopenharmony_ci 1641d4afb5ceSopenharmony_ci n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)]; 1642d4afb5ceSopenharmony_ci 1643d4afb5ceSopenharmony_ci chunk = pub->payload_len - pub->payload_pos; 1644d4afb5ceSopenharmony_ci if (chunk > len) 1645d4afb5ceSopenharmony_ci chunk = len; 1646d4afb5ceSopenharmony_ci 1647d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct lws *, w, 1648d4afb5ceSopenharmony_ci wsi->mux.child_list) { 1649d4afb5ceSopenharmony_ci if (lws_mqtt_find_sub(w->mqtt, 1650d4afb5ceSopenharmony_ci pub->topic)) 1651d4afb5ceSopenharmony_ci if (w->a.protocol->callback( 1652d4afb5ceSopenharmony_ci w, (enum lws_callback_reasons)n, 1653d4afb5ceSopenharmony_ci w->user_space, 1654d4afb5ceSopenharmony_ci (void *)pub, 1655d4afb5ceSopenharmony_ci chunk)) { 1656d4afb5ceSopenharmony_ci par->payload_consumed = 0; 1657d4afb5ceSopenharmony_ci lws_free_set_NULL(pub->topic); 1658d4afb5ceSopenharmony_ci lws_free_set_NULL(wsi->mqtt->rx_cpkt_param); 1659d4afb5ceSopenharmony_ci return 1; 1660d4afb5ceSopenharmony_ci } 1661d4afb5ceSopenharmony_ci } lws_end_foreach_ll(w, mux.sibling_list); 1662d4afb5ceSopenharmony_ci 1663d4afb5ceSopenharmony_ci 1664d4afb5ceSopenharmony_ci pub->payload_pos += (uint32_t)chunk; 1665d4afb5ceSopenharmony_ci len -= chunk; 1666d4afb5ceSopenharmony_ci buf += chunk; 1667d4afb5ceSopenharmony_ci 1668d4afb5ceSopenharmony_ci lwsl_debug("%s: post pos %d, plen %d, len %d\n", 1669d4afb5ceSopenharmony_ci __func__, (int)pub->payload_pos, 1670d4afb5ceSopenharmony_ci (int)pub->payload_len, (int)len); 1671d4afb5ceSopenharmony_ci 1672d4afb5ceSopenharmony_ci if (pub->payload_pos != pub->payload_len) { 1673d4afb5ceSopenharmony_ci /* 1674d4afb5ceSopenharmony_ci * More chunks of the payload pending, 1675d4afb5ceSopenharmony_ci * blocking this connection from doing 1676d4afb5ceSopenharmony_ci * anything else 1677d4afb5ceSopenharmony_ci */ 1678d4afb5ceSopenharmony_ci par->state = LMQCPP_PAYLOAD; 1679d4afb5ceSopenharmony_ci break; 1680d4afb5ceSopenharmony_ci } 1681d4afb5ceSopenharmony_ci 1682d4afb5ceSopenharmony_ci if (pub->qos == 1) { 1683d4afb5ceSopenharmony_ci /* For QOS = 1, send out PUBACK */ 1684d4afb5ceSopenharmony_ci wsi->mqtt->send_puback = 1; 1685d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 1686d4afb5ceSopenharmony_ci } else if (pub->qos == 2) { 1687d4afb5ceSopenharmony_ci /* For QOS = 2, send out PUBREC */ 1688d4afb5ceSopenharmony_ci wsi->mqtt->send_pubrec = 1; 1689d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 1690d4afb5ceSopenharmony_ci } 1691d4afb5ceSopenharmony_ci 1692d4afb5ceSopenharmony_ci par->payload_consumed = 0; 1693d4afb5ceSopenharmony_ci lws_free_set_NULL(pub->topic); 1694d4afb5ceSopenharmony_ci lws_free_set_NULL(wsi->mqtt->rx_cpkt_param); 1695d4afb5ceSopenharmony_ci 1696d4afb5ceSopenharmony_ci break; 1697d4afb5ceSopenharmony_ci } 1698d4afb5ceSopenharmony_ci default: 1699d4afb5ceSopenharmony_ci break; 1700d4afb5ceSopenharmony_ci } 1701d4afb5ceSopenharmony_ci 1702d4afb5ceSopenharmony_ci break; 1703d4afb5ceSopenharmony_ci 1704d4afb5ceSopenharmony_ci 1705d4afb5ceSopenharmony_ci case LMQCPP_PROP_ID_VBI: 1706d4afb5ceSopenharmony_ci switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { 1707d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1708d4afb5ceSopenharmony_ci break; 1709d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1710d4afb5ceSopenharmony_ci par->consumed = (uint32_t)((unsigned int)par->consumed + (unsigned int)(unsigned char)par->vbit.consumed); 1711d4afb5ceSopenharmony_ci if (par->vbit.value > 1712d4afb5ceSopenharmony_ci LWS_ARRAY_SIZE(property_valid)) { 1713d4afb5ceSopenharmony_ci lwsl_notice("%s: undef prop id 0x%x\n", 1714d4afb5ceSopenharmony_ci __func__, (int)par->vbit.value); 1715d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1716d4afb5ceSopenharmony_ci } 1717d4afb5ceSopenharmony_ci if (!(property_valid[par->vbit.value] & 1718d4afb5ceSopenharmony_ci (1 << ctl_pkt_type(par)))) { 1719d4afb5ceSopenharmony_ci lwsl_notice("%s: prop id 0x%x invalid for" 1720d4afb5ceSopenharmony_ci " control pkt %d\n", __func__, 1721d4afb5ceSopenharmony_ci (int)par->vbit.value, 1722d4afb5ceSopenharmony_ci ctl_pkt_type(par)); 1723d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1724d4afb5ceSopenharmony_ci } 1725d4afb5ceSopenharmony_ci par->prop_id = par->vbit.value; 1726d4afb5ceSopenharmony_ci par->flag_prop_multi = !!( 1727d4afb5ceSopenharmony_ci par->props_seen[par->prop_id >> 3] & 1728d4afb5ceSopenharmony_ci (1 << (par->prop_id & 7))); 1729d4afb5ceSopenharmony_ci par->props_seen[par->prop_id >> 3] = 1730d4afb5ceSopenharmony_ci (uint8_t)((par->props_seen[par->prop_id >> 3]) | (1 << (par->prop_id & 7))); 1731d4afb5ceSopenharmony_ci /* 1732d4afb5ceSopenharmony_ci * even if it's not a vbi property arg, 1733d4afb5ceSopenharmony_ci * .consumed of this will be zero the first time 1734d4afb5ceSopenharmony_ci */ 1735d4afb5ceSopenharmony_ci lws_mqtt_vbi_init(&par->vbit); 1736d4afb5ceSopenharmony_ci /* 1737d4afb5ceSopenharmony_ci * if it's a string, next state must set the 1738d4afb5ceSopenharmony_ci * destination and size limit itself. But 1739d4afb5ceSopenharmony_ci * resetting it generically here lets it use 1740d4afb5ceSopenharmony_ci * lws_mqtt_str_first() to understand it's the 1741d4afb5ceSopenharmony_ci * first time around. 1742d4afb5ceSopenharmony_ci */ 1743d4afb5ceSopenharmony_ci lws_mqtt_str_init(&par->s_temp, NULL, 0, 0); 1744d4afb5ceSopenharmony_ci 1745d4afb5ceSopenharmony_ci /* property arg state enums are so encoded */ 1746d4afb5ceSopenharmony_ci par->state = 0x100 | par->vbit.value; 1747d4afb5ceSopenharmony_ci break; 1748d4afb5ceSopenharmony_ci default: 1749d4afb5ceSopenharmony_ci lwsl_notice("%s: prop id bad vbi\n", __func__); 1750d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1751d4afb5ceSopenharmony_ci } 1752d4afb5ceSopenharmony_ci break; 1753d4afb5ceSopenharmony_ci 1754d4afb5ceSopenharmony_ci /* 1755d4afb5ceSopenharmony_ci * All possible property payloads... restricting which ones 1756d4afb5ceSopenharmony_ci * can appear in which control packets is already done above 1757d4afb5ceSopenharmony_ci * in LMQCPP_PROP_ID_VBI 1758d4afb5ceSopenharmony_ci */ 1759d4afb5ceSopenharmony_ci 1760d4afb5ceSopenharmony_ci case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE: 1761d4afb5ceSopenharmony_ci case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE: 1762d4afb5ceSopenharmony_ci case LMQCPP_PROP_MAXIMUM_QOS_1BYTE: 1763d4afb5ceSopenharmony_ci case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE: 1764d4afb5ceSopenharmony_ci case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE: 1765d4afb5ceSopenharmony_ci case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE: 1766d4afb5ceSopenharmony_ci case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE: 1767d4afb5ceSopenharmony_ci case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */ 1768d4afb5ceSopenharmony_ci if (par->flag_prop_multi) 1769d4afb5ceSopenharmony_ci goto singular_prop_seen_twice; 1770d4afb5ceSopenharmony_ci par->payload_format = *buf++; 1771d4afb5ceSopenharmony_ci len--; 1772d4afb5ceSopenharmony_ci if (lws_mqtt_pconsume(par, 1)) 1773d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1774d4afb5ceSopenharmony_ci break; 1775d4afb5ceSopenharmony_ci 1776d4afb5ceSopenharmony_ci case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE: 1777d4afb5ceSopenharmony_ci case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE: 1778d4afb5ceSopenharmony_ci case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE: 1779d4afb5ceSopenharmony_ci case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE: 1780d4afb5ceSopenharmony_ci if (par->flag_prop_multi) 1781d4afb5ceSopenharmony_ci goto singular_prop_seen_twice; 1782d4afb5ceSopenharmony_ci 1783d4afb5ceSopenharmony_ci if (lws_mqtt_mb_first(&par->vbit)) 1784d4afb5ceSopenharmony_ci lws_mqtt_4byte_init(&par->vbit); 1785d4afb5ceSopenharmony_ci 1786d4afb5ceSopenharmony_ci switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) { 1787d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1788d4afb5ceSopenharmony_ci break; 1789d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1790d4afb5ceSopenharmony_ci if (lws_mqtt_pconsume(par, par->vbit.consumed)) 1791d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1792d4afb5ceSopenharmony_ci break; 1793d4afb5ceSopenharmony_ci default: 1794d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1795d4afb5ceSopenharmony_ci } 1796d4afb5ceSopenharmony_ci break; 1797d4afb5ceSopenharmony_ci 1798d4afb5ceSopenharmony_ci case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE: 1799d4afb5ceSopenharmony_ci case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE: 1800d4afb5ceSopenharmony_ci case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE: 1801d4afb5ceSopenharmony_ci case LMQCPP_PROP_TOPIC_ALIAS_2BYTE: 1802d4afb5ceSopenharmony_ci if (par->flag_prop_multi) 1803d4afb5ceSopenharmony_ci goto singular_prop_seen_twice; 1804d4afb5ceSopenharmony_ci 1805d4afb5ceSopenharmony_ci if (lws_mqtt_mb_first(&par->vbit)) 1806d4afb5ceSopenharmony_ci lws_mqtt_2byte_init(&par->vbit); 1807d4afb5ceSopenharmony_ci 1808d4afb5ceSopenharmony_ci switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) { 1809d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1810d4afb5ceSopenharmony_ci break; 1811d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1812d4afb5ceSopenharmony_ci if (lws_mqtt_pconsume(par, par->vbit.consumed)) 1813d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1814d4afb5ceSopenharmony_ci break; 1815d4afb5ceSopenharmony_ci default: 1816d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1817d4afb5ceSopenharmony_ci } 1818d4afb5ceSopenharmony_ci break; 1819d4afb5ceSopenharmony_ci 1820d4afb5ceSopenharmony_ci case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S: 1821d4afb5ceSopenharmony_ci case LMQCPP_PROP_AUTH_METHOD_UTF8S: 1822d4afb5ceSopenharmony_ci case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S: 1823d4afb5ceSopenharmony_ci case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S: 1824d4afb5ceSopenharmony_ci case LMQCPP_PROP_RESPONSE_INFO_UTF8S: 1825d4afb5ceSopenharmony_ci case LMQCPP_PROP_SERVER_REFERENCE_UTF8S: 1826d4afb5ceSopenharmony_ci case LMQCPP_PROP_REASON_STRING_UTF8S: 1827d4afb5ceSopenharmony_ci case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S: 1828d4afb5ceSopenharmony_ci case LMQCPP_PROP_CONTENT_TYPE_UTF8S: 1829d4afb5ceSopenharmony_ci if (par->flag_prop_multi) 1830d4afb5ceSopenharmony_ci goto singular_prop_seen_twice; 1831d4afb5ceSopenharmony_ci 1832d4afb5ceSopenharmony_ci if (lws_mqtt_str_first(&par->s_temp)) 1833d4afb5ceSopenharmony_ci lws_mqtt_str_init(&par->s_temp, par->temp, 1834d4afb5ceSopenharmony_ci sizeof(par->temp), 0); 1835d4afb5ceSopenharmony_ci 1836d4afb5ceSopenharmony_ci switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) { 1837d4afb5ceSopenharmony_ci case LMSPR_NEED_MORE: 1838d4afb5ceSopenharmony_ci break; 1839d4afb5ceSopenharmony_ci case LMSPR_COMPLETED: 1840d4afb5ceSopenharmony_ci if (lws_mqtt_pconsume(par, par->s_temp.len)) 1841d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1842d4afb5ceSopenharmony_ci break; 1843d4afb5ceSopenharmony_ci 1844d4afb5ceSopenharmony_ci default: 1845d4afb5ceSopenharmony_ci lwsl_info("%s: bad protocol name\n", __func__); 1846d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1847d4afb5ceSopenharmony_ci } 1848d4afb5ceSopenharmony_ci break; 1849d4afb5ceSopenharmony_ci 1850d4afb5ceSopenharmony_ci case LMQCPP_PROP_SUBSCRIPTION_ID_VBI: 1851d4afb5ceSopenharmony_ci 1852d4afb5ceSopenharmony_ci case LMQCPP_PROP_CORRELATION_BINDATA: 1853d4afb5ceSopenharmony_ci case LMQCPP_PROP_AUTH_DATA_BINDATA: 1854d4afb5ceSopenharmony_ci 1855d4afb5ceSopenharmony_ci /* TODO */ 1856d4afb5ceSopenharmony_ci lwsl_err("%s: Unimplemented packet state 0x%x\n", 1857d4afb5ceSopenharmony_ci __func__, par->state); 1858d4afb5ceSopenharmony_ci return -1; 1859d4afb5ceSopenharmony_ci } 1860d4afb5ceSopenharmony_ci } 1861d4afb5ceSopenharmony_ci 1862d4afb5ceSopenharmony_ci return 0; 1863d4afb5ceSopenharmony_ci 1864d4afb5ceSopenharmony_cioom: 1865d4afb5ceSopenharmony_ci lwsl_err("%s: OOM!\n", __func__); 1866d4afb5ceSopenharmony_ci goto send_protocol_error_and_close; 1867d4afb5ceSopenharmony_ci 1868d4afb5ceSopenharmony_cisingular_prop_seen_twice: 1869d4afb5ceSopenharmony_ci lwsl_info("%s: property appears twice\n", __func__); 1870d4afb5ceSopenharmony_ci 1871d4afb5ceSopenharmony_cisend_protocol_error_and_close: 1872d4afb5ceSopenharmony_ci lwsl_notice("%s: peac\n", __func__); 1873d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_PROTOCOL_ERROR; 1874d4afb5ceSopenharmony_ci 1875d4afb5ceSopenharmony_cisend_reason_and_close: 1876d4afb5ceSopenharmony_ci lwsl_notice("%s: srac\n", __func__); 1877d4afb5ceSopenharmony_ci par->flag_pending_send_reason_close = 1; 1878d4afb5ceSopenharmony_ci goto ask; 1879d4afb5ceSopenharmony_ci 1880d4afb5ceSopenharmony_cisend_unsupp_connack_and_close: 1881d4afb5ceSopenharmony_ci lwsl_notice("%s: unsupac\n", __func__); 1882d4afb5ceSopenharmony_ci par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL; 1883d4afb5ceSopenharmony_ci par->flag_pending_send_connack_close = 1; 1884d4afb5ceSopenharmony_ci 1885d4afb5ceSopenharmony_ciask: 1886d4afb5ceSopenharmony_ci /* Should we ask for clients? */ 1887d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 1888d4afb5ceSopenharmony_ci 1889d4afb5ceSopenharmony_ci return -1; 1890d4afb5ceSopenharmony_ci} 1891d4afb5ceSopenharmony_ci 1892d4afb5ceSopenharmony_ciint 1893d4afb5ceSopenharmony_cilws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type, 1894d4afb5ceSopenharmony_ci uint8_t dup, lws_mqtt_qos_levels_t qos, 1895d4afb5ceSopenharmony_ci uint8_t retain) 1896d4afb5ceSopenharmony_ci{ 1897d4afb5ceSopenharmony_ci lws_mqtt_fixed_hdr_t hdr; 1898d4afb5ceSopenharmony_ci 1899d4afb5ceSopenharmony_ci hdr.bits = 0; 1900d4afb5ceSopenharmony_ci hdr.flags.ctrl_pkt_type = ctrl_pkt_type & 0xf; 1901d4afb5ceSopenharmony_ci 1902d4afb5ceSopenharmony_ci switch(ctrl_pkt_type) { 1903d4afb5ceSopenharmony_ci case LMQCP_PUBLISH: 1904d4afb5ceSopenharmony_ci hdr.flags.dup = !!dup; 1905d4afb5ceSopenharmony_ci /* 1906d4afb5ceSopenharmony_ci * A PUBLISH Packet MUST NOT have both QoS bits set to 1907d4afb5ceSopenharmony_ci * 1. If a Server or Client receives a PUBLISH Packet 1908d4afb5ceSopenharmony_ci * which has both QoS bits set to 1 it MUST close the 1909d4afb5ceSopenharmony_ci * Network Connection [MQTT-3.3.1-4]. 1910d4afb5ceSopenharmony_ci */ 1911d4afb5ceSopenharmony_ci if (qos >= RESERVED_QOS_LEVEL) { 1912d4afb5ceSopenharmony_ci lwsl_err("%s: Unsupport QoS level 0x%x\n", 1913d4afb5ceSopenharmony_ci __func__, qos); 1914d4afb5ceSopenharmony_ci return -1; 1915d4afb5ceSopenharmony_ci } 1916d4afb5ceSopenharmony_ci hdr.flags.qos = qos & 3; 1917d4afb5ceSopenharmony_ci hdr.flags.retain = !!retain; 1918d4afb5ceSopenharmony_ci break; 1919d4afb5ceSopenharmony_ci 1920d4afb5ceSopenharmony_ci case LMQCP_CTOS_CONNECT: 1921d4afb5ceSopenharmony_ci case LMQCP_STOC_CONNACK: 1922d4afb5ceSopenharmony_ci case LMQCP_PUBACK: 1923d4afb5ceSopenharmony_ci case LMQCP_PUBREC: 1924d4afb5ceSopenharmony_ci case LMQCP_PUBCOMP: 1925d4afb5ceSopenharmony_ci case LMQCP_STOC_SUBACK: 1926d4afb5ceSopenharmony_ci case LMQCP_STOC_UNSUBACK: 1927d4afb5ceSopenharmony_ci case LMQCP_CTOS_PINGREQ: 1928d4afb5ceSopenharmony_ci case LMQCP_STOC_PINGRESP: 1929d4afb5ceSopenharmony_ci case LMQCP_DISCONNECT: 1930d4afb5ceSopenharmony_ci case LMQCP_AUTH: 1931d4afb5ceSopenharmony_ci hdr.bits &= 0xf0; 1932d4afb5ceSopenharmony_ci break; 1933d4afb5ceSopenharmony_ci 1934d4afb5ceSopenharmony_ci /* 1935d4afb5ceSopenharmony_ci * Bits 3,2,1 and 0 of the fixed header of the PUBREL, 1936d4afb5ceSopenharmony_ci * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and 1937d4afb5ceSopenharmony_ci * MUST be set to 0,0,1 and 0 respectively. The Server MUST 1938d4afb5ceSopenharmony_ci * treat any other value as malformed and close the Network 1939d4afb5ceSopenharmony_ci * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1]. 1940d4afb5ceSopenharmony_ci */ 1941d4afb5ceSopenharmony_ci case LMQCP_PUBREL: 1942d4afb5ceSopenharmony_ci case LMQCP_CTOS_SUBSCRIBE: 1943d4afb5ceSopenharmony_ci case LMQCP_CTOS_UNSUBSCRIBE: 1944d4afb5ceSopenharmony_ci hdr.bits |= 0x02; 1945d4afb5ceSopenharmony_ci break; 1946d4afb5ceSopenharmony_ci 1947d4afb5ceSopenharmony_ci default: 1948d4afb5ceSopenharmony_ci return -1; 1949d4afb5ceSopenharmony_ci } 1950d4afb5ceSopenharmony_ci 1951d4afb5ceSopenharmony_ci *p = hdr.bits; 1952d4afb5ceSopenharmony_ci 1953d4afb5ceSopenharmony_ci return 0; 1954d4afb5ceSopenharmony_ci} 1955d4afb5ceSopenharmony_ci 1956d4afb5ceSopenharmony_ciint 1957d4afb5ceSopenharmony_cilws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, 1958d4afb5ceSopenharmony_ci const void *buf, uint32_t len, int is_complete) 1959d4afb5ceSopenharmony_ci{ 1960d4afb5ceSopenharmony_ci struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi]; 1961d4afb5ceSopenharmony_ci uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p; 1962d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(wsi); 1963d4afb5ceSopenharmony_ci lws_mqtt_str_t mqtt_vh_payload; 1964d4afb5ceSopenharmony_ci uint32_t vh_len, rem_len; 1965d4afb5ceSopenharmony_ci 1966d4afb5ceSopenharmony_ci assert(pub->topic); 1967d4afb5ceSopenharmony_ci 1968d4afb5ceSopenharmony_ci lwsl_debug("%s: len = %d, is_complete = %d\n", 1969d4afb5ceSopenharmony_ci __func__, (int)len, (int)is_complete); 1970d4afb5ceSopenharmony_ci 1971d4afb5ceSopenharmony_ci if (lwsi_state(wsi) != LRS_ESTABLISHED) { 1972d4afb5ceSopenharmony_ci lwsl_err("%s: %s: unknown state 0x%x\n", __func__, 1973d4afb5ceSopenharmony_ci lws_wsi_tag(wsi), lwsi_state(wsi)); 1974d4afb5ceSopenharmony_ci assert(0); 1975d4afb5ceSopenharmony_ci return 1; 1976d4afb5ceSopenharmony_ci } 1977d4afb5ceSopenharmony_ci 1978d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_payload) { 1979d4afb5ceSopenharmony_ci /* 1980d4afb5ceSopenharmony_ci * Headers are filled, we are sending 1981d4afb5ceSopenharmony_ci * the payload - a buffer with LWS_PRE 1982d4afb5ceSopenharmony_ci * in front it. 1983d4afb5ceSopenharmony_ci */ 1984d4afb5ceSopenharmony_ci start = (uint8_t *)buf; 1985d4afb5ceSopenharmony_ci p = start + len; 1986d4afb5ceSopenharmony_ci if (is_complete) 1987d4afb5ceSopenharmony_ci wsi->mqtt->inside_payload = 0; 1988d4afb5ceSopenharmony_ci goto do_write; 1989d4afb5ceSopenharmony_ci } 1990d4afb5ceSopenharmony_ci 1991d4afb5ceSopenharmony_ci start = b + LWS_PRE; 1992d4afb5ceSopenharmony_ci p = start; 1993d4afb5ceSopenharmony_ci /* 1994d4afb5ceSopenharmony_ci * Fill headers and the first chunk of the 1995d4afb5ceSopenharmony_ci * payload (if any) 1996d4afb5ceSopenharmony_ci */ 1997d4afb5ceSopenharmony_ci if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH, 1998d4afb5ceSopenharmony_ci pub->dup, pub->qos, pub->retain)) { 1999d4afb5ceSopenharmony_ci lwsl_err("%s: Failed to fill fixed header\n", __func__); 2000d4afb5ceSopenharmony_ci return 1; 2001d4afb5ceSopenharmony_ci } 2002d4afb5ceSopenharmony_ci 2003d4afb5ceSopenharmony_ci /* 2004d4afb5ceSopenharmony_ci * Topic len field + Topic len + Packet ID 2005d4afb5ceSopenharmony_ci * (for QOS>0) + Payload len 2006d4afb5ceSopenharmony_ci */ 2007d4afb5ceSopenharmony_ci vh_len = (unsigned int)(2 + pub->topic_len + ((pub->qos) ? 2 : 0)); 2008d4afb5ceSopenharmony_ci rem_len = vh_len + pub->payload_len; 2009d4afb5ceSopenharmony_ci lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len); 2010d4afb5ceSopenharmony_ci 2011d4afb5ceSopenharmony_ci /* Will the chunk of payload fit? */ 2012d4afb5ceSopenharmony_ci if ((vh_len + len) >= 2013d4afb5ceSopenharmony_ci (wsi->a.context->pt_serv_buf_size - LWS_PRE)) { 2014d4afb5ceSopenharmony_ci lwsl_err("%s: Payload is too big\n", __func__); 2015d4afb5ceSopenharmony_ci return 1; 2016d4afb5ceSopenharmony_ci } 2017d4afb5ceSopenharmony_ci 2018d4afb5ceSopenharmony_ci p += lws_mqtt_vbi_encode(rem_len, p); 2019d4afb5ceSopenharmony_ci 2020d4afb5ceSopenharmony_ci /* Topic's Len */ 2021d4afb5ceSopenharmony_ci lws_ser_wu16be(p, pub->topic_len); 2022d4afb5ceSopenharmony_ci p += 2; 2023d4afb5ceSopenharmony_ci 2024d4afb5ceSopenharmony_ci /* 2025d4afb5ceSopenharmony_ci * Init lws_mqtt_str for "MQTT Variable 2026d4afb5ceSopenharmony_ci * Headers + payload" (only the supplied 2027d4afb5ceSopenharmony_ci * chuncked payload) 2028d4afb5ceSopenharmony_ci */ 2029d4afb5ceSopenharmony_ci lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, 2030d4afb5ceSopenharmony_ci (uint16_t)(unsigned int)(pub->topic_len + ((pub->qos) ? 2u : 0u) + len), 2031d4afb5ceSopenharmony_ci 0); 2032d4afb5ceSopenharmony_ci 2033d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2034d4afb5ceSopenharmony_ci lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1); 2035d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) { 2036d4afb5ceSopenharmony_ci lwsl_err("%s: a\n", __func__); 2037d4afb5ceSopenharmony_ci return 1; 2038d4afb5ceSopenharmony_ci } 2039d4afb5ceSopenharmony_ci 2040d4afb5ceSopenharmony_ci /* Packet ID */ 2041d4afb5ceSopenharmony_ci if (pub->qos != QOS0) { 2042d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2043d4afb5ceSopenharmony_ci if (!pub->dup) 2044d4afb5ceSopenharmony_ci nwsi->mqtt->pkt_id++; 2045d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id; 2046d4afb5ceSopenharmony_ci lwsl_debug("%s: pkt_id = %d\n", __func__, 2047d4afb5ceSopenharmony_ci (int)wsi->mqtt->ack_pkt_id); 2048d4afb5ceSopenharmony_ci lws_ser_wu16be(p, pub->packet_id); 2049d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) { 2050d4afb5ceSopenharmony_ci lwsl_err("%s: b\n", __func__); 2051d4afb5ceSopenharmony_ci return 1; 2052d4afb5ceSopenharmony_ci } 2053d4afb5ceSopenharmony_ci } 2054d4afb5ceSopenharmony_ci 2055d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2056d4afb5ceSopenharmony_ci memcpy(p, buf, len); 2057d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len)) 2058d4afb5ceSopenharmony_ci return 1; 2059d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2060d4afb5ceSopenharmony_ci 2061d4afb5ceSopenharmony_ci if (!is_complete) 2062d4afb5ceSopenharmony_ci nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1; 2063d4afb5ceSopenharmony_ci 2064d4afb5ceSopenharmony_cido_write: 2065d4afb5ceSopenharmony_ci 2066d4afb5ceSopenharmony_ci // lwsl_hexdump_err(start, lws_ptr_diff(p, start)); 2067d4afb5ceSopenharmony_ci 2068d4afb5ceSopenharmony_ci if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) != 2069d4afb5ceSopenharmony_ci lws_ptr_diff(p, start)) { 2070d4afb5ceSopenharmony_ci lwsl_err("%s: write failed\n", __func__); 2071d4afb5ceSopenharmony_ci return 1; 2072d4afb5ceSopenharmony_ci } 2073d4afb5ceSopenharmony_ci 2074d4afb5ceSopenharmony_ci if (!is_complete) { 2075d4afb5ceSopenharmony_ci /* still some more chunks to come... */ 2076d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 2077d4afb5ceSopenharmony_ci 2078d4afb5ceSopenharmony_ci return 0; 2079d4afb5ceSopenharmony_ci } 2080d4afb5ceSopenharmony_ci 2081d4afb5ceSopenharmony_ci wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0; 2082d4afb5ceSopenharmony_ci 2083d4afb5ceSopenharmony_ci if (pub->qos != QOS0) 2084d4afb5ceSopenharmony_ci wsi->mqtt->unacked_publish = 1; 2085d4afb5ceSopenharmony_ci 2086d4afb5ceSopenharmony_ci /* this was the last part of the publish message */ 2087d4afb5ceSopenharmony_ci 2088d4afb5ceSopenharmony_ci if (pub->qos == QOS0) { 2089d4afb5ceSopenharmony_ci /* 2090d4afb5ceSopenharmony_ci * There won't be any real PUBACK, act like we got one 2091d4afb5ceSopenharmony_ci * so the user callback logic is the same for QoS0 or 2092d4afb5ceSopenharmony_ci * QoS1 2093d4afb5ceSopenharmony_ci */ 2094d4afb5ceSopenharmony_ci if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK, 2095d4afb5ceSopenharmony_ci wsi->user_space, NULL, 0)) { 2096d4afb5ceSopenharmony_ci lwsl_err("%s: ACK callback exited\n", __func__); 2097d4afb5ceSopenharmony_ci return 1; 2098d4afb5ceSopenharmony_ci } 2099d4afb5ceSopenharmony_ci } else if (pub->qos == QOS1 || pub->qos == QOS2) { 2100d4afb5ceSopenharmony_ci /* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s, 2101d4afb5ceSopenharmony_ci * we must RETRY the publish 2102d4afb5ceSopenharmony_ci */ 2103d4afb5ceSopenharmony_ci wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend; 2104d4afb5ceSopenharmony_ci __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], 2105d4afb5ceSopenharmony_ci &wsi->mqtt->sul_qos_puback_pubrec_wait, 2106d4afb5ceSopenharmony_ci 3 * LWS_USEC_PER_SEC); 2107d4afb5ceSopenharmony_ci } 2108d4afb5ceSopenharmony_ci 2109d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) { 2110d4afb5ceSopenharmony_ci wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout; 2111d4afb5ceSopenharmony_ci __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], 2112d4afb5ceSopenharmony_ci &wsi->mqtt->sul_shadow_wait, 2113d4afb5ceSopenharmony_ci 60 * LWS_USEC_PER_SEC); 2114d4afb5ceSopenharmony_ci } 2115d4afb5ceSopenharmony_ci 2116d4afb5ceSopenharmony_ci return 0; 2117d4afb5ceSopenharmony_ci} 2118d4afb5ceSopenharmony_ci 2119d4afb5ceSopenharmony_ciint 2120d4afb5ceSopenharmony_cilws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub) 2121d4afb5ceSopenharmony_ci{ 2122d4afb5ceSopenharmony_ci struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi]; 2123d4afb5ceSopenharmony_ci uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start; 2124d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(wsi); 2125d4afb5ceSopenharmony_ci lws_mqtt_str_t mqtt_vh_payload; 2126d4afb5ceSopenharmony_ci uint8_t exists[8], extant; 2127d4afb5ceSopenharmony_ci lws_mqtt_subs_t *mysub; 2128d4afb5ceSopenharmony_ci uint32_t rem_len; 2129d4afb5ceSopenharmony_ci#if defined(_DEBUG) 2130d4afb5ceSopenharmony_ci uint32_t tops; 2131d4afb5ceSopenharmony_ci#endif 2132d4afb5ceSopenharmony_ci uint32_t n; 2133d4afb5ceSopenharmony_ci 2134d4afb5ceSopenharmony_ci assert(sub->num_topics); 2135d4afb5ceSopenharmony_ci assert(sub->num_topics < sizeof(exists)); 2136d4afb5ceSopenharmony_ci 2137d4afb5ceSopenharmony_ci switch (lwsi_state(wsi)) { 2138d4afb5ceSopenharmony_ci case LRS_ESTABLISHED: /* Protocol connection established */ 2139d4afb5ceSopenharmony_ci if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE, 2140d4afb5ceSopenharmony_ci 0, 0, 0)) { 2141d4afb5ceSopenharmony_ci lwsl_err("%s: Failed to fill fixed header\n", __func__); 2142d4afb5ceSopenharmony_ci return 1; 2143d4afb5ceSopenharmony_ci } 2144d4afb5ceSopenharmony_ci 2145d4afb5ceSopenharmony_ci /* 2146d4afb5ceSopenharmony_ci * The stream wants to subscribe to one or more topic, but 2147d4afb5ceSopenharmony_ci * the shared nwsi may already be subscribed to some or all of 2148d4afb5ceSopenharmony_ci * them from interactions with other streams. For those cases, 2149d4afb5ceSopenharmony_ci * we filter them from the list the child wants until we just 2150d4afb5ceSopenharmony_ci * have ones that are new to the nwsi. If nothing left, we just 2151d4afb5ceSopenharmony_ci * synthesize the callback to the child as if SUBACK had come 2152d4afb5ceSopenharmony_ci * and we're done, otherwise just ask the server for topics that 2153d4afb5ceSopenharmony_ci * are new to the wsi. 2154d4afb5ceSopenharmony_ci */ 2155d4afb5ceSopenharmony_ci 2156d4afb5ceSopenharmony_ci extant = 0; 2157d4afb5ceSopenharmony_ci memset(&exists, 0, sizeof(exists)); 2158d4afb5ceSopenharmony_ci for (n = 0; n < sub->num_topics; n++) { 2159d4afb5ceSopenharmony_ci lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n", 2160d4afb5ceSopenharmony_ci __func__, (int)n, sub->topic[n].name); 2161d4afb5ceSopenharmony_ci 2162d4afb5ceSopenharmony_ci mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name); 2163d4afb5ceSopenharmony_ci if (mysub && mysub->ref_count) { 2164d4afb5ceSopenharmony_ci mysub->ref_count++; /* another stream using it */ 2165d4afb5ceSopenharmony_ci exists[n] = 1; 2166d4afb5ceSopenharmony_ci extant++; 2167d4afb5ceSopenharmony_ci } 2168d4afb5ceSopenharmony_ci 2169d4afb5ceSopenharmony_ci /* 2170d4afb5ceSopenharmony_ci * Attach the topic we're subscribing to, to wsi->mqtt 2171d4afb5ceSopenharmony_ci */ 2172d4afb5ceSopenharmony_ci if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) { 2173d4afb5ceSopenharmony_ci lwsl_err("%s: create sub fail\n", __func__); 2174d4afb5ceSopenharmony_ci return 1; 2175d4afb5ceSopenharmony_ci } 2176d4afb5ceSopenharmony_ci } 2177d4afb5ceSopenharmony_ci 2178d4afb5ceSopenharmony_ci if (extant == sub->num_topics) { 2179d4afb5ceSopenharmony_ci /* 2180d4afb5ceSopenharmony_ci * It turns out there's nothing to do here, the nwsi has 2181d4afb5ceSopenharmony_ci * already subscribed to all the topics this stream 2182d4afb5ceSopenharmony_ci * wanted. Just tell it it can have them. 2183d4afb5ceSopenharmony_ci */ 2184d4afb5ceSopenharmony_ci lwsl_notice("%s: all topics already subscribed\n", __func__); 2185d4afb5ceSopenharmony_ci if (user_callback_handle_rxflow( 2186d4afb5ceSopenharmony_ci wsi->a.protocol->callback, 2187d4afb5ceSopenharmony_ci wsi, LWS_CALLBACK_MQTT_SUBSCRIBED, 2188d4afb5ceSopenharmony_ci wsi->user_space, NULL, 0) < 0) { 2189d4afb5ceSopenharmony_ci lwsl_err("%s: MQTT_SUBSCRIBE failed\n", 2190d4afb5ceSopenharmony_ci __func__); 2191d4afb5ceSopenharmony_ci return -1; 2192d4afb5ceSopenharmony_ci } 2193d4afb5ceSopenharmony_ci 2194d4afb5ceSopenharmony_ci return 0; 2195d4afb5ceSopenharmony_ci } 2196d4afb5ceSopenharmony_ci 2197d4afb5ceSopenharmony_ci#if defined(_DEBUG) 2198d4afb5ceSopenharmony_ci /* 2199d4afb5ceSopenharmony_ci * zero or more of the topics already existed, but not all, 2200d4afb5ceSopenharmony_ci * so we must go to the server with a filtered list of the 2201d4afb5ceSopenharmony_ci * new ones only 2202d4afb5ceSopenharmony_ci */ 2203d4afb5ceSopenharmony_ci 2204d4afb5ceSopenharmony_ci tops = sub->num_topics - extant; 2205d4afb5ceSopenharmony_ci#endif 2206d4afb5ceSopenharmony_ci 2207d4afb5ceSopenharmony_ci /* 2208d4afb5ceSopenharmony_ci * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics 2209d4afb5ceSopenharmony_ci */ 2210d4afb5ceSopenharmony_ci rem_len = 2; 2211d4afb5ceSopenharmony_ci for (n = 0; n < sub->num_topics; n++) 2212d4afb5ceSopenharmony_ci if (!exists[n]) 2213d4afb5ceSopenharmony_ci rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1); 2214d4afb5ceSopenharmony_ci 2215d4afb5ceSopenharmony_ci wsi->mqtt->sub_size = (uint16_t)rem_len; 2216d4afb5ceSopenharmony_ci 2217d4afb5ceSopenharmony_ci#if defined(_DEBUG) 2218d4afb5ceSopenharmony_ci lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n", 2219d4afb5ceSopenharmony_ci __func__, (int)tops, (int)rem_len); 2220d4afb5ceSopenharmony_ci#endif 2221d4afb5ceSopenharmony_ci 2222d4afb5ceSopenharmony_ci p += lws_mqtt_vbi_encode(rem_len, p); 2223d4afb5ceSopenharmony_ci 2224d4afb5ceSopenharmony_ci if ((rem_len + lws_ptr_diff_size_t(p, start)) >= 2225d4afb5ceSopenharmony_ci wsi->a.context->pt_serv_buf_size) { 2226d4afb5ceSopenharmony_ci lwsl_err("%s: Payload is too big\n", __func__); 2227d4afb5ceSopenharmony_ci return 1; 2228d4afb5ceSopenharmony_ci } 2229d4afb5ceSopenharmony_ci 2230d4afb5ceSopenharmony_ci /* Init lws_mqtt_str */ 2231d4afb5ceSopenharmony_ci lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0); 2232d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2233d4afb5ceSopenharmony_ci 2234d4afb5ceSopenharmony_ci /* Packet ID */ 2235d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = sub->packet_id = ++nwsi->mqtt->pkt_id; 2236d4afb5ceSopenharmony_ci lwsl_debug("%s: pkt_id = %d\n", __func__, 2237d4afb5ceSopenharmony_ci (int)sub->packet_id); 2238d4afb5ceSopenharmony_ci lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id); 2239d4afb5ceSopenharmony_ci 2240d4afb5ceSopenharmony_ci nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot; 2241d4afb5ceSopenharmony_ci 2242d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2243d4afb5ceSopenharmony_ci return 1; 2244d4afb5ceSopenharmony_ci 2245d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2246d4afb5ceSopenharmony_ci 2247d4afb5ceSopenharmony_ci for (n = 0; n < sub->num_topics; n++) { 2248d4afb5ceSopenharmony_ci lwsl_info("%s: topics[%d] = %s\n", __func__, 2249d4afb5ceSopenharmony_ci (int)n, sub->topic[n].name); 2250d4afb5ceSopenharmony_ci 2251d4afb5ceSopenharmony_ci /* if the nwsi already has it, don't ask server for it */ 2252d4afb5ceSopenharmony_ci if (exists[n]) { 2253d4afb5ceSopenharmony_ci lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n", 2254d4afb5ceSopenharmony_ci __func__, (int)n, sub->topic[n].name); 2255d4afb5ceSopenharmony_ci continue; 2256d4afb5ceSopenharmony_ci } 2257d4afb5ceSopenharmony_ci 2258d4afb5ceSopenharmony_ci /* 2259d4afb5ceSopenharmony_ci * Attach the topic we're subscribing to, to nwsi->mqtt 2260d4afb5ceSopenharmony_ci * so we know the nwsi itself has a subscription to it 2261d4afb5ceSopenharmony_ci */ 2262d4afb5ceSopenharmony_ci 2263d4afb5ceSopenharmony_ci if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name)) 2264d4afb5ceSopenharmony_ci return 1; 2265d4afb5ceSopenharmony_ci 2266d4afb5ceSopenharmony_ci /* Topic's Len */ 2267d4afb5ceSopenharmony_ci lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name)); 2268d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2269d4afb5ceSopenharmony_ci return 1; 2270d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2271d4afb5ceSopenharmony_ci 2272d4afb5ceSopenharmony_ci /* Topic Name */ 2273d4afb5ceSopenharmony_ci lws_strncpy((char *)p, sub->topic[n].name, 2274d4afb5ceSopenharmony_ci strlen(sub->topic[n].name) + 1); 2275d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 2276d4afb5ceSopenharmony_ci (int)strlen(sub->topic[n].name))) 2277d4afb5ceSopenharmony_ci return 1; 2278d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2279d4afb5ceSopenharmony_ci 2280d4afb5ceSopenharmony_ci /* QoS */ 2281d4afb5ceSopenharmony_ci *p = (uint8_t)sub->topic[n].qos; 2282d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 1)) 2283d4afb5ceSopenharmony_ci return 1; 2284d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2285d4afb5ceSopenharmony_ci } 2286d4afb5ceSopenharmony_ci break; 2287d4afb5ceSopenharmony_ci 2288d4afb5ceSopenharmony_ci default: 2289d4afb5ceSopenharmony_ci return 1; 2290d4afb5ceSopenharmony_ci } 2291d4afb5ceSopenharmony_ci 2292d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_resume_session) 2293d4afb5ceSopenharmony_ci return 0; 2294d4afb5ceSopenharmony_ci 2295d4afb5ceSopenharmony_ci if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) != 2296d4afb5ceSopenharmony_ci lws_ptr_diff(p, start)) 2297d4afb5ceSopenharmony_ci return 1; 2298d4afb5ceSopenharmony_ci 2299d4afb5ceSopenharmony_ci wsi->mqtt->inside_subscribe = 1; 2300d4afb5ceSopenharmony_ci 2301d4afb5ceSopenharmony_ci return 0; 2302d4afb5ceSopenharmony_ci} 2303d4afb5ceSopenharmony_ci 2304d4afb5ceSopenharmony_ciint 2305d4afb5ceSopenharmony_cilws_mqtt_client_send_unsubcribe(struct lws *wsi, 2306d4afb5ceSopenharmony_ci const lws_mqtt_subscribe_param_t *unsub) 2307d4afb5ceSopenharmony_ci{ 2308d4afb5ceSopenharmony_ci struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi]; 2309d4afb5ceSopenharmony_ci uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start; 2310d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(wsi); 2311d4afb5ceSopenharmony_ci lws_mqtt_str_t mqtt_vh_payload; 2312d4afb5ceSopenharmony_ci uint8_t send_unsub[8], orphaned; 2313d4afb5ceSopenharmony_ci uint32_t rem_len, n; 2314d4afb5ceSopenharmony_ci lws_mqtt_subs_t *mysub; 2315d4afb5ceSopenharmony_ci#if defined(_DEBUG) 2316d4afb5ceSopenharmony_ci uint32_t tops; 2317d4afb5ceSopenharmony_ci#endif 2318d4afb5ceSopenharmony_ci 2319d4afb5ceSopenharmony_ci lwsl_info("%s: Enter\n", __func__); 2320d4afb5ceSopenharmony_ci 2321d4afb5ceSopenharmony_ci switch (lwsi_state(wsi)) { 2322d4afb5ceSopenharmony_ci case LRS_ESTABLISHED: /* Protocol connection established */ 2323d4afb5ceSopenharmony_ci orphaned = 0; 2324d4afb5ceSopenharmony_ci memset(&send_unsub, 0, sizeof(send_unsub)); 2325d4afb5ceSopenharmony_ci for (n = 0; n < unsub->num_topics; n++) { 2326d4afb5ceSopenharmony_ci mysub = lws_mqtt_find_sub(nwsi->mqtt, 2327d4afb5ceSopenharmony_ci unsub->topic[n].name); 2328d4afb5ceSopenharmony_ci assert(mysub); 2329d4afb5ceSopenharmony_ci 2330d4afb5ceSopenharmony_ci if (mysub && --mysub->ref_count == 0) { 2331d4afb5ceSopenharmony_ci lwsl_notice("%s: Need to send UNSUB\n", __func__); 2332d4afb5ceSopenharmony_ci send_unsub[n] = 1; 2333d4afb5ceSopenharmony_ci orphaned++; 2334d4afb5ceSopenharmony_ci } 2335d4afb5ceSopenharmony_ci } 2336d4afb5ceSopenharmony_ci 2337d4afb5ceSopenharmony_ci if (!orphaned) { 2338d4afb5ceSopenharmony_ci /* 2339d4afb5ceSopenharmony_ci * The nwsi still has other subscribers bound to the 2340d4afb5ceSopenharmony_ci * topics. 2341d4afb5ceSopenharmony_ci * 2342d4afb5ceSopenharmony_ci * So, don't send UNSUB to server, and just fake the 2343d4afb5ceSopenharmony_ci * UNSUB ACK event for the guy going away. 2344d4afb5ceSopenharmony_ci */ 2345d4afb5ceSopenharmony_ci lwsl_notice("%s: unsubscribed!\n", __func__); 2346d4afb5ceSopenharmony_ci if (user_callback_handle_rxflow( 2347d4afb5ceSopenharmony_ci wsi->a.protocol->callback, 2348d4afb5ceSopenharmony_ci wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED, 2349d4afb5ceSopenharmony_ci wsi->user_space, NULL, 0) < 0) { 2350d4afb5ceSopenharmony_ci /* 2351d4afb5ceSopenharmony_ci * We can't directly close here, because the 2352d4afb5ceSopenharmony_ci * caller still has the wsi. Inform the 2353d4afb5ceSopenharmony_ci * caller that we want to close 2354d4afb5ceSopenharmony_ci */ 2355d4afb5ceSopenharmony_ci 2356d4afb5ceSopenharmony_ci return 1; 2357d4afb5ceSopenharmony_ci } 2358d4afb5ceSopenharmony_ci 2359d4afb5ceSopenharmony_ci return 0; 2360d4afb5ceSopenharmony_ci } 2361d4afb5ceSopenharmony_ci#if defined(_DEBUG) 2362d4afb5ceSopenharmony_ci /* 2363d4afb5ceSopenharmony_ci * one or more of the topics needs to be unsubscribed 2364d4afb5ceSopenharmony_ci * from, so we must go to the server with a filtered 2365d4afb5ceSopenharmony_ci * list of the new ones only 2366d4afb5ceSopenharmony_ci */ 2367d4afb5ceSopenharmony_ci 2368d4afb5ceSopenharmony_ci tops = orphaned; 2369d4afb5ceSopenharmony_ci#endif 2370d4afb5ceSopenharmony_ci 2371d4afb5ceSopenharmony_ci if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE, 2372d4afb5ceSopenharmony_ci 0, 0, 0)) { 2373d4afb5ceSopenharmony_ci lwsl_err("%s: Failed to fill fixed header\n", __func__); 2374d4afb5ceSopenharmony_ci return 1; 2375d4afb5ceSopenharmony_ci } 2376d4afb5ceSopenharmony_ci 2377d4afb5ceSopenharmony_ci /* 2378d4afb5ceSopenharmony_ci * Pid + (Topic len field + Topic len) x Num of Topics 2379d4afb5ceSopenharmony_ci */ 2380d4afb5ceSopenharmony_ci rem_len = 2; 2381d4afb5ceSopenharmony_ci for (n = 0; n < unsub->num_topics; n++) 2382d4afb5ceSopenharmony_ci if (send_unsub[n]) 2383d4afb5ceSopenharmony_ci rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name)); 2384d4afb5ceSopenharmony_ci 2385d4afb5ceSopenharmony_ci wsi->mqtt->sub_size = (uint16_t)rem_len; 2386d4afb5ceSopenharmony_ci 2387d4afb5ceSopenharmony_ci#if defined(_DEBUG) 2388d4afb5ceSopenharmony_ci lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n", 2389d4afb5ceSopenharmony_ci __func__, (int)tops, (int)rem_len); 2390d4afb5ceSopenharmony_ci#endif 2391d4afb5ceSopenharmony_ci 2392d4afb5ceSopenharmony_ci p += lws_mqtt_vbi_encode(rem_len, p); 2393d4afb5ceSopenharmony_ci 2394d4afb5ceSopenharmony_ci if ((rem_len + lws_ptr_diff_size_t(p, start)) >= 2395d4afb5ceSopenharmony_ci wsi->a.context->pt_serv_buf_size) { 2396d4afb5ceSopenharmony_ci lwsl_err("%s: Payload is too big\n", __func__); 2397d4afb5ceSopenharmony_ci return 1; 2398d4afb5ceSopenharmony_ci } 2399d4afb5ceSopenharmony_ci 2400d4afb5ceSopenharmony_ci /* Init lws_mqtt_str */ 2401d4afb5ceSopenharmony_ci lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, (uint16_t)rem_len, 0); 2402d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2403d4afb5ceSopenharmony_ci 2404d4afb5ceSopenharmony_ci /* Packet ID */ 2405d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id; 2406d4afb5ceSopenharmony_ci lwsl_debug("%s: pkt_id = %d\n", __func__, 2407d4afb5ceSopenharmony_ci (int)wsi->mqtt->ack_pkt_id); 2408d4afb5ceSopenharmony_ci lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id); 2409d4afb5ceSopenharmony_ci 2410d4afb5ceSopenharmony_ci nwsi->mqtt->client.aws_iot = wsi->mqtt->client.aws_iot; 2411d4afb5ceSopenharmony_ci 2412d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2413d4afb5ceSopenharmony_ci return 1; 2414d4afb5ceSopenharmony_ci 2415d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2416d4afb5ceSopenharmony_ci 2417d4afb5ceSopenharmony_ci for (n = 0; n < unsub->num_topics; n++) { 2418d4afb5ceSopenharmony_ci lwsl_info("%s: topics[%d] = %s\n", __func__, 2419d4afb5ceSopenharmony_ci (int)n, unsub->topic[n].name); 2420d4afb5ceSopenharmony_ci 2421d4afb5ceSopenharmony_ci /* 2422d4afb5ceSopenharmony_ci * Subscriber still bound to it, don't UBSUB 2423d4afb5ceSopenharmony_ci * from the server 2424d4afb5ceSopenharmony_ci */ 2425d4afb5ceSopenharmony_ci if (!send_unsub[n]) 2426d4afb5ceSopenharmony_ci continue; 2427d4afb5ceSopenharmony_ci 2428d4afb5ceSopenharmony_ci /* Topic's Len */ 2429d4afb5ceSopenharmony_ci lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name)); 2430d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) 2431d4afb5ceSopenharmony_ci return 1; 2432d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2433d4afb5ceSopenharmony_ci 2434d4afb5ceSopenharmony_ci /* Topic Name */ 2435d4afb5ceSopenharmony_ci lws_strncpy((char *)p, unsub->topic[n].name, 2436d4afb5ceSopenharmony_ci strlen(unsub->topic[n].name) + 1); 2437d4afb5ceSopenharmony_ci if (lws_mqtt_str_advance(&mqtt_vh_payload, 2438d4afb5ceSopenharmony_ci (int)strlen(unsub->topic[n].name))) 2439d4afb5ceSopenharmony_ci return 1; 2440d4afb5ceSopenharmony_ci p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); 2441d4afb5ceSopenharmony_ci } 2442d4afb5ceSopenharmony_ci break; 2443d4afb5ceSopenharmony_ci 2444d4afb5ceSopenharmony_ci default: 2445d4afb5ceSopenharmony_ci return 1; 2446d4afb5ceSopenharmony_ci } 2447d4afb5ceSopenharmony_ci 2448d4afb5ceSopenharmony_ci if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) != 2449d4afb5ceSopenharmony_ci lws_ptr_diff(p, start)) 2450d4afb5ceSopenharmony_ci return 1; 2451d4afb5ceSopenharmony_ci 2452d4afb5ceSopenharmony_ci wsi->mqtt->inside_unsubscribe = 1; 2453d4afb5ceSopenharmony_ci 2454d4afb5ceSopenharmony_ci wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout; 2455d4afb5ceSopenharmony_ci __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], 2456d4afb5ceSopenharmony_ci &wsi->mqtt->sul_unsuback_wait, 2457d4afb5ceSopenharmony_ci 3 * LWS_USEC_PER_SEC); 2458d4afb5ceSopenharmony_ci 2459d4afb5ceSopenharmony_ci return 0; 2460d4afb5ceSopenharmony_ci} 2461d4afb5ceSopenharmony_ci 2462d4afb5ceSopenharmony_ci/* 2463d4afb5ceSopenharmony_ci * This is called when child streams bind to an already-existing and compatible 2464d4afb5ceSopenharmony_ci * MQTT stream 2465d4afb5ceSopenharmony_ci */ 2466d4afb5ceSopenharmony_ci 2467d4afb5ceSopenharmony_cistruct lws * 2468d4afb5ceSopenharmony_cilws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi) 2469d4afb5ceSopenharmony_ci{ 2470d4afb5ceSopenharmony_ci /* no more children allowed by parent? */ 2471d4afb5ceSopenharmony_ci 2472d4afb5ceSopenharmony_ci if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) { 2473d4afb5ceSopenharmony_ci lwsl_err("%s: reached concurrent stream limit\n", __func__); 2474d4afb5ceSopenharmony_ci return NULL; 2475d4afb5ceSopenharmony_ci } 2476d4afb5ceSopenharmony_ci 2477d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 2478d4afb5ceSopenharmony_ci wsi->client_mux_substream = 1; 2479d4afb5ceSopenharmony_ci#endif 2480d4afb5ceSopenharmony_ci 2481d4afb5ceSopenharmony_ci lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid); 2482d4afb5ceSopenharmony_ci 2483d4afb5ceSopenharmony_ci if (lws_ensure_user_space(wsi)) 2484d4afb5ceSopenharmony_ci goto bail1; 2485d4afb5ceSopenharmony_ci 2486d4afb5ceSopenharmony_ci lws_mqtt_set_client_established(wsi); 2487d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 2488d4afb5ceSopenharmony_ci 2489d4afb5ceSopenharmony_ci return wsi; 2490d4afb5ceSopenharmony_ci 2491d4afb5ceSopenharmony_cibail1: 2492d4afb5ceSopenharmony_ci /* undo the insert */ 2493d4afb5ceSopenharmony_ci parent_wsi->mux.child_list = wsi->mux.sibling_list; 2494d4afb5ceSopenharmony_ci parent_wsi->mux.child_count--; 2495d4afb5ceSopenharmony_ci 2496d4afb5ceSopenharmony_ci if (wsi->user_space) 2497d4afb5ceSopenharmony_ci lws_free_set_NULL(wsi->user_space); 2498d4afb5ceSopenharmony_ci 2499d4afb5ceSopenharmony_ci wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0); 2500d4afb5ceSopenharmony_ci lws_free(wsi); 2501d4afb5ceSopenharmony_ci 2502d4afb5ceSopenharmony_ci return NULL; 2503d4afb5ceSopenharmony_ci} 2504d4afb5ceSopenharmony_ci 2505