1/* 2 * ws protocol handler plugin for "lws-minimal-broker" 3 * 4 * Written in 2010-2019 by Andy Green <andy@warmcat.com> 5 * 6 * This file is made available under the Creative Commons CC0 1.0 7 * Universal Public Domain Dedication. 8 * 9 * This implements a minimal "broker", for systems that look like this 10 * 11 * [ publisher ws client ] <-> [ ws server broker ws server ] <-> [ ws client subscriber ] 12 * 13 * The "publisher" role is to add data to the broker. 14 * 15 * The "subscriber" role is to hear about all data added to the system. 16 * 17 * The "broker" role is to manage incoming data from publishers and pass it out 18 * to subscribers. 19 * 20 * Any number of publishers and subscribers are supported. 21 * 22 * This example implements a single ws server, using one ws protocol, that treats ws 23 * connections as being in publisher or subscriber mode according to the URL the ws 24 * connection was made to. ws connections to "/publisher" URL are understood to be 25 * publishing data and to any other URL, subscribing. 26 */ 27 28#if !defined (LWS_PLUGIN_STATIC) 29#define LWS_DLL 30#define LWS_INTERNAL 31#include <libwebsockets.h> 32#endif 33 34#include <string.h> 35 36/* one of these created for each message */ 37 38struct msg { 39 void *payload; /* is malloc'd */ 40 size_t len; 41}; 42 43/* one of these is created for each client connecting to us */ 44 45struct per_session_data__minimal { 46 struct per_session_data__minimal *pss_list; 47 struct lws *wsi; 48 uint32_t tail; 49 char publishing; /* nonzero: peer is publishing to us */ 50}; 51 52/* one of these is created for each vhost our protocol is used with */ 53 54struct per_vhost_data__minimal { 55 struct lws_context *context; 56 struct lws_vhost *vhost; 57 const struct lws_protocols *protocol; 58 59 struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ 60 61 struct lws_ring *ring; /* ringbuffer holding unsent messages */ 62}; 63 64/* destroys the message when everyone has had a copy of it */ 65 66static void 67__minimal_destroy_message(void *_msg) 68{ 69 struct msg *msg = _msg; 70 71 free(msg->payload); 72 msg->payload = NULL; 73 msg->len = 0; 74} 75 76static int 77callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, 78 void *user, void *in, size_t len) 79{ 80 struct per_session_data__minimal *pss = 81 (struct per_session_data__minimal *)user; 82 struct per_vhost_data__minimal *vhd = 83 (struct per_vhost_data__minimal *) 84 lws_protocol_vh_priv_get(lws_get_vhost(wsi), 85 lws_get_protocol(wsi)); 86 const struct msg *pmsg; 87 struct msg amsg; 88 char buf[32]; 89 int n, m; 90 91 switch (reason) { 92 case LWS_CALLBACK_PROTOCOL_INIT: 93 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 94 lws_get_protocol(wsi), 95 sizeof(struct per_vhost_data__minimal)); 96 vhd->context = lws_get_context(wsi); 97 vhd->protocol = lws_get_protocol(wsi); 98 vhd->vhost = lws_get_vhost(wsi); 99 100 vhd->ring = lws_ring_create(sizeof(struct msg), 8, 101 __minimal_destroy_message); 102 if (!vhd->ring) 103 return 1; 104 break; 105 106 case LWS_CALLBACK_PROTOCOL_DESTROY: 107 lws_ring_destroy(vhd->ring); 108 break; 109 110 case LWS_CALLBACK_ESTABLISHED: 111 pss->tail = lws_ring_get_oldest_tail(vhd->ring); 112 pss->wsi = wsi; 113 if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0) 114 pss->publishing = !strcmp(buf, "/publisher"); 115 if (!pss->publishing) 116 /* add subscribers to the list of live pss held in the vhd */ 117 lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); 118 break; 119 120 case LWS_CALLBACK_CLOSED: 121 /* remove our closing pss from the list of live pss */ 122 lws_ll_fwd_remove(struct per_session_data__minimal, pss_list, 123 pss, vhd->pss_list); 124 break; 125 126 case LWS_CALLBACK_SERVER_WRITEABLE: 127 128 if (pss->publishing) 129 break; 130 131 pmsg = lws_ring_get_element(vhd->ring, &pss->tail); 132 if (!pmsg) 133 break; 134 135 /* notice we allowed for LWS_PRE in the payload already */ 136 m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE, 137 pmsg->len, LWS_WRITE_TEXT); 138 if (m < (int)pmsg->len) { 139 lwsl_err("ERROR %d writing to ws socket\n", m); 140 return -1; 141 } 142 143 lws_ring_consume_and_update_oldest_tail( 144 vhd->ring, /* lws_ring object */ 145 struct per_session_data__minimal, /* type of objects with tails */ 146 &pss->tail, /* tail of guy doing the consuming */ 147 1, /* number of payload objects being consumed */ 148 vhd->pss_list, /* head of list of objects with tails */ 149 tail, /* member name of tail in objects with tails */ 150 pss_list /* member name of next object in objects with tails */ 151 ); 152 153 /* more to do? */ 154 if (lws_ring_get_element(vhd->ring, &pss->tail)) 155 /* come back as soon as we can write more */ 156 lws_callback_on_writable(pss->wsi); 157 break; 158 159 case LWS_CALLBACK_RECEIVE: 160 161 if (!pss->publishing) 162 break; 163 164 /* 165 * For test, our policy is ignore publishing when there are 166 * no subscribers connected. 167 */ 168 if (!vhd->pss_list) 169 break; 170 171 n = (int)lws_ring_get_count_free_elements(vhd->ring); 172 if (!n) { 173 lwsl_user("dropping!\n"); 174 break; 175 } 176 177 amsg.len = len; 178 /* notice we over-allocate by LWS_PRE */ 179 amsg.payload = malloc(LWS_PRE + len); 180 if (!amsg.payload) { 181 lwsl_user("OOM: dropping\n"); 182 break; 183 } 184 185 memcpy((char *)amsg.payload + LWS_PRE, in, len); 186 if (!lws_ring_insert(vhd->ring, &amsg, 1)) { 187 __minimal_destroy_message(&amsg); 188 lwsl_user("dropping 2!\n"); 189 break; 190 } 191 192 /* 193 * let every subscriber know we want to write something 194 * on them as soon as they are ready 195 */ 196 lws_start_foreach_llp(struct per_session_data__minimal **, 197 ppss, vhd->pss_list) { 198 if (!(*ppss)->publishing) 199 lws_callback_on_writable((*ppss)->wsi); 200 } lws_end_foreach_llp(ppss, pss_list); 201 break; 202 203 default: 204 break; 205 } 206 207 return 0; 208} 209 210#define LWS_PLUGIN_PROTOCOL_MINIMAL \ 211 { \ 212 "lws-minimal-broker", \ 213 callback_minimal, \ 214 sizeof(struct per_session_data__minimal), \ 215 128, \ 216 0, NULL, 0 \ 217 } 218