1/*
2 * lws-minimal-secure-streams-client
3 *
4 * Written in 2010-2020 by Andy Green <andy@warmcat.com>
5 *
6 * This file is made available under the Creative Commons CC0 1.0
7 * Universal Public Domain Dedication.
8 *
9 *
10 * This client does not perform any INET networking... instead it opens a unix
11 * domain socket on a proxy that is listening for it, and that creates the
12 * actual secure stream connection.
13 *
14 * We are able to use the usual secure streams api in the client process, with
15 * payloads and connection state information proxied over the unix domain
16 * socket and fulfilled in the proxy process.
17 *
18 * The public client helper pieces are built as part of lws
19 */
20#include <private-lib-core.h>
21
22extern const uint32_t ss_state_txn_validity[17];
23
24int
25lws_ss_check_next_state_sspc(lws_sspc_handle_t *ss, uint8_t *prevstate,
26			     lws_ss_constate_t cs)
27{
28	if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED)
29		/*
30		 * we can't judge user or transient states, leave the old state
31		 * and just wave them through
32		 */
33		return 0;
34
35	if (cs >= LWS_ARRAY_SIZE(ss_state_txn_validity)) {
36		/* we don't recognize this state as usable */
37		lwsl_sspc_err(ss, "bad new state %u", cs);
38		assert(0);
39		return 1;
40	}
41
42	if (*prevstate >= LWS_ARRAY_SIZE(ss_state_txn_validity)) {
43		/* existing state is broken */
44		lwsl_sspc_err(ss, "bad existing state %u",
45				(unsigned int)*prevstate);
46		assert(0);
47		return 1;
48	}
49
50	if (ss_state_txn_validity[*prevstate] & (1u << cs)) {
51
52		lwsl_sspc_notice(ss, "%s -> %s",
53			       lws_ss_state_name((int)*prevstate),
54			       lws_ss_state_name((int)cs));
55
56		/* this is explicitly allowed, update old state to new */
57		*prevstate = (uint8_t)cs;
58
59		return 0;
60	}
61
62	lwsl_sspc_err(ss, "transition from %s -> %s is illegal",
63		    lws_ss_state_name((int)*prevstate),
64		    lws_ss_state_name((int)cs));
65
66	assert(0);
67
68	return 1;
69}
70
71lws_ss_state_return_t
72lws_sspc_event_helper(lws_sspc_handle_t *h, lws_ss_constate_t cs,
73		      lws_ss_tx_ordinal_t flags)
74{
75	lws_ss_state_return_t ret;
76
77	if (!h)
78		return LWSSSSRET_OK;
79
80	if (lws_ss_check_next_state_sspc(h, &h->prev_ss_state, cs))
81		return LWSSSSRET_DESTROY_ME;
82
83	if (!h->ssi.state)
84		return LWSSSSRET_OK;
85
86	h->h_in_svc = h;
87	ret = h->ssi.state((void *)((uint8_t *)(h + 1)), NULL, cs, flags);
88	h->h_in_svc = NULL;
89
90	return ret;
91}
92
93static void
94lws_sspc_sul_retry_cb(lws_sorted_usec_list_t *sul)
95{
96	lws_sspc_handle_t *h = lws_container_of(sul, lws_sspc_handle_t, sul_retry);
97	static struct lws_client_connect_info i;
98
99	/*
100	 * We may have started up before the system proxy, so be prepared with
101	 * a sul to retry at 1Hz
102	 */
103
104	memset(&i, 0, sizeof i);
105	i.context = h->context;
106	if (h->context->ss_proxy_port) { /* tcp */
107		i.address = h->context->ss_proxy_address;
108		i.port = h->context->ss_proxy_port;
109		i.iface = h->context->ss_proxy_bind;
110	} else {
111		if (h->context->ss_proxy_bind)
112			i.address = h->context->ss_proxy_bind;
113		else
114#if defined(__linux__)
115			i.address = "+@proxy.ss.lws";
116#else
117			i.address = "+/tmp/proxy.ss.lws";
118#endif
119	}
120	i.host = i.address;
121	i.origin = i.address;
122	i.method = "RAW";
123	i.protocol = lws_sspc_protocols[0].name;
124	i.local_protocol_name = lws_sspc_protocols[0].name;
125	i.path = "";
126	i.pwsi = &h->cwsi;
127	i.opaque_user_data = (void *)h;
128	i.ssl_connection = LCCSCF_SECSTREAM_PROXY_LINK;
129
130	lws_metrics_caliper_bind(h->cal_txn, h->context->mt_ss_cliprox_conn);
131#if defined(LWS_WITH_SYS_METRICS)
132	lws_metrics_tag_add(&h->cal_txn.mtags_owner, "ss", h->ssi.streamtype);
133#endif
134
135	/* this wsi is the link to the proxy */
136
137	if (!lws_client_connect_via_info(&i)) {
138
139#if defined(LWS_WITH_SYS_METRICS)
140		/*
141		 * If any hanging caliper measurement, dump it, and free any tags
142		 */
143		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
144#endif
145
146		lws_sul_schedule(h->context, 0, &h->sul_retry,
147				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
148
149		return;
150	}
151
152	lwsl_sspc_notice(h, "%s", h->cwsi->lc.gutag);
153}
154
155static int
156lws_sspc_serialize_metadata(lws_sspc_handle_t *h, lws_sspc_metadata_t *md,
157				uint8_t *p, uint8_t *end)
158{
159	int n, txc;
160
161	if (md->name[0] == '\0') {
162
163		lwsl_info("sending tx credit update %d\n",
164				md->tx_cr_adjust);
165
166		p[0] = LWSSS_SER_TXPRE_TXCR_UPDATE;
167		lws_ser_wu16be(&p[1], 4);
168		lws_ser_wu32be(&p[3], (uint32_t)md->tx_cr_adjust);
169
170		n = 7;
171
172	} else {
173
174		lwsl_sspc_info(h, "sending metadata");
175
176		p[0] = LWSSS_SER_TXPRE_METADATA;
177		txc = (int)strlen(md->name);
178		n = txc + 1 + (int)md->len;
179		if (n > 0xffff)
180			/* we can't serialize this metadata in 16b length */
181			return -1;
182		if (n > lws_ptr_diff(end, &p[4]))
183			/* we don't have space for this metadata */
184			return -1;
185		lws_ser_wu16be(&p[1], (uint16_t)n);
186		p[3] = (uint8_t)txc;
187		memcpy(&p[4], md->name, (unsigned int)txc);
188		memcpy(&p[4 + txc], &md[1], md->len);
189		n = 4 + txc + (int)md->len;
190	}
191
192	lws_dll2_remove(&md->list);
193	lws_free(md);
194
195	return n;
196}
197
198static int
199callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason,
200		     void *user, void *in, size_t len)
201{
202	lws_sspc_handle_t *h = (lws_sspc_handle_t *)lws_get_opaque_user_data(wsi);
203	size_t pktsize = wsi->a.context->max_http_header_data;
204	void *m = (void *)((uint8_t *)(h + 1));
205	uint8_t *pkt = NULL, *p = NULL, *end = NULL;
206	lws_ss_state_return_t r;
207	uint64_t interval;
208	const uint8_t *cp;
209	uint8_t s[64];
210	lws_usec_t us;
211	int flags, n;
212
213	switch (reason) {
214
215	case LWS_CALLBACK_CONNECTING:
216		/*
217		 * In our particular case, we want CCEs even inside the
218		 * initial connect loop time
219		 */
220		wsi->client_suppress_CONNECTION_ERROR = 0;
221		break;
222
223	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
224		lwsl_warn("%s: CCE: %s\n", __func__,
225			  in ? (const char *)in : "null");
226#if defined(LWS_WITH_SYS_METRICS)
227		/*
228		 * If any hanging caliper measurement, dump it, and free any tags
229		 */
230		lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
231#endif
232		lws_set_opaque_user_data(wsi, NULL);
233		h->cwsi = NULL;
234		lws_sul_schedule(h->context, 0, &h->sul_retry,
235				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
236		if (h->ssi.state) {
237			interval = (uint64_t)(lws_now_usecs() - h->us_start_upstream) /
238								LWS_US_PER_MS;
239			if (interval > 0xffffffffull)
240				interval = 0xffffffffull;
241			r = h->ssi.state(lws_sspc_to_user_object(h), NULL,
242					  LWSSSCS_UPSTREAM_LINK_RETRY,
243					  (uint32_t)interval);
244			if (r == LWSSSSRET_DESTROY_ME)
245				lws_sspc_destroy(&h);
246		}
247		break;
248
249        case LWS_CALLBACK_RAW_CONNECTED:
250		if (!h || lws_fi(&h->fic, "sspc_fail_on_linkup"))
251			return -1;
252		lwsl_sspc_info(h, "CONNECTED (%s)", h->ssi.streamtype);
253
254		h->state = LPCSCLI_SENDING_INITIAL_TX;
255		/*
256		 * We create the dsh at the response to the initial tx, which
257		 * will let us know the policy's max size for it... let's
258		 * protect the connection with a promise to complete the
259		 * SS serialization streamtype negotation within a short period,
260		 * we will cancel this timeout when we have the proxy's ack
261		 * of the streamtype serialization, eg, it exists in the proxy
262		 * policy etc
263		 */
264		lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
265		lws_callback_on_writable(wsi);
266		h->us_start_upstream = 0;
267                break;
268
269	case LWS_CALLBACK_RAW_CLOSE:
270		/*
271		 * our ss proxy Unix Domain socket has closed...
272		 */
273		if (!h) {
274			lwsl_info("%s: no sspc on client proxy link close", __func__);
275			break;
276		}
277		lwsl_sspc_info(h, "LWS_CALLBACK_RAW_CLOSE: proxy conn down, wsi %s",
278				lws_wsi_tag(wsi));
279
280		lws_dsh_destroy(&h->dsh);
281		if (h->ss_dangling_connected && h->ssi.state) {
282
283			lwsl_sspc_notice(h, "setting _DISCONNECTED");
284			h->ss_dangling_connected = 0;
285			h->prev_ss_state = LWSSSCS_DISCONNECTED;
286			r = h->ssi.state(ss_to_userobj(h), NULL,
287						 LWSSSCS_DISCONNECTED, 0);
288			if (r == LWSSSSRET_DESTROY_ME) {
289				h->cwsi = NULL;
290				lws_set_opaque_user_data(wsi, NULL);
291				lws_sspc_destroy(&h);
292				break;
293			}
294		}
295
296		h->cwsi = NULL;
297		/*
298		 * schedule a reconnect in 1s
299		 */
300		lws_sul_schedule(h->context, 0, &h->sul_retry,
301				 lws_sspc_sul_retry_cb, LWS_US_PER_SEC);
302
303		break;
304
305	case LWS_CALLBACK_RAW_RX:
306		/*
307		 * ie, the proxy has sent us something
308		 */
309
310		if (!h || !h->cwsi) {
311			lwsl_info("%s: rx when client ss destroyed\n", __func__);
312
313			return -1;
314		}
315
316		lwsl_sspc_info(h, "%s: RAW_RX: rx %d\n", __func__, (int)len);
317
318		if (!len) {
319			lwsl_sspc_notice(h, "RAW_RX: zero len");
320
321			return -1;
322		}
323
324		if (lws_fi(&h->fic, "sspc_fake_rxparse_disconnect_me"))
325			n = LWSSSSRET_DISCONNECT_ME;
326		else
327			if (lws_fi(&h->fic, "sspc_fake_rxparse_destroy_me"))
328				n = LWSSSSRET_DESTROY_ME;
329			else
330				n = lws_ss_deserialize_parse(&h->parser,
331							     lws_get_context(wsi),
332							     h->dsh, in, len,
333							     &h->state, h,
334							     (lws_ss_handle_t **)m,
335							     &h->ssi, 1);
336		switch (n) {
337		case LWSSSSRET_OK:
338			break;
339		case LWSSSSRET_DISCONNECT_ME:
340			lwsl_info("%s: proxlicent RX ended with DISCONNECT_ME\n",
341					__func__);
342			return -1;
343		case LWSSSSRET_DESTROY_ME:
344			lwsl_info("%s: proxlicent RX ended with DESTROY_ME\n",
345					__func__);
346			lws_set_opaque_user_data(wsi, NULL);
347			lws_sspc_destroy(&h);
348			return -1;
349		}
350
351		if (h->state == LPCSCLI_LOCAL_CONNECTED ||
352		    h->state == LPCSCLI_ONWARD_CONNECT)
353			lws_set_timeout(wsi, 0, 0);
354
355		break;
356
357	case LWS_CALLBACK_RAW_WRITEABLE:
358
359		/*
360		 * We can transmit something to the proxy...
361		 */
362
363		if (!h)
364			break;
365
366		lwsl_sspc_debug(h, "WRITEABLE %s, state %d",
367				wsi->lc.gutag, h->state);
368
369		/*
370		 * Management of ss timeout can happen any time and doesn't
371		 * depend on wsi existence or state
372		 */
373
374		n = 0;
375		cp = s;
376
377		if (h->pending_timeout_update) {
378			s[0] = LWSSS_SER_TXPRE_TIMEOUT_UPDATE;
379			s[1] = 0;
380			s[2] = 4;
381			/*
382			 *          0: use policy timeout value
383			 * 0xffffffff: cancel the timeout
384			 */
385			lws_ser_wu32be(&s[3], h->timeout_ms);
386			/* in case anything else to write */
387			lws_callback_on_writable(h->cwsi);
388			h->pending_timeout_update = 0;
389			n = 7;
390			goto do_write;
391		}
392
393		s[1] = 0;
394		/*
395		 * This is the state of the link that connects us to the onward
396		 * proxy
397		 */
398		switch (h->state) {
399		case LPCSCLI_SENDING_INITIAL_TX:
400			/*
401			 * We are negotating the opening of a particular
402			 * streamtype
403			 */
404			n = (int)strlen(h->ssi.streamtype) + 1 + 4 + 4;
405
406			s[0] = LWSSS_SER_TXPRE_STREAMTYPE;
407			lws_ser_wu16be(&s[1], (uint16_t)n);
408			/* SSSv1: add protocol version byte (initially 1) */
409			s[3] = (uint8_t)LWS_SSS_CLIENT_PROTOCOL_VERSION;
410			lws_ser_wu32be(&s[4], (uint32_t)getpid());
411			lws_ser_wu32be(&s[8], (uint32_t)h->txc.peer_tx_cr_est);
412			//h->txcr_out = txc;
413			lws_strncpy((char *)&s[12], h->ssi.streamtype, sizeof(s) - 12);
414			n += 3;
415			h->state = LPCSCLI_WAITING_CREATE_RESULT;
416
417			break;
418
419		case LPCSCLI_LOCAL_CONNECTED:
420
421			// lwsl_notice("%s: LPCSCLI_LOCAL_CONNECTED\n", __func__);
422
423			/*
424			 * Do we need to prioritize sending any metadata
425			 * changes?
426			 */
427
428			if (h->metadata_owner.count) {
429				lws_sspc_metadata_t *md = lws_container_of(
430					lws_dll2_get_tail(&h->metadata_owner),
431					lws_sspc_metadata_t, list);
432
433				pkt = lws_malloc(pktsize + LWS_PRE, __func__);
434				if (!pkt)
435					goto hangup;
436				cp = p = pkt + LWS_PRE;
437				end = p + pktsize;
438
439				n = lws_sspc_serialize_metadata(h, md, p, end);
440				if (n < 0)
441					goto metadata_hangup;
442
443				lwsl_sspc_debug(h, "(local_conn) metadata");
444
445				goto req_write_and_issue;
446			}
447
448			if (h->pending_writeable_len) {
449				lwsl_sspc_debug(h, "(local_conn) PAYLOAD_LENGTH_HINT %u",
450					   (unsigned int)h->writeable_len);
451				s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
452				lws_ser_wu16be(&s[1], 4);
453				lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
454				h->pending_writeable_len = 0;
455				n = 7;
456				goto req_write_and_issue;
457			}
458
459			if (h->conn_req_state >= LWSSSPC_ONW_ONGOING) {
460				lwsl_sspc_info(h, "conn_req_state %d",
461						h->conn_req_state);
462				break;
463			}
464
465			lwsl_sspc_info(h, "(local_conn) onward connect");
466
467			h->conn_req_state = LWSSSPC_ONW_ONGOING;
468
469			s[0] = LWSSS_SER_TXPRE_ONWARD_CONNECT;
470			s[1] = 0;
471			s[2] = 0;
472			n = 3;
473			break;
474
475		case LPCSCLI_OPERATIONAL:
476
477			/*
478			 *
479			 * - Do we need to prioritize sending any metadata
480			 *   changes?  (includes txcr updates)
481			 *
482			 * - Do we need to forward a hint about the payload
483			 *   length?
484			 */
485
486			pkt = lws_malloc(pktsize + LWS_PRE, __func__);
487			if (!pkt)
488				goto hangup;
489			cp = p = pkt + LWS_PRE;
490			end = p + pktsize;
491
492			if (h->metadata_owner.count) {
493				lws_sspc_metadata_t *md = lws_container_of(
494					lws_dll2_get_tail(&h->metadata_owner),
495					lws_sspc_metadata_t, list);
496
497				n = lws_sspc_serialize_metadata(h, md, p, end);
498				if (n < 0)
499					goto metadata_hangup;
500
501				goto req_write_and_issue;
502			}
503
504			if (h->pending_writeable_len) {
505				lwsl_sspc_info(h, "PAYLOAD_LENGTH_HINT %u",
506					  (unsigned int)h->writeable_len);
507				s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT;
508				lws_ser_wu16be(&s[1], 4);
509				lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len);
510				h->pending_writeable_len = 0;
511				n = 7;
512				goto req_write_and_issue;
513			}
514
515			/* we can't write anything if we don't have credit */
516			if (!h->ignore_txc && h->txc.tx_cr <= 0) {
517				lwsl_sspc_info(h, "WRITEABLE / OPERATIONAL:"
518					    " lack credit (%d)",
519					    h->txc.tx_cr);
520				// break;
521			}
522
523			len = pktsize - LWS_PRE - 19;
524			flags = 0;
525			if (!h->ssi.tx) {
526				n = 0;
527				goto do_write_nz;
528			}
529
530			n = h->ssi.tx(m, h->ord++, pkt + LWS_PRE + 19, &len,
531				      &flags);
532			switch (n) {
533			case LWSSSSRET_TX_DONT_SEND:
534				n = 0;
535				goto do_write_nz;
536
537			case LWSSSSRET_DISCONNECT_ME:
538			case LWSSSSRET_DESTROY_ME:
539				lwsl_notice("%s: sspc tx DISCONNECT/DESTROY unimplemented\n", __func__);
540				break;
541			default:
542				break;
543			}
544
545			h->txc.tx_cr = h->txc.tx_cr - (int)len;
546
547			cp = p;
548			n = (int)(len + 19);
549			us = lws_now_usecs();
550			p[0] = LWSSS_SER_TXPRE_TX_PAYLOAD;
551			lws_ser_wu16be(&p[1], (uint16_t)(len + 19 - 3));
552			lws_ser_wu32be(&p[3], (uint32_t)flags);
553			/* time spent here waiting to send this */
554			lws_ser_wu32be(&p[7], (uint32_t)(us - h->us_earliest_write_req));
555			/* ust that the client write happened */
556			lws_ser_wu64be(&p[11], (uint64_t)us);
557			h->us_earliest_write_req = 0;
558
559			if (flags & LWSSS_FLAG_EOM)
560				if (h->rsidx + 1 < (int)LWS_ARRAY_SIZE(h->rideshare_ofs) &&
561				    h->rideshare_ofs[h->rsidx + 1])
562					h->rsidx++;
563
564			break;
565		default:
566			break;
567		}
568
569do_write_nz:
570
571		if (!n)
572			break;
573
574do_write:
575		if (lws_fi(&h->fic, "sspc_link_write_fail"))
576			n = -1;
577		else
578			n = lws_write(wsi, (uint8_t *)cp, (unsigned int)n, LWS_WRITE_RAW);
579		if (n < 0) {
580			lwsl_sspc_notice(h, "WRITEABLE: %d", n);
581
582			goto hangup;
583		}
584		break;
585
586	default:
587		break;
588	}
589
590	lws_free(pkt);
591
592	return lws_callback_http_dummy(wsi, reason, user, in, len);
593
594metadata_hangup:
595	lwsl_sspc_err(h, "metadata too large");
596
597hangup:
598	lws_free(pkt);
599	lwsl_warn("hangup\n");
600	/* hang up on him */
601	return -1;
602
603req_write_and_issue:
604	/* in case anything else to write */
605	lws_callback_on_writable(h->cwsi);
606	goto do_write_nz;
607}
608
609const struct lws_protocols lws_sspc_protocols[] = {
610	{
611		"ssproxy-protocol",
612		callback_sspc_client,
613		0,
614		2048, 2048, NULL, 0
615	},
616	{ NULL, NULL, 0, 0, 0, NULL, 0 }
617};
618
619int
620lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi,
621	        void *opaque_user_data, lws_sspc_handle_t **ppss,
622	        struct lws_sequencer *seq_owner, const char **ppayload_fmt)
623{
624	lws_sspc_handle_t *h;
625	uint8_t *ua;
626	char *p;
627
628	lws_service_assert_loop_thread(context, tsi);
629
630	/* allocate the handle (including ssi), the user alloc,
631	 * and the streamname */
632
633	h = malloc(sizeof(lws_sspc_handle_t) + ssi->user_alloc +
634				strlen(ssi->streamtype) + 1);
635	if (!h)
636		return 1;
637	memset(h, 0, sizeof(*h));
638
639	h->lc.log_cx = context->log_cx;
640
641#if defined(LWS_WITH_SYS_FAULT_INJECTION)
642	h->fic.name = "sspc";
643	lws_xos_init(&h->fic.xos, lws_xos(&context->fic.xos));
644	if (ssi->fic.fi_owner.count)
645		lws_fi_import(&h->fic, &ssi->fic);
646
647	lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype);
648#endif
649
650	if (lws_fi(&h->fic, "sspc_create_oom")) {
651		/*
652		 * We have to do this a litte later, so we can cleanly inherit
653		 * the OOM pieces and drain the info fic
654		 */
655		lws_fi_destroy(&h->fic);
656		free(h);
657		return 1;
658	}
659
660	__lws_lc_tag(context, &context->lcg[LWSLCG_SSP_CLIENT], &h->lc,
661			ssi->streamtype);
662
663	memcpy(&h->ssi, ssi, sizeof(*ssi));
664	ua = (uint8_t *)(h + 1);
665	memset(ua, 0, ssi->user_alloc);
666	p = (char *)ua + ssi->user_alloc;
667	memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1);
668	h->ssi.streamtype = (const char *)p;
669	h->context = context;
670	h->us_start_upstream = lws_now_usecs();
671
672	if (!ssi->manual_initial_tx_credit)
673		h->txc.peer_tx_cr_est = 500000000;
674	else
675		h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit;
676
677	if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME))
678		h->ignore_txc = 1;
679
680	lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner);
681
682	/* fill in the things the real api does for the caller */
683
684	*((void **)(ua + ssi->opaque_user_data_offset)) = opaque_user_data;
685	*((void **)(ua + ssi->handle_offset)) = h;
686
687	if (ppss)
688		*ppss = h;
689
690	/* try the actual connect */
691
692	lws_sspc_sul_retry_cb(&h->sul_retry);
693
694	return 0;
695}
696
697/* used on context destroy when iterating listed lws_ss on a pt */
698
699int
700lws_sspc_destroy_dll(struct lws_dll2 *d, void *user)
701{
702	lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
703
704	lws_sspc_destroy(&h);
705
706	return 0;
707}
708
709void
710lws_sspc_rxmetadata_destroy(lws_sspc_handle_t *h)
711{
712	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
713			lws_dll2_get_head(&h->metadata_owner_rx)) {
714		lws_sspc_metadata_t *md =
715				lws_container_of(d, lws_sspc_metadata_t, list);
716
717		lws_dll2_remove(&md->list);
718		lws_free(md);
719
720	} lws_end_foreach_dll_safe(d, d1);
721}
722
723void
724lws_sspc_destroy(lws_sspc_handle_t **ph)
725{
726	lws_sspc_handle_t *h;
727
728	if (!*ph)
729		return;
730
731	h = *ph;
732	if (h == h->h_in_svc) {
733		lwsl_err("%s: illegal destroy, return LWSSSSRET_DESTROY_ME instead\n",
734				__func__);
735		assert(0);
736		return;
737	}
738
739	lws_service_assert_loop_thread(h->context, 0);
740
741	if (h->destroying)
742		return;
743
744	h->destroying = 1;
745
746	/* if this caliper is still dangling at destroy, we failed */
747#if defined(LWS_WITH_SYS_METRICS)
748	/*
749	 * If any hanging caliper measurement, dump it, and free any tags
750	 */
751	lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL);
752#endif
753	if (h->ss_dangling_connected && h->ssi.state) {
754		lws_sspc_event_helper(h, LWSSSCS_DISCONNECTED, 0);
755		h->ss_dangling_connected = 0;
756	}
757
758#if defined(LWS_WITH_SYS_FAULT_INJECTION)
759	lws_fi_destroy(&h->fic);
760#endif
761
762	lws_sul_cancel(&h->sul_retry);
763	lws_dll2_remove(&h->client_list);
764
765	if (h->dsh)
766		lws_dsh_destroy(&h->dsh);
767	if (h->cwsi) {
768		lws_set_opaque_user_data(h->cwsi, NULL);
769		lws_wsi_close(h->cwsi, LWS_TO_KILL_ASYNC);
770		h->cwsi = NULL;
771	}
772
773	/* clean out any pending metadata changes that didn't make it */
774
775	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
776			lws_dll2_get_head(&(*ph)->metadata_owner)) {
777		lws_sspc_metadata_t *md =
778				lws_container_of(d, lws_sspc_metadata_t, list);
779
780		lws_dll2_remove(&md->list);
781		lws_free(md);
782
783	} lws_end_foreach_dll_safe(d, d1);
784
785	lws_sspc_rxmetadata_destroy(h);
786
787	lws_sspc_event_helper(h, LWSSSCS_DESTROYING, 0);
788	*ph = NULL;
789
790	lws_sul_cancel(&h->sul_retry);
791
792
793	/* confirm no sul left scheduled in handle or user allocation object */
794	lws_sul_debug_zombies(h->context, h, sizeof(*h) + h->ssi.user_alloc,
795			      __func__);
796
797	__lws_lc_untag(h->context, &h->lc);
798
799	free(h);
800}
801
802lws_ss_state_return_t
803lws_sspc_request_tx(lws_sspc_handle_t *h)
804{
805	if (!h || !h->cwsi)
806		return LWSSSSRET_OK;
807
808	lws_service_assert_loop_thread(h->context, 0);
809
810	if (!h->us_earliest_write_req)
811		h->us_earliest_write_req = lws_now_usecs();
812
813	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
814	    h->conn_req_state == LWSSSPC_ONW_NONE)
815		h->conn_req_state = LWSSSPC_ONW_REQ;
816
817	lws_callback_on_writable(h->cwsi);
818
819	return LWSSSSRET_OK;
820}
821
822/*
823 * Currently we fulfil the writeable part locally by just enabling POLLOUT on
824 * the UDS link, without serialization footprint, which is reasonable as far as
825 * it goes.
826 *
827 * But for the ..._len() variant, the expected payload length hint we are being
828 * told is something that must be serialized to the onward peer, since either
829 * that guy or someone upstream of him is the guy who will compose the framing
830 * with it that actually goes out.
831 *
832 * This information is needed at the upstream guy before we have sent any
833 * payload, eg, for http POST, he has to prepare the content-length in the
834 * headers, before any payload.  So we have to issue a serialization of the
835 * length at this point.
836 */
837
838lws_ss_state_return_t
839lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len)
840{
841	/*
842	 * for client conns, they cannot even complete creation of the handle
843	 * without the onwared connection to the proxy, it's not legal to start
844	 * using it until it's operation and has the onward connection (and the
845	 * link has called CREATED state)
846	 */
847
848	if (!h)
849		return LWSSSSRET_OK;
850
851	lws_service_assert_loop_thread(h->context, 0);
852
853	lwsl_sspc_notice(h, "setting writeable_len %u", (unsigned int)len);
854	h->writeable_len = len;
855	h->pending_writeable_len = 1;
856
857	if (!h->us_earliest_write_req)
858		h->us_earliest_write_req = lws_now_usecs();
859
860	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
861	    h->conn_req_state == LWSSSPC_ONW_NONE)
862		h->conn_req_state = LWSSSPC_ONW_REQ;
863
864	/*
865	 * We're going to use this up with serializing h->writeable_len... that
866	 * will request again.
867	 */
868
869	if (h->cwsi)
870		lws_callback_on_writable(h->cwsi);
871
872	return LWSSSSRET_OK;
873}
874
875int
876lws_sspc_client_connect(lws_sspc_handle_t *h)
877{
878	if (!h || h->state == LPCSCLI_OPERATIONAL)
879		return 0;
880
881	lws_service_assert_loop_thread(h->context, 0);
882
883	assert(h->state == LPCSCLI_LOCAL_CONNECTED);
884	if (h->state == LPCSCLI_LOCAL_CONNECTED &&
885	    h->conn_req_state == LWSSSPC_ONW_NONE)
886		h->conn_req_state = LWSSSPC_ONW_REQ;
887	if (h->cwsi)
888		lws_callback_on_writable(h->cwsi);
889
890	return 0;
891}
892
893struct lws_context *
894lws_sspc_get_context(struct lws_sspc_handle *h)
895{
896	return h->context;
897}
898
899const char *
900lws_sspc_rideshare(struct lws_sspc_handle *h)
901{
902	/*
903	 * ...the serialized RX rideshare name if any...
904	 */
905
906	if (h->parser.rideshare[0]) {
907		lwsl_sspc_info(h, "parser %s", h->parser.rideshare);
908
909		return h->parser.rideshare;
910	}
911
912	/*
913	 * The tx rideshare index
914	 */
915
916	if (h->rideshare_list[0]) {
917		lwsl_sspc_info(h, "tx list %s",
918			  &h->rideshare_list[h->rideshare_ofs[h->rsidx]]);
919		return &h->rideshare_list[h->rideshare_ofs[h->rsidx]];
920	}
921
922	/*
923	 * ... otherwise default to our stream type name
924	 */
925
926	lwsl_sspc_info(h, "def %s\n", h->ssi.streamtype);
927
928	return h->ssi.streamtype;
929}
930
931static int
932_lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
933		       const void *value, size_t len, int tx_cr_adjust)
934{
935	lws_sspc_metadata_t *md;
936
937	lws_service_assert_loop_thread(h->context, 0);
938
939	/*
940	 * Are we replacing a pending metadata of the same name?  It's not
941	 * efficient to do this but user code can do what it likes... let's
942	 * optimize away the old one.
943	 *
944	 * Tx credit adjust always has name ""
945	 */
946
947	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
948				   lws_dll2_get_head(&h->metadata_owner)) {
949		md = lws_container_of(d, lws_sspc_metadata_t, list);
950
951		if (!strcmp(name, md->name)) {
952			lws_dll2_remove(&md->list);
953			lws_free(md);
954			break;
955		}
956
957	} lws_end_foreach_dll_safe(d, d1);
958
959	/*
960	 * We have to stash the metadata and pass it to the proxy
961	 */
962
963	if (lws_fi(&h->fic, "sspc_fail_metadata_set"))
964		md = NULL;
965	else
966		md = lws_malloc(sizeof(*md) + len, "set metadata");
967	if (!md) {
968		lwsl_sspc_err(h, "OOM");
969
970		return 1;
971	}
972
973	memset(md, 0, sizeof(*md));
974
975	md->tx_cr_adjust = tx_cr_adjust;
976	h->txc.peer_tx_cr_est += tx_cr_adjust;
977
978	lws_strncpy(md->name, name, sizeof(md->name));
979	md->len = len;
980	if (len)
981		memcpy(&md[1], value, len);
982
983	lws_dll2_add_tail(&md->list, &h->metadata_owner);
984
985	if (len) {
986		lwsl_sspc_info(h, "set metadata %s", name);
987		lwsl_hexdump_sspc_info(h, value, len);
988	} else
989		lwsl_sspc_info(h, "serializing tx cr adj %d",
990			    (int)tx_cr_adjust);
991
992	if (h->cwsi)
993		lws_callback_on_writable(h->cwsi);
994
995	return 0;
996}
997
998int
999lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name,
1000		      const void *value, size_t len)
1001{
1002	return _lws_sspc_set_metadata(h, name, value, len, 0);
1003}
1004
1005int
1006lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name,
1007		      const void **value, size_t *len)
1008{
1009	lws_sspc_metadata_t *md;
1010
1011	/*
1012	 * client side does not have access to policy
1013	 * and any metadata are new to it each time,
1014	 * we allocate them, removing any existing with
1015	 * the same name first
1016	 */
1017
1018	lws_service_assert_loop_thread(h->context, 0);
1019
1020	lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
1021			lws_dll2_get_head(&h->metadata_owner_rx)) {
1022		md = lws_container_of(d,
1023			   lws_sspc_metadata_t, list);
1024
1025		if (!strcmp(md->name, name)) {
1026			*len = md->len;
1027			*value = &md[1];
1028
1029			return 0;
1030		}
1031
1032	} lws_end_foreach_dll_safe(d, d1);
1033
1034	return 1;
1035}
1036
1037int
1038lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump)
1039{
1040	lws_service_assert_loop_thread(h->context, 0);
1041	lwsl_sspc_notice(h, "%d\n", bump);
1042	return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump);
1043}
1044
1045int
1046lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h)
1047{
1048	lws_service_assert_loop_thread(h->context, 0);
1049	return h->txc.peer_tx_cr_est;
1050}
1051
1052void
1053lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms)
1054{
1055	lws_service_assert_loop_thread(h->context, 0);
1056	if (!h->cwsi)
1057		/* we can't fulfil it */
1058		return;
1059	h->timeout_ms = (uint32_t)timeout_ms;
1060	h->pending_timeout_update = 1;
1061	lws_callback_on_writable(h->cwsi);
1062}
1063
1064void
1065lws_sspc_cancel_timeout(struct lws_sspc_handle *h)
1066{
1067	lws_sspc_start_timeout(h, (unsigned int)-1);
1068}
1069
1070void *
1071lws_sspc_to_user_object(struct lws_sspc_handle *h)
1072{
1073	return (void *)(h + 1);
1074}
1075
1076void
1077lws_sspc_change_handlers(struct lws_sspc_handle *h,
1078	lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags),
1079	lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
1080		  size_t *len, int *flags),
1081	lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */,
1082		     lws_ss_constate_t state, lws_ss_tx_ordinal_t ack))
1083{
1084	if (rx)
1085		h->ssi.rx = rx;
1086	if (tx)
1087		h->ssi.tx = tx;
1088	if (state)
1089		h->ssi.state = state;
1090}
1091
1092const char *
1093lws_sspc_tag(struct lws_sspc_handle *h)
1094{
1095	if (!h)
1096		return "[null sspc]";
1097	return lws_lc_tag(&h->lc);
1098}
1099
1100int
1101lws_sspc_cancel_notify_dll(struct lws_dll2 *d, void *user)
1102{
1103	lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list);
1104
1105	lws_sspc_event_helper(h, LWSSSCS_EVENT_WAIT_CANCELLED, 0);
1106
1107	return 0;
1108}
1109
1110