1/*
2 * ws protocol handler plugin for "lws-minimal-broker"
3 *
4 * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 * This implements a minimal "broker", for systems that look like this
10 *
11 * [ publisher  ws client ] <-> [ ws server  broker ws server ] <-> [ ws client subscriber ]
12 *
13 * The "publisher" role is to add data to the broker.
14 *
15 * The "subscriber" role is to hear about all data added to the system.
16 *
17 * The "broker" role is to manage incoming data from publishers and pass it out
18 * to subscribers.
19 *
20 * Any number of publishers and subscribers are supported.
21 *
22 * This example implements a single ws server, using one ws protocol, that treats ws
23 * connections as being in publisher or subscriber mode according to the URL the ws
24 * connection was made to.  ws connections to "/publisher" URL are understood to be
25 * publishing data and to any other URL, subscribing.
26 */
27
28#if !defined (LWS_PLUGIN_STATIC)
29#define LWS_DLL
30#define LWS_INTERNAL
31#include <libwebsockets.h>
32#endif
33
34#include <string.h>
35
36/* one of these created for each message */
37
38struct msg {
39	void *payload; /* is malloc'd */
40	size_t len;
41};
42
43/* one of these is created for each client connecting to us */
44
45struct per_session_data__minimal {
46	struct per_session_data__minimal *pss_list;
47	struct lws *wsi;
48	uint32_t tail;
49	char publishing; /* nonzero: peer is publishing to us */
50};
51
52/* one of these is created for each vhost our protocol is used with */
53
54struct per_vhost_data__minimal {
55	struct lws_context *context;
56	struct lws_vhost *vhost;
57	const struct lws_protocols *protocol;
58
59	struct per_session_data__minimal *pss_list; /* linked-list of live pss*/
60
61	struct lws_ring *ring; /* ringbuffer holding unsent messages */
62};
63
64/* destroys the message when everyone has had a copy of it */
65
66static void
67__minimal_destroy_message(void *_msg)
68{
69	struct msg *msg = _msg;
70
71	free(msg->payload);
72	msg->payload = NULL;
73	msg->len = 0;
74}
75
76static int
77callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
78			void *user, void *in, size_t len)
79{
80	struct per_session_data__minimal *pss =
81			(struct per_session_data__minimal *)user;
82	struct per_vhost_data__minimal *vhd =
83			(struct per_vhost_data__minimal *)
84			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
85					lws_get_protocol(wsi));
86	const struct msg *pmsg;
87	struct msg amsg;
88	char buf[32];
89	int n, m;
90
91	switch (reason) {
92	case LWS_CALLBACK_PROTOCOL_INIT:
93		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
94				lws_get_protocol(wsi),
95				sizeof(struct per_vhost_data__minimal));
96		vhd->context = lws_get_context(wsi);
97		vhd->protocol = lws_get_protocol(wsi);
98		vhd->vhost = lws_get_vhost(wsi);
99
100		vhd->ring = lws_ring_create(sizeof(struct msg), 8,
101					    __minimal_destroy_message);
102		if (!vhd->ring)
103			return 1;
104		break;
105
106	case LWS_CALLBACK_PROTOCOL_DESTROY:
107		lws_ring_destroy(vhd->ring);
108		break;
109
110	case LWS_CALLBACK_ESTABLISHED:
111		pss->tail = lws_ring_get_oldest_tail(vhd->ring);
112		pss->wsi = wsi;
113		if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0)
114			pss->publishing = !strcmp(buf, "/publisher");
115		if (!pss->publishing)
116			/* add subscribers to the list of live pss held in the vhd */
117			lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
118		break;
119
120	case LWS_CALLBACK_CLOSED:
121		/* remove our closing pss from the list of live pss */
122		lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
123				  pss, vhd->pss_list);
124		break;
125
126	case LWS_CALLBACK_SERVER_WRITEABLE:
127
128		if (pss->publishing)
129			break;
130
131		pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
132		if (!pmsg)
133			break;
134
135		/* notice we allowed for LWS_PRE in the payload already */
136		m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
137			      pmsg->len, LWS_WRITE_TEXT);
138		if (m < (int)pmsg->len) {
139			lwsl_err("ERROR %d writing to ws socket\n", m);
140			return -1;
141		}
142
143		lws_ring_consume_and_update_oldest_tail(
144			vhd->ring,	/* lws_ring object */
145			struct per_session_data__minimal, /* type of objects with tails */
146			&pss->tail,	/* tail of guy doing the consuming */
147			1,		/* number of payload objects being consumed */
148			vhd->pss_list,	/* head of list of objects with tails */
149			tail,		/* member name of tail in objects with tails */
150			pss_list	/* member name of next object in objects with tails */
151		);
152
153		/* more to do? */
154		if (lws_ring_get_element(vhd->ring, &pss->tail))
155			/* come back as soon as we can write more */
156			lws_callback_on_writable(pss->wsi);
157		break;
158
159	case LWS_CALLBACK_RECEIVE:
160
161		if (!pss->publishing)
162			break;
163
164		/*
165		 * For test, our policy is ignore publishing when there are
166		 * no subscribers connected.
167		 */
168		if (!vhd->pss_list)
169			break;
170
171		n = (int)lws_ring_get_count_free_elements(vhd->ring);
172		if (!n) {
173			lwsl_user("dropping!\n");
174			break;
175		}
176
177		amsg.len = len;
178		/* notice we over-allocate by LWS_PRE */
179		amsg.payload = malloc(LWS_PRE + len);
180		if (!amsg.payload) {
181			lwsl_user("OOM: dropping\n");
182			break;
183		}
184
185		memcpy((char *)amsg.payload + LWS_PRE, in, len);
186		if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
187			__minimal_destroy_message(&amsg);
188			lwsl_user("dropping 2!\n");
189			break;
190		}
191
192		/*
193		 * let every subscriber know we want to write something
194		 * on them as soon as they are ready
195		 */
196		lws_start_foreach_llp(struct per_session_data__minimal **,
197				      ppss, vhd->pss_list) {
198			if (!(*ppss)->publishing)
199				lws_callback_on_writable((*ppss)->wsi);
200		} lws_end_foreach_llp(ppss, pss_list);
201		break;
202
203	default:
204		break;
205	}
206
207	return 0;
208}
209
210#define LWS_PLUGIN_PROTOCOL_MINIMAL \
211	{ \
212		"lws-minimal-broker", \
213		callback_minimal, \
214		sizeof(struct per_session_data__minimal), \
215		128, \
216		0, NULL, 0 \
217	}
218