1d4afb5ceSopenharmony_ci/*
2d4afb5ceSopenharmony_ci * ws protocol handler plugin for "lws-minimal-broker"
3d4afb5ceSopenharmony_ci *
4d4afb5ceSopenharmony_ci * Written in 2010-2019 by Andy Green <andy@warmcat.com>
5d4afb5ceSopenharmony_ci *
6d4afb5ceSopenharmony_ci * This file is made available under the Creative Commons CC0 1.0
7d4afb5ceSopenharmony_ci * Universal Public Domain Dedication.
8d4afb5ceSopenharmony_ci *
9d4afb5ceSopenharmony_ci * This implements a minimal "broker", for systems that look like this
10d4afb5ceSopenharmony_ci *
11d4afb5ceSopenharmony_ci * [ publisher  ws client ] <-> [ ws server  broker ws server ] <-> [ ws client subscriber ]
12d4afb5ceSopenharmony_ci *
13d4afb5ceSopenharmony_ci * The "publisher" role is to add data to the broker.
14d4afb5ceSopenharmony_ci *
15d4afb5ceSopenharmony_ci * The "subscriber" role is to hear about all data added to the system.
16d4afb5ceSopenharmony_ci *
17d4afb5ceSopenharmony_ci * The "broker" role is to manage incoming data from publishers and pass it out
18d4afb5ceSopenharmony_ci * to subscribers.
19d4afb5ceSopenharmony_ci *
20d4afb5ceSopenharmony_ci * Any number of publishers and subscribers are supported.
21d4afb5ceSopenharmony_ci *
22d4afb5ceSopenharmony_ci * This example implements a single ws server, using one ws protocol, that treats ws
23d4afb5ceSopenharmony_ci * connections as being in publisher or subscriber mode according to the URL the ws
24d4afb5ceSopenharmony_ci * connection was made to.  ws connections to "/publisher" URL are understood to be
25d4afb5ceSopenharmony_ci * publishing data and to any other URL, subscribing.
26d4afb5ceSopenharmony_ci */
27d4afb5ceSopenharmony_ci
28d4afb5ceSopenharmony_ci#if !defined (LWS_PLUGIN_STATIC)
29d4afb5ceSopenharmony_ci#define LWS_DLL
30d4afb5ceSopenharmony_ci#define LWS_INTERNAL
31d4afb5ceSopenharmony_ci#include <libwebsockets.h>
32d4afb5ceSopenharmony_ci#endif
33d4afb5ceSopenharmony_ci
34d4afb5ceSopenharmony_ci#include <string.h>
35d4afb5ceSopenharmony_ci
36d4afb5ceSopenharmony_ci/* one of these created for each message */
37d4afb5ceSopenharmony_ci
38d4afb5ceSopenharmony_cistruct msg {
39d4afb5ceSopenharmony_ci	void *payload; /* is malloc'd */
40d4afb5ceSopenharmony_ci	size_t len;
41d4afb5ceSopenharmony_ci};
42d4afb5ceSopenharmony_ci
43d4afb5ceSopenharmony_ci/* one of these is created for each client connecting to us */
44d4afb5ceSopenharmony_ci
45d4afb5ceSopenharmony_cistruct per_session_data__minimal {
46d4afb5ceSopenharmony_ci	struct per_session_data__minimal *pss_list;
47d4afb5ceSopenharmony_ci	struct lws *wsi;
48d4afb5ceSopenharmony_ci	uint32_t tail;
49d4afb5ceSopenharmony_ci	char publishing; /* nonzero: peer is publishing to us */
50d4afb5ceSopenharmony_ci};
51d4afb5ceSopenharmony_ci
52d4afb5ceSopenharmony_ci/* one of these is created for each vhost our protocol is used with */
53d4afb5ceSopenharmony_ci
54d4afb5ceSopenharmony_cistruct per_vhost_data__minimal {
55d4afb5ceSopenharmony_ci	struct lws_context *context;
56d4afb5ceSopenharmony_ci	struct lws_vhost *vhost;
57d4afb5ceSopenharmony_ci	const struct lws_protocols *protocol;
58d4afb5ceSopenharmony_ci
59d4afb5ceSopenharmony_ci	struct per_session_data__minimal *pss_list; /* linked-list of live pss*/
60d4afb5ceSopenharmony_ci
61d4afb5ceSopenharmony_ci	struct lws_ring *ring; /* ringbuffer holding unsent messages */
62d4afb5ceSopenharmony_ci};
63d4afb5ceSopenharmony_ci
64d4afb5ceSopenharmony_ci/* destroys the message when everyone has had a copy of it */
65d4afb5ceSopenharmony_ci
66d4afb5ceSopenharmony_cistatic void
67d4afb5ceSopenharmony_ci__minimal_destroy_message(void *_msg)
68d4afb5ceSopenharmony_ci{
69d4afb5ceSopenharmony_ci	struct msg *msg = _msg;
70d4afb5ceSopenharmony_ci
71d4afb5ceSopenharmony_ci	free(msg->payload);
72d4afb5ceSopenharmony_ci	msg->payload = NULL;
73d4afb5ceSopenharmony_ci	msg->len = 0;
74d4afb5ceSopenharmony_ci}
75d4afb5ceSopenharmony_ci
76d4afb5ceSopenharmony_cistatic int
77d4afb5ceSopenharmony_cicallback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
78d4afb5ceSopenharmony_ci			void *user, void *in, size_t len)
79d4afb5ceSopenharmony_ci{
80d4afb5ceSopenharmony_ci	struct per_session_data__minimal *pss =
81d4afb5ceSopenharmony_ci			(struct per_session_data__minimal *)user;
82d4afb5ceSopenharmony_ci	struct per_vhost_data__minimal *vhd =
83d4afb5ceSopenharmony_ci			(struct per_vhost_data__minimal *)
84d4afb5ceSopenharmony_ci			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
85d4afb5ceSopenharmony_ci					lws_get_protocol(wsi));
86d4afb5ceSopenharmony_ci	const struct msg *pmsg;
87d4afb5ceSopenharmony_ci	struct msg amsg;
88d4afb5ceSopenharmony_ci	char buf[32];
89d4afb5ceSopenharmony_ci	int n, m;
90d4afb5ceSopenharmony_ci
91d4afb5ceSopenharmony_ci	switch (reason) {
92d4afb5ceSopenharmony_ci	case LWS_CALLBACK_PROTOCOL_INIT:
93d4afb5ceSopenharmony_ci		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
94d4afb5ceSopenharmony_ci				lws_get_protocol(wsi),
95d4afb5ceSopenharmony_ci				sizeof(struct per_vhost_data__minimal));
96d4afb5ceSopenharmony_ci		vhd->context = lws_get_context(wsi);
97d4afb5ceSopenharmony_ci		vhd->protocol = lws_get_protocol(wsi);
98d4afb5ceSopenharmony_ci		vhd->vhost = lws_get_vhost(wsi);
99d4afb5ceSopenharmony_ci
100d4afb5ceSopenharmony_ci		vhd->ring = lws_ring_create(sizeof(struct msg), 8,
101d4afb5ceSopenharmony_ci					    __minimal_destroy_message);
102d4afb5ceSopenharmony_ci		if (!vhd->ring)
103d4afb5ceSopenharmony_ci			return 1;
104d4afb5ceSopenharmony_ci		break;
105d4afb5ceSopenharmony_ci
106d4afb5ceSopenharmony_ci	case LWS_CALLBACK_PROTOCOL_DESTROY:
107d4afb5ceSopenharmony_ci		lws_ring_destroy(vhd->ring);
108d4afb5ceSopenharmony_ci		break;
109d4afb5ceSopenharmony_ci
110d4afb5ceSopenharmony_ci	case LWS_CALLBACK_ESTABLISHED:
111d4afb5ceSopenharmony_ci		pss->tail = lws_ring_get_oldest_tail(vhd->ring);
112d4afb5ceSopenharmony_ci		pss->wsi = wsi;
113d4afb5ceSopenharmony_ci		if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0)
114d4afb5ceSopenharmony_ci			pss->publishing = !strcmp(buf, "/publisher");
115d4afb5ceSopenharmony_ci		if (!pss->publishing)
116d4afb5ceSopenharmony_ci			/* add subscribers to the list of live pss held in the vhd */
117d4afb5ceSopenharmony_ci			lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
118d4afb5ceSopenharmony_ci		break;
119d4afb5ceSopenharmony_ci
120d4afb5ceSopenharmony_ci	case LWS_CALLBACK_CLOSED:
121d4afb5ceSopenharmony_ci		/* remove our closing pss from the list of live pss */
122d4afb5ceSopenharmony_ci		lws_ll_fwd_remove(struct per_session_data__minimal, pss_list,
123d4afb5ceSopenharmony_ci				  pss, vhd->pss_list);
124d4afb5ceSopenharmony_ci		break;
125d4afb5ceSopenharmony_ci
126d4afb5ceSopenharmony_ci	case LWS_CALLBACK_SERVER_WRITEABLE:
127d4afb5ceSopenharmony_ci
128d4afb5ceSopenharmony_ci		if (pss->publishing)
129d4afb5ceSopenharmony_ci			break;
130d4afb5ceSopenharmony_ci
131d4afb5ceSopenharmony_ci		pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
132d4afb5ceSopenharmony_ci		if (!pmsg)
133d4afb5ceSopenharmony_ci			break;
134d4afb5ceSopenharmony_ci
135d4afb5ceSopenharmony_ci		/* notice we allowed for LWS_PRE in the payload already */
136d4afb5ceSopenharmony_ci		m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
137d4afb5ceSopenharmony_ci			      pmsg->len, LWS_WRITE_TEXT);
138d4afb5ceSopenharmony_ci		if (m < (int)pmsg->len) {
139d4afb5ceSopenharmony_ci			lwsl_err("ERROR %d writing to ws socket\n", m);
140d4afb5ceSopenharmony_ci			return -1;
141d4afb5ceSopenharmony_ci		}
142d4afb5ceSopenharmony_ci
143d4afb5ceSopenharmony_ci		lws_ring_consume_and_update_oldest_tail(
144d4afb5ceSopenharmony_ci			vhd->ring,	/* lws_ring object */
145d4afb5ceSopenharmony_ci			struct per_session_data__minimal, /* type of objects with tails */
146d4afb5ceSopenharmony_ci			&pss->tail,	/* tail of guy doing the consuming */
147d4afb5ceSopenharmony_ci			1,		/* number of payload objects being consumed */
148d4afb5ceSopenharmony_ci			vhd->pss_list,	/* head of list of objects with tails */
149d4afb5ceSopenharmony_ci			tail,		/* member name of tail in objects with tails */
150d4afb5ceSopenharmony_ci			pss_list	/* member name of next object in objects with tails */
151d4afb5ceSopenharmony_ci		);
152d4afb5ceSopenharmony_ci
153d4afb5ceSopenharmony_ci		/* more to do? */
154d4afb5ceSopenharmony_ci		if (lws_ring_get_element(vhd->ring, &pss->tail))
155d4afb5ceSopenharmony_ci			/* come back as soon as we can write more */
156d4afb5ceSopenharmony_ci			lws_callback_on_writable(pss->wsi);
157d4afb5ceSopenharmony_ci		break;
158d4afb5ceSopenharmony_ci
159d4afb5ceSopenharmony_ci	case LWS_CALLBACK_RECEIVE:
160d4afb5ceSopenharmony_ci
161d4afb5ceSopenharmony_ci		if (!pss->publishing)
162d4afb5ceSopenharmony_ci			break;
163d4afb5ceSopenharmony_ci
164d4afb5ceSopenharmony_ci		/*
165d4afb5ceSopenharmony_ci		 * For test, our policy is ignore publishing when there are
166d4afb5ceSopenharmony_ci		 * no subscribers connected.
167d4afb5ceSopenharmony_ci		 */
168d4afb5ceSopenharmony_ci		if (!vhd->pss_list)
169d4afb5ceSopenharmony_ci			break;
170d4afb5ceSopenharmony_ci
171d4afb5ceSopenharmony_ci		n = (int)lws_ring_get_count_free_elements(vhd->ring);
172d4afb5ceSopenharmony_ci		if (!n) {
173d4afb5ceSopenharmony_ci			lwsl_user("dropping!\n");
174d4afb5ceSopenharmony_ci			break;
175d4afb5ceSopenharmony_ci		}
176d4afb5ceSopenharmony_ci
177d4afb5ceSopenharmony_ci		amsg.len = len;
178d4afb5ceSopenharmony_ci		/* notice we over-allocate by LWS_PRE */
179d4afb5ceSopenharmony_ci		amsg.payload = malloc(LWS_PRE + len);
180d4afb5ceSopenharmony_ci		if (!amsg.payload) {
181d4afb5ceSopenharmony_ci			lwsl_user("OOM: dropping\n");
182d4afb5ceSopenharmony_ci			break;
183d4afb5ceSopenharmony_ci		}
184d4afb5ceSopenharmony_ci
185d4afb5ceSopenharmony_ci		memcpy((char *)amsg.payload + LWS_PRE, in, len);
186d4afb5ceSopenharmony_ci		if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
187d4afb5ceSopenharmony_ci			__minimal_destroy_message(&amsg);
188d4afb5ceSopenharmony_ci			lwsl_user("dropping 2!\n");
189d4afb5ceSopenharmony_ci			break;
190d4afb5ceSopenharmony_ci		}
191d4afb5ceSopenharmony_ci
192d4afb5ceSopenharmony_ci		/*
193d4afb5ceSopenharmony_ci		 * let every subscriber know we want to write something
194d4afb5ceSopenharmony_ci		 * on them as soon as they are ready
195d4afb5ceSopenharmony_ci		 */
196d4afb5ceSopenharmony_ci		lws_start_foreach_llp(struct per_session_data__minimal **,
197d4afb5ceSopenharmony_ci				      ppss, vhd->pss_list) {
198d4afb5ceSopenharmony_ci			if (!(*ppss)->publishing)
199d4afb5ceSopenharmony_ci				lws_callback_on_writable((*ppss)->wsi);
200d4afb5ceSopenharmony_ci		} lws_end_foreach_llp(ppss, pss_list);
201d4afb5ceSopenharmony_ci		break;
202d4afb5ceSopenharmony_ci
203d4afb5ceSopenharmony_ci	default:
204d4afb5ceSopenharmony_ci		break;
205d4afb5ceSopenharmony_ci	}
206d4afb5ceSopenharmony_ci
207d4afb5ceSopenharmony_ci	return 0;
208d4afb5ceSopenharmony_ci}
209d4afb5ceSopenharmony_ci
210d4afb5ceSopenharmony_ci#define LWS_PLUGIN_PROTOCOL_MINIMAL \
211d4afb5ceSopenharmony_ci	{ \
212d4afb5ceSopenharmony_ci		"lws-minimal-broker", \
213d4afb5ceSopenharmony_ci		callback_minimal, \
214d4afb5ceSopenharmony_ci		sizeof(struct per_session_data__minimal), \
215d4afb5ceSopenharmony_ci		128, \
216d4afb5ceSopenharmony_ci		0, NULL, 0 \
217d4afb5ceSopenharmony_ci	}
218