1/*
2 * libwebsockets - small server side websockets and web server implementation
3 *
4 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com>
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to
8 * deal in the Software without restriction, including without limitation the
9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 * sell copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 * IN THE SOFTWARE.
23 */
24
25#if !defined (LWS_PLUGIN_STATIC)
26#if !defined(LWS_DLL)
27#define LWS_DLL
28#endif
29#if !defined(LWS_INTERNAL)
30#define LWS_INTERNAL
31#endif
32#include <libwebsockets.h>
33#endif
34
35#include <string.h>
36#include <sys/types.h>
37#include <fcntl.h>
38
39#define RING_DEPTH 8
40
41struct packet {
42	void *payload;
43	uint32_t len;
44	uint32_t ticket;
45};
46
47enum {
48	ACC,
49	ONW
50};
51
52/*
53 * Because both sides of the connection want to share this, we allocate it
54 * during accepted adoption and both sides have a pss that is just a wrapper
55 * pointing to this.
56 *
57 * The last one of the accepted side and the onward side to close frees it.
58 * This removes any chance of one side or the other having an invalidated
59 * pointer to the pss.
60 */
61
62struct conn {
63	struct lws *wsi[2];
64
65	/* rings containing unsent rx from accepted and onward sides */
66	struct lws_ring *r[2];
67	uint32_t t[2]; /* ring tail */
68
69	uint32_t ticket_next;
70	uint32_t ticket_retired;
71
72	char rx_enabled[2];
73	char closed[2];
74	char established[2];
75};
76
77struct raw_pss {
78	struct conn *conn;
79};
80
81/* one of these is created for each vhost our protocol is used with */
82
83struct raw_vhd {
84	char addr[128];
85	uint16_t port;
86	char ipv6;
87};
88
89static void
90__destroy_packet(void *_pkt)
91{
92	struct packet *pkt = _pkt;
93
94	free(pkt->payload);
95	pkt->payload = NULL;
96	pkt->len = 0;
97}
98
99static void
100destroy_conn(struct raw_vhd *vhd, struct raw_pss *pss)
101{
102	struct conn *conn = pss->conn;
103
104	if (conn->r[ACC])
105		lws_ring_destroy(conn->r[ACC]);
106	if (conn->r[ONW])
107		lws_ring_destroy(conn->r[ONW]);
108
109	pss->conn = NULL;
110
111	free(conn);
112}
113
114static int
115connect_client(struct raw_vhd *vhd, struct raw_pss *pss)
116{
117	struct lws_client_connect_info i;
118	char host[128];
119	struct lws *cwsi;
120
121	lws_snprintf(host, sizeof(host), "%s:%u", vhd->addr, vhd->port);
122
123	memset(&i, 0, sizeof(i));
124
125	i.method = "RAW";
126	i.context = lws_get_context(pss->conn->wsi[ACC]);
127	i.port = vhd->port;
128	i.address = vhd->addr;
129	i.host = host;
130	i.origin = host;
131	i.ssl_connection = 0;
132	i.vhost = lws_get_vhost(pss->conn->wsi[ACC]);
133	i.local_protocol_name = "raw-proxy";
134	i.protocol = "raw-proxy";
135	i.path = "/";
136	/*
137	 * The "onward" client wsi has its own pss but shares the "conn"
138	 * created when the inbound connection was accepted.  We need to stash
139	 * the address of the shared conn and apply it to the client psss
140	 * when the client connection completes.
141	 */
142	i.opaque_user_data = pss->conn;
143	i.pwsi = &pss->conn->wsi[ONW];
144
145	lwsl_info("%s: onward: %s:%d%s\n", __func__, i.address, i.port, i.path);
146
147	cwsi = lws_client_connect_via_info(&i);
148	if (!cwsi)
149		lwsl_err("%s: client connect failed early\n", __func__);
150
151	return !cwsi;
152}
153
154static int
155flow_control(struct conn *conn, int side, int enable)
156{
157	if (conn->closed[side] ||
158	    enable == conn->rx_enabled[side] ||
159	    !conn->established[side])
160		return 0;
161
162	if (lws_rx_flow_control(conn->wsi[side], enable))
163		return 1;
164
165	conn->rx_enabled[side] = (char)enable;
166	lwsl_info("%s: %s side: %s\n", __func__, side ? "ONW" : "ACC",
167		  enable ? "rx enabled" : "rx flow controlled");
168
169	return 0;
170}
171
172static int
173callback_raw_proxy(struct lws *wsi, enum lws_callback_reasons reason,
174		   void *user, void *in, size_t len)
175{
176	struct raw_pss *pss = (struct raw_pss *)user;
177	struct raw_vhd *vhd = (struct raw_vhd *)lws_protocol_vh_priv_get(
178				     lws_get_vhost(wsi), lws_get_protocol(wsi));
179	const struct packet *ppkt;
180	struct conn *conn = NULL;
181	struct lws_tokenize ts;
182	lws_tokenize_elem e;
183	struct packet pkt;
184	const char *cp;
185	int n;
186
187	if (pss)
188		conn = pss->conn;
189
190	switch (reason) {
191	case LWS_CALLBACK_PROTOCOL_INIT:
192		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
193				lws_get_protocol(wsi), sizeof(struct raw_vhd));
194		if (!vhd)
195			return 0;
196		if (lws_pvo_get_str(in, "onward", &cp)) {
197			lwsl_warn("%s: vh %s: pvo 'onward' required\n", __func__,
198				 lws_get_vhost_name(lws_get_vhost(wsi)));
199
200			return 0;
201		}
202		lws_tokenize_init(&ts, cp, LWS_TOKENIZE_F_DOT_NONTERM |
203					   LWS_TOKENIZE_F_MINUS_NONTERM |
204					   LWS_TOKENIZE_F_NO_FLOATS);
205		ts.len = strlen(cp);
206
207		if (lws_tokenize(&ts) != LWS_TOKZE_TOKEN)
208			goto bad_onward;
209		if (!strncmp(ts.token, "ipv6", ts.token_len))
210			vhd->ipv6 = 1;
211		else
212			if (strncmp(ts.token, "ipv4", ts.token_len))
213				goto bad_onward;
214
215		/* then the colon */
216		if (lws_tokenize(&ts) != LWS_TOKZE_DELIMITER)
217			goto bad_onward;
218
219		e = lws_tokenize(&ts);
220		if (!vhd->ipv6) {
221			if (e != LWS_TOKZE_TOKEN ||
222			    ts.token_len + 1 >= (int)sizeof(vhd->addr))
223				goto bad_onward;
224
225			lws_strncpy(vhd->addr, ts.token, ts.token_len + 1);
226			e = lws_tokenize(&ts);
227			if (e == LWS_TOKZE_DELIMITER) {
228				/* there should be a port then */
229				e = lws_tokenize(&ts);
230				if (e != LWS_TOKZE_INTEGER)
231					goto bad_onward;
232				vhd->port = (uint16_t)atoi(ts.token);
233				e = lws_tokenize(&ts);
234			}
235			if (e != LWS_TOKZE_ENDED)
236				goto bad_onward;
237		} else
238			lws_strncpy(vhd->addr, ts.token, sizeof(vhd->addr));
239
240		lwsl_notice("%s: vh %s: onward %s:%s:%d\n", __func__,
241			    lws_get_vhost_name(lws_get_vhost(wsi)),
242			    vhd->ipv6 ? "ipv6": "ipv4", vhd->addr, vhd->port);
243		break;
244
245bad_onward:
246		lwsl_err("%s: onward pvo format must be ipv4:addr[:port] "
247			 " or ipv6:addr, not '%s'\n", __func__, cp);
248		return -1;
249
250	case LWS_CALLBACK_PROTOCOL_DESTROY:
251		break;
252
253	/* callbacks related to client "onward side" */
254
255	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
256		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
257			 in ? (char *)in : "(null)");
258		break;
259
260        case LWS_CALLBACK_RAW_PROXY_CLI_ADOPT:
261		lwsl_debug("%s: %p: LWS_CALLBACK_RAW_CLI_ADOPT: pss %p\n", __func__, wsi, pss);
262		if (conn || !pss)
263			break;
264		conn = pss->conn = lws_get_opaque_user_data(wsi);
265		if (!conn)
266			break;
267		conn->established[ONW] = 1;
268		/* they start enabled */
269		conn->rx_enabled[ACC] = 1;
270		conn->rx_enabled[ONW] = 1;
271
272		/* he disabled his rx while waiting for use to be established */
273		flow_control(conn, ACC, 1);
274
275		lws_callback_on_writable(wsi);
276		lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
277		break;
278
279	case LWS_CALLBACK_RAW_PROXY_CLI_CLOSE:
280		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_CLOSE\n");
281		if (!conn)
282			break;
283
284		conn->closed[ONW] = 1;
285
286		if (conn->closed[ACC])
287			destroy_conn(vhd, pss);
288
289		break;
290
291	case LWS_CALLBACK_RAW_PROXY_CLI_RX:
292		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_RX: %d\n", (int)len);
293
294		if (!conn)
295			return 0;
296
297		if (!pss || !conn->wsi[ACC] || conn->closed[ACC]) {
298			lwsl_info(" pss %p, wsi[ACC] %p, closed[ACC] %d\n",
299				  pss, conn->wsi[ACC], conn->closed[ACC]);
300			return -1;
301		}
302		pkt.payload = malloc(len);
303		if (!pkt.payload) {
304			lwsl_notice("OOM: dropping\n");
305			return -1;
306		}
307		pkt.len = (uint32_t)len;
308		pkt.ticket = conn->ticket_next++;
309
310		memcpy(pkt.payload, in, len);
311		if (!lws_ring_insert(conn->r[ONW], &pkt, 1)) {
312			__destroy_packet(&pkt);
313			lwsl_notice("dropping!\n");
314			return -1;
315		}
316
317		lwsl_debug("After onward RX: acc free: %d...\n",
318			   (int)lws_ring_get_count_free_elements(conn->r[ONW]));
319
320		if (conn->rx_enabled[ONW] &&
321		    lws_ring_get_count_free_elements(conn->r[ONW]) < 2)
322			flow_control(conn, ONW, 0);
323
324		if (!conn->closed[ACC])
325			lws_callback_on_writable(conn->wsi[ACC]);
326		break;
327
328	case LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE:
329		lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE\n");
330
331		if (!conn)
332			break;
333
334		ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
335		if (!ppkt) {
336			lwsl_info("%s: CLI_WRITABLE had nothing in acc ring\n",
337				  __func__);
338			break;
339		}
340
341		if (ppkt->ticket != conn->ticket_retired + 1) {
342			lwsl_info("%s: acc ring has %d but next %d\n", __func__,
343				  ppkt->ticket, conn->ticket_retired + 1);
344			lws_callback_on_writable(conn->wsi[ACC]);
345			break;
346		}
347
348		n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
349		if (n < 0) {
350			lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
351
352			return -1;
353		}
354
355		conn->ticket_retired = ppkt->ticket;
356		lws_ring_consume(conn->r[ACC], &conn->t[ACC], NULL, 1);
357		lws_ring_update_oldest_tail(conn->r[ACC], conn->t[ACC]);
358
359		lwsl_debug("acc free: %d...\n",
360			  (int)lws_ring_get_count_free_elements(conn->r[ACC]));
361
362		if (!conn->rx_enabled[ACC] &&
363		    lws_ring_get_count_free_elements(conn->r[ACC]) > 2)
364			flow_control(conn, ACC, 1);
365
366		ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
367		lwsl_debug("%s: CLI_WRITABLE: next acc pkt %p idx %d vs %d\n",
368			   __func__, ppkt, ppkt ? ppkt->ticket : 0,
369					   conn->ticket_retired + 1);
370
371		if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
372			lws_callback_on_writable(wsi);
373		else {
374			/*
375			 * defer checking for accepted side closing until we
376			 * sent everything in the ring to onward
377			 */
378			if (conn->closed[ACC])
379				/*
380				 * there is never going to be any more... but
381				 * we may have some tx still in tx buflist /
382				 * partial
383				 */
384				return lws_raw_transaction_completed(wsi);
385
386			if (lws_ring_get_element(conn->r[ONW], &conn->t[ONW]))
387				lws_callback_on_writable(conn->wsi[ACC]);
388		}
389		break;
390
391	/* callbacks related to raw socket descriptor "accepted side" */
392
393        case LWS_CALLBACK_RAW_PROXY_SRV_ADOPT:
394		lwsl_debug("LWS_CALLBACK_RAW_SRV_ADOPT\n");
395		if (!pss)
396			return -1;
397		conn = pss->conn = malloc(sizeof(struct conn));
398		if (!pss->conn)
399			return -1;
400		memset(conn, 0, sizeof(*conn));
401
402		conn->wsi[ACC] = wsi;
403		conn->ticket_next = 1;
404
405		conn->r[ACC] = lws_ring_create(sizeof(struct packet),
406					       RING_DEPTH, __destroy_packet);
407		if (!conn->r[ACC]) {
408			lwsl_err("%s: OOM\n", __func__);
409			return -1;
410		}
411		conn->r[ONW] = lws_ring_create(sizeof(struct packet),
412					       RING_DEPTH, __destroy_packet);
413		if (!conn->r[ONW]) {
414			lws_ring_destroy(conn->r[ACC]);
415			conn->r[ACC] = NULL;
416			lwsl_err("%s: OOM\n", __func__);
417
418			return -1;
419		}
420
421		conn->established[ACC] = 1;
422
423		/* disable any rx until the client side is up */
424		flow_control(conn, ACC, 0);
425
426		lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
427
428		/* try to create the onward client connection */
429		connect_client(vhd, pss);
430                break;
431
432	case LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:
433		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:\n");
434
435		if (!conn)
436			break;
437
438		conn->closed[ACC] = 1;
439		if (conn->closed[ONW])
440			destroy_conn(vhd, pss);
441		break;
442
443	case LWS_CALLBACK_RAW_PROXY_SRV_RX:
444		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_RX: rx %d\n", (int)len);
445
446		if (!conn || !conn->wsi[ONW]) {
447			lwsl_err("%s: LWS_CALLBACK_RAW_PROXY_SRV_RX: "
448				 "conn->wsi[ONW] NULL\n", __func__);
449			return -1;
450		}
451		if (conn->closed[ONW]) {
452			lwsl_info(" closed[ONW] %d\n", conn->closed[ONW]);
453			return -1;
454		}
455
456		if (!len)
457			return 0;
458
459		pkt.payload = malloc(len);
460		if (!pkt.payload) {
461			lwsl_notice("OOM: dropping\n");
462			return -1;
463		}
464		pkt.len = (uint32_t)len;
465		pkt.ticket = conn->ticket_next++;
466
467		memcpy(pkt.payload, in, len);
468		if (!lws_ring_insert(conn->r[ACC], &pkt, 1)) {
469			__destroy_packet(&pkt);
470			lwsl_notice("dropping!\n");
471			return -1;
472		}
473
474		lwsl_debug("After acc RX: acc free: %d...\n",
475			   (int)lws_ring_get_count_free_elements(conn->r[ACC]));
476
477		if (conn->rx_enabled[ACC] &&
478		    lws_ring_get_count_free_elements(conn->r[ACC]) <= 2)
479			flow_control(conn, ACC, 0);
480
481		if (conn->established[ONW] && !conn->closed[ONW])
482			lws_callback_on_writable(conn->wsi[ONW]);
483		break;
484
485	case LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE:
486		lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n");
487
488		if (!conn || !conn->established[ONW] || conn->closed[ONW])
489			break;
490
491		ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
492		if (!ppkt) {
493			lwsl_info("%s: SRV_WRITABLE nothing in onw ring\n",
494				  __func__);
495			break;
496		}
497
498		if (ppkt->ticket != conn->ticket_retired + 1) {
499			lwsl_info("%s: onw ring has %d but next %d\n", __func__,
500				  ppkt->ticket, conn->ticket_retired + 1);
501			lws_callback_on_writable(conn->wsi[ONW]);
502			break;
503		}
504
505		n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
506		if (n < 0) {
507			lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
508
509			return -1;
510		}
511
512		conn->ticket_retired = ppkt->ticket;
513		lws_ring_consume(conn->r[ONW], &conn->t[ONW], NULL, 1);
514		lws_ring_update_oldest_tail(conn->r[ONW], conn->t[ONW]);
515
516		lwsl_debug("onward free: %d... waiting %d\n",
517			  (int)lws_ring_get_count_free_elements(conn->r[ONW]),
518			  (int)lws_ring_get_count_waiting_elements(conn->r[ONW],
519								&conn->t[ONW]));
520
521		if (!conn->rx_enabled[ONW] &&
522		    lws_ring_get_count_free_elements(conn->r[ONW]) > 2)
523			flow_control(conn, ONW, 1);
524
525		ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
526		lwsl_debug("%s: SRV_WRITABLE: next onw pkt %p idx %d vs %d\n",
527			   __func__, ppkt, ppkt ? ppkt->ticket : 0,
528					   conn->ticket_retired + 1);
529
530		if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
531			lws_callback_on_writable(wsi);
532		else {
533			/*
534			 * defer checking for onward side closing until we
535			 * sent everything in the ring to accepted side
536			 */
537			if (conn->closed[ONW])
538				/*
539				 * there is never going to be any more... but
540				 * we may have some tx still in tx buflist /
541				 * partial
542				 */
543				return lws_raw_transaction_completed(wsi);
544
545		if (lws_ring_get_element(conn->r[ACC], &conn->t[ACC]))
546			lws_callback_on_writable(conn->wsi[ONW]);
547		}
548		break;
549
550	default:
551		break;
552	}
553
554	return lws_callback_http_dummy(wsi, reason, user, in, len);
555}
556
557#define LWS_PLUGIN_PROTOCOL_RAW_PROXY { \
558		"raw-proxy", \
559		callback_raw_proxy, \
560		sizeof(struct raw_pss), \
561		8192, \
562		8192, NULL, 0 \
563	}
564
565#if !defined (LWS_PLUGIN_STATIC)
566
567LWS_VISIBLE const struct lws_protocols lws_raw_proxy_protocols[] = {
568	LWS_PLUGIN_PROTOCOL_RAW_PROXY
569};
570
571LWS_VISIBLE const lws_plugin_protocol_t lws_raw_proxy = {
572	.hdr = {
573		"raw proxy",
574		"lws_protocol_plugin",
575		LWS_BUILD_HASH,
576		LWS_PLUGIN_API_MAGIC
577	},
578
579	.protocols = lws_raw_proxy_protocols,
580	.count_protocols = LWS_ARRAY_SIZE(lws_raw_proxy_protocols),
581	.extensions = NULL,
582	.count_extensions = 0,
583};
584#endif
585
586
587