1d4afb5ceSopenharmony_ci/* 2d4afb5ceSopenharmony_ci * libwebsockets - small server side websockets and web server implementation 3d4afb5ceSopenharmony_ci * 4d4afb5ceSopenharmony_ci * Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com> 5d4afb5ceSopenharmony_ci * 6d4afb5ceSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy 7d4afb5ceSopenharmony_ci * of this software and associated documentation files (the "Software"), to 8d4afb5ceSopenharmony_ci * deal in the Software without restriction, including without limitation the 9d4afb5ceSopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10d4afb5ceSopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is 11d4afb5ceSopenharmony_ci * furnished to do so, subject to the following conditions: 12d4afb5ceSopenharmony_ci * 13d4afb5ceSopenharmony_ci * The above copyright notice and this permission notice shall be included in 14d4afb5ceSopenharmony_ci * all copies or substantial portions of the Software. 15d4afb5ceSopenharmony_ci * 16d4afb5ceSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17d4afb5ceSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18d4afb5ceSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19d4afb5ceSopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20d4afb5ceSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21d4afb5ceSopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22d4afb5ceSopenharmony_ci * IN THE SOFTWARE. 23d4afb5ceSopenharmony_ci */ 24d4afb5ceSopenharmony_ci 25d4afb5ceSopenharmony_ci#include "private-lib-core.h" 26d4afb5ceSopenharmony_ci 27d4afb5ceSopenharmony_cistatic int 28d4afb5ceSopenharmony_cirops_handle_POLLIN_mqtt(struct lws_context_per_thread *pt, struct lws *wsi, 29d4afb5ceSopenharmony_ci struct lws_pollfd *pollfd) 30d4afb5ceSopenharmony_ci{ 31d4afb5ceSopenharmony_ci unsigned int pending = 0; 32d4afb5ceSopenharmony_ci struct lws_tokens ebuf; 33d4afb5ceSopenharmony_ci int n = 0; 34d4afb5ceSopenharmony_ci char buffered = 0; 35d4afb5ceSopenharmony_ci 36d4afb5ceSopenharmony_ci lwsl_debug("%s: wsistate 0x%x, %s pollout %d\n", __func__, 37d4afb5ceSopenharmony_ci (unsigned int)wsi->wsistate, wsi->a.protocol->name, 38d4afb5ceSopenharmony_ci pollfd->revents); 39d4afb5ceSopenharmony_ci 40d4afb5ceSopenharmony_ci /* 41d4afb5ceSopenharmony_ci * After the CONNACK and nwsi establishment, the first logical 42d4afb5ceSopenharmony_ci * stream is migrated out of the nwsi to be child sid 1, and the 43d4afb5ceSopenharmony_ci * nwsi no longer has a wsi->mqtt of its own. 44d4afb5ceSopenharmony_ci * 45d4afb5ceSopenharmony_ci * RX events on the nwsi must be converted to events seen or not 46d4afb5ceSopenharmony_ci * seen by one or more child streams. 47d4afb5ceSopenharmony_ci * 48d4afb5ceSopenharmony_ci * SUBACK - reflected to child stream that asked for it 49d4afb5ceSopenharmony_ci * PUBACK - routed to child that did the related publish 50d4afb5ceSopenharmony_ci */ 51d4afb5ceSopenharmony_ci 52d4afb5ceSopenharmony_ci ebuf.token = NULL; 53d4afb5ceSopenharmony_ci ebuf.len = 0; 54d4afb5ceSopenharmony_ci 55d4afb5ceSopenharmony_ci if (lwsi_state(wsi) != LRS_ESTABLISHED) { 56d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 57d4afb5ceSopenharmony_ci 58d4afb5ceSopenharmony_ci if (lwsi_state(wsi) == LRS_WAITING_SSL && 59d4afb5ceSopenharmony_ci ((pollfd->revents & LWS_POLLOUT)) && 60d4afb5ceSopenharmony_ci lws_change_pollfd(wsi, LWS_POLLOUT, 0)) { 61d4afb5ceSopenharmony_ci lwsl_info("failed at set pollfd\n"); 62d4afb5ceSopenharmony_ci return LWS_HPI_RET_PLEASE_CLOSE_ME; 63d4afb5ceSopenharmony_ci } 64d4afb5ceSopenharmony_ci 65d4afb5ceSopenharmony_ci if ((pollfd->revents & LWS_POLLOUT) && 66d4afb5ceSopenharmony_ci lws_handle_POLLOUT_event(wsi, pollfd)) { 67d4afb5ceSopenharmony_ci lwsl_debug("POLLOUT event closed it\n"); 68d4afb5ceSopenharmony_ci return LWS_HPI_RET_PLEASE_CLOSE_ME; 69d4afb5ceSopenharmony_ci } 70d4afb5ceSopenharmony_ci 71d4afb5ceSopenharmony_ci n = lws_mqtt_client_socket_service(wsi, pollfd, NULL); 72d4afb5ceSopenharmony_ci if (n) 73d4afb5ceSopenharmony_ci return LWS_HPI_RET_WSI_ALREADY_DIED; 74d4afb5ceSopenharmony_ci#endif 75d4afb5ceSopenharmony_ci return LWS_HPI_RET_HANDLED; 76d4afb5ceSopenharmony_ci } 77d4afb5ceSopenharmony_ci 78d4afb5ceSopenharmony_ci /* 1: something requested a callback when it was OK to write */ 79d4afb5ceSopenharmony_ci 80d4afb5ceSopenharmony_ci if ((pollfd->revents & LWS_POLLOUT) && 81d4afb5ceSopenharmony_ci lwsi_state_can_handle_POLLOUT(wsi) && 82d4afb5ceSopenharmony_ci lws_handle_POLLOUT_event(wsi, pollfd)) { 83d4afb5ceSopenharmony_ci if (lwsi_state(wsi) == LRS_RETURNED_CLOSE) 84d4afb5ceSopenharmony_ci lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE); 85d4afb5ceSopenharmony_ci 86d4afb5ceSopenharmony_ci return LWS_HPI_RET_PLEASE_CLOSE_ME; 87d4afb5ceSopenharmony_ci } 88d4afb5ceSopenharmony_ci 89d4afb5ceSopenharmony_ci /* 3: buflist needs to be drained 90d4afb5ceSopenharmony_ci */ 91d4afb5ceSopenharmony_ciread: 92d4afb5ceSopenharmony_ci // lws_buflist_describe(&wsi->buflist, wsi, __func__); 93d4afb5ceSopenharmony_ci ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist, &ebuf.token); 94d4afb5ceSopenharmony_ci if (ebuf.len) { 95d4afb5ceSopenharmony_ci lwsl_info("draining buflist (len %d)\n", ebuf.len); 96d4afb5ceSopenharmony_ci buffered = 1; 97d4afb5ceSopenharmony_ci goto drain; 98d4afb5ceSopenharmony_ci } 99d4afb5ceSopenharmony_ci 100d4afb5ceSopenharmony_ci if (!(pollfd->revents & pollfd->events & LWS_POLLIN)) 101d4afb5ceSopenharmony_ci return LWS_HPI_RET_HANDLED; 102d4afb5ceSopenharmony_ci 103d4afb5ceSopenharmony_ci /* if (lws_is_flowcontrolled(wsi)) { */ 104d4afb5ceSopenharmony_ci /* lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n", */ 105d4afb5ceSopenharmony_ci /* __func__, wsi, wsi->rxflow_bitmap); */ 106d4afb5ceSopenharmony_ci /* return LWS_HPI_RET_HANDLED; */ 107d4afb5ceSopenharmony_ci /* } */ 108d4afb5ceSopenharmony_ci 109d4afb5ceSopenharmony_ci if (!(lwsi_role_client(wsi) && lwsi_state(wsi) != LRS_ESTABLISHED)) { 110d4afb5ceSopenharmony_ci /* 111d4afb5ceSopenharmony_ci * In case we are going to react to this rx by scheduling 112d4afb5ceSopenharmony_ci * writes, we need to restrict the amount of rx to the size 113d4afb5ceSopenharmony_ci * the protocol reported for rx buffer. 114d4afb5ceSopenharmony_ci * 115d4afb5ceSopenharmony_ci * Otherwise we get a situation we have to absorb possibly a 116d4afb5ceSopenharmony_ci * lot of reads before we get a chance to drain them by writing 117d4afb5ceSopenharmony_ci * them, eg, with echo type tests in autobahn. 118d4afb5ceSopenharmony_ci */ 119d4afb5ceSopenharmony_ci 120d4afb5ceSopenharmony_ci buffered = 0; 121d4afb5ceSopenharmony_ci ebuf.token = pt->serv_buf; 122d4afb5ceSopenharmony_ci ebuf.len = (int)wsi->a.context->pt_serv_buf_size; 123d4afb5ceSopenharmony_ci 124d4afb5ceSopenharmony_ci if ((unsigned int)ebuf.len > wsi->a.context->pt_serv_buf_size) 125d4afb5ceSopenharmony_ci ebuf.len = (int)wsi->a.context->pt_serv_buf_size; 126d4afb5ceSopenharmony_ci 127d4afb5ceSopenharmony_ci if ((int)pending > ebuf.len) 128d4afb5ceSopenharmony_ci pending = (unsigned int)ebuf.len; 129d4afb5ceSopenharmony_ci 130d4afb5ceSopenharmony_ci ebuf.len = lws_ssl_capable_read(wsi, ebuf.token, 131d4afb5ceSopenharmony_ci pending ? pending : 132d4afb5ceSopenharmony_ci (unsigned int)ebuf.len); 133d4afb5ceSopenharmony_ci switch (ebuf.len) { 134d4afb5ceSopenharmony_ci case 0: 135d4afb5ceSopenharmony_ci lwsl_info("%s: zero length read\n", 136d4afb5ceSopenharmony_ci __func__); 137d4afb5ceSopenharmony_ci return LWS_HPI_RET_PLEASE_CLOSE_ME; 138d4afb5ceSopenharmony_ci case LWS_SSL_CAPABLE_MORE_SERVICE: 139d4afb5ceSopenharmony_ci lwsl_info("SSL Capable more service\n"); 140d4afb5ceSopenharmony_ci return LWS_HPI_RET_HANDLED; 141d4afb5ceSopenharmony_ci case LWS_SSL_CAPABLE_ERROR: 142d4afb5ceSopenharmony_ci lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n", 143d4afb5ceSopenharmony_ci __func__); 144d4afb5ceSopenharmony_ci return LWS_HPI_RET_PLEASE_CLOSE_ME; 145d4afb5ceSopenharmony_ci } 146d4afb5ceSopenharmony_ci 147d4afb5ceSopenharmony_ci /* 148d4afb5ceSopenharmony_ci * coverity thinks ssl_capable_read() may read over 149d4afb5ceSopenharmony_ci * 2GB. Dissuade it... 150d4afb5ceSopenharmony_ci */ 151d4afb5ceSopenharmony_ci ebuf.len &= 0x7fffffff; 152d4afb5ceSopenharmony_ci } 153d4afb5ceSopenharmony_ci 154d4afb5ceSopenharmony_cidrain: 155d4afb5ceSopenharmony_ci /* service incoming data */ 156d4afb5ceSopenharmony_ci //lws_buflist_describe(&wsi->buflist, wsi, __func__); 157d4afb5ceSopenharmony_ci if (ebuf.len) { 158d4afb5ceSopenharmony_ci n = lws_read_mqtt(wsi, ebuf.token, (unsigned int)ebuf.len); 159d4afb5ceSopenharmony_ci if (n < 0) { 160d4afb5ceSopenharmony_ci lwsl_notice("%s: lws_read_mqtt returned %d\n", 161d4afb5ceSopenharmony_ci __func__, n); 162d4afb5ceSopenharmony_ci /* we closed wsi */ 163d4afb5ceSopenharmony_ci goto fail; 164d4afb5ceSopenharmony_ci } 165d4afb5ceSopenharmony_ci // lws_buflist_describe(&wsi->buflist, wsi, __func__); 166d4afb5ceSopenharmony_ci lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len); 167d4afb5ceSopenharmony_ci if (lws_buflist_aware_finished_consuming(wsi, &ebuf, ebuf.len, 168d4afb5ceSopenharmony_ci buffered, __func__)) 169d4afb5ceSopenharmony_ci return LWS_HPI_RET_PLEASE_CLOSE_ME; 170d4afb5ceSopenharmony_ci } 171d4afb5ceSopenharmony_ci 172d4afb5ceSopenharmony_ci ebuf.token = NULL; 173d4afb5ceSopenharmony_ci ebuf.len = 0; 174d4afb5ceSopenharmony_ci 175d4afb5ceSopenharmony_ci pending = (unsigned int)lws_ssl_pending(wsi); 176d4afb5ceSopenharmony_ci if (pending) { 177d4afb5ceSopenharmony_ci pending = pending > wsi->a.context->pt_serv_buf_size ? 178d4afb5ceSopenharmony_ci wsi->a.context->pt_serv_buf_size : pending; 179d4afb5ceSopenharmony_ci goto read; 180d4afb5ceSopenharmony_ci } 181d4afb5ceSopenharmony_ci 182d4afb5ceSopenharmony_ci if (buffered && /* were draining, now nothing left */ 183d4afb5ceSopenharmony_ci !lws_buflist_next_segment_len(&wsi->buflist, NULL)) { 184d4afb5ceSopenharmony_ci lwsl_info("%s: %s flow buf: drained\n", __func__, lws_wsi_tag(wsi)); 185d4afb5ceSopenharmony_ci /* having drained the rxflow buffer, can rearm POLLIN */ 186d4afb5ceSopenharmony_ci#if !defined(LWS_WITH_SERVER) 187d4afb5ceSopenharmony_ci n = 188d4afb5ceSopenharmony_ci#endif 189d4afb5ceSopenharmony_ci __lws_rx_flow_control(wsi); 190d4afb5ceSopenharmony_ci /* n ignored, needed for NO_SERVER case */ 191d4afb5ceSopenharmony_ci } 192d4afb5ceSopenharmony_ci 193d4afb5ceSopenharmony_ci /* n = 0 */ 194d4afb5ceSopenharmony_ci return LWS_HPI_RET_HANDLED; 195d4afb5ceSopenharmony_ci 196d4afb5ceSopenharmony_cifail: 197d4afb5ceSopenharmony_ci lwsl_err("%s: Failed, bailing\n", __func__); 198d4afb5ceSopenharmony_ci lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail"); 199d4afb5ceSopenharmony_ci 200d4afb5ceSopenharmony_ci return LWS_HPI_RET_WSI_ALREADY_DIED; 201d4afb5ceSopenharmony_ci} 202d4afb5ceSopenharmony_ci 203d4afb5ceSopenharmony_ci#if 0 /* defined(LWS_WITH_SERVER) */ 204d4afb5ceSopenharmony_ci 205d4afb5ceSopenharmony_cistatic int 206d4afb5ceSopenharmony_cirops_adoption_bind_mqtt(struct lws *wsi, int type, const char *vh_prot_name) 207d4afb5ceSopenharmony_ci{ 208d4afb5ceSopenharmony_ci /* no http but socket... must be mqtt */ 209d4afb5ceSopenharmony_ci if ((type & LWS_ADOPT_HTTP) || !(type & LWS_ADOPT_SOCKET) || 210d4afb5ceSopenharmony_ci (type & _LWS_ADOPT_FINISH)) 211d4afb5ceSopenharmony_ci return 0; /* no match */ 212d4afb5ceSopenharmony_ci 213d4afb5ceSopenharmony_ci lws_role_transition(wsi, 0, (type & LWS_ADOPT_ALLOW_SSL) ? LRS_SSL_INIT : 214d4afb5ceSopenharmony_ci LRS_ESTABLISHED, &role_ops_mqtt); 215d4afb5ceSopenharmony_ci 216d4afb5ceSopenharmony_ci if (vh_prot_name) 217d4afb5ceSopenharmony_ci lws_bind_protocol(wsi, wsi->a.protocol, __func__); 218d4afb5ceSopenharmony_ci else 219d4afb5ceSopenharmony_ci /* this is the only time he will transition */ 220d4afb5ceSopenharmony_ci lws_bind_protocol(wsi, 221d4afb5ceSopenharmony_ci &wsi->a.vhost->protocols[wsi->a.vhost->mqtt_protocol_index], 222d4afb5ceSopenharmony_ci __func__); 223d4afb5ceSopenharmony_ci 224d4afb5ceSopenharmony_ci return 1; /* bound */ 225d4afb5ceSopenharmony_ci} 226d4afb5ceSopenharmony_ci#endif 227d4afb5ceSopenharmony_ci 228d4afb5ceSopenharmony_cistatic int 229d4afb5ceSopenharmony_cirops_client_bind_mqtt(struct lws *wsi, const struct lws_client_connect_info *i) 230d4afb5ceSopenharmony_ci{ 231d4afb5ceSopenharmony_ci lwsl_debug("%s: i = %p\n", __func__, i); 232d4afb5ceSopenharmony_ci if (!i) { 233d4afb5ceSopenharmony_ci 234d4afb5ceSopenharmony_ci /* finalize */ 235d4afb5ceSopenharmony_ci 236d4afb5ceSopenharmony_ci if (!wsi->user_space && wsi->stash->cis[CIS_METHOD]) 237d4afb5ceSopenharmony_ci if (lws_ensure_user_space(wsi)) 238d4afb5ceSopenharmony_ci return 1; 239d4afb5ceSopenharmony_ci 240d4afb5ceSopenharmony_ci if (!wsi->stash->cis[CIS_METHOD] && !wsi->stash->cis[CIS_ALPN]) 241d4afb5ceSopenharmony_ci wsi->stash->cis[CIS_ALPN] = "x-amzn-mqtt-ca"; 242d4afb5ceSopenharmony_ci 243d4afb5ceSopenharmony_ci /* if we went on the ah waiting list, it's ok, we can 244d4afb5ceSopenharmony_ci * wait. 245d4afb5ceSopenharmony_ci * 246d4afb5ceSopenharmony_ci * When we do get the ah, now or later, he will end up 247d4afb5ceSopenharmony_ci * at lws_http_client_connect_via_info2(). 248d4afb5ceSopenharmony_ci */ 249d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 250d4afb5ceSopenharmony_ci if (lws_header_table_attach(wsi, 0) < 0) 251d4afb5ceSopenharmony_ci /* 252d4afb5ceSopenharmony_ci * if we failed here, the connection is already closed 253d4afb5ceSopenharmony_ci * and freed. 254d4afb5ceSopenharmony_ci */ 255d4afb5ceSopenharmony_ci return -1; 256d4afb5ceSopenharmony_ci#else 257d4afb5ceSopenharmony_ci if (lws_header_table_attach(wsi, 0)) 258d4afb5ceSopenharmony_ci return 0; 259d4afb5ceSopenharmony_ci#endif 260d4afb5ceSopenharmony_ci return 0; 261d4afb5ceSopenharmony_ci } 262d4afb5ceSopenharmony_ci 263d4afb5ceSopenharmony_ci /* if a recognized mqtt method, bind to it */ 264d4afb5ceSopenharmony_ci if (strcmp(i->method, "MQTT")) 265d4afb5ceSopenharmony_ci return 0; /* no match */ 266d4afb5ceSopenharmony_ci 267d4afb5ceSopenharmony_ci if (lws_create_client_mqtt_object(i, wsi)) 268d4afb5ceSopenharmony_ci return 1; 269d4afb5ceSopenharmony_ci 270d4afb5ceSopenharmony_ci lws_role_transition(wsi, LWSIFR_CLIENT, LRS_UNCONNECTED, 271d4afb5ceSopenharmony_ci &role_ops_mqtt); 272d4afb5ceSopenharmony_ci return 1; /* matched */ 273d4afb5ceSopenharmony_ci} 274d4afb5ceSopenharmony_ci 275d4afb5ceSopenharmony_cistatic int 276d4afb5ceSopenharmony_cirops_handle_POLLOUT_mqtt(struct lws *wsi) 277d4afb5ceSopenharmony_ci{ 278d4afb5ceSopenharmony_ci struct lws **wsi2; 279d4afb5ceSopenharmony_ci 280d4afb5ceSopenharmony_ci lwsl_debug("%s\n", __func__); 281d4afb5ceSopenharmony_ci 282d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 283d4afb5ceSopenharmony_ci if (wsi->mqtt && wsi->mqtt->send_pingreq && !wsi->mqtt->inside_payload) { 284d4afb5ceSopenharmony_ci uint8_t buf[LWS_PRE + 2]; 285d4afb5ceSopenharmony_ci 286d4afb5ceSopenharmony_ci /* 287d4afb5ceSopenharmony_ci * We are swallowing this POLLOUT in order to send a PINGREQ 288d4afb5ceSopenharmony_ci * autonomously 289d4afb5ceSopenharmony_ci */ 290d4afb5ceSopenharmony_ci 291d4afb5ceSopenharmony_ci wsi->mqtt->send_pingreq = 0; 292d4afb5ceSopenharmony_ci 293d4afb5ceSopenharmony_ci lwsl_notice("%s: issuing PINGREQ\n", __func__); 294d4afb5ceSopenharmony_ci 295d4afb5ceSopenharmony_ci buf[LWS_PRE] = LMQCP_CTOS_PINGREQ << 4; 296d4afb5ceSopenharmony_ci buf[LWS_PRE + 1] = 0; 297d4afb5ceSopenharmony_ci 298d4afb5ceSopenharmony_ci if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 2, 299d4afb5ceSopenharmony_ci LWS_WRITE_BINARY) != 2) 300d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_DIE; 301d4afb5ceSopenharmony_ci 302d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_OK; 303d4afb5ceSopenharmony_ci } 304d4afb5ceSopenharmony_ci#endif 305d4afb5ceSopenharmony_ci if (wsi->mqtt && !wsi->mqtt->inside_payload && 306d4afb5ceSopenharmony_ci (wsi->mqtt->send_pubrec || wsi->mqtt->send_pubrel || 307d4afb5ceSopenharmony_ci wsi->mqtt->send_pubcomp)) { 308d4afb5ceSopenharmony_ci uint8_t buf[LWS_PRE + 4]; 309d4afb5ceSopenharmony_ci /* Remaining len = 2 */ 310d4afb5ceSopenharmony_ci buf[LWS_PRE + 1] = 2; 311d4afb5ceSopenharmony_ci if (wsi->mqtt->send_pubrec) { 312d4afb5ceSopenharmony_ci lwsl_notice("%s: issuing PUBREC for pkt id: %d\n", 313d4afb5ceSopenharmony_ci __func__, wsi->mqtt->peer_ack_pkt_id); 314d4afb5ceSopenharmony_ci buf[LWS_PRE] = LMQCP_PUBREC << 4 | 0x2; 315d4afb5ceSopenharmony_ci /* Packet ID */ 316d4afb5ceSopenharmony_ci lws_ser_wu16be(&buf[LWS_PRE + 2], 317d4afb5ceSopenharmony_ci wsi->mqtt->peer_ack_pkt_id); 318d4afb5ceSopenharmony_ci wsi->mqtt->send_pubrec = 0; 319d4afb5ceSopenharmony_ci } else if (wsi->mqtt->send_pubrel) { 320d4afb5ceSopenharmony_ci lwsl_notice("%s: issuing PUBREL for pkt id: %d\n", 321d4afb5ceSopenharmony_ci __func__, wsi->mqtt->ack_pkt_id); 322d4afb5ceSopenharmony_ci buf[LWS_PRE] = LMQCP_PUBREL << 4 | 0x2; 323d4afb5ceSopenharmony_ci lws_ser_wu16be(&buf[LWS_PRE + 2], 324d4afb5ceSopenharmony_ci wsi->mqtt->ack_pkt_id); 325d4afb5ceSopenharmony_ci wsi->mqtt->send_pubrel = 0; 326d4afb5ceSopenharmony_ci } else { 327d4afb5ceSopenharmony_ci lwsl_notice("%s: issuing PUBCOMP for pkt id: %d\n", 328d4afb5ceSopenharmony_ci __func__, wsi->mqtt->peer_ack_pkt_id); 329d4afb5ceSopenharmony_ci buf[LWS_PRE] = LMQCP_PUBCOMP << 4 | 0x2; 330d4afb5ceSopenharmony_ci lws_ser_wu16be(&buf[LWS_PRE + 2], 331d4afb5ceSopenharmony_ci wsi->mqtt->peer_ack_pkt_id); 332d4afb5ceSopenharmony_ci wsi->mqtt->send_pubcomp = 0; 333d4afb5ceSopenharmony_ci } 334d4afb5ceSopenharmony_ci if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4, 335d4afb5ceSopenharmony_ci LWS_WRITE_BINARY) != 4) 336d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_DIE; 337d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_OK; 338d4afb5ceSopenharmony_ci } 339d4afb5ceSopenharmony_ci 340d4afb5ceSopenharmony_ci wsi = lws_get_network_wsi(wsi); 341d4afb5ceSopenharmony_ci 342d4afb5ceSopenharmony_ci wsi->mux.requested_POLLOUT = 0; 343d4afb5ceSopenharmony_ci 344d4afb5ceSopenharmony_ci wsi2 = &wsi->mux.child_list; 345d4afb5ceSopenharmony_ci if (!*wsi2) { 346d4afb5ceSopenharmony_ci lwsl_debug("%s: no children\n", __func__); 347d4afb5ceSopenharmony_ci return LWS_HP_RET_DROP_POLLOUT; 348d4afb5ceSopenharmony_ci } 349d4afb5ceSopenharmony_ci 350d4afb5ceSopenharmony_ci if (!wsi->mqtt) 351d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_DIE; 352d4afb5ceSopenharmony_ci 353d4afb5ceSopenharmony_ci lws_wsi_mux_dump_waiting_children(wsi); 354d4afb5ceSopenharmony_ci 355d4afb5ceSopenharmony_ci do { 356d4afb5ceSopenharmony_ci struct lws *w, **wa; 357d4afb5ceSopenharmony_ci 358d4afb5ceSopenharmony_ci wa = &(*wsi2)->mux.sibling_list; 359d4afb5ceSopenharmony_ci if (!(*wsi2)->mux.requested_POLLOUT) 360d4afb5ceSopenharmony_ci goto next_child; 361d4afb5ceSopenharmony_ci 362d4afb5ceSopenharmony_ci if (!lwsi_state_can_handle_POLLOUT(wsi)) 363d4afb5ceSopenharmony_ci goto next_child; 364d4afb5ceSopenharmony_ci 365d4afb5ceSopenharmony_ci /* 366d4afb5ceSopenharmony_ci * If the nwsi is in the middle of a frame, we can only 367d4afb5ceSopenharmony_ci * continue to send that 368d4afb5ceSopenharmony_ci */ 369d4afb5ceSopenharmony_ci 370d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_payload && !(*wsi2)->mqtt->inside_payload) 371d4afb5ceSopenharmony_ci goto next_child; 372d4afb5ceSopenharmony_ci 373d4afb5ceSopenharmony_ci /* 374d4afb5ceSopenharmony_ci * we're going to do writable callback for this child. 375d4afb5ceSopenharmony_ci * move him to be the last child 376d4afb5ceSopenharmony_ci */ 377d4afb5ceSopenharmony_ci w = lws_wsi_mux_move_child_to_tail(wsi2); 378d4afb5ceSopenharmony_ci if (!w) { 379d4afb5ceSopenharmony_ci wa = &wsi->mux.child_list; 380d4afb5ceSopenharmony_ci goto next_child; 381d4afb5ceSopenharmony_ci } 382d4afb5ceSopenharmony_ci 383d4afb5ceSopenharmony_ci lwsl_debug("%s: child %s (wsistate 0x%x)\n", __func__, 384d4afb5ceSopenharmony_ci lws_wsi_tag(w), (unsigned int)w->wsistate); 385d4afb5ceSopenharmony_ci 386d4afb5ceSopenharmony_ci if (lwsi_state(wsi) == LRS_ESTABLISHED && 387d4afb5ceSopenharmony_ci !wsi->mqtt->inside_payload && 388d4afb5ceSopenharmony_ci wsi->mqtt->send_puback) { 389d4afb5ceSopenharmony_ci uint8_t buf[LWS_PRE + 4]; 390d4afb5ceSopenharmony_ci lwsl_notice("%s: issuing PUBACK for pkt id: %d\n", 391d4afb5ceSopenharmony_ci __func__, wsi->mqtt->ack_pkt_id); 392d4afb5ceSopenharmony_ci 393d4afb5ceSopenharmony_ci /* Fixed header */ 394d4afb5ceSopenharmony_ci buf[LWS_PRE] = LMQCP_PUBACK << 4; 395d4afb5ceSopenharmony_ci /* Remaining len = 2 */ 396d4afb5ceSopenharmony_ci buf[LWS_PRE + 1] = 2; 397d4afb5ceSopenharmony_ci /* Packet ID */ 398d4afb5ceSopenharmony_ci lws_ser_wu16be(&buf[LWS_PRE + 2], wsi->mqtt->peer_ack_pkt_id); 399d4afb5ceSopenharmony_ci 400d4afb5ceSopenharmony_ci if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4, 401d4afb5ceSopenharmony_ci LWS_WRITE_BINARY) != 4) 402d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_DIE; 403d4afb5ceSopenharmony_ci 404d4afb5ceSopenharmony_ci wsi->mqtt->send_puback = 0; 405d4afb5ceSopenharmony_ci w->mux.requested_POLLOUT = 1; 406d4afb5ceSopenharmony_ci 407d4afb5ceSopenharmony_ci wa = &wsi->mux.child_list; 408d4afb5ceSopenharmony_ci goto next_child; 409d4afb5ceSopenharmony_ci } 410d4afb5ceSopenharmony_ci 411d4afb5ceSopenharmony_ci if (lws_callback_as_writeable(w)) { 412d4afb5ceSopenharmony_ci lwsl_notice("%s: Closing child %s\n", __func__, lws_wsi_tag(w)); 413d4afb5ceSopenharmony_ci lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS, 414d4afb5ceSopenharmony_ci "mqtt pollout handle"); 415d4afb5ceSopenharmony_ci wa = &wsi->mux.child_list; 416d4afb5ceSopenharmony_ci } 417d4afb5ceSopenharmony_ci 418d4afb5ceSopenharmony_cinext_child: 419d4afb5ceSopenharmony_ci wsi2 = wa; 420d4afb5ceSopenharmony_ci } while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi)); 421d4afb5ceSopenharmony_ci 422d4afb5ceSopenharmony_ci // lws_wsi_mux_dump_waiting_children(wsi); 423d4afb5ceSopenharmony_ci 424d4afb5ceSopenharmony_ci if (lws_wsi_mux_action_pending_writeable_reqs(wsi)) 425d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_DIE; 426d4afb5ceSopenharmony_ci 427d4afb5ceSopenharmony_ci return LWS_HP_RET_BAIL_OK; 428d4afb5ceSopenharmony_ci} 429d4afb5ceSopenharmony_ci 430d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 431d4afb5ceSopenharmony_cistatic int 432d4afb5ceSopenharmony_cirops_issue_keepalive_mqtt(struct lws *wsi, int isvalid) 433d4afb5ceSopenharmony_ci{ 434d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(wsi); 435d4afb5ceSopenharmony_ci 436d4afb5ceSopenharmony_ci if (isvalid) { 437d4afb5ceSopenharmony_ci _lws_validity_confirmed_role(nwsi); 438d4afb5ceSopenharmony_ci 439d4afb5ceSopenharmony_ci return 0; 440d4afb5ceSopenharmony_ci } 441d4afb5ceSopenharmony_ci 442d4afb5ceSopenharmony_ci nwsi->mqtt->send_pingreq = 1; 443d4afb5ceSopenharmony_ci lws_callback_on_writable(nwsi); 444d4afb5ceSopenharmony_ci 445d4afb5ceSopenharmony_ci return 0; 446d4afb5ceSopenharmony_ci} 447d4afb5ceSopenharmony_ci#endif 448d4afb5ceSopenharmony_ci 449d4afb5ceSopenharmony_cistatic int 450d4afb5ceSopenharmony_cirops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi) 451d4afb5ceSopenharmony_ci{ 452d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(wsi); 453d4afb5ceSopenharmony_ci lws_mqtt_subs_t *s, *s1, *mysub; 454d4afb5ceSopenharmony_ci lws_mqttc_t *c; 455d4afb5ceSopenharmony_ci 456d4afb5ceSopenharmony_ci if (!wsi->mqtt) 457d4afb5ceSopenharmony_ci return 0; 458d4afb5ceSopenharmony_ci 459d4afb5ceSopenharmony_ci c = &wsi->mqtt->client; 460d4afb5ceSopenharmony_ci 461d4afb5ceSopenharmony_ci lws_sul_cancel(&wsi->mqtt->sul_qos_puback_pubrec_wait); 462d4afb5ceSopenharmony_ci 463d4afb5ceSopenharmony_ci lws_mqtt_str_free(&c->username); 464d4afb5ceSopenharmony_ci lws_mqtt_str_free(&c->password); 465d4afb5ceSopenharmony_ci lws_mqtt_str_free(&c->will.message); 466d4afb5ceSopenharmony_ci lws_mqtt_str_free(&c->will.topic); 467d4afb5ceSopenharmony_ci lws_mqtt_str_free(&c->id); 468d4afb5ceSopenharmony_ci 469d4afb5ceSopenharmony_ci /* clean up any subscription allocations */ 470d4afb5ceSopenharmony_ci 471d4afb5ceSopenharmony_ci s = wsi->mqtt->subs_head; 472d4afb5ceSopenharmony_ci wsi->mqtt->subs_head = NULL; 473d4afb5ceSopenharmony_ci while (s) { 474d4afb5ceSopenharmony_ci s1 = s->next; 475d4afb5ceSopenharmony_ci /* 476d4afb5ceSopenharmony_ci * Account for children no longer using nwsi subscription 477d4afb5ceSopenharmony_ci */ 478d4afb5ceSopenharmony_ci mysub = lws_mqtt_find_sub(nwsi->mqtt, (const char *)&s[1]); 479d4afb5ceSopenharmony_ci// assert(mysub); /* if child subscribed, nwsi must feel the same */ 480d4afb5ceSopenharmony_ci if (mysub) { 481d4afb5ceSopenharmony_ci assert(mysub->ref_count); 482d4afb5ceSopenharmony_ci mysub->ref_count--; 483d4afb5ceSopenharmony_ci } 484d4afb5ceSopenharmony_ci lws_free(s); 485d4afb5ceSopenharmony_ci s = s1; 486d4afb5ceSopenharmony_ci } 487d4afb5ceSopenharmony_ci 488d4afb5ceSopenharmony_ci lws_mqtt_publish_param_t *pub = 489d4afb5ceSopenharmony_ci (lws_mqtt_publish_param_t *) 490d4afb5ceSopenharmony_ci wsi->mqtt->rx_cpkt_param; 491d4afb5ceSopenharmony_ci 492d4afb5ceSopenharmony_ci if (pub) 493d4afb5ceSopenharmony_ci lws_free_set_NULL(pub->topic); 494d4afb5ceSopenharmony_ci 495d4afb5ceSopenharmony_ci lws_free_set_NULL(wsi->mqtt->rx_cpkt_param); 496d4afb5ceSopenharmony_ci 497d4afb5ceSopenharmony_ci lws_free_set_NULL(wsi->mqtt); 498d4afb5ceSopenharmony_ci 499d4afb5ceSopenharmony_ci return 0; 500d4afb5ceSopenharmony_ci} 501d4afb5ceSopenharmony_ci 502d4afb5ceSopenharmony_cistatic int 503d4afb5ceSopenharmony_cirops_callback_on_writable_mqtt(struct lws *wsi) 504d4afb5ceSopenharmony_ci{ 505d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 506d4afb5ceSopenharmony_ci struct lws *network_wsi; 507d4afb5ceSopenharmony_ci#endif 508d4afb5ceSopenharmony_ci int already; 509d4afb5ceSopenharmony_ci 510d4afb5ceSopenharmony_ci lwsl_debug("%s: %s (wsistate 0x%x)\n", __func__, lws_wsi_tag(wsi), 511d4afb5ceSopenharmony_ci (unsigned int)wsi->wsistate); 512d4afb5ceSopenharmony_ci 513d4afb5ceSopenharmony_ci if (wsi->mux.requested_POLLOUT 514d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 515d4afb5ceSopenharmony_ci && !wsi->client_h2_alpn 516d4afb5ceSopenharmony_ci#endif 517d4afb5ceSopenharmony_ci ) { 518d4afb5ceSopenharmony_ci lwsl_debug("already pending writable\n"); 519d4afb5ceSopenharmony_ci return 1; 520d4afb5ceSopenharmony_ci } 521d4afb5ceSopenharmony_ci#if 0 522d4afb5ceSopenharmony_ci /* is this for DATA or for control messages? */ 523d4afb5ceSopenharmony_ci if (wsi->upgraded_to_http2 && !wsi->h2.h2n->pps && 524d4afb5ceSopenharmony_ci !lws_h2_tx_cr_get(wsi)) { 525d4afb5ceSopenharmony_ci /* 526d4afb5ceSopenharmony_ci * other side is not able to cope with us sending DATA 527d4afb5ceSopenharmony_ci * anything so no matter if we have POLLOUT on our side if it's 528d4afb5ceSopenharmony_ci * DATA we want to send. 529d4afb5ceSopenharmony_ci * 530d4afb5ceSopenharmony_ci * Delay waiting for our POLLOUT until peer indicates he has 531d4afb5ceSopenharmony_ci * space for more using tx window command in http2 layer 532d4afb5ceSopenharmony_ci */ 533d4afb5ceSopenharmony_ci lwsl_notice("%s: %p: skint (%d)\n", __func__, wsi, 534d4afb5ceSopenharmony_ci wsi->h2.tx_cr); 535d4afb5ceSopenharmony_ci wsi->h2.skint = 1; 536d4afb5ceSopenharmony_ci return 0; 537d4afb5ceSopenharmony_ci } 538d4afb5ceSopenharmony_ci 539d4afb5ceSopenharmony_ci wsi->h2.skint = 0; 540d4afb5ceSopenharmony_ci#endif 541d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 542d4afb5ceSopenharmony_ci network_wsi = lws_get_network_wsi(wsi); 543d4afb5ceSopenharmony_ci#endif 544d4afb5ceSopenharmony_ci already = lws_wsi_mux_mark_parents_needing_writeable(wsi); 545d4afb5ceSopenharmony_ci 546d4afb5ceSopenharmony_ci /* for network action, act only on the network wsi */ 547d4afb5ceSopenharmony_ci 548d4afb5ceSopenharmony_ci if (already 549d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 550d4afb5ceSopenharmony_ci && !network_wsi->client_mux_substream 551d4afb5ceSopenharmony_ci#endif 552d4afb5ceSopenharmony_ci ) 553d4afb5ceSopenharmony_ci return 1; 554d4afb5ceSopenharmony_ci 555d4afb5ceSopenharmony_ci return 0; 556d4afb5ceSopenharmony_ci} 557d4afb5ceSopenharmony_ci 558d4afb5ceSopenharmony_cistatic int 559d4afb5ceSopenharmony_cirops_close_kill_connection_mqtt(struct lws *wsi, enum lws_close_status reason) 560d4afb5ceSopenharmony_ci{ 561d4afb5ceSopenharmony_ci lwsl_info(" %s, his parent %s: child list %p, siblings:\n", 562d4afb5ceSopenharmony_ci lws_wsi_tag(wsi), 563d4afb5ceSopenharmony_ci lws_wsi_tag(wsi->mux.parent_wsi), wsi->mux.child_list); 564d4afb5ceSopenharmony_ci //lws_wsi_mux_dump_children(wsi); 565d4afb5ceSopenharmony_ci 566d4afb5ceSopenharmony_ci if (wsi->mux_substream 567d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 568d4afb5ceSopenharmony_ci || wsi->client_mux_substream 569d4afb5ceSopenharmony_ci#endif 570d4afb5ceSopenharmony_ci ) { 571d4afb5ceSopenharmony_ci lwsl_info("closing %s: parent %s: first child %p\n", 572d4afb5ceSopenharmony_ci lws_wsi_tag(wsi), 573d4afb5ceSopenharmony_ci lws_wsi_tag(wsi->mux.parent_wsi), 574d4afb5ceSopenharmony_ci wsi->mux.child_list); 575d4afb5ceSopenharmony_ci 576d4afb5ceSopenharmony_ci if (wsi->mux.child_list && lwsl_visible(LLL_INFO)) { 577d4afb5ceSopenharmony_ci lwsl_info(" parent %s: closing children: list:\n", lws_wsi_tag(wsi)); 578d4afb5ceSopenharmony_ci lws_wsi_mux_dump_children(wsi); 579d4afb5ceSopenharmony_ci } 580d4afb5ceSopenharmony_ci 581d4afb5ceSopenharmony_ci lws_wsi_mux_close_children(wsi, (int)reason); 582d4afb5ceSopenharmony_ci } 583d4afb5ceSopenharmony_ci 584d4afb5ceSopenharmony_ci if (( 585d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 586d4afb5ceSopenharmony_ci wsi->client_mux_substream || 587d4afb5ceSopenharmony_ci#endif 588d4afb5ceSopenharmony_ci wsi->mux_substream) && 589d4afb5ceSopenharmony_ci wsi->mux.parent_wsi) { 590d4afb5ceSopenharmony_ci lws_wsi_mux_sibling_disconnect(wsi); 591d4afb5ceSopenharmony_ci } 592d4afb5ceSopenharmony_ci 593d4afb5ceSopenharmony_ci return 0; 594d4afb5ceSopenharmony_ci} 595d4afb5ceSopenharmony_ci 596d4afb5ceSopenharmony_cistatic const lws_rops_t rops_table_mqtt[] = { 597d4afb5ceSopenharmony_ci /* 1 */ { .handle_POLLIN = rops_handle_POLLIN_mqtt }, 598d4afb5ceSopenharmony_ci /* 2 */ { .handle_POLLOUT = rops_handle_POLLOUT_mqtt }, 599d4afb5ceSopenharmony_ci /* 3 */ { .callback_on_writable = rops_callback_on_writable_mqtt }, 600d4afb5ceSopenharmony_ci /* 4 */ { .close_role = rops_close_role_mqtt }, 601d4afb5ceSopenharmony_ci /* 5 */ { .close_kill_connection = rops_close_kill_connection_mqtt }, 602d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 603d4afb5ceSopenharmony_ci /* 6 */ { .client_bind = rops_client_bind_mqtt }, 604d4afb5ceSopenharmony_ci /* 7 */ { .issue_keepalive = rops_issue_keepalive_mqtt }, 605d4afb5ceSopenharmony_ci#endif 606d4afb5ceSopenharmony_ci}; 607d4afb5ceSopenharmony_ci 608d4afb5ceSopenharmony_cistruct lws_role_ops role_ops_mqtt = { 609d4afb5ceSopenharmony_ci /* role name */ "mqtt", 610d4afb5ceSopenharmony_ci /* alpn id */ "x-amzn-mqtt-ca", /* "mqtt/3.1.1" */ 611d4afb5ceSopenharmony_ci 612d4afb5ceSopenharmony_ci /* rops_table */ rops_table_mqtt, 613d4afb5ceSopenharmony_ci /* rops_idx */ { 614d4afb5ceSopenharmony_ci /* LWS_ROPS_check_upgrades */ 615d4afb5ceSopenharmony_ci /* LWS_ROPS_pt_init_destroy */ 0x00, 616d4afb5ceSopenharmony_ci /* LWS_ROPS_init_vhost */ 617d4afb5ceSopenharmony_ci /* LWS_ROPS_destroy_vhost */ 0x00, 618d4afb5ceSopenharmony_ci /* LWS_ROPS_service_flag_pending */ 619d4afb5ceSopenharmony_ci /* LWS_ROPS_handle_POLLIN */ 0x01, 620d4afb5ceSopenharmony_ci /* LWS_ROPS_handle_POLLOUT */ 621d4afb5ceSopenharmony_ci /* LWS_ROPS_perform_user_POLLOUT */ 0x20, 622d4afb5ceSopenharmony_ci /* LWS_ROPS_callback_on_writable */ 623d4afb5ceSopenharmony_ci /* LWS_ROPS_tx_credit */ 0x30, 624d4afb5ceSopenharmony_ci /* LWS_ROPS_write_role_protocol */ 625d4afb5ceSopenharmony_ci /* LWS_ROPS_encapsulation_parent */ 0x00, 626d4afb5ceSopenharmony_ci /* LWS_ROPS_alpn_negotiated */ 627d4afb5ceSopenharmony_ci /* LWS_ROPS_close_via_role_protocol */ 0x00, 628d4afb5ceSopenharmony_ci /* LWS_ROPS_close_role */ 629d4afb5ceSopenharmony_ci /* LWS_ROPS_close_kill_connection */ 0x45, 630d4afb5ceSopenharmony_ci /* LWS_ROPS_destroy_role */ 631d4afb5ceSopenharmony_ci /* LWS_ROPS_adoption_bind */ 0x00, 632d4afb5ceSopenharmony_ci 633d4afb5ceSopenharmony_ci /* LWS_ROPS_client_bind */ 634d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CLIENT) 635d4afb5ceSopenharmony_ci /* LWS_ROPS_issue_keepalive */ 0x67, 636d4afb5ceSopenharmony_ci#else 637d4afb5ceSopenharmony_ci /* LWS_ROPS_issue_keepalive */ 0x00, 638d4afb5ceSopenharmony_ci#endif 639d4afb5ceSopenharmony_ci }, 640d4afb5ceSopenharmony_ci 641d4afb5ceSopenharmony_ci .adoption_cb = { LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED, 642d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED }, 643d4afb5ceSopenharmony_ci .rx_cb = { LWS_CALLBACK_MQTT_CLIENT_RX, 644d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_CLIENT_RX }, 645d4afb5ceSopenharmony_ci .writeable_cb = { LWS_CALLBACK_MQTT_CLIENT_WRITEABLE, 646d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_CLIENT_WRITEABLE }, 647d4afb5ceSopenharmony_ci .close_cb = { LWS_CALLBACK_MQTT_CLIENT_CLOSED, 648d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_CLIENT_CLOSED }, 649d4afb5ceSopenharmony_ci .protocol_bind_cb = { LWS_CALLBACK_MQTT_IDLE, 650d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_IDLE }, 651d4afb5ceSopenharmony_ci .protocol_unbind_cb = { LWS_CALLBACK_MQTT_DROP_PROTOCOL, 652d4afb5ceSopenharmony_ci LWS_CALLBACK_MQTT_DROP_PROTOCOL }, 653d4afb5ceSopenharmony_ci .file_handle = 0, 654d4afb5ceSopenharmony_ci}; 655