1/* 2 * ws protocol handler plugin for "lws-minimal" 3 * 4 * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5 * 6 * This file is made available under the Creative Commons CC0 1.0 7 * Universal Public Domain Dedication. 8 * 9 * This version uses an lws_ring ringbuffer to cache up to 8 messages at a time, 10 * so it's not so easy to lose messages. 11 * 12 * This also demonstrates how to "cull", ie, kill, connections that can't 13 * keep up for some reason. 14 */ 15 16#if !defined (LWS_PLUGIN_STATIC) 17#define LWS_DLL 18#define LWS_INTERNAL 19#include <libwebsockets.h> 20#endif 21 22#include <string.h> 23 24/* one of these created for each message */ 25 26struct msg { 27 void *payload; /* is malloc'd */ 28 size_t len; 29}; 30 31/* one of these is created for each client connecting to us */ 32 33struct per_session_data__minimal { 34 struct per_session_data__minimal *pss_list; 35 struct lws *wsi; 36 uint32_t tail; 37 38 unsigned int culled:1; 39}; 40 41/* one of these is created for each vhost our protocol is used with */ 42 43struct per_vhost_data__minimal { 44 struct lws_context *context; 45 struct lws_vhost *vhost; 46 const struct lws_protocols *protocol; 47 48 struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ 49 50 struct lws_ring *ring; /* ringbuffer holding unsent messages */ 51}; 52 53static void 54cull_lagging_clients(struct per_vhost_data__minimal *vhd) 55{ 56 uint32_t oldest_tail = lws_ring_get_oldest_tail(vhd->ring); 57 struct per_session_data__minimal *old_pss = NULL; 58 int most = 0, before = (int)lws_ring_get_count_waiting_elements(vhd->ring, 59 &oldest_tail), m; 60 61 /* 62 * At least one guy with the oldest tail has lagged too far, filling 63 * the ringbuffer with stuff waiting for them, while new stuff is 64 * coming in, and they must close, freeing up ringbuffer entries. 65 */ 66 67 lws_start_foreach_llp_safe(struct per_session_data__minimal **, 68 ppss, vhd->pss_list, pss_list) { 69 70 if ((*ppss)->tail == oldest_tail) { 71 old_pss = *ppss; 72 73 lwsl_user("Killing lagging client %p\n", (*ppss)->wsi); 74 75 lws_set_timeout((*ppss)->wsi, PENDING_TIMEOUT_LAGGING, 76 /* 77 * we may kill the wsi we came in on, 78 * so the actual close is deferred 79 */ 80 LWS_TO_KILL_ASYNC); 81 82 /* 83 * We might try to write something before we get a 84 * chance to close. But this pss is now detached 85 * from the ring buffer. Mark this pss as culled so we 86 * don't try to do anything more with it. 87 */ 88 89 (*ppss)->culled = 1; 90 91 /* 92 * Because we can't kill it synchronously, but we 93 * know it's closing momentarily and don't want its 94 * participation any more, remove its pss from the 95 * vhd pss list early. (This is safe to repeat 96 * uselessly later in the close flow). 97 * 98 * Notice this changes *ppss! 99 */ 100 101 lws_ll_fwd_remove(struct per_session_data__minimal, 102 pss_list, (*ppss), vhd->pss_list); 103 104 /* use the changed *ppss so we won't skip anything */ 105 106 continue; 107 108 } else { 109 /* 110 * so this guy is a survivor of the cull. Let's track 111 * what is the largest number of pending ring elements 112 * for any survivor. 113 */ 114 m = (int)lws_ring_get_count_waiting_elements(vhd->ring, 115 &((*ppss)->tail)); 116 if (m > most) 117 most = m; 118 } 119 120 } lws_end_foreach_llp_safe(ppss); 121 122 /* it would mean we lost track of oldest... but Coverity insists */ 123 if (!old_pss) 124 return; 125 126 /* 127 * Let's recover (ie, free up) all the ring slots between the 128 * original oldest's last one and the "worst" survivor. 129 */ 130 131 lws_ring_consume_and_update_oldest_tail(vhd->ring, 132 struct per_session_data__minimal, &old_pss->tail, (size_t)(before - most), 133 vhd->pss_list, tail, pss_list); 134 135 lwsl_user("%s: shrunk ring from %d to %d\n", __func__, before, most); 136} 137 138/* destroys the message when everyone has had a copy of it */ 139 140static void 141__minimal_destroy_message(void *_msg) 142{ 143 struct msg *msg = _msg; 144 145 free(msg->payload); 146 msg->payload = NULL; 147 msg->len = 0; 148} 149 150static int 151callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, 152 void *user, void *in, size_t len) 153{ 154 struct per_session_data__minimal *pss = 155 (struct per_session_data__minimal *)user; 156 struct per_vhost_data__minimal *vhd = 157 (struct per_vhost_data__minimal *) 158 lws_protocol_vh_priv_get(lws_get_vhost(wsi), 159 lws_get_protocol(wsi)); 160 const struct msg *pmsg; 161 struct msg amsg; 162 int n, m; 163 164 switch (reason) { 165 case LWS_CALLBACK_PROTOCOL_INIT: 166 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 167 lws_get_protocol(wsi), 168 sizeof(struct per_vhost_data__minimal)); 169 vhd->context = lws_get_context(wsi); 170 vhd->protocol = lws_get_protocol(wsi); 171 vhd->vhost = lws_get_vhost(wsi); 172 173 vhd->ring = lws_ring_create(sizeof(struct msg), 8, 174 __minimal_destroy_message); 175 if (!vhd->ring) 176 return 1; 177 break; 178 179 case LWS_CALLBACK_PROTOCOL_DESTROY: 180 lws_ring_destroy(vhd->ring); 181 break; 182 183 case LWS_CALLBACK_ESTABLISHED: 184 /* add ourselves to the list of live pss held in the vhd */ 185 lwsl_user("LWS_CALLBACK_ESTABLISHED: wsi %p\n", wsi); 186 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); 187 pss->tail = lws_ring_get_oldest_tail(vhd->ring); 188 pss->wsi = wsi; 189 break; 190 191 case LWS_CALLBACK_CLOSED: 192 lwsl_user("LWS_CALLBACK_CLOSED: wsi %p\n", wsi); 193 /* remove our closing pss from the list of live pss */ 194 lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, 195 pss, vhd->pss_list); 196 break; 197 198 case LWS_CALLBACK_SERVER_WRITEABLE: 199 if (pss->culled) 200 break; 201 pmsg = lws_ring_get_element(vhd->ring, &pss->tail); 202 if (!pmsg) 203 break; 204 205 /* notice we allowed for LWS_PRE in the payload already */ 206 m = lws_write(wsi, ((unsigned char *)pmsg->payload) + 207 LWS_PRE, pmsg->len, LWS_WRITE_TEXT); 208 if (m < (int)pmsg->len) { 209 lwsl_err("ERROR %d writing to ws socket\n", m); 210 return -1; 211 } 212 213 lws_ring_consume_and_update_oldest_tail( 214 vhd->ring, /* lws_ring object */ 215 struct per_session_data__minimal, /* type of objects with tails */ 216 &pss->tail, /* tail of guy doing the consuming */ 217 1, /* number of payload objects being consumed */ 218 vhd->pss_list, /* head of list of objects with tails */ 219 tail, /* member name of tail in objects with tails */ 220 pss_list /* member name of next object in objects with tails */ 221 ); 222 223 /* more to do for us? */ 224 if (lws_ring_get_element(vhd->ring, &pss->tail)) 225 /* come back as soon as we can write more */ 226 lws_callback_on_writable(pss->wsi); 227 break; 228 229 case LWS_CALLBACK_RECEIVE: 230 n = (int)lws_ring_get_count_free_elements(vhd->ring); 231 if (!n) { 232 /* forcibly make space */ 233 cull_lagging_clients(vhd); 234 n = (int)lws_ring_get_count_free_elements(vhd->ring); 235 } 236 if (!n) 237 break; 238 239 lwsl_user("LWS_CALLBACK_RECEIVE: free space %d\n", n); 240 241 amsg.len = len; 242 /* notice we over-allocate by LWS_PRE... */ 243 amsg.payload = malloc(LWS_PRE + len); 244 if (!amsg.payload) { 245 lwsl_user("OOM: dropping\n"); 246 break; 247 } 248 249 /* ...and we copy the payload in at +LWS_PRE */ 250 memcpy((char *)amsg.payload + LWS_PRE, in, len); 251 if (!lws_ring_insert(vhd->ring, &amsg, 1)) { 252 __minimal_destroy_message(&amsg); 253 lwsl_user("dropping!\n"); 254 break; 255 } 256 257 /* 258 * let everybody know we want to write something on them 259 * as soon as they are ready 260 */ 261 lws_start_foreach_llp(struct per_session_data__minimal **, 262 ppss, vhd->pss_list) { 263 lws_callback_on_writable((*ppss)->wsi); 264 } lws_end_foreach_llp(ppss, pss_list); 265 break; 266 267 default: 268 break; 269 } 270 271 return 0; 272} 273 274#define LWS_PLUGIN_PROTOCOL_MINIMAL \ 275 { \ 276 "lws-minimal", \ 277 callback_minimal, \ 278 sizeof(struct per_session_data__minimal), \ 279 0, \ 280 0, NULL, 0 \ 281 } 282