1d4afb5ceSopenharmony_ci/* 2d4afb5ceSopenharmony_ci * libwebsockets - small server side websockets and web server implementation 3d4afb5ceSopenharmony_ci * 4d4afb5ceSopenharmony_ci * Copyright (C) 2019 - 2022 Andy Green <andy@warmcat.com> 5d4afb5ceSopenharmony_ci * 6d4afb5ceSopenharmony_ci * Permission is hereby granted, free of charge, to any person obtaining a copy 7d4afb5ceSopenharmony_ci * of this software and associated documentation files (the "Software"), to 8d4afb5ceSopenharmony_ci * deal in the Software without restriction, including without limitation the 9d4afb5ceSopenharmony_ci * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10d4afb5ceSopenharmony_ci * sell copies of the Software, and to permit persons to whom the Software is 11d4afb5ceSopenharmony_ci * furnished to do so, subject to the following conditions: 12d4afb5ceSopenharmony_ci * 13d4afb5ceSopenharmony_ci * The above copyright notice and this permission notice shall be included in 14d4afb5ceSopenharmony_ci * all copies or substantial portions of the Software. 15d4afb5ceSopenharmony_ci * 16d4afb5ceSopenharmony_ci * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17d4afb5ceSopenharmony_ci * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18d4afb5ceSopenharmony_ci * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19d4afb5ceSopenharmony_ci * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20d4afb5ceSopenharmony_ci * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21d4afb5ceSopenharmony_ci * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22d4afb5ceSopenharmony_ci * IN THE SOFTWARE. 23d4afb5ceSopenharmony_ci */ 24d4afb5ceSopenharmony_ci 25d4afb5ceSopenharmony_ci#include <private-lib-core.h> 26d4afb5ceSopenharmony_ci 27d4afb5ceSopenharmony_cistatic void 28d4afb5ceSopenharmony_cisecstream_mqtt_cleanup(lws_ss_handle_t *h) 29d4afb5ceSopenharmony_ci{ 30d4afb5ceSopenharmony_ci uint32_t i; 31d4afb5ceSopenharmony_ci 32d4afb5ceSopenharmony_ci if (h->u.mqtt.heap_baggage) { 33d4afb5ceSopenharmony_ci lws_free(h->u.mqtt.heap_baggage); 34d4afb5ceSopenharmony_ci h->u.mqtt.heap_baggage = NULL; 35d4afb5ceSopenharmony_ci } 36d4afb5ceSopenharmony_ci 37d4afb5ceSopenharmony_ci if (h->u.mqtt.sub_info.topic) { 38d4afb5ceSopenharmony_ci for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) { 39d4afb5ceSopenharmony_ci if (h->u.mqtt.sub_info.topic[i].name) { 40d4afb5ceSopenharmony_ci lws_free((void*)h->u.mqtt.sub_info.topic[i].name); 41d4afb5ceSopenharmony_ci h->u.mqtt.sub_info.topic[i].name = NULL; 42d4afb5ceSopenharmony_ci } 43d4afb5ceSopenharmony_ci } 44d4afb5ceSopenharmony_ci lws_free(h->u.mqtt.sub_info.topic); 45d4afb5ceSopenharmony_ci h->u.mqtt.sub_info.topic = NULL; 46d4afb5ceSopenharmony_ci } 47d4afb5ceSopenharmony_ci lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 48d4afb5ceSopenharmony_ci} 49d4afb5ceSopenharmony_ci 50d4afb5ceSopenharmony_cistatic int 51d4afb5ceSopenharmony_cisecstream_mqtt_subscribe(struct lws *wsi) 52d4afb5ceSopenharmony_ci{ 53d4afb5ceSopenharmony_ci size_t used_in, used_out, topic_limit; 54d4afb5ceSopenharmony_ci lws_strexp_t exp; 55d4afb5ceSopenharmony_ci char* expbuf; 56d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 57d4afb5ceSopenharmony_ci 58d4afb5ceSopenharmony_ci if (!h || !h->policy) 59d4afb5ceSopenharmony_ci return -1; 60d4afb5ceSopenharmony_ci 61d4afb5ceSopenharmony_ci if (h->policy->u.mqtt.aws_iot) 62d4afb5ceSopenharmony_ci topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; 63d4afb5ceSopenharmony_ci else 64d4afb5ceSopenharmony_ci topic_limit = LWS_MQTT_MAX_TOPICLEN; 65d4afb5ceSopenharmony_ci 66d4afb5ceSopenharmony_ci if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe) 67d4afb5ceSopenharmony_ci return 0; 68d4afb5ceSopenharmony_ci 69d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL, 70d4afb5ceSopenharmony_ci topic_limit); 71d4afb5ceSopenharmony_ci /* 72d4afb5ceSopenharmony_ci * Expand with no output first to calculate the size of 73d4afb5ceSopenharmony_ci * expanded string then, allocate new buffer and expand 74d4afb5ceSopenharmony_ci * again with the buffer 75d4afb5ceSopenharmony_ci */ 76d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, 77d4afb5ceSopenharmony_ci strlen(h->policy->u.mqtt.subscribe), &used_in, 78d4afb5ceSopenharmony_ci &used_out) != LSTRX_DONE) { 79d4afb5ceSopenharmony_ci lwsl_err( 80d4afb5ceSopenharmony_ci "%s, failed to expand MQTT subscribe" 81d4afb5ceSopenharmony_ci " topic with no output\n", 82d4afb5ceSopenharmony_ci __func__); 83d4afb5ceSopenharmony_ci return 1; 84d4afb5ceSopenharmony_ci } 85d4afb5ceSopenharmony_ci 86d4afb5ceSopenharmony_ci expbuf = lws_malloc(used_out + 1, __func__); 87d4afb5ceSopenharmony_ci if (!expbuf) { 88d4afb5ceSopenharmony_ci lwsl_err( 89d4afb5ceSopenharmony_ci "%s, failed to allocate MQTT subscribe" 90d4afb5ceSopenharmony_ci "topic", 91d4afb5ceSopenharmony_ci __func__); 92d4afb5ceSopenharmony_ci return 1; 93d4afb5ceSopenharmony_ci } 94d4afb5ceSopenharmony_ci 95d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf, 96d4afb5ceSopenharmony_ci used_out + 1); 97d4afb5ceSopenharmony_ci 98d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, 99d4afb5ceSopenharmony_ci strlen(h->policy->u.mqtt.subscribe), &used_in, 100d4afb5ceSopenharmony_ci &used_out) != LSTRX_DONE) { 101d4afb5ceSopenharmony_ci lwsl_err("%s, failed to expand MQTT subscribe topic\n", 102d4afb5ceSopenharmony_ci __func__); 103d4afb5ceSopenharmony_ci lws_free(expbuf); 104d4afb5ceSopenharmony_ci return 1; 105d4afb5ceSopenharmony_ci } 106d4afb5ceSopenharmony_ci lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); 107d4afb5ceSopenharmony_ci h->u.mqtt.sub_top.name = expbuf; 108d4afb5ceSopenharmony_ci 109d4afb5ceSopenharmony_ci /* 110d4afb5ceSopenharmony_ci * The policy says to subscribe to something, and we 111d4afb5ceSopenharmony_ci * haven't done it yet. Do it using the pre-prepared 112d4afb5ceSopenharmony_ci * string-substituted version of the policy string. 113d4afb5ceSopenharmony_ci */ 114d4afb5ceSopenharmony_ci 115d4afb5ceSopenharmony_ci lwsl_notice("%s: subscribing %s\n", __func__, 116d4afb5ceSopenharmony_ci h->u.mqtt.sub_top.name); 117d4afb5ceSopenharmony_ci 118d4afb5ceSopenharmony_ci h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos; 119d4afb5ceSopenharmony_ci memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info)); 120d4afb5ceSopenharmony_ci h->u.mqtt.sub_info.num_topics = 1; 121d4afb5ceSopenharmony_ci h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top; 122d4afb5ceSopenharmony_ci h->u.mqtt.sub_info.topic = 123d4afb5ceSopenharmony_ci lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__); 124d4afb5ceSopenharmony_ci h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf); 125d4afb5ceSopenharmony_ci h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos; 126d4afb5ceSopenharmony_ci 127d4afb5ceSopenharmony_ci if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) { 128d4afb5ceSopenharmony_ci lwsl_notice("%s: unable to subscribe", __func__); 129d4afb5ceSopenharmony_ci lws_free(expbuf); 130d4afb5ceSopenharmony_ci h->u.mqtt.sub_top.name = NULL; 131d4afb5ceSopenharmony_ci return -1; 132d4afb5ceSopenharmony_ci } 133d4afb5ceSopenharmony_ci lws_free(expbuf); 134d4afb5ceSopenharmony_ci h->u.mqtt.sub_top.name = NULL; 135d4afb5ceSopenharmony_ci 136d4afb5ceSopenharmony_ci /* Expect a SUBACK */ 137d4afb5ceSopenharmony_ci if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { 138d4afb5ceSopenharmony_ci lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); 139d4afb5ceSopenharmony_ci return -1; 140d4afb5ceSopenharmony_ci } 141d4afb5ceSopenharmony_ci return 0; 142d4afb5ceSopenharmony_ci} 143d4afb5ceSopenharmony_ci 144d4afb5ceSopenharmony_cistatic int 145d4afb5ceSopenharmony_cisecstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, 146d4afb5ceSopenharmony_ci uint32_t payload_len, const char* topic, 147d4afb5ceSopenharmony_ci lws_mqtt_qos_levels_t qos, uint8_t retain, uint8_t dup, 148d4afb5ceSopenharmony_ci int f) 149d4afb5ceSopenharmony_ci{ 150d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 151d4afb5ceSopenharmony_ci size_t used_in, used_out, topic_limit; 152d4afb5ceSopenharmony_ci lws_strexp_t exp; 153d4afb5ceSopenharmony_ci char *expbuf; 154d4afb5ceSopenharmony_ci lws_mqtt_publish_param_t mqpp; 155d4afb5ceSopenharmony_ci 156d4afb5ceSopenharmony_ci if (h->policy->u.mqtt.aws_iot) 157d4afb5ceSopenharmony_ci topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; 158d4afb5ceSopenharmony_ci else 159d4afb5ceSopenharmony_ci topic_limit = LWS_MQTT_MAX_TOPICLEN; 160d4afb5ceSopenharmony_ci 161d4afb5ceSopenharmony_ci memset(&mqpp, 0, sizeof(mqpp)); 162d4afb5ceSopenharmony_ci 163d4afb5ceSopenharmony_ci lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL, 164d4afb5ceSopenharmony_ci topic_limit); 165d4afb5ceSopenharmony_ci 166d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in, 167d4afb5ceSopenharmony_ci &used_out) != LSTRX_DONE) { 168d4afb5ceSopenharmony_ci lwsl_err("%s, failed to expand MQTT publish" 169d4afb5ceSopenharmony_ci " topic with no output\n", __func__); 170d4afb5ceSopenharmony_ci return 1; 171d4afb5ceSopenharmony_ci } 172d4afb5ceSopenharmony_ci expbuf = lws_malloc(used_out + 1, __func__); 173d4afb5ceSopenharmony_ci if (!expbuf) { 174d4afb5ceSopenharmony_ci lwsl_err("%s, failed to allocate MQTT publish topic", 175d4afb5ceSopenharmony_ci __func__); 176d4afb5ceSopenharmony_ci return 1; 177d4afb5ceSopenharmony_ci } 178d4afb5ceSopenharmony_ci 179d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, 180d4afb5ceSopenharmony_ci used_out + 1); 181d4afb5ceSopenharmony_ci 182d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in, 183d4afb5ceSopenharmony_ci &used_out) != LSTRX_DONE) { 184d4afb5ceSopenharmony_ci lws_free(expbuf); 185d4afb5ceSopenharmony_ci return 1; 186d4afb5ceSopenharmony_ci } 187d4afb5ceSopenharmony_ci lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); 188d4afb5ceSopenharmony_ci mqpp.topic = (char *)expbuf; 189d4afb5ceSopenharmony_ci 190d4afb5ceSopenharmony_ci mqpp.topic_len = (uint16_t)strlen(mqpp.topic); 191d4afb5ceSopenharmony_ci mqpp.packet_id = (uint16_t)(h->txord - 1); 192d4afb5ceSopenharmony_ci mqpp.qos = qos; 193d4afb5ceSopenharmony_ci mqpp.retain = !!retain; 194d4afb5ceSopenharmony_ci mqpp.payload = buf; 195d4afb5ceSopenharmony_ci mqpp.dup = !!dup; 196d4afb5ceSopenharmony_ci if (payload_len) 197d4afb5ceSopenharmony_ci mqpp.payload_len = payload_len; 198d4afb5ceSopenharmony_ci else 199d4afb5ceSopenharmony_ci mqpp.payload_len = (uint32_t)buf_len; 200d4afb5ceSopenharmony_ci 201d4afb5ceSopenharmony_ci lwsl_notice("%s: payload len %d\n", __func__, 202d4afb5ceSopenharmony_ci (int)mqpp.payload_len); 203d4afb5ceSopenharmony_ci 204d4afb5ceSopenharmony_ci if (lws_mqtt_client_send_publish(wsi, &mqpp, 205d4afb5ceSopenharmony_ci (const char *)buf, 206d4afb5ceSopenharmony_ci (uint32_t)buf_len, 207d4afb5ceSopenharmony_ci f & LWSSS_FLAG_EOM)) { 208d4afb5ceSopenharmony_ci lwsl_notice("%s: failed to publish\n", __func__); 209d4afb5ceSopenharmony_ci lws_free(expbuf); 210d4afb5ceSopenharmony_ci return -1; 211d4afb5ceSopenharmony_ci } 212d4afb5ceSopenharmony_ci lws_free(expbuf); 213d4afb5ceSopenharmony_ci 214d4afb5ceSopenharmony_ci if ((mqpp.qos == QOS1 || mqpp.qos == QOS2) && buf_len > 0) { 215d4afb5ceSopenharmony_ci if (lws_buflist_append_segment(&h->u.mqtt.buflist_unacked, 216d4afb5ceSopenharmony_ci buf, buf_len) < 0) { 217d4afb5ceSopenharmony_ci lwsl_notice("%s: failed to store unacked\n", __func__); 218d4afb5ceSopenharmony_ci return -1; 219d4afb5ceSopenharmony_ci } 220d4afb5ceSopenharmony_ci } 221d4afb5ceSopenharmony_ci 222d4afb5ceSopenharmony_ci return 0; 223d4afb5ceSopenharmony_ci} 224d4afb5ceSopenharmony_ci 225d4afb5ceSopenharmony_cistatic int 226d4afb5ceSopenharmony_cisecstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) { 227d4afb5ceSopenharmony_ci lws_strexp_t exp; 228d4afb5ceSopenharmony_ci size_t used_in, used_out = 0; 229d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 230d4afb5ceSopenharmony_ci 231d4afb5ceSopenharmony_ci if (h->policy->u.mqtt.birth_message) { 232d4afb5ceSopenharmony_ci lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, 233d4afb5ceSopenharmony_ci (char *)buf, buflen); 234d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message, 235d4afb5ceSopenharmony_ci strlen(h->policy->u.mqtt.birth_message), 236d4afb5ceSopenharmony_ci &used_in, &used_out) != LSTRX_DONE) { 237d4afb5ceSopenharmony_ci return 1; 238d4afb5ceSopenharmony_ci } 239d4afb5ceSopenharmony_ci } 240d4afb5ceSopenharmony_ci wsi->mqtt->inside_birth = 1; 241d4afb5ceSopenharmony_ci return secstream_mqtt_publish(wsi, buf, 242d4afb5ceSopenharmony_ci used_out, 0, h->policy->u.mqtt.birth_topic, 243d4afb5ceSopenharmony_ci h->policy->u.mqtt.birth_qos, 244d4afb5ceSopenharmony_ci h->policy->u.mqtt.birth_retain, 0, 245d4afb5ceSopenharmony_ci LWSSS_FLAG_EOM); 246d4afb5ceSopenharmony_ci} 247d4afb5ceSopenharmony_ci 248d4afb5ceSopenharmony_cistatic int 249d4afb5ceSopenharmony_cisecstream_mqtt_resend(struct lws *wsi, uint8_t *buf) { 250d4afb5ceSopenharmony_ci uint8_t *buffered; 251d4afb5ceSopenharmony_ci size_t len; 252d4afb5ceSopenharmony_ci int f = 0, r; 253d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 254d4afb5ceSopenharmony_ci 255d4afb5ceSopenharmony_ci len = lws_buflist_next_segment_len(&h->u.mqtt.buflist_unacked, 256d4afb5ceSopenharmony_ci &buffered); 257d4afb5ceSopenharmony_ci 258d4afb5ceSopenharmony_ci if (h->u.mqtt.unacked_size <= len) 259d4afb5ceSopenharmony_ci f |= LWSSS_FLAG_EOM; 260d4afb5ceSopenharmony_ci 261d4afb5ceSopenharmony_ci if (!len) { 262d4afb5ceSopenharmony_ci /* when the message does not have payload */ 263d4afb5ceSopenharmony_ci buffered = buf; 264d4afb5ceSopenharmony_ci } else { 265d4afb5ceSopenharmony_ci h->u.mqtt.unacked_size -= (uint32_t)len; 266d4afb5ceSopenharmony_ci } 267d4afb5ceSopenharmony_ci 268d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_birth) { 269d4afb5ceSopenharmony_ci r = secstream_mqtt_publish(wsi, buffered, len, 0, 270d4afb5ceSopenharmony_ci h->policy->u.mqtt.birth_topic, 271d4afb5ceSopenharmony_ci h->policy->u.mqtt.birth_qos, 272d4afb5ceSopenharmony_ci h->policy->u.mqtt.birth_retain, 273d4afb5ceSopenharmony_ci 1, f); 274d4afb5ceSopenharmony_ci } else { 275d4afb5ceSopenharmony_ci r = secstream_mqtt_publish(wsi, buffered, len, 276d4afb5ceSopenharmony_ci (uint32_t)h->writeable_len, 277d4afb5ceSopenharmony_ci h->policy->u.mqtt.topic, 278d4afb5ceSopenharmony_ci h->policy->u.mqtt.qos, 279d4afb5ceSopenharmony_ci h->policy->u.mqtt.retain, 1, f); 280d4afb5ceSopenharmony_ci } 281d4afb5ceSopenharmony_ci if (len) 282d4afb5ceSopenharmony_ci lws_buflist_use_segment(&h->u.mqtt.buflist_unacked, len); 283d4afb5ceSopenharmony_ci 284d4afb5ceSopenharmony_ci if (r) { 285d4afb5ceSopenharmony_ci lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 286d4afb5ceSopenharmony_ci h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; 287d4afb5ceSopenharmony_ci 288d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_birth) { 289d4afb5ceSopenharmony_ci lwsl_err("%s: %s: failed to send Birth\n", __func__, 290d4afb5ceSopenharmony_ci lws_ss_tag(h)); 291d4afb5ceSopenharmony_ci return -1; 292d4afb5ceSopenharmony_ci } else { 293d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); 294d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 295d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 296d4afb5ceSopenharmony_ci } 297d4afb5ceSopenharmony_ci } 298d4afb5ceSopenharmony_ci return 0; 299d4afb5ceSopenharmony_ci} 300d4afb5ceSopenharmony_ci 301d4afb5ceSopenharmony_cistatic char * 302d4afb5ceSopenharmony_ciexpand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len) 303d4afb5ceSopenharmony_ci{ 304d4afb5ceSopenharmony_ci lws_strexp_t exp; 305d4afb5ceSopenharmony_ci char *expbuf = NULL; 306d4afb5ceSopenharmony_ci size_t used_in = 0, used_out = 0, post_len = 0; 307d4afb5ceSopenharmony_ci 308d4afb5ceSopenharmony_ci memset(&exp, 0, sizeof(exp)); 309d4afb5ceSopenharmony_ci 310d4afb5ceSopenharmony_ci if (post) 311d4afb5ceSopenharmony_ci post_len = strlen(post); 312d4afb5ceSopenharmony_ci 313d4afb5ceSopenharmony_ci if (post_len > max_len) 314d4afb5ceSopenharmony_ci return NULL; 315d4afb5ceSopenharmony_ci 316d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, NULL, 317d4afb5ceSopenharmony_ci max_len - post_len); 318d4afb5ceSopenharmony_ci 319d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, str, strlen(str), &used_in, 320d4afb5ceSopenharmony_ci &used_out) != LSTRX_DONE) { 321d4afb5ceSopenharmony_ci lwsl_err("%s, failed to expand %s", __func__, str); 322d4afb5ceSopenharmony_ci 323d4afb5ceSopenharmony_ci return NULL; 324d4afb5ceSopenharmony_ci } 325d4afb5ceSopenharmony_ci 326d4afb5ceSopenharmony_ci expbuf = lws_malloc(used_out + 1 + post_len, __func__); 327d4afb5ceSopenharmony_ci if (!expbuf) { 328d4afb5ceSopenharmony_ci lwsl_err("%s, failed to allocate str_exp for %s", __func__, str); 329d4afb5ceSopenharmony_ci 330d4afb5ceSopenharmony_ci return NULL; 331d4afb5ceSopenharmony_ci } 332d4afb5ceSopenharmony_ci 333d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf, 334d4afb5ceSopenharmony_ci used_out + 1 + post_len); 335d4afb5ceSopenharmony_ci 336d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, str, strlen(str), &used_in, 337d4afb5ceSopenharmony_ci &used_out) != LSTRX_DONE) { 338d4afb5ceSopenharmony_ci lwsl_err("%s, failed to expand str_exp %s\n", __func__, str); 339d4afb5ceSopenharmony_ci lws_free(expbuf); 340d4afb5ceSopenharmony_ci 341d4afb5ceSopenharmony_ci return NULL; 342d4afb5ceSopenharmony_ci } 343d4afb5ceSopenharmony_ci if (post) 344d4afb5ceSopenharmony_ci strcat(expbuf, post); 345d4afb5ceSopenharmony_ci 346d4afb5ceSopenharmony_ci return expbuf; 347d4afb5ceSopenharmony_ci} 348d4afb5ceSopenharmony_ci 349d4afb5ceSopenharmony_cistatic lws_mqtt_match_topic_return_t 350d4afb5ceSopenharmony_cisecstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic) 351d4afb5ceSopenharmony_ci{ 352d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 353d4afb5ceSopenharmony_ci const char *match[] = { LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH, 354d4afb5ceSopenharmony_ci LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH }; 355d4afb5ceSopenharmony_ci char *expbuf = NULL; 356d4afb5ceSopenharmony_ci unsigned int i = 0; 357d4afb5ceSopenharmony_ci lws_mqtt_match_topic_return_t ret = LMMTR_TOPIC_NOMATCH; 358d4afb5ceSopenharmony_ci 359d4afb5ceSopenharmony_ci if (!topic) 360d4afb5ceSopenharmony_ci return LMMTR_TOPIC_MATCH_ERROR; 361d4afb5ceSopenharmony_ci 362d4afb5ceSopenharmony_ci expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN); 363d4afb5ceSopenharmony_ci if (!expbuf) { 364d4afb5ceSopenharmony_ci lwsl_wsi_warn(wsi, "Failed to expand Shadow topic"); 365d4afb5ceSopenharmony_ci 366d4afb5ceSopenharmony_ci return LMMTR_TOPIC_MATCH_ERROR; 367d4afb5ceSopenharmony_ci } 368d4afb5ceSopenharmony_ci for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) { 369d4afb5ceSopenharmony_ci if (lws_mqtt_is_topic_matched( 370d4afb5ceSopenharmony_ci match[i], expbuf) == LMMTR_TOPIC_MATCH) { 371d4afb5ceSopenharmony_ci ret = LMMTR_TOPIC_MATCH; 372d4afb5ceSopenharmony_ci break; 373d4afb5ceSopenharmony_ci } 374d4afb5ceSopenharmony_ci } 375d4afb5ceSopenharmony_ci lws_free(expbuf); 376d4afb5ceSopenharmony_ci 377d4afb5ceSopenharmony_ci return ret; 378d4afb5ceSopenharmony_ci} 379d4afb5ceSopenharmony_ci 380d4afb5ceSopenharmony_cistatic void 381d4afb5ceSopenharmony_cisecstream_mqtt_shadow_cleanup(struct lws *wsi) 382d4afb5ceSopenharmony_ci{ 383d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 384d4afb5ceSopenharmony_ci uint32_t i = 0; 385d4afb5ceSopenharmony_ci 386d4afb5ceSopenharmony_ci for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) 387d4afb5ceSopenharmony_ci lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name); 388d4afb5ceSopenharmony_ci 389d4afb5ceSopenharmony_ci h->u.mqtt.shadow_sub.num_topics = 0; 390d4afb5ceSopenharmony_ci 391d4afb5ceSopenharmony_ci if (h->u.mqtt.shadow_sub.topic) { 392d4afb5ceSopenharmony_ci lws_free(h->u.mqtt.shadow_sub.topic); 393d4afb5ceSopenharmony_ci h->u.mqtt.shadow_sub.topic = NULL; 394d4afb5ceSopenharmony_ci } 395d4afb5ceSopenharmony_ci} 396d4afb5ceSopenharmony_ci 397d4afb5ceSopenharmony_cistatic lws_ss_state_return_t 398d4afb5ceSopenharmony_cisecstream_mqtt_shadow_unsubscribe(struct lws *wsi) 399d4afb5ceSopenharmony_ci{ 400d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 401d4afb5ceSopenharmony_ci 402d4afb5ceSopenharmony_ci if (h->u.mqtt.shadow_sub.num_topics == 0) { 403d4afb5ceSopenharmony_ci wsi->mqtt->send_shadow_unsubscribe = 0; 404d4afb5ceSopenharmony_ci wsi->mqtt->inside_shadow = 0; 405d4afb5ceSopenharmony_ci wsi->mqtt->done_shadow_subscribe = 0; 406d4afb5ceSopenharmony_ci 407d4afb5ceSopenharmony_ci return LWSSSSRET_OK; 408d4afb5ceSopenharmony_ci } 409d4afb5ceSopenharmony_ci 410d4afb5ceSopenharmony_ci if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) { 411d4afb5ceSopenharmony_ci lwsl_wsi_err(wsi, "Failed to send MQTT unsubsribe"); 412d4afb5ceSopenharmony_ci 413d4afb5ceSopenharmony_ci return LWSSSSRET_DISCONNECT_ME; 414d4afb5ceSopenharmony_ci } 415d4afb5ceSopenharmony_ci /* Expect a UNSUBACK */ 416d4afb5ceSopenharmony_ci if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { 417d4afb5ceSopenharmony_ci lwsl_wsi_err(wsi, "Unable to set LWS_POLLIN"); 418d4afb5ceSopenharmony_ci 419d4afb5ceSopenharmony_ci return LWSSSSRET_DISCONNECT_ME; 420d4afb5ceSopenharmony_ci } 421d4afb5ceSopenharmony_ci wsi->mqtt->send_shadow_unsubscribe = 0; 422d4afb5ceSopenharmony_ci 423d4afb5ceSopenharmony_ci return LWSSSSRET_OK; 424d4afb5ceSopenharmony_ci} 425d4afb5ceSopenharmony_ci 426d4afb5ceSopenharmony_cistatic int 427d4afb5ceSopenharmony_cisecstream_mqtt_shadow_subscribe(struct lws *wsi) 428d4afb5ceSopenharmony_ci{ 429d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 430d4afb5ceSopenharmony_ci char* expbuf = NULL; 431d4afb5ceSopenharmony_ci const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR, 432d4afb5ceSopenharmony_ci LWS_MQTT_SHADOW_RESP_REJECTED_STR }; 433d4afb5ceSopenharmony_ci unsigned int i, suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]); 434d4afb5ceSopenharmony_ci 435d4afb5ceSopenharmony_ci if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow) 436d4afb5ceSopenharmony_ci return 0; 437d4afb5ceSopenharmony_ci 438d4afb5ceSopenharmony_ci if (h->u.mqtt.shadow_sub.num_topics > 0) 439d4afb5ceSopenharmony_ci secstream_mqtt_shadow_cleanup(wsi); 440d4afb5ceSopenharmony_ci 441d4afb5ceSopenharmony_ci memset(&h->u.mqtt.shadow_sub, 0, sizeof(lws_mqtt_subscribe_param_t)); 442d4afb5ceSopenharmony_ci h->u.mqtt.shadow_sub.topic = lws_malloc( 443d4afb5ceSopenharmony_ci sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__); 444d4afb5ceSopenharmony_ci if (!h->u.mqtt.shadow_sub.topic) { 445d4afb5ceSopenharmony_ci lwsl_ss_err(h, "Failed to allocate Shadow topics"); 446d4afb5ceSopenharmony_ci return -1; 447d4afb5ceSopenharmony_ci } 448d4afb5ceSopenharmony_ci h->u.mqtt.shadow_sub.num_topics = suffixes_len; 449d4afb5ceSopenharmony_ci for (i = 0; i < suffixes_len; i++) { 450d4afb5ceSopenharmony_ci expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i], 451d4afb5ceSopenharmony_ci LWS_MQTT_MAX_AWSIOT_TOPICLEN); 452d4afb5ceSopenharmony_ci if (!expbuf) { 453d4afb5ceSopenharmony_ci lwsl_ss_err(h, "Failed to allocate Shadow topic"); 454d4afb5ceSopenharmony_ci secstream_mqtt_shadow_cleanup(wsi); 455d4afb5ceSopenharmony_ci 456d4afb5ceSopenharmony_ci return -1; 457d4afb5ceSopenharmony_ci } 458d4afb5ceSopenharmony_ci h->u.mqtt.shadow_sub.topic[i].name = expbuf; 459d4afb5ceSopenharmony_ci h->u.mqtt.shadow_sub.topic[i].qos = h->policy->u.mqtt.qos; 460d4afb5ceSopenharmony_ci } 461d4afb5ceSopenharmony_ci h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1); 462d4afb5ceSopenharmony_ci 463d4afb5ceSopenharmony_ci if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) { 464d4afb5ceSopenharmony_ci lwsl_wsi_notice(wsi, "Unable to subscribe Shadow topics"); 465d4afb5ceSopenharmony_ci 466d4afb5ceSopenharmony_ci return 0; 467d4afb5ceSopenharmony_ci } 468d4afb5ceSopenharmony_ci 469d4afb5ceSopenharmony_ci /* Expect a SUBACK */ 470d4afb5ceSopenharmony_ci if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { 471d4afb5ceSopenharmony_ci lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); 472d4afb5ceSopenharmony_ci return -1; 473d4afb5ceSopenharmony_ci } 474d4afb5ceSopenharmony_ci wsi->mqtt->inside_shadow = 1; 475d4afb5ceSopenharmony_ci 476d4afb5ceSopenharmony_ci return 0; 477d4afb5ceSopenharmony_ci} 478d4afb5ceSopenharmony_ci 479d4afb5ceSopenharmony_cistatic int 480d4afb5ceSopenharmony_cisecstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, 481d4afb5ceSopenharmony_ci void *in, size_t len) 482d4afb5ceSopenharmony_ci{ 483d4afb5ceSopenharmony_ci lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); 484d4afb5ceSopenharmony_ci size_t used_in = 0, used_out = 0, topic_len = 0; 485d4afb5ceSopenharmony_ci lws_mqtt_publish_param_t *pmqpp = NULL; 486d4afb5ceSopenharmony_ci lws_ss_state_return_t r = LWSSSSRET_OK; 487d4afb5ceSopenharmony_ci uint8_t buf[LWS_PRE + 1400]; 488d4afb5ceSopenharmony_ci size_t buflen = sizeof(buf) - LWS_PRE; 489d4afb5ceSopenharmony_ci lws_ss_metadata_t *omd = NULL; 490d4afb5ceSopenharmony_ci char *sub_topic = NULL; 491d4afb5ceSopenharmony_ci lws_strexp_t exp; 492d4afb5ceSopenharmony_ci int f = 0; 493d4afb5ceSopenharmony_ci 494d4afb5ceSopenharmony_ci memset(buf, 0, sizeof(buf)); 495d4afb5ceSopenharmony_ci memset(&exp, 0, sizeof(exp)); 496d4afb5ceSopenharmony_ci 497d4afb5ceSopenharmony_ci switch (reason) { 498d4afb5ceSopenharmony_ci 499d4afb5ceSopenharmony_ci /* because we are protocols[0] ... */ 500d4afb5ceSopenharmony_ci case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: 501d4afb5ceSopenharmony_ci lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__, 502d4afb5ceSopenharmony_ci in ? (char *)in : "(null)"); 503d4afb5ceSopenharmony_ci if (!h) 504d4afb5ceSopenharmony_ci break; 505d4afb5ceSopenharmony_ci 506d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CONMON) 507d4afb5ceSopenharmony_ci lws_conmon_ss_json(h); 508d4afb5ceSopenharmony_ci#endif 509d4afb5ceSopenharmony_ci 510d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE); 511d4afb5ceSopenharmony_ci h->wsi = NULL; 512d4afb5ceSopenharmony_ci 513d4afb5ceSopenharmony_ci secstream_mqtt_cleanup(h); 514d4afb5ceSopenharmony_ci 515d4afb5ceSopenharmony_ci if (r == LWSSSSRET_DESTROY_ME) 516d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 517d4afb5ceSopenharmony_ci 518d4afb5ceSopenharmony_ci r = lws_ss_backoff(h); 519d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 520d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 521d4afb5ceSopenharmony_ci 522d4afb5ceSopenharmony_ci break; 523d4afb5ceSopenharmony_ci 524d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_CLIENT_CLOSED: 525d4afb5ceSopenharmony_ci if (!h) 526d4afb5ceSopenharmony_ci break; 527d4afb5ceSopenharmony_ci lws_sul_cancel(&h->sul_timeout); 528d4afb5ceSopenharmony_ci#if defined(LWS_WITH_CONMON) 529d4afb5ceSopenharmony_ci lws_conmon_ss_json(h); 530d4afb5ceSopenharmony_ci#endif 531d4afb5ceSopenharmony_ci if (h->ss_dangling_connected) 532d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_DISCONNECTED); 533d4afb5ceSopenharmony_ci else 534d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE); 535d4afb5ceSopenharmony_ci if (h->wsi) 536d4afb5ceSopenharmony_ci lws_set_opaque_user_data(h->wsi, NULL); 537d4afb5ceSopenharmony_ci h->wsi = NULL; 538d4afb5ceSopenharmony_ci 539d4afb5ceSopenharmony_ci secstream_mqtt_cleanup(h); 540d4afb5ceSopenharmony_ci 541d4afb5ceSopenharmony_ci if (r) 542d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 543d4afb5ceSopenharmony_ci 544d4afb5ceSopenharmony_ci if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) && 545d4afb5ceSopenharmony_ci !h->txn_ok && !wsi->a.context->being_destroyed) { 546d4afb5ceSopenharmony_ci r = lws_ss_backoff(h); 547d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 548d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 549d4afb5ceSopenharmony_ci } 550d4afb5ceSopenharmony_ci break; 551d4afb5ceSopenharmony_ci 552d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED: 553d4afb5ceSopenharmony_ci /* 554d4afb5ceSopenharmony_ci * Make sure the handle wsi points to the stream wsi not the 555d4afb5ceSopenharmony_ci * original nwsi, in the case it was migrated 556d4afb5ceSopenharmony_ci */ 557d4afb5ceSopenharmony_ci h->wsi = wsi; 558d4afb5ceSopenharmony_ci h->retry = 0; 559d4afb5ceSopenharmony_ci h->seqstate = SSSEQ_CONNECTED; 560d4afb5ceSopenharmony_ci 561d4afb5ceSopenharmony_ci if (h->policy->u.mqtt.birth_topic && 562d4afb5ceSopenharmony_ci !wsi->mqtt->done_birth) { 563d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(wsi); 564d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) { 565d4afb5ceSopenharmony_ci if (w != wsi && 566d4afb5ceSopenharmony_ci (w->mqtt->done_birth || w->mqtt->inside_birth)) { 567d4afb5ceSopenharmony_ci /* 568d4afb5ceSopenharmony_ci * If any Birth was sent out or 569d4afb5ceSopenharmony_ci * is pending on other stream, 570d4afb5ceSopenharmony_ci * skip sending Birth. 571d4afb5ceSopenharmony_ci */ 572d4afb5ceSopenharmony_ci wsi->mqtt->done_birth = 1; 573d4afb5ceSopenharmony_ci break; 574d4afb5ceSopenharmony_ci } 575d4afb5ceSopenharmony_ci } lws_end_foreach_ll(w, mux.sibling_list); 576d4afb5ceSopenharmony_ci } 577d4afb5ceSopenharmony_ci 578d4afb5ceSopenharmony_ci if (!h->policy->u.mqtt.subscribe || 579d4afb5ceSopenharmony_ci !h->policy->u.mqtt.subscribe[0]) { 580d4afb5ceSopenharmony_ci /* 581d4afb5ceSopenharmony_ci * If subscribe is empty in the policy, then, 582d4afb5ceSopenharmony_ci * skip sending SUBSCRIBE and signal the user 583d4afb5ceSopenharmony_ci * application. 584d4afb5ceSopenharmony_ci */ 585d4afb5ceSopenharmony_ci wsi->mqtt->done_subscribe = 1; 586d4afb5ceSopenharmony_ci } else if (!h->policy->u.mqtt.clean_start && 587d4afb5ceSopenharmony_ci wsi->mqtt->session_resumed) { 588d4afb5ceSopenharmony_ci wsi->mqtt->inside_resume_session = 1; 589d4afb5ceSopenharmony_ci /* 590d4afb5ceSopenharmony_ci * If the previous session is resumed and Server has 591d4afb5ceSopenharmony_ci * stored session, then, do not subscribe. 592d4afb5ceSopenharmony_ci */ 593d4afb5ceSopenharmony_ci if (!secstream_mqtt_subscribe(wsi)) 594d4afb5ceSopenharmony_ci wsi->mqtt->done_subscribe = 1; 595d4afb5ceSopenharmony_ci wsi->mqtt->inside_resume_session = 0; 596d4afb5ceSopenharmony_ci } else if (h->policy->u.mqtt.subscribe && 597d4afb5ceSopenharmony_ci !wsi->mqtt->done_subscribe) { 598d4afb5ceSopenharmony_ci /* 599d4afb5ceSopenharmony_ci * If a subscribe is pending on the stream, then make 600d4afb5ceSopenharmony_ci * sure the SUBSCRIBE is done before signaling the 601d4afb5ceSopenharmony_ci * user application. 602d4afb5ceSopenharmony_ci */ 603d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 604d4afb5ceSopenharmony_ci break; 605d4afb5ceSopenharmony_ci } 606d4afb5ceSopenharmony_ci 607d4afb5ceSopenharmony_ci if (h->policy->u.mqtt.birth_topic && 608d4afb5ceSopenharmony_ci !wsi->mqtt->done_birth) { 609d4afb5ceSopenharmony_ci /* 610d4afb5ceSopenharmony_ci * If a Birth is pending on the stream, then make 611d4afb5ceSopenharmony_ci * sure the Birth is done before signaling the 612d4afb5ceSopenharmony_ci * user application. 613d4afb5ceSopenharmony_ci */ 614d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 615d4afb5ceSopenharmony_ci break; 616d4afb5ceSopenharmony_ci } 617d4afb5ceSopenharmony_ci lws_sul_cancel(&h->sul); 618d4afb5ceSopenharmony_ci#if defined(LWS_WITH_SYS_METRICS) 619d4afb5ceSopenharmony_ci /* 620d4afb5ceSopenharmony_ci * If any hanging caliper measurement, dump it, and free any tags 621d4afb5ceSopenharmony_ci */ 622d4afb5ceSopenharmony_ci lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL); 623d4afb5ceSopenharmony_ci#endif 624d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); 625d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 626d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 627d4afb5ceSopenharmony_ci if (h->policy->u.mqtt.topic) 628d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 629d4afb5ceSopenharmony_ci break; 630d4afb5ceSopenharmony_ci 631d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_CLIENT_RX: 632d4afb5ceSopenharmony_ci // lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len); 633d4afb5ceSopenharmony_ci if (!h || !h->info.rx) 634d4afb5ceSopenharmony_ci return 0; 635d4afb5ceSopenharmony_ci 636d4afb5ceSopenharmony_ci pmqpp = (lws_mqtt_publish_param_t *)in; 637d4afb5ceSopenharmony_ci 638d4afb5ceSopenharmony_ci f = 0; 639d4afb5ceSopenharmony_ci if (!pmqpp->payload_pos) 640d4afb5ceSopenharmony_ci f |= LWSSS_FLAG_SOM; 641d4afb5ceSopenharmony_ci if (pmqpp->payload_pos + len == pmqpp->payload_len) 642d4afb5ceSopenharmony_ci f |= LWSSS_FLAG_EOM; 643d4afb5ceSopenharmony_ci 644d4afb5ceSopenharmony_ci h->subseq = 1; 645d4afb5ceSopenharmony_ci 646d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) { 647d4afb5ceSopenharmony_ci /* 648d4afb5ceSopenharmony_ci * When Shadow is used, the stream receives multiple 649d4afb5ceSopenharmony_ci * topics including Shadow response, set received 650d4afb5ceSopenharmony_ci * topic on the metadata 651d4afb5ceSopenharmony_ci */ 652d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, 653d4afb5ceSopenharmony_ci NULL, (size_t)-1); 654d4afb5ceSopenharmony_ci 655d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, 656d4afb5ceSopenharmony_ci strlen(h->policy->u.mqtt.subscribe), 657d4afb5ceSopenharmony_ci &used_in, &used_out) != LSTRX_DONE) { 658d4afb5ceSopenharmony_ci lwsl_err("%s, failed to expand subscribe topic", 659d4afb5ceSopenharmony_ci __func__); 660d4afb5ceSopenharmony_ci return -1; 661d4afb5ceSopenharmony_ci } 662d4afb5ceSopenharmony_ci omd = lws_ss_get_handle_metadata(h, exp.name); 663d4afb5ceSopenharmony_ci 664d4afb5ceSopenharmony_ci if (!omd) { 665d4afb5ceSopenharmony_ci lwsl_err("%s, failed to find metadata for subscribe", 666d4afb5ceSopenharmony_ci __func__); 667d4afb5ceSopenharmony_ci return -1; 668d4afb5ceSopenharmony_ci } 669d4afb5ceSopenharmony_ci sub_topic = omd->value__may_own_heap; 670d4afb5ceSopenharmony_ci topic_len = omd->length; 671d4afb5ceSopenharmony_ci 672d4afb5ceSopenharmony_ci _lws_ss_set_metadata(omd, exp.name, 673d4afb5ceSopenharmony_ci (const void *)pmqpp->topic, 674d4afb5ceSopenharmony_ci pmqpp->topic_len); 675d4afb5ceSopenharmony_ci } 676d4afb5ceSopenharmony_ci 677d4afb5ceSopenharmony_ci r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload, 678d4afb5ceSopenharmony_ci len, f); 679d4afb5ceSopenharmony_ci 680d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) 681d4afb5ceSopenharmony_ci _lws_ss_set_metadata(omd, exp.name, &sub_topic, 682d4afb5ceSopenharmony_ci topic_len); 683d4afb5ceSopenharmony_ci 684d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 685d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 686d4afb5ceSopenharmony_ci 687d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) { 688d4afb5ceSopenharmony_ci size_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR); 689d4afb5ceSopenharmony_ci size_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR); 690d4afb5ceSopenharmony_ci uint32_t i; 691d4afb5ceSopenharmony_ci 692d4afb5ceSopenharmony_ci for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) { 693d4afb5ceSopenharmony_ci /* 694d4afb5ceSopenharmony_ci * received response ('/accepted' or 'rejected') 695d4afb5ceSopenharmony_ci * and clean up Shadow operation 696d4afb5ceSopenharmony_ci */ 697d4afb5ceSopenharmony_ci if (strncmp(h->u.mqtt.shadow_sub.topic[i].name, 698d4afb5ceSopenharmony_ci pmqpp->topic, pmqpp->topic_len) || 699d4afb5ceSopenharmony_ci (strlen(pmqpp->topic) < acc_n || 700d4afb5ceSopenharmony_ci strlen(pmqpp->topic) < rej_n)) 701d4afb5ceSopenharmony_ci continue; 702d4afb5ceSopenharmony_ci 703d4afb5ceSopenharmony_ci if (!strcmp(pmqpp->topic + 704d4afb5ceSopenharmony_ci (strlen(pmqpp->topic) - acc_n), 705d4afb5ceSopenharmony_ci LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) || 706d4afb5ceSopenharmony_ci !strcmp(pmqpp->topic + 707d4afb5ceSopenharmony_ci (strlen(pmqpp->topic) - rej_n), 708d4afb5ceSopenharmony_ci LWS_MQTT_SHADOW_RESP_REJECTED_STR)) { 709d4afb5ceSopenharmony_ci lws_sul_cancel(&wsi->mqtt->sul_shadow_wait); 710d4afb5ceSopenharmony_ci wsi->mqtt->send_shadow_unsubscribe = 1; 711d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 712d4afb5ceSopenharmony_ci 713d4afb5ceSopenharmony_ci return 0; 714d4afb5ceSopenharmony_ci } 715d4afb5ceSopenharmony_ci } 716d4afb5ceSopenharmony_ci } 717d4afb5ceSopenharmony_ci return 0; /* don't passthru */ 718d4afb5ceSopenharmony_ci 719d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_SUBSCRIBED: 720d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) { 721d4afb5ceSopenharmony_ci wsi->mqtt->done_shadow_subscribe = 1; 722d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 723d4afb5ceSopenharmony_ci 724d4afb5ceSopenharmony_ci return 0; 725d4afb5ceSopenharmony_ci } 726d4afb5ceSopenharmony_ci /* 727d4afb5ceSopenharmony_ci * Stream demanded a subscribe without a Birth while connecting, once 728d4afb5ceSopenharmony_ci * done notify CONNECTED event to the application. 729d4afb5ceSopenharmony_ci */ 730d4afb5ceSopenharmony_ci if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) { 731d4afb5ceSopenharmony_ci lws_sul_cancel(&h->sul); 732d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); 733d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 734d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 735d4afb5ceSopenharmony_ci } 736d4afb5ceSopenharmony_ci wsi->mqtt->done_subscribe = 1; 737d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 738d4afb5ceSopenharmony_ci break; 739d4afb5ceSopenharmony_ci 740d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_ACK: 741d4afb5ceSopenharmony_ci lws_sul_cancel(&h->sul_timeout); 742d4afb5ceSopenharmony_ci if (h->u.mqtt.send_unacked) { 743d4afb5ceSopenharmony_ci lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 744d4afb5ceSopenharmony_ci h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; 745d4afb5ceSopenharmony_ci } 746d4afb5ceSopenharmony_ci 747d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_birth) { 748d4afb5ceSopenharmony_ci /* 749d4afb5ceSopenharmony_ci * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify 750d4afb5ceSopenharmony_ci * CONNECTED event to the application. 751d4afb5ceSopenharmony_ci */ 752d4afb5ceSopenharmony_ci wsi->mqtt->inside_birth = 0; 753d4afb5ceSopenharmony_ci wsi->mqtt->done_birth = 1; 754d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); 755d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 756d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 757d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 758d4afb5ceSopenharmony_ci break; 759d4afb5ceSopenharmony_ci } 760d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE); 761d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 762d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 763d4afb5ceSopenharmony_ci break; 764d4afb5ceSopenharmony_ci 765d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_RESEND: 766d4afb5ceSopenharmony_ci lws_sul_cancel(&h->sul_timeout); 767d4afb5ceSopenharmony_ci if (h->u.mqtt.retry_count++ < LWS_MQTT_MAX_PUBLISH_RETRY) { 768d4afb5ceSopenharmony_ci h->u.mqtt.unacked_size = 769d4afb5ceSopenharmony_ci (uint32_t)lws_buflist_total_len(&h->u.mqtt.buflist_unacked); 770d4afb5ceSopenharmony_ci if (h->u.mqtt.unacked_size) { 771d4afb5ceSopenharmony_ci lwsl_notice("%s: %s: resend unacked message (%d/%d) \n", 772d4afb5ceSopenharmony_ci __func__, lws_ss_tag(h), 773d4afb5ceSopenharmony_ci h->u.mqtt.retry_count, 774d4afb5ceSopenharmony_ci LWS_MQTT_MAX_PUBLISH_RETRY); 775d4afb5ceSopenharmony_ci h->u.mqtt.send_unacked = 1; 776d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 777d4afb5ceSopenharmony_ci break; 778d4afb5ceSopenharmony_ci } 779d4afb5ceSopenharmony_ci } 780d4afb5ceSopenharmony_ci 781d4afb5ceSopenharmony_ci lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); 782d4afb5ceSopenharmony_ci h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; 783d4afb5ceSopenharmony_ci 784d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_birth) { 785d4afb5ceSopenharmony_ci lwsl_err("%s: %s: failed to send Birth\n", __func__, 786d4afb5ceSopenharmony_ci lws_ss_tag(h)); 787d4afb5ceSopenharmony_ci return -1; 788d4afb5ceSopenharmony_ci } 789d4afb5ceSopenharmony_ci 790d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); 791d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 792d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 793d4afb5ceSopenharmony_ci break; 794d4afb5ceSopenharmony_ci 795d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: 796d4afb5ceSopenharmony_ci { 797d4afb5ceSopenharmony_ci if (!h || !h->info.tx) 798d4afb5ceSopenharmony_ci return 0; 799d4afb5ceSopenharmony_ci lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h)); 800d4afb5ceSopenharmony_ci 801d4afb5ceSopenharmony_ci if (h->seqstate != SSSEQ_CONNECTED) { 802d4afb5ceSopenharmony_ci lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate); 803d4afb5ceSopenharmony_ci break; 804d4afb5ceSopenharmony_ci } 805d4afb5ceSopenharmony_ci 806d4afb5ceSopenharmony_ci if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) 807d4afb5ceSopenharmony_ci return secstream_mqtt_subscribe(wsi); 808d4afb5ceSopenharmony_ci 809d4afb5ceSopenharmony_ci if (h->u.mqtt.send_unacked) 810d4afb5ceSopenharmony_ci return secstream_mqtt_resend(wsi, buf + LWS_PRE); 811d4afb5ceSopenharmony_ci 812d4afb5ceSopenharmony_ci if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) 813d4afb5ceSopenharmony_ci return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen); 814d4afb5ceSopenharmony_ci 815d4afb5ceSopenharmony_ci if (h->policy->u.mqtt.aws_iot) { 816d4afb5ceSopenharmony_ci if (secstream_mqtt_is_shadow_matched(wsi, 817d4afb5ceSopenharmony_ci h->policy->u.mqtt.topic) == LMMTR_TOPIC_MATCH) { 818d4afb5ceSopenharmony_ci if (!wsi->mqtt->done_shadow_subscribe) 819d4afb5ceSopenharmony_ci return secstream_mqtt_shadow_subscribe(wsi); 820d4afb5ceSopenharmony_ci if (wsi->mqtt->send_shadow_unsubscribe) 821d4afb5ceSopenharmony_ci return secstream_mqtt_shadow_unsubscribe(wsi); 822d4afb5ceSopenharmony_ci } 823d4afb5ceSopenharmony_ci } 824d4afb5ceSopenharmony_ci 825d4afb5ceSopenharmony_ci r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, 826d4afb5ceSopenharmony_ci &buflen, &f); 827d4afb5ceSopenharmony_ci 828d4afb5ceSopenharmony_ci if (r == LWSSSSRET_TX_DONT_SEND) { 829d4afb5ceSopenharmony_ci if (wsi->mqtt->done_shadow_subscribe) { 830d4afb5ceSopenharmony_ci return secstream_mqtt_shadow_unsubscribe(wsi); 831d4afb5ceSopenharmony_ci } 832d4afb5ceSopenharmony_ci return 0; 833d4afb5ceSopenharmony_ci } 834d4afb5ceSopenharmony_ci 835d4afb5ceSopenharmony_ci if (r == LWSSSSRET_DISCONNECT_ME) { 836d4afb5ceSopenharmony_ci lws_mqtt_subscribe_param_t lmsp; 837d4afb5ceSopenharmony_ci if (h->u.mqtt.sub_info.num_topics) { 838d4afb5ceSopenharmony_ci lmsp.num_topics = h->u.mqtt.sub_info.num_topics; 839d4afb5ceSopenharmony_ci lmsp.topic = h->u.mqtt.sub_info.topic; 840d4afb5ceSopenharmony_ci lmsp.packet_id = (uint16_t)(h->txord - 1); 841d4afb5ceSopenharmony_ci if (lws_mqtt_client_send_unsubcribe(wsi, 842d4afb5ceSopenharmony_ci &lmsp)) { 843d4afb5ceSopenharmony_ci lwsl_err("%s, failed to send" 844d4afb5ceSopenharmony_ci " MQTT unsubsribe", __func__); 845d4afb5ceSopenharmony_ci return -1; 846d4afb5ceSopenharmony_ci } 847d4afb5ceSopenharmony_ci return 0; 848d4afb5ceSopenharmony_ci } 849d4afb5ceSopenharmony_ci } 850d4afb5ceSopenharmony_ci 851d4afb5ceSopenharmony_ci if (r < 0) 852d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 853d4afb5ceSopenharmony_ci 854d4afb5ceSopenharmony_ci if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, 855d4afb5ceSopenharmony_ci (uint32_t)h->writeable_len, 856d4afb5ceSopenharmony_ci h->policy->u.mqtt.topic, 857d4afb5ceSopenharmony_ci h->policy->u.mqtt.qos, 858d4afb5ceSopenharmony_ci h->policy->u.mqtt.retain, 0, f) != 0) { 859d4afb5ceSopenharmony_ci r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); 860d4afb5ceSopenharmony_ci if (r != LWSSSSRET_OK) 861d4afb5ceSopenharmony_ci return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); 862d4afb5ceSopenharmony_ci } 863d4afb5ceSopenharmony_ci return 0; 864d4afb5ceSopenharmony_ci } 865d4afb5ceSopenharmony_ci 866d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_UNSUBSCRIBED: 867d4afb5ceSopenharmony_ci { 868d4afb5ceSopenharmony_ci struct lws *nwsi = lws_get_network_wsi(wsi); 869d4afb5ceSopenharmony_ci 870d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) { 871d4afb5ceSopenharmony_ci secstream_mqtt_shadow_cleanup(wsi); 872d4afb5ceSopenharmony_ci wsi->mqtt->inside_shadow = 0; 873d4afb5ceSopenharmony_ci wsi->mqtt->done_shadow_subscribe = 0; 874d4afb5ceSopenharmony_ci break; 875d4afb5ceSopenharmony_ci } 876d4afb5ceSopenharmony_ci if (nwsi && (nwsi->mux.child_count == 1)) 877d4afb5ceSopenharmony_ci lws_mqtt_client_send_disconnect(nwsi); 878d4afb5ceSopenharmony_ci return -1; 879d4afb5ceSopenharmony_ci } 880d4afb5ceSopenharmony_ci 881d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT: 882d4afb5ceSopenharmony_ci if (!wsi->mqtt) 883d4afb5ceSopenharmony_ci return -1; 884d4afb5ceSopenharmony_ci 885d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) { 886d4afb5ceSopenharmony_ci secstream_mqtt_shadow_cleanup(wsi); 887d4afb5ceSopenharmony_ci wsi->mqtt->inside_shadow = 0; 888d4afb5ceSopenharmony_ci wsi->mqtt->done_shadow_subscribe = 0; 889d4afb5ceSopenharmony_ci lwsl_warn("%s: %s: Unsubscribe (Shadow) timeout.\n", 890d4afb5ceSopenharmony_ci __func__, lws_ss_tag(h)); 891d4afb5ceSopenharmony_ci break; 892d4afb5ceSopenharmony_ci } 893d4afb5ceSopenharmony_ci 894d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_unsubscribe) { 895d4afb5ceSopenharmony_ci lwsl_warn("%s: %s: Unsubscribe timeout.\n", __func__, 896d4afb5ceSopenharmony_ci lws_ss_tag(h)); 897d4afb5ceSopenharmony_ci return -1; 898d4afb5ceSopenharmony_ci } 899d4afb5ceSopenharmony_ci break; 900d4afb5ceSopenharmony_ci 901d4afb5ceSopenharmony_ci case LWS_CALLBACK_MQTT_SHADOW_TIMEOUT: 902d4afb5ceSopenharmony_ci if (!wsi->mqtt) 903d4afb5ceSopenharmony_ci return -1; 904d4afb5ceSopenharmony_ci 905d4afb5ceSopenharmony_ci if (wsi->mqtt->inside_shadow) { 906d4afb5ceSopenharmony_ci lwsl_warn("%s: %s: Shadow timeout.\n", __func__, 907d4afb5ceSopenharmony_ci lws_ss_tag(h)); 908d4afb5ceSopenharmony_ci wsi->mqtt->send_shadow_unsubscribe = 1; 909d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 910d4afb5ceSopenharmony_ci } 911d4afb5ceSopenharmony_ci break; 912d4afb5ceSopenharmony_ci 913d4afb5ceSopenharmony_ci default: 914d4afb5ceSopenharmony_ci break; 915d4afb5ceSopenharmony_ci } 916d4afb5ceSopenharmony_ci 917d4afb5ceSopenharmony_ci return lws_callback_http_dummy(wsi, reason, user, in, len); 918d4afb5ceSopenharmony_ci} 919d4afb5ceSopenharmony_ci 920d4afb5ceSopenharmony_ciconst struct lws_protocols protocol_secstream_mqtt = { 921d4afb5ceSopenharmony_ci "lws-secstream-mqtt", 922d4afb5ceSopenharmony_ci secstream_mqtt, 923d4afb5ceSopenharmony_ci 0, 0, 0, NULL, 0 924d4afb5ceSopenharmony_ci}; 925d4afb5ceSopenharmony_ci/* 926d4afb5ceSopenharmony_ci * Munge connect info according to protocol-specific considerations... this 927d4afb5ceSopenharmony_ci * usually means interpreting aux in a protocol-specific way and using the 928d4afb5ceSopenharmony_ci * pieces at connection setup time, eg, http url pieces. 929d4afb5ceSopenharmony_ci * 930d4afb5ceSopenharmony_ci * len bytes of buf can be used for things with scope until after the actual 931d4afb5ceSopenharmony_ci * connect. 932d4afb5ceSopenharmony_ci * 933d4afb5ceSopenharmony_ci * For ws, protocol aux is <url path>;<ws subprotocol name> 934d4afb5ceSopenharmony_ci */ 935d4afb5ceSopenharmony_ci 936d4afb5ceSopenharmony_cienum { 937d4afb5ceSopenharmony_ci SSCMM_STRSUB_WILL_TOPIC, 938d4afb5ceSopenharmony_ci SSCMM_STRSUB_WILL_MESSAGE, 939d4afb5ceSopenharmony_ci SSCMM_STRSUB_SUBSCRIBE, 940d4afb5ceSopenharmony_ci SSCMM_STRSUB_TOPIC, 941d4afb5ceSopenharmony_ci SSCMM_STRSUB_BIRTH_TOPIC, 942d4afb5ceSopenharmony_ci SSCMM_STRSUB_BIRTH_MESSAGE 943d4afb5ceSopenharmony_ci}; 944d4afb5ceSopenharmony_ci 945d4afb5ceSopenharmony_cistatic int 946d4afb5ceSopenharmony_cisecstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, 947d4afb5ceSopenharmony_ci struct lws_client_connect_info *i, 948d4afb5ceSopenharmony_ci union lws_ss_contemp *ct) 949d4afb5ceSopenharmony_ci{ 950d4afb5ceSopenharmony_ci const char *sources[6] = { 951d4afb5ceSopenharmony_ci /* we're going to string-substitute these before use */ 952d4afb5ceSopenharmony_ci h->policy->u.mqtt.will_topic, 953d4afb5ceSopenharmony_ci h->policy->u.mqtt.will_message, 954d4afb5ceSopenharmony_ci h->policy->u.mqtt.subscribe, 955d4afb5ceSopenharmony_ci h->policy->u.mqtt.topic, 956d4afb5ceSopenharmony_ci h->policy->u.mqtt.birth_topic, 957d4afb5ceSopenharmony_ci h->policy->u.mqtt.birth_message 958d4afb5ceSopenharmony_ci }; 959d4afb5ceSopenharmony_ci size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0; 960d4afb5ceSopenharmony_ci lws_strexp_t exp; 961d4afb5ceSopenharmony_ci char *ps[6]; 962d4afb5ceSopenharmony_ci uint8_t *p = NULL; 963d4afb5ceSopenharmony_ci int n = -1; 964d4afb5ceSopenharmony_ci size_t blen; 965d4afb5ceSopenharmony_ci lws_system_blob_t *b = NULL; 966d4afb5ceSopenharmony_ci 967d4afb5ceSopenharmony_ci memset(&ct->ccp, 0, sizeof(ct->ccp)); 968d4afb5ceSopenharmony_ci b = lws_system_get_blob(i->context, 969d4afb5ceSopenharmony_ci LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID, 0); 970d4afb5ceSopenharmony_ci 971d4afb5ceSopenharmony_ci /* If LWS_SYSBLOB_TYPE_MQTT_CLIENT_ID is set */ 972d4afb5ceSopenharmony_ci if (b && (blen = lws_system_blob_get_size(b))) { 973d4afb5ceSopenharmony_ci if (blen > LWS_MQTT_MAX_CIDLEN) { 974d4afb5ceSopenharmony_ci lwsl_err("%s - Client ID too long.\n", 975d4afb5ceSopenharmony_ci __func__); 976d4afb5ceSopenharmony_ci return -1; 977d4afb5ceSopenharmony_ci } 978d4afb5ceSopenharmony_ci p = (uint8_t *)lws_zalloc(blen+1, __func__); 979d4afb5ceSopenharmony_ci if (!p) 980d4afb5ceSopenharmony_ci return -1; 981d4afb5ceSopenharmony_ci n = lws_system_blob_get(b, p, &blen, 0); 982d4afb5ceSopenharmony_ci if (n) { 983d4afb5ceSopenharmony_ci ct->ccp.client_id = NULL; 984d4afb5ceSopenharmony_ci } else { 985d4afb5ceSopenharmony_ci ct->ccp.client_id = (const char *)p; 986d4afb5ceSopenharmony_ci lwsl_notice("%s - Client ID = %s\n", 987d4afb5ceSopenharmony_ci __func__, ct->ccp.client_id); 988d4afb5ceSopenharmony_ci } 989d4afb5ceSopenharmony_ci } else { 990d4afb5ceSopenharmony_ci /* Default (Random) client ID */ 991d4afb5ceSopenharmony_ci ct->ccp.client_id = NULL; 992d4afb5ceSopenharmony_ci } 993d4afb5ceSopenharmony_ci 994d4afb5ceSopenharmony_ci b = lws_system_get_blob(i->context, 995d4afb5ceSopenharmony_ci LWS_SYSBLOB_TYPE_MQTT_USERNAME, 0); 996d4afb5ceSopenharmony_ci 997d4afb5ceSopenharmony_ci /* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */ 998d4afb5ceSopenharmony_ci if (b && (blen = lws_system_blob_get_size(b))) { 999d4afb5ceSopenharmony_ci p = (uint8_t *)lws_zalloc(blen+1, __func__); 1000d4afb5ceSopenharmony_ci if (!p) 1001d4afb5ceSopenharmony_ci return -1; 1002d4afb5ceSopenharmony_ci n = lws_system_blob_get(b, p, &blen, 0); 1003d4afb5ceSopenharmony_ci if (n) { 1004d4afb5ceSopenharmony_ci ct->ccp.username = NULL; 1005d4afb5ceSopenharmony_ci } else { 1006d4afb5ceSopenharmony_ci ct->ccp.username = (const char *)p; 1007d4afb5ceSopenharmony_ci lwsl_notice("%s - Username ID = %s\n", 1008d4afb5ceSopenharmony_ci __func__, ct->ccp.username); 1009d4afb5ceSopenharmony_ci } 1010d4afb5ceSopenharmony_ci } 1011d4afb5ceSopenharmony_ci 1012d4afb5ceSopenharmony_ci b = lws_system_get_blob(i->context, 1013d4afb5ceSopenharmony_ci LWS_SYSBLOB_TYPE_MQTT_PASSWORD, 0); 1014d4afb5ceSopenharmony_ci 1015d4afb5ceSopenharmony_ci /* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */ 1016d4afb5ceSopenharmony_ci if (b && (blen = lws_system_blob_get_size(b))) { 1017d4afb5ceSopenharmony_ci p = (uint8_t *)lws_zalloc(blen+1, __func__); 1018d4afb5ceSopenharmony_ci if (!p) 1019d4afb5ceSopenharmony_ci return -1; 1020d4afb5ceSopenharmony_ci n = lws_system_blob_get(b, p, &blen, 0); 1021d4afb5ceSopenharmony_ci if (n) { 1022d4afb5ceSopenharmony_ci ct->ccp.password = NULL; 1023d4afb5ceSopenharmony_ci } else { 1024d4afb5ceSopenharmony_ci ct->ccp.password = (const char *)p; 1025d4afb5ceSopenharmony_ci lwsl_notice("%s - Password ID = %s\n", 1026d4afb5ceSopenharmony_ci __func__, ct->ccp.password); 1027d4afb5ceSopenharmony_ci } 1028d4afb5ceSopenharmony_ci } 1029d4afb5ceSopenharmony_ci 1030d4afb5ceSopenharmony_ci ct->ccp.keep_alive = h->policy->u.mqtt.keep_alive; 1031d4afb5ceSopenharmony_ci ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u); 1032d4afb5ceSopenharmony_ci ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos; 1033d4afb5ceSopenharmony_ci ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain; 1034d4afb5ceSopenharmony_ci ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos; 1035d4afb5ceSopenharmony_ci ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain; 1036d4afb5ceSopenharmony_ci ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot; 1037d4afb5ceSopenharmony_ci h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos; 1038d4afb5ceSopenharmony_ci 1039d4afb5ceSopenharmony_ci /* 1040d4afb5ceSopenharmony_ci * We're going to string-substitute several of these parameters, which 1041d4afb5ceSopenharmony_ci * have unknown, possibly large size. And, as their usage is deferred 1042d4afb5ceSopenharmony_ci * inside the asynchronous lifetime of the MQTT connection, they need 1043d4afb5ceSopenharmony_ci * to live on the heap. 1044d4afb5ceSopenharmony_ci * 1045d4afb5ceSopenharmony_ci * Notice these allocations at h->u.mqtt.heap_baggage belong to the 1046d4afb5ceSopenharmony_ci * underlying MQTT stream lifetime, not the logical SS lifetime, and 1047d4afb5ceSopenharmony_ci * are destroyed if present at connection error or close of the 1048d4afb5ceSopenharmony_ci * underlying connection. 1049d4afb5ceSopenharmony_ci * 1050d4afb5ceSopenharmony_ci * 1051d4afb5ceSopenharmony_ci * First, compute the length of each without producing strsubst output, 1052d4afb5ceSopenharmony_ci * and keep a running total. 1053d4afb5ceSopenharmony_ci */ 1054d4afb5ceSopenharmony_ci 1055d4afb5ceSopenharmony_ci for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) { 1056d4afb5ceSopenharmony_ci if (!sources[n]) 1057d4afb5ceSopenharmony_ci continue; 1058d4afb5ceSopenharmony_ci 1059d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, 1060d4afb5ceSopenharmony_ci NULL, (size_t)-1); 1061d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]), 1062d4afb5ceSopenharmony_ci &used_in, &olen[n]) != LSTRX_DONE) { 1063d4afb5ceSopenharmony_ci lwsl_err("%s: failed to subsitute %s\n", __func__, 1064d4afb5ceSopenharmony_ci sources[n]); 1065d4afb5ceSopenharmony_ci return 1; 1066d4afb5ceSopenharmony_ci } 1067d4afb5ceSopenharmony_ci tot += olen[n] + 1; 1068d4afb5ceSopenharmony_ci } 1069d4afb5ceSopenharmony_ci 1070d4afb5ceSopenharmony_ci /* 1071d4afb5ceSopenharmony_ci * Then, allocate enough space on the heap for the total of the 1072d4afb5ceSopenharmony_ci * substituted results 1073d4afb5ceSopenharmony_ci */ 1074d4afb5ceSopenharmony_ci 1075d4afb5ceSopenharmony_ci h->u.mqtt.heap_baggage = lws_malloc(tot, __func__); 1076d4afb5ceSopenharmony_ci if (!h->u.mqtt.heap_baggage) 1077d4afb5ceSopenharmony_ci return 1; 1078d4afb5ceSopenharmony_ci 1079d4afb5ceSopenharmony_ci /* 1080d4afb5ceSopenharmony_ci * Finally, issue the subsitutions one after the other into the single 1081d4afb5ceSopenharmony_ci * allocated result buffer and prepare pointers into them 1082d4afb5ceSopenharmony_ci */ 1083d4afb5ceSopenharmony_ci 1084d4afb5ceSopenharmony_ci p = h->u.mqtt.heap_baggage; 1085d4afb5ceSopenharmony_ci for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) { 1086d4afb5ceSopenharmony_ci lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, 1087d4afb5ceSopenharmony_ci (char *)p, (size_t)-1); 1088d4afb5ceSopenharmony_ci if (!sources[n]) { 1089d4afb5ceSopenharmony_ci ps[n] = NULL; 1090d4afb5ceSopenharmony_ci continue; 1091d4afb5ceSopenharmony_ci } 1092d4afb5ceSopenharmony_ci ps[n] = (char *)p; 1093d4afb5ceSopenharmony_ci if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]), 1094d4afb5ceSopenharmony_ci &used_in, &olen[n]) != LSTRX_DONE) 1095d4afb5ceSopenharmony_ci return 1; 1096d4afb5ceSopenharmony_ci 1097d4afb5ceSopenharmony_ci p += olen[n] + 1; 1098d4afb5ceSopenharmony_ci } 1099d4afb5ceSopenharmony_ci 1100d4afb5ceSopenharmony_ci /* 1101d4afb5ceSopenharmony_ci * Point the guys who want the substituted content at the substituted 1102d4afb5ceSopenharmony_ci * strings 1103d4afb5ceSopenharmony_ci */ 1104d4afb5ceSopenharmony_ci 1105d4afb5ceSopenharmony_ci ct->ccp.will_param.topic = ps[SSCMM_STRSUB_WILL_TOPIC]; 1106d4afb5ceSopenharmony_ci ct->ccp.will_param.message = ps[SSCMM_STRSUB_WILL_MESSAGE]; 1107d4afb5ceSopenharmony_ci h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE]; 1108d4afb5ceSopenharmony_ci h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE]; 1109d4afb5ceSopenharmony_ci h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC]; 1110d4afb5ceSopenharmony_ci ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC]; 1111d4afb5ceSopenharmony_ci ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE]; 1112d4afb5ceSopenharmony_ci 1113d4afb5ceSopenharmony_ci i->method = "MQTT"; 1114d4afb5ceSopenharmony_ci i->mqtt_cp = &ct->ccp; 1115d4afb5ceSopenharmony_ci 1116d4afb5ceSopenharmony_ci i->alpn = "x-amzn-mqtt-ca"; 1117d4afb5ceSopenharmony_ci 1118d4afb5ceSopenharmony_ci /* share connections where possible */ 1119d4afb5ceSopenharmony_ci i->ssl_connection |= LCCSCF_PIPELINE; 1120d4afb5ceSopenharmony_ci 1121d4afb5ceSopenharmony_ci return 0; 1122d4afb5ceSopenharmony_ci} 1123d4afb5ceSopenharmony_ci 1124d4afb5ceSopenharmony_ciconst struct ss_pcols ss_pcol_mqtt = { 1125d4afb5ceSopenharmony_ci "MQTT", 1126d4afb5ceSopenharmony_ci "x-amzn-mqtt-ca", //"mqtt/3.1.1", 1127d4afb5ceSopenharmony_ci &protocol_secstream_mqtt, 1128d4afb5ceSopenharmony_ci secstream_connect_munge_mqtt, 1129d4afb5ceSopenharmony_ci NULL, NULL 1130d4afb5ceSopenharmony_ci}; 1131