1d4afb5ceSopenharmony_ci/* 2d4afb5ceSopenharmony_ci * libwebsockets-test-server - libwebsockets test implementation 3d4afb5ceSopenharmony_ci * 4d4afb5ceSopenharmony_ci * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5d4afb5ceSopenharmony_ci * 6d4afb5ceSopenharmony_ci * This file is made available under the Creative Commons CC0 1.0 7d4afb5ceSopenharmony_ci * Universal Public Domain Dedication. 8d4afb5ceSopenharmony_ci * 9d4afb5ceSopenharmony_ci * The person who associated a work with this deed has dedicated 10d4afb5ceSopenharmony_ci * the work to the public domain by waiving all of his or her rights 11d4afb5ceSopenharmony_ci * to the work worldwide under copyright law, including all related 12d4afb5ceSopenharmony_ci * and neighboring rights, to the extent allowed by law. You can copy, 13d4afb5ceSopenharmony_ci * modify, distribute and perform the work, even for commercial purposes, 14d4afb5ceSopenharmony_ci * all without asking permission. 15d4afb5ceSopenharmony_ci * 16d4afb5ceSopenharmony_ci * The test apps are intended to be adapted for use in your code, which 17d4afb5ceSopenharmony_ci * may be proprietary. So unlike the library itself, they are licensed 18d4afb5ceSopenharmony_ci * Public Domain. 19d4afb5ceSopenharmony_ci * 20d4afb5ceSopenharmony_ci * Notice that the lws_pthread... locking apis are all zero-footprint 21d4afb5ceSopenharmony_ci * NOPs in the case LWS_MAX_SMP == 1, which is the default. When lws 22d4afb5ceSopenharmony_ci * is built for multiple service threads though, they resolve to their 23d4afb5ceSopenharmony_ci * pthreads equivalents. 24d4afb5ceSopenharmony_ci */ 25d4afb5ceSopenharmony_ci 26d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC) 27d4afb5ceSopenharmony_ci#if !defined(LWS_DLL) 28d4afb5ceSopenharmony_ci#define LWS_DLL 29d4afb5ceSopenharmony_ci#endif 30d4afb5ceSopenharmony_ci#if !defined(LWS_INTERNAL) 31d4afb5ceSopenharmony_ci#define LWS_INTERNAL 32d4afb5ceSopenharmony_ci#endif 33d4afb5ceSopenharmony_ci#include <libwebsockets.h> 34d4afb5ceSopenharmony_ci#endif 35d4afb5ceSopenharmony_ci 36d4afb5ceSopenharmony_ci#include <string.h> 37d4afb5ceSopenharmony_ci#include <stdlib.h> 38d4afb5ceSopenharmony_ci 39d4afb5ceSopenharmony_ci#define QUEUELEN 32 40d4afb5ceSopenharmony_ci/* queue free space below this, rx flow is disabled */ 41d4afb5ceSopenharmony_ci#define RXFLOW_MIN (4) 42d4afb5ceSopenharmony_ci/* queue free space above this, rx flow is enabled */ 43d4afb5ceSopenharmony_ci#define RXFLOW_MAX ((2 * QUEUELEN) / 3) 44d4afb5ceSopenharmony_ci 45d4afb5ceSopenharmony_ci#define MAX_MIRROR_INSTANCES 3 46d4afb5ceSopenharmony_ci 47d4afb5ceSopenharmony_cistruct mirror_instance; 48d4afb5ceSopenharmony_ci 49d4afb5ceSopenharmony_cistruct per_session_data__lws_mirror { 50d4afb5ceSopenharmony_ci struct lws *wsi; 51d4afb5ceSopenharmony_ci struct mirror_instance *mi; 52d4afb5ceSopenharmony_ci struct per_session_data__lws_mirror *same_mi_pss_list; 53d4afb5ceSopenharmony_ci uint32_t tail; 54d4afb5ceSopenharmony_ci}; 55d4afb5ceSopenharmony_ci 56d4afb5ceSopenharmony_ci/* this is the element in the ring */ 57d4afb5ceSopenharmony_cistruct a_message { 58d4afb5ceSopenharmony_ci void *payload; 59d4afb5ceSopenharmony_ci size_t len; 60d4afb5ceSopenharmony_ci}; 61d4afb5ceSopenharmony_ci 62d4afb5ceSopenharmony_cistruct mirror_instance { 63d4afb5ceSopenharmony_ci struct mirror_instance *next; 64d4afb5ceSopenharmony_ci lws_pthread_mutex(lock) /* protects all mirror instance data */ 65d4afb5ceSopenharmony_ci struct per_session_data__lws_mirror *same_mi_pss_list; 66d4afb5ceSopenharmony_ci /**< must hold the the per_vhost_data__lws_mirror.lock as well 67d4afb5ceSopenharmony_ci * to change mi list membership */ 68d4afb5ceSopenharmony_ci struct lws_ring *ring; 69d4afb5ceSopenharmony_ci int messages_allocated; 70d4afb5ceSopenharmony_ci char name[30]; 71d4afb5ceSopenharmony_ci char rx_enabled; 72d4afb5ceSopenharmony_ci}; 73d4afb5ceSopenharmony_ci 74d4afb5ceSopenharmony_cistruct per_vhost_data__lws_mirror { 75d4afb5ceSopenharmony_ci lws_pthread_mutex(lock) /* protects mi_list membership changes */ 76d4afb5ceSopenharmony_ci struct mirror_instance *mi_list; 77d4afb5ceSopenharmony_ci}; 78d4afb5ceSopenharmony_ci 79d4afb5ceSopenharmony_ci 80d4afb5ceSopenharmony_ci/* enable or disable rx from all connections to this mirror instance */ 81d4afb5ceSopenharmony_cistatic void 82d4afb5ceSopenharmony_ci__mirror_rxflow_instance(struct mirror_instance *mi, int enable) 83d4afb5ceSopenharmony_ci{ 84d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct per_session_data__lws_mirror *, 85d4afb5ceSopenharmony_ci pss, mi->same_mi_pss_list) { 86d4afb5ceSopenharmony_ci lws_rx_flow_control(pss->wsi, enable); 87d4afb5ceSopenharmony_ci } lws_end_foreach_ll(pss, same_mi_pss_list); 88d4afb5ceSopenharmony_ci 89d4afb5ceSopenharmony_ci mi->rx_enabled = (char)enable; 90d4afb5ceSopenharmony_ci} 91d4afb5ceSopenharmony_ci 92d4afb5ceSopenharmony_ci/* 93d4afb5ceSopenharmony_ci * Find out which connection to this mirror instance has the longest number 94d4afb5ceSopenharmony_ci * of still unread elements in the ringbuffer and update the lws_ring "oldest 95d4afb5ceSopenharmony_ci * tail" with it. Elements behind the "oldest tail" are freed and recycled for 96d4afb5ceSopenharmony_ci * new head content. Elements after the "oldest tail" are still waiting to be 97d4afb5ceSopenharmony_ci * read by somebody. 98d4afb5ceSopenharmony_ci * 99d4afb5ceSopenharmony_ci * If the oldest tail moved on from before, check if it created enough space 100d4afb5ceSopenharmony_ci * in the queue to re-enable RX flow control for the mirror instance. 101d4afb5ceSopenharmony_ci * 102d4afb5ceSopenharmony_ci * Mark connections that are at the oldest tail as being on a 3s timeout to 103d4afb5ceSopenharmony_ci * transmit something, otherwise the connection will be closed. Without this, 104d4afb5ceSopenharmony_ci * a choked or nonresponsive connection can block the FIFO from freeing up any 105d4afb5ceSopenharmony_ci * new space for new data. 106d4afb5ceSopenharmony_ci * 107d4afb5ceSopenharmony_ci * You can skip calling this if on your connection, before processing, the tail 108d4afb5ceSopenharmony_ci * was not equal to the current worst, ie, if the tail you will work on is != 109d4afb5ceSopenharmony_ci * lws_ring_get_oldest_tail(ring) then no need to call this when the tail 110d4afb5ceSopenharmony_ci * has changed; it wasn't the oldest so it won't change the oldest. 111d4afb5ceSopenharmony_ci * 112d4afb5ceSopenharmony_ci * Returns 0 if oldest unchanged or 1 if oldest changed from this call. 113d4afb5ceSopenharmony_ci */ 114d4afb5ceSopenharmony_cistatic int 115d4afb5ceSopenharmony_ci__mirror_update_worst_tail(struct mirror_instance *mi) 116d4afb5ceSopenharmony_ci{ 117d4afb5ceSopenharmony_ci uint32_t wai, worst = 0, worst_tail = 0, oldest; 118d4afb5ceSopenharmony_ci struct per_session_data__lws_mirror *worst_pss = NULL; 119d4afb5ceSopenharmony_ci 120d4afb5ceSopenharmony_ci oldest = lws_ring_get_oldest_tail(mi->ring); 121d4afb5ceSopenharmony_ci 122d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct per_session_data__lws_mirror *, 123d4afb5ceSopenharmony_ci pss, mi->same_mi_pss_list) { 124d4afb5ceSopenharmony_ci wai = (uint32_t)lws_ring_get_count_waiting_elements(mi->ring, 125d4afb5ceSopenharmony_ci &pss->tail); 126d4afb5ceSopenharmony_ci if (wai >= worst) { 127d4afb5ceSopenharmony_ci worst = wai; 128d4afb5ceSopenharmony_ci worst_tail = pss->tail; 129d4afb5ceSopenharmony_ci worst_pss = pss; 130d4afb5ceSopenharmony_ci } 131d4afb5ceSopenharmony_ci } lws_end_foreach_ll(pss, same_mi_pss_list); 132d4afb5ceSopenharmony_ci 133d4afb5ceSopenharmony_ci if (!worst_pss) 134d4afb5ceSopenharmony_ci return 0; 135d4afb5ceSopenharmony_ci 136d4afb5ceSopenharmony_ci lws_ring_update_oldest_tail(mi->ring, worst_tail); 137d4afb5ceSopenharmony_ci if (oldest == lws_ring_get_oldest_tail(mi->ring)) 138d4afb5ceSopenharmony_ci return 0; 139d4afb5ceSopenharmony_ci /* 140d4afb5ceSopenharmony_ci * The oldest tail did move on. Check if we should re-enable rx flow 141d4afb5ceSopenharmony_ci * for the mirror instance since we made some space now. 142d4afb5ceSopenharmony_ci */ 143d4afb5ceSopenharmony_ci if (!mi->rx_enabled && /* rx is disabled */ 144d4afb5ceSopenharmony_ci lws_ring_get_count_free_elements(mi->ring) >= RXFLOW_MAX) 145d4afb5ceSopenharmony_ci /* there is enough space, let's re-enable rx for our instance */ 146d4afb5ceSopenharmony_ci __mirror_rxflow_instance(mi, 1); 147d4afb5ceSopenharmony_ci 148d4afb5ceSopenharmony_ci /* if nothing in queue, no timeout needed */ 149d4afb5ceSopenharmony_ci if (!worst) 150d4afb5ceSopenharmony_ci return 1; 151d4afb5ceSopenharmony_ci 152d4afb5ceSopenharmony_ci /* 153d4afb5ceSopenharmony_ci * The guy(s) with the oldest tail block the ringbuffer from recycling 154d4afb5ceSopenharmony_ci * the FIFO entries he has not read yet. Don't allow those guys to 155d4afb5ceSopenharmony_ci * block the FIFO operation for very long. 156d4afb5ceSopenharmony_ci */ 157d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct per_session_data__lws_mirror *, 158d4afb5ceSopenharmony_ci pss, mi->same_mi_pss_list) { 159d4afb5ceSopenharmony_ci if (pss->tail == worst_tail) 160d4afb5ceSopenharmony_ci /* 161d4afb5ceSopenharmony_ci * Our policy is if you are the slowest connection, 162d4afb5ceSopenharmony_ci * you had better transmit something to help with that 163d4afb5ceSopenharmony_ci * within 3s, or we will hang up on you to stop you 164d4afb5ceSopenharmony_ci * blocking the FIFO for everyone else. 165d4afb5ceSopenharmony_ci */ 166d4afb5ceSopenharmony_ci lws_set_timeout(pss->wsi, 167d4afb5ceSopenharmony_ci PENDING_TIMEOUT_USER_REASON_BASE, 3); 168d4afb5ceSopenharmony_ci } lws_end_foreach_ll(pss, same_mi_pss_list); 169d4afb5ceSopenharmony_ci 170d4afb5ceSopenharmony_ci return 1; 171d4afb5ceSopenharmony_ci} 172d4afb5ceSopenharmony_ci 173d4afb5ceSopenharmony_cistatic void 174d4afb5ceSopenharmony_ci__mirror_callback_all_in_mi_on_writable(struct mirror_instance *mi) 175d4afb5ceSopenharmony_ci{ 176d4afb5ceSopenharmony_ci /* ask for WRITABLE callback for every wsi on this mi */ 177d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct per_session_data__lws_mirror *, 178d4afb5ceSopenharmony_ci pss, mi->same_mi_pss_list) { 179d4afb5ceSopenharmony_ci lws_callback_on_writable(pss->wsi); 180d4afb5ceSopenharmony_ci } lws_end_foreach_ll(pss, same_mi_pss_list); 181d4afb5ceSopenharmony_ci} 182d4afb5ceSopenharmony_ci 183d4afb5ceSopenharmony_cistatic void 184d4afb5ceSopenharmony_ci__mirror_destroy_message(void *_msg) 185d4afb5ceSopenharmony_ci{ 186d4afb5ceSopenharmony_ci struct a_message *msg = _msg; 187d4afb5ceSopenharmony_ci 188d4afb5ceSopenharmony_ci free(msg->payload); 189d4afb5ceSopenharmony_ci msg->payload = NULL; 190d4afb5ceSopenharmony_ci msg->len = 0; 191d4afb5ceSopenharmony_ci} 192d4afb5ceSopenharmony_ci 193d4afb5ceSopenharmony_cistatic int 194d4afb5ceSopenharmony_cicallback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, 195d4afb5ceSopenharmony_ci void *user, void *in, size_t len) 196d4afb5ceSopenharmony_ci{ 197d4afb5ceSopenharmony_ci struct per_session_data__lws_mirror *pss = 198d4afb5ceSopenharmony_ci (struct per_session_data__lws_mirror *)user; 199d4afb5ceSopenharmony_ci struct per_vhost_data__lws_mirror *v = 200d4afb5ceSopenharmony_ci (struct per_vhost_data__lws_mirror *) 201d4afb5ceSopenharmony_ci lws_protocol_vh_priv_get(lws_get_vhost(wsi), 202d4afb5ceSopenharmony_ci lws_get_protocol(wsi)); 203d4afb5ceSopenharmony_ci char name[300], update_worst, sent_something, *pn = name; 204d4afb5ceSopenharmony_ci struct mirror_instance *mi = NULL; 205d4afb5ceSopenharmony_ci const struct a_message *msg; 206d4afb5ceSopenharmony_ci struct a_message amsg; 207d4afb5ceSopenharmony_ci uint32_t oldest_tail; 208d4afb5ceSopenharmony_ci int n, count_mi = 0; 209d4afb5ceSopenharmony_ci 210d4afb5ceSopenharmony_ci switch (reason) { 211d4afb5ceSopenharmony_ci case LWS_CALLBACK_ESTABLISHED: 212d4afb5ceSopenharmony_ci lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__); 213d4afb5ceSopenharmony_ci if (!v) { 214d4afb5ceSopenharmony_ci lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 215d4afb5ceSopenharmony_ci lws_get_protocol(wsi), 216d4afb5ceSopenharmony_ci sizeof(struct per_vhost_data__lws_mirror)); 217d4afb5ceSopenharmony_ci v = (struct per_vhost_data__lws_mirror *) 218d4afb5ceSopenharmony_ci lws_protocol_vh_priv_get(lws_get_vhost(wsi), 219d4afb5ceSopenharmony_ci lws_get_protocol(wsi)); 220d4afb5ceSopenharmony_ci lws_pthread_mutex_init(&v->lock); 221d4afb5ceSopenharmony_ci } 222d4afb5ceSopenharmony_ci 223d4afb5ceSopenharmony_ci /* 224d4afb5ceSopenharmony_ci * mirror instance name... defaults to "", but if URL includes 225d4afb5ceSopenharmony_ci * "?mirror=xxx", will be "xxx" 226d4afb5ceSopenharmony_ci */ 227d4afb5ceSopenharmony_ci 228d4afb5ceSopenharmony_ci if (lws_get_urlarg_by_name_safe(wsi, "mirror", name, 229d4afb5ceSopenharmony_ci sizeof(name) - 1) < 0) { 230d4afb5ceSopenharmony_ci lwsl_debug("get urlarg failed\n"); 231d4afb5ceSopenharmony_ci name[0] = '\0'; 232d4afb5ceSopenharmony_ci } 233d4afb5ceSopenharmony_ci 234d4afb5ceSopenharmony_ci //lwsl_notice("%s: mirror name '%s'\n", __func__, pn); 235d4afb5ceSopenharmony_ci 236d4afb5ceSopenharmony_ci /* is there already a mirror instance of this name? */ 237d4afb5ceSopenharmony_ci 238d4afb5ceSopenharmony_ci lws_pthread_mutex_lock(&v->lock); /* vhost lock { */ 239d4afb5ceSopenharmony_ci 240d4afb5ceSopenharmony_ci lws_start_foreach_ll(struct mirror_instance *, mi1, 241d4afb5ceSopenharmony_ci v->mi_list) { 242d4afb5ceSopenharmony_ci count_mi++; 243d4afb5ceSopenharmony_ci if (!strcmp(pn, mi1->name)) { 244d4afb5ceSopenharmony_ci /* yes... we will join it */ 245d4afb5ceSopenharmony_ci mi = mi1; 246d4afb5ceSopenharmony_ci break; 247d4afb5ceSopenharmony_ci } 248d4afb5ceSopenharmony_ci } lws_end_foreach_ll(mi1, next); 249d4afb5ceSopenharmony_ci 250d4afb5ceSopenharmony_ci if (!mi) { 251d4afb5ceSopenharmony_ci 252d4afb5ceSopenharmony_ci /* no existing mirror instance for name */ 253d4afb5ceSopenharmony_ci if (count_mi == MAX_MIRROR_INSTANCES) { 254d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&v->lock); /* } vh lock */ 255d4afb5ceSopenharmony_ci return -1; 256d4afb5ceSopenharmony_ci } 257d4afb5ceSopenharmony_ci 258d4afb5ceSopenharmony_ci /* create one with this name, and join it */ 259d4afb5ceSopenharmony_ci mi = malloc(sizeof(*mi)); 260d4afb5ceSopenharmony_ci if (!mi) 261d4afb5ceSopenharmony_ci goto bail1; 262d4afb5ceSopenharmony_ci memset(mi, 0, sizeof(*mi)); 263d4afb5ceSopenharmony_ci mi->ring = lws_ring_create(sizeof(struct a_message), 264d4afb5ceSopenharmony_ci QUEUELEN, 265d4afb5ceSopenharmony_ci __mirror_destroy_message); 266d4afb5ceSopenharmony_ci if (!mi->ring) { 267d4afb5ceSopenharmony_ci free(mi); 268d4afb5ceSopenharmony_ci goto bail1; 269d4afb5ceSopenharmony_ci } 270d4afb5ceSopenharmony_ci 271d4afb5ceSopenharmony_ci mi->next = v->mi_list; 272d4afb5ceSopenharmony_ci v->mi_list = mi; 273d4afb5ceSopenharmony_ci lws_snprintf(mi->name, sizeof(mi->name) - 1, "%s", pn); 274d4afb5ceSopenharmony_ci mi->rx_enabled = 1; 275d4afb5ceSopenharmony_ci 276d4afb5ceSopenharmony_ci lws_pthread_mutex_init(&mi->lock); 277d4afb5ceSopenharmony_ci 278d4afb5ceSopenharmony_ci lwsl_notice("Created new mi %p '%s'\n", mi, pn); 279d4afb5ceSopenharmony_ci } 280d4afb5ceSopenharmony_ci 281d4afb5ceSopenharmony_ci /* add our pss to list of guys bound to this mi */ 282d4afb5ceSopenharmony_ci 283d4afb5ceSopenharmony_ci lws_ll_fwd_insert(pss, same_mi_pss_list, mi->same_mi_pss_list); 284d4afb5ceSopenharmony_ci 285d4afb5ceSopenharmony_ci /* init the pss */ 286d4afb5ceSopenharmony_ci 287d4afb5ceSopenharmony_ci pss->mi = mi; 288d4afb5ceSopenharmony_ci pss->tail = lws_ring_get_oldest_tail(mi->ring); 289d4afb5ceSopenharmony_ci pss->wsi = wsi; 290d4afb5ceSopenharmony_ci 291d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ 292d4afb5ceSopenharmony_ci break; 293d4afb5ceSopenharmony_ci 294d4afb5ceSopenharmony_cibail1: 295d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ 296d4afb5ceSopenharmony_ci return 1; 297d4afb5ceSopenharmony_ci 298d4afb5ceSopenharmony_ci case LWS_CALLBACK_CLOSED: 299d4afb5ceSopenharmony_ci /* detach our pss from the mirror instance */ 300d4afb5ceSopenharmony_ci mi = pss->mi; 301d4afb5ceSopenharmony_ci if (!mi) 302d4afb5ceSopenharmony_ci break; 303d4afb5ceSopenharmony_ci 304d4afb5ceSopenharmony_ci lws_pthread_mutex_lock(&v->lock); /* vhost lock { */ 305d4afb5ceSopenharmony_ci 306d4afb5ceSopenharmony_ci /* remove our closing pss from its mirror instance list */ 307d4afb5ceSopenharmony_ci lws_ll_fwd_remove(struct per_session_data__lws_mirror, 308d4afb5ceSopenharmony_ci same_mi_pss_list, pss, mi->same_mi_pss_list); 309d4afb5ceSopenharmony_ci pss->mi = NULL; 310d4afb5ceSopenharmony_ci 311d4afb5ceSopenharmony_ci if (mi->same_mi_pss_list) { 312d4afb5ceSopenharmony_ci /* 313d4afb5ceSopenharmony_ci * Still other pss using the mirror instance. The pss 314d4afb5ceSopenharmony_ci * going away may have had the oldest tail, reconfirm 315d4afb5ceSopenharmony_ci * using the remaining pss what is the current oldest 316d4afb5ceSopenharmony_ci * tail. If the oldest tail moves on, this call also 317d4afb5ceSopenharmony_ci * will re-enable rx flow control when appropriate. 318d4afb5ceSopenharmony_ci */ 319d4afb5ceSopenharmony_ci lws_pthread_mutex_lock(&mi->lock); /* mi lock { */ 320d4afb5ceSopenharmony_ci __mirror_update_worst_tail(mi); 321d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&mi->lock); /* } mi lock */ 322d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ 323d4afb5ceSopenharmony_ci break; 324d4afb5ceSopenharmony_ci } 325d4afb5ceSopenharmony_ci 326d4afb5ceSopenharmony_ci /* No more pss using the mirror instance... delete mi */ 327d4afb5ceSopenharmony_ci 328d4afb5ceSopenharmony_ci lws_start_foreach_llp(struct mirror_instance **, 329d4afb5ceSopenharmony_ci pmi, v->mi_list) { 330d4afb5ceSopenharmony_ci if (*pmi == mi) { 331d4afb5ceSopenharmony_ci *pmi = (*pmi)->next; 332d4afb5ceSopenharmony_ci 333d4afb5ceSopenharmony_ci lws_ring_destroy(mi->ring); 334d4afb5ceSopenharmony_ci lws_pthread_mutex_destroy(&mi->lock); 335d4afb5ceSopenharmony_ci 336d4afb5ceSopenharmony_ci free(mi); 337d4afb5ceSopenharmony_ci break; 338d4afb5ceSopenharmony_ci } 339d4afb5ceSopenharmony_ci } lws_end_foreach_llp(pmi, next); 340d4afb5ceSopenharmony_ci 341d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&v->lock); /* } vhost lock */ 342d4afb5ceSopenharmony_ci break; 343d4afb5ceSopenharmony_ci 344d4afb5ceSopenharmony_ci case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY: 345d4afb5ceSopenharmony_ci return 1; /* disallow compression */ 346d4afb5ceSopenharmony_ci 347d4afb5ceSopenharmony_ci case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */ 348d4afb5ceSopenharmony_ci if (!v) { 349d4afb5ceSopenharmony_ci lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 350d4afb5ceSopenharmony_ci lws_get_protocol(wsi), 351d4afb5ceSopenharmony_ci sizeof(struct per_vhost_data__lws_mirror)); 352d4afb5ceSopenharmony_ci v = (struct per_vhost_data__lws_mirror *) 353d4afb5ceSopenharmony_ci lws_protocol_vh_priv_get(lws_get_vhost(wsi), 354d4afb5ceSopenharmony_ci lws_get_protocol(wsi)); 355d4afb5ceSopenharmony_ci if (!v) 356d4afb5ceSopenharmony_ci return 0; 357d4afb5ceSopenharmony_ci lws_pthread_mutex_init(&v->lock); 358d4afb5ceSopenharmony_ci } 359d4afb5ceSopenharmony_ci break; 360d4afb5ceSopenharmony_ci 361d4afb5ceSopenharmony_ci case LWS_CALLBACK_PROTOCOL_DESTROY: 362d4afb5ceSopenharmony_ci lws_pthread_mutex_destroy(&v->lock); 363d4afb5ceSopenharmony_ci break; 364d4afb5ceSopenharmony_ci 365d4afb5ceSopenharmony_ci case LWS_CALLBACK_SERVER_WRITEABLE: 366d4afb5ceSopenharmony_ci lws_pthread_mutex_lock(&pss->mi->lock); /* instance lock { */ 367d4afb5ceSopenharmony_ci oldest_tail = lws_ring_get_oldest_tail(pss->mi->ring); 368d4afb5ceSopenharmony_ci update_worst = oldest_tail == pss->tail; 369d4afb5ceSopenharmony_ci sent_something = 0; 370d4afb5ceSopenharmony_ci 371d4afb5ceSopenharmony_ci do { 372d4afb5ceSopenharmony_ci msg = lws_ring_get_element(pss->mi->ring, &pss->tail); 373d4afb5ceSopenharmony_ci if (!msg) 374d4afb5ceSopenharmony_ci break; 375d4afb5ceSopenharmony_ci 376d4afb5ceSopenharmony_ci if (!msg->payload) { 377d4afb5ceSopenharmony_ci lwsl_err("%s: NULL payload: worst = %d," 378d4afb5ceSopenharmony_ci " pss->tail = %d\n", __func__, 379d4afb5ceSopenharmony_ci oldest_tail, pss->tail); 380d4afb5ceSopenharmony_ci if (lws_ring_consume(pss->mi->ring, &pss->tail, 381d4afb5ceSopenharmony_ci NULL, 1)) 382d4afb5ceSopenharmony_ci continue; 383d4afb5ceSopenharmony_ci break; 384d4afb5ceSopenharmony_ci } 385d4afb5ceSopenharmony_ci 386d4afb5ceSopenharmony_ci n = lws_write(wsi, (unsigned char *)msg->payload + 387d4afb5ceSopenharmony_ci LWS_PRE, msg->len, LWS_WRITE_TEXT); 388d4afb5ceSopenharmony_ci if (n < 0) { 389d4afb5ceSopenharmony_ci lwsl_info("%s: WRITEABLE: %d\n", __func__, n); 390d4afb5ceSopenharmony_ci 391d4afb5ceSopenharmony_ci goto bail2; 392d4afb5ceSopenharmony_ci } 393d4afb5ceSopenharmony_ci sent_something = 1; 394d4afb5ceSopenharmony_ci lws_ring_consume(pss->mi->ring, &pss->tail, NULL, 1); 395d4afb5ceSopenharmony_ci 396d4afb5ceSopenharmony_ci } while (!lws_send_pipe_choked(wsi)); 397d4afb5ceSopenharmony_ci 398d4afb5ceSopenharmony_ci /* if any left for us to send, ask for writeable again */ 399d4afb5ceSopenharmony_ci if (lws_ring_get_count_waiting_elements(pss->mi->ring, 400d4afb5ceSopenharmony_ci &pss->tail)) 401d4afb5ceSopenharmony_ci lws_callback_on_writable(wsi); 402d4afb5ceSopenharmony_ci 403d4afb5ceSopenharmony_ci if (!sent_something || !update_worst) 404d4afb5ceSopenharmony_ci goto done1; 405d4afb5ceSopenharmony_ci 406d4afb5ceSopenharmony_ci /* 407d4afb5ceSopenharmony_ci * We are no longer holding the oldest tail (since we sent 408d4afb5ceSopenharmony_ci * something. So free us of the timeout related to hogging the 409d4afb5ceSopenharmony_ci * oldest tail. 410d4afb5ceSopenharmony_ci */ 411d4afb5ceSopenharmony_ci lws_set_timeout(pss->wsi, NO_PENDING_TIMEOUT, 0); 412d4afb5ceSopenharmony_ci /* 413d4afb5ceSopenharmony_ci * If we were originally at the oldest fifo position of 414d4afb5ceSopenharmony_ci * all the tails, now we used some up we may have 415d4afb5ceSopenharmony_ci * changed the oldest fifo position and made some space. 416d4afb5ceSopenharmony_ci */ 417d4afb5ceSopenharmony_ci __mirror_update_worst_tail(pss->mi); 418d4afb5ceSopenharmony_ci 419d4afb5ceSopenharmony_cidone1: 420d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */ 421d4afb5ceSopenharmony_ci break; 422d4afb5ceSopenharmony_ci 423d4afb5ceSopenharmony_cibail2: 424d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&pss->mi->lock); /* } instance lock */ 425d4afb5ceSopenharmony_ci 426d4afb5ceSopenharmony_ci return -1; 427d4afb5ceSopenharmony_ci 428d4afb5ceSopenharmony_ci case LWS_CALLBACK_RECEIVE: 429d4afb5ceSopenharmony_ci lws_pthread_mutex_lock(&pss->mi->lock); /* mi lock { */ 430d4afb5ceSopenharmony_ci n = (int)lws_ring_get_count_free_elements(pss->mi->ring); 431d4afb5ceSopenharmony_ci if (!n) { 432d4afb5ceSopenharmony_ci lwsl_notice("dropping!\n"); 433d4afb5ceSopenharmony_ci if (pss->mi->rx_enabled) 434d4afb5ceSopenharmony_ci __mirror_rxflow_instance(pss->mi, 0); 435d4afb5ceSopenharmony_ci goto req_writable; 436d4afb5ceSopenharmony_ci } 437d4afb5ceSopenharmony_ci 438d4afb5ceSopenharmony_ci amsg.payload = malloc(LWS_PRE + len); 439d4afb5ceSopenharmony_ci amsg.len = len; 440d4afb5ceSopenharmony_ci if (!amsg.payload) { 441d4afb5ceSopenharmony_ci lwsl_notice("OOM: dropping\n"); 442d4afb5ceSopenharmony_ci goto done2; 443d4afb5ceSopenharmony_ci } 444d4afb5ceSopenharmony_ci 445d4afb5ceSopenharmony_ci memcpy((char *)amsg.payload + LWS_PRE, in, len); 446d4afb5ceSopenharmony_ci if (!lws_ring_insert(pss->mi->ring, &amsg, 1)) { 447d4afb5ceSopenharmony_ci __mirror_destroy_message(&amsg); 448d4afb5ceSopenharmony_ci lwsl_notice("dropping!\n"); 449d4afb5ceSopenharmony_ci if (pss->mi->rx_enabled) 450d4afb5ceSopenharmony_ci __mirror_rxflow_instance(pss->mi, 0); 451d4afb5ceSopenharmony_ci goto req_writable; 452d4afb5ceSopenharmony_ci } 453d4afb5ceSopenharmony_ci 454d4afb5ceSopenharmony_ci if (pss->mi->rx_enabled && 455d4afb5ceSopenharmony_ci lws_ring_get_count_free_elements(pss->mi->ring) < 456d4afb5ceSopenharmony_ci RXFLOW_MIN) 457d4afb5ceSopenharmony_ci __mirror_rxflow_instance(pss->mi, 0); 458d4afb5ceSopenharmony_ci 459d4afb5ceSopenharmony_cireq_writable: 460d4afb5ceSopenharmony_ci __mirror_callback_all_in_mi_on_writable(pss->mi); 461d4afb5ceSopenharmony_ci 462d4afb5ceSopenharmony_cidone2: 463d4afb5ceSopenharmony_ci lws_pthread_mutex_unlock(&pss->mi->lock); /* } mi lock */ 464d4afb5ceSopenharmony_ci break; 465d4afb5ceSopenharmony_ci 466d4afb5ceSopenharmony_ci case LWS_CALLBACK_EVENT_WAIT_CANCELLED: 467d4afb5ceSopenharmony_ci lwsl_info("LWS_CALLBACK_EVENT_WAIT_CANCELLED\n"); 468d4afb5ceSopenharmony_ci break; 469d4afb5ceSopenharmony_ci 470d4afb5ceSopenharmony_ci default: 471d4afb5ceSopenharmony_ci break; 472d4afb5ceSopenharmony_ci } 473d4afb5ceSopenharmony_ci 474d4afb5ceSopenharmony_ci return 0; 475d4afb5ceSopenharmony_ci} 476d4afb5ceSopenharmony_ci 477d4afb5ceSopenharmony_ci#define LWS_PLUGIN_PROTOCOL_MIRROR { \ 478d4afb5ceSopenharmony_ci "lws-mirror-protocol", \ 479d4afb5ceSopenharmony_ci callback_lws_mirror, \ 480d4afb5ceSopenharmony_ci sizeof(struct per_session_data__lws_mirror), \ 481d4afb5ceSopenharmony_ci 4096, /* rx buf size must be >= permessage-deflate rx size */ \ 482d4afb5ceSopenharmony_ci 0, NULL, 0 \ 483d4afb5ceSopenharmony_ci } 484d4afb5ceSopenharmony_ci 485d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC) 486d4afb5ceSopenharmony_ci 487d4afb5ceSopenharmony_ciLWS_VISIBLE const struct lws_protocols lws_mirror_protocols[] = { 488d4afb5ceSopenharmony_ci LWS_PLUGIN_PROTOCOL_MIRROR 489d4afb5ceSopenharmony_ci}; 490d4afb5ceSopenharmony_ci 491d4afb5ceSopenharmony_ciLWS_VISIBLE const lws_plugin_protocol_t lws_mirror = { 492d4afb5ceSopenharmony_ci .hdr = { 493d4afb5ceSopenharmony_ci "lws mirror", 494d4afb5ceSopenharmony_ci "lws_protocol_plugin", 495d4afb5ceSopenharmony_ci LWS_BUILD_HASH, 496d4afb5ceSopenharmony_ci LWS_PLUGIN_API_MAGIC 497d4afb5ceSopenharmony_ci }, 498d4afb5ceSopenharmony_ci 499d4afb5ceSopenharmony_ci .protocols = lws_mirror_protocols, 500d4afb5ceSopenharmony_ci .count_protocols = LWS_ARRAY_SIZE(lws_mirror_protocols), 501d4afb5ceSopenharmony_ci .extensions = NULL, 502d4afb5ceSopenharmony_ci .count_extensions = 0, 503d4afb5ceSopenharmony_ci}; 504d4afb5ceSopenharmony_ci 505d4afb5ceSopenharmony_ci#endif 506