1/* 2 * lws-minimal-secure-streams-client 3 * 4 * Written in 2010-2020 by Andy Green <andy@warmcat.com> 5 * 6 * This file is made available under the Creative Commons CC0 1.0 7 * Universal Public Domain Dedication. 8 * 9 * 10 * This client does not perform any INET networking... instead it opens a unix 11 * domain socket on a proxy that is listening for it, and that creates the 12 * actual secure stream connection. 13 * 14 * We are able to use the usual secure streams api in the client process, with 15 * payloads and connection state information proxied over the unix domain 16 * socket and fulfilled in the proxy process. 17 * 18 * The public client helper pieces are built as part of lws 19 */ 20#include <private-lib-core.h> 21 22extern const uint32_t ss_state_txn_validity[17]; 23 24int 25lws_ss_check_next_state_sspc(lws_sspc_handle_t *ss, uint8_t *prevstate, 26 lws_ss_constate_t cs) 27{ 28 if (cs >= LWSSSCS_USER_BASE || cs == LWSSSCS_EVENT_WAIT_CANCELLED) 29 /* 30 * we can't judge user or transient states, leave the old state 31 * and just wave them through 32 */ 33 return 0; 34 35 if (cs >= LWS_ARRAY_SIZE(ss_state_txn_validity)) { 36 /* we don't recognize this state as usable */ 37 lwsl_sspc_err(ss, "bad new state %u", cs); 38 assert(0); 39 return 1; 40 } 41 42 if (*prevstate >= LWS_ARRAY_SIZE(ss_state_txn_validity)) { 43 /* existing state is broken */ 44 lwsl_sspc_err(ss, "bad existing state %u", 45 (unsigned int)*prevstate); 46 assert(0); 47 return 1; 48 } 49 50 if (ss_state_txn_validity[*prevstate] & (1u << cs)) { 51 52 lwsl_sspc_notice(ss, "%s -> %s", 53 lws_ss_state_name((int)*prevstate), 54 lws_ss_state_name((int)cs)); 55 56 /* this is explicitly allowed, update old state to new */ 57 *prevstate = (uint8_t)cs; 58 59 return 0; 60 } 61 62 lwsl_sspc_err(ss, "transition from %s -> %s is illegal", 63 lws_ss_state_name((int)*prevstate), 64 lws_ss_state_name((int)cs)); 65 66 assert(0); 67 68 return 1; 69} 70 71lws_ss_state_return_t 72lws_sspc_event_helper(lws_sspc_handle_t *h, lws_ss_constate_t cs, 73 lws_ss_tx_ordinal_t flags) 74{ 75 lws_ss_state_return_t ret; 76 77 if (!h) 78 return LWSSSSRET_OK; 79 80 if (lws_ss_check_next_state_sspc(h, &h->prev_ss_state, cs)) 81 return LWSSSSRET_DESTROY_ME; 82 83 if (!h->ssi.state) 84 return LWSSSSRET_OK; 85 86 h->h_in_svc = h; 87 ret = h->ssi.state((void *)((uint8_t *)(h + 1)), NULL, cs, flags); 88 h->h_in_svc = NULL; 89 90 return ret; 91} 92 93static void 94lws_sspc_sul_retry_cb(lws_sorted_usec_list_t *sul) 95{ 96 lws_sspc_handle_t *h = lws_container_of(sul, lws_sspc_handle_t, sul_retry); 97 static struct lws_client_connect_info i; 98 99 /* 100 * We may have started up before the system proxy, so be prepared with 101 * a sul to retry at 1Hz 102 */ 103 104 memset(&i, 0, sizeof i); 105 i.context = h->context; 106 if (h->context->ss_proxy_port) { /* tcp */ 107 i.address = h->context->ss_proxy_address; 108 i.port = h->context->ss_proxy_port; 109 i.iface = h->context->ss_proxy_bind; 110 } else { 111 if (h->context->ss_proxy_bind) 112 i.address = h->context->ss_proxy_bind; 113 else 114#if defined(__linux__) 115 i.address = "+@proxy.ss.lws"; 116#else 117 i.address = "+/tmp/proxy.ss.lws"; 118#endif 119 } 120 i.host = i.address; 121 i.origin = i.address; 122 i.method = "RAW"; 123 i.protocol = lws_sspc_protocols[0].name; 124 i.local_protocol_name = lws_sspc_protocols[0].name; 125 i.path = ""; 126 i.pwsi = &h->cwsi; 127 i.opaque_user_data = (void *)h; 128 i.ssl_connection = LCCSCF_SECSTREAM_PROXY_LINK; 129 130 lws_metrics_caliper_bind(h->cal_txn, h->context->mt_ss_cliprox_conn); 131#if defined(LWS_WITH_SYS_METRICS) 132 lws_metrics_tag_add(&h->cal_txn.mtags_owner, "ss", h->ssi.streamtype); 133#endif 134 135 /* this wsi is the link to the proxy */ 136 137 if (!lws_client_connect_via_info(&i)) { 138 139#if defined(LWS_WITH_SYS_METRICS) 140 /* 141 * If any hanging caliper measurement, dump it, and free any tags 142 */ 143 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL); 144#endif 145 146 lws_sul_schedule(h->context, 0, &h->sul_retry, 147 lws_sspc_sul_retry_cb, LWS_US_PER_SEC); 148 149 return; 150 } 151 152 lwsl_sspc_notice(h, "%s", h->cwsi->lc.gutag); 153} 154 155static int 156lws_sspc_serialize_metadata(lws_sspc_handle_t *h, lws_sspc_metadata_t *md, 157 uint8_t *p, uint8_t *end) 158{ 159 int n, txc; 160 161 if (md->name[0] == '\0') { 162 163 lwsl_info("sending tx credit update %d\n", 164 md->tx_cr_adjust); 165 166 p[0] = LWSSS_SER_TXPRE_TXCR_UPDATE; 167 lws_ser_wu16be(&p[1], 4); 168 lws_ser_wu32be(&p[3], (uint32_t)md->tx_cr_adjust); 169 170 n = 7; 171 172 } else { 173 174 lwsl_sspc_info(h, "sending metadata"); 175 176 p[0] = LWSSS_SER_TXPRE_METADATA; 177 txc = (int)strlen(md->name); 178 n = txc + 1 + (int)md->len; 179 if (n > 0xffff) 180 /* we can't serialize this metadata in 16b length */ 181 return -1; 182 if (n > lws_ptr_diff(end, &p[4])) 183 /* we don't have space for this metadata */ 184 return -1; 185 lws_ser_wu16be(&p[1], (uint16_t)n); 186 p[3] = (uint8_t)txc; 187 memcpy(&p[4], md->name, (unsigned int)txc); 188 memcpy(&p[4 + txc], &md[1], md->len); 189 n = 4 + txc + (int)md->len; 190 } 191 192 lws_dll2_remove(&md->list); 193 lws_free(md); 194 195 return n; 196} 197 198static int 199callback_sspc_client(struct lws *wsi, enum lws_callback_reasons reason, 200 void *user, void *in, size_t len) 201{ 202 lws_sspc_handle_t *h = (lws_sspc_handle_t *)lws_get_opaque_user_data(wsi); 203 size_t pktsize = wsi->a.context->max_http_header_data; 204 void *m = (void *)((uint8_t *)(h + 1)); 205 uint8_t *pkt = NULL, *p = NULL, *end = NULL; 206 lws_ss_state_return_t r; 207 uint64_t interval; 208 const uint8_t *cp; 209 uint8_t s[64]; 210 lws_usec_t us; 211 int flags, n; 212 213 switch (reason) { 214 215 case LWS_CALLBACK_CONNECTING: 216 /* 217 * In our particular case, we want CCEs even inside the 218 * initial connect loop time 219 */ 220 wsi->client_suppress_CONNECTION_ERROR = 0; 221 break; 222 223 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: 224 lwsl_warn("%s: CCE: %s\n", __func__, 225 in ? (const char *)in : "null"); 226#if defined(LWS_WITH_SYS_METRICS) 227 /* 228 * If any hanging caliper measurement, dump it, and free any tags 229 */ 230 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL); 231#endif 232 lws_set_opaque_user_data(wsi, NULL); 233 h->cwsi = NULL; 234 lws_sul_schedule(h->context, 0, &h->sul_retry, 235 lws_sspc_sul_retry_cb, LWS_US_PER_SEC); 236 if (h->ssi.state) { 237 interval = (uint64_t)(lws_now_usecs() - h->us_start_upstream) / 238 LWS_US_PER_MS; 239 if (interval > 0xffffffffull) 240 interval = 0xffffffffull; 241 r = h->ssi.state(lws_sspc_to_user_object(h), NULL, 242 LWSSSCS_UPSTREAM_LINK_RETRY, 243 (uint32_t)interval); 244 if (r == LWSSSSRET_DESTROY_ME) 245 lws_sspc_destroy(&h); 246 } 247 break; 248 249 case LWS_CALLBACK_RAW_CONNECTED: 250 if (!h || lws_fi(&h->fic, "sspc_fail_on_linkup")) 251 return -1; 252 lwsl_sspc_info(h, "CONNECTED (%s)", h->ssi.streamtype); 253 254 h->state = LPCSCLI_SENDING_INITIAL_TX; 255 /* 256 * We create the dsh at the response to the initial tx, which 257 * will let us know the policy's max size for it... let's 258 * protect the connection with a promise to complete the 259 * SS serialization streamtype negotation within a short period, 260 * we will cancel this timeout when we have the proxy's ack 261 * of the streamtype serialization, eg, it exists in the proxy 262 * policy etc 263 */ 264 lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3); 265 lws_callback_on_writable(wsi); 266 h->us_start_upstream = 0; 267 break; 268 269 case LWS_CALLBACK_RAW_CLOSE: 270 /* 271 * our ss proxy Unix Domain socket has closed... 272 */ 273 if (!h) { 274 lwsl_info("%s: no sspc on client proxy link close", __func__); 275 break; 276 } 277 lwsl_sspc_info(h, "LWS_CALLBACK_RAW_CLOSE: proxy conn down, wsi %s", 278 lws_wsi_tag(wsi)); 279 280 lws_dsh_destroy(&h->dsh); 281 if (h->ss_dangling_connected && h->ssi.state) { 282 283 lwsl_sspc_notice(h, "setting _DISCONNECTED"); 284 h->ss_dangling_connected = 0; 285 h->prev_ss_state = LWSSSCS_DISCONNECTED; 286 r = h->ssi.state(ss_to_userobj(h), NULL, 287 LWSSSCS_DISCONNECTED, 0); 288 if (r == LWSSSSRET_DESTROY_ME) { 289 h->cwsi = NULL; 290 lws_set_opaque_user_data(wsi, NULL); 291 lws_sspc_destroy(&h); 292 break; 293 } 294 } 295 296 h->cwsi = NULL; 297 /* 298 * schedule a reconnect in 1s 299 */ 300 lws_sul_schedule(h->context, 0, &h->sul_retry, 301 lws_sspc_sul_retry_cb, LWS_US_PER_SEC); 302 303 break; 304 305 case LWS_CALLBACK_RAW_RX: 306 /* 307 * ie, the proxy has sent us something 308 */ 309 310 if (!h || !h->cwsi) { 311 lwsl_info("%s: rx when client ss destroyed\n", __func__); 312 313 return -1; 314 } 315 316 lwsl_sspc_info(h, "%s: RAW_RX: rx %d\n", __func__, (int)len); 317 318 if (!len) { 319 lwsl_sspc_notice(h, "RAW_RX: zero len"); 320 321 return -1; 322 } 323 324 if (lws_fi(&h->fic, "sspc_fake_rxparse_disconnect_me")) 325 n = LWSSSSRET_DISCONNECT_ME; 326 else 327 if (lws_fi(&h->fic, "sspc_fake_rxparse_destroy_me")) 328 n = LWSSSSRET_DESTROY_ME; 329 else 330 n = lws_ss_deserialize_parse(&h->parser, 331 lws_get_context(wsi), 332 h->dsh, in, len, 333 &h->state, h, 334 (lws_ss_handle_t **)m, 335 &h->ssi, 1); 336 switch (n) { 337 case LWSSSSRET_OK: 338 break; 339 case LWSSSSRET_DISCONNECT_ME: 340 lwsl_info("%s: proxlicent RX ended with DISCONNECT_ME\n", 341 __func__); 342 return -1; 343 case LWSSSSRET_DESTROY_ME: 344 lwsl_info("%s: proxlicent RX ended with DESTROY_ME\n", 345 __func__); 346 lws_set_opaque_user_data(wsi, NULL); 347 lws_sspc_destroy(&h); 348 return -1; 349 } 350 351 if (h->state == LPCSCLI_LOCAL_CONNECTED || 352 h->state == LPCSCLI_ONWARD_CONNECT) 353 lws_set_timeout(wsi, 0, 0); 354 355 break; 356 357 case LWS_CALLBACK_RAW_WRITEABLE: 358 359 /* 360 * We can transmit something to the proxy... 361 */ 362 363 if (!h) 364 break; 365 366 lwsl_sspc_debug(h, "WRITEABLE %s, state %d", 367 wsi->lc.gutag, h->state); 368 369 /* 370 * Management of ss timeout can happen any time and doesn't 371 * depend on wsi existence or state 372 */ 373 374 n = 0; 375 cp = s; 376 377 if (h->pending_timeout_update) { 378 s[0] = LWSSS_SER_TXPRE_TIMEOUT_UPDATE; 379 s[1] = 0; 380 s[2] = 4; 381 /* 382 * 0: use policy timeout value 383 * 0xffffffff: cancel the timeout 384 */ 385 lws_ser_wu32be(&s[3], h->timeout_ms); 386 /* in case anything else to write */ 387 lws_callback_on_writable(h->cwsi); 388 h->pending_timeout_update = 0; 389 n = 7; 390 goto do_write; 391 } 392 393 s[1] = 0; 394 /* 395 * This is the state of the link that connects us to the onward 396 * proxy 397 */ 398 switch (h->state) { 399 case LPCSCLI_SENDING_INITIAL_TX: 400 /* 401 * We are negotating the opening of a particular 402 * streamtype 403 */ 404 n = (int)strlen(h->ssi.streamtype) + 1 + 4 + 4; 405 406 s[0] = LWSSS_SER_TXPRE_STREAMTYPE; 407 lws_ser_wu16be(&s[1], (uint16_t)n); 408 /* SSSv1: add protocol version byte (initially 1) */ 409 s[3] = (uint8_t)LWS_SSS_CLIENT_PROTOCOL_VERSION; 410 lws_ser_wu32be(&s[4], (uint32_t)getpid()); 411 lws_ser_wu32be(&s[8], (uint32_t)h->txc.peer_tx_cr_est); 412 //h->txcr_out = txc; 413 lws_strncpy((char *)&s[12], h->ssi.streamtype, sizeof(s) - 12); 414 n += 3; 415 h->state = LPCSCLI_WAITING_CREATE_RESULT; 416 417 break; 418 419 case LPCSCLI_LOCAL_CONNECTED: 420 421 // lwsl_notice("%s: LPCSCLI_LOCAL_CONNECTED\n", __func__); 422 423 /* 424 * Do we need to prioritize sending any metadata 425 * changes? 426 */ 427 428 if (h->metadata_owner.count) { 429 lws_sspc_metadata_t *md = lws_container_of( 430 lws_dll2_get_tail(&h->metadata_owner), 431 lws_sspc_metadata_t, list); 432 433 pkt = lws_malloc(pktsize + LWS_PRE, __func__); 434 if (!pkt) 435 goto hangup; 436 cp = p = pkt + LWS_PRE; 437 end = p + pktsize; 438 439 n = lws_sspc_serialize_metadata(h, md, p, end); 440 if (n < 0) 441 goto metadata_hangup; 442 443 lwsl_sspc_debug(h, "(local_conn) metadata"); 444 445 goto req_write_and_issue; 446 } 447 448 if (h->pending_writeable_len) { 449 lwsl_sspc_debug(h, "(local_conn) PAYLOAD_LENGTH_HINT %u", 450 (unsigned int)h->writeable_len); 451 s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT; 452 lws_ser_wu16be(&s[1], 4); 453 lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len); 454 h->pending_writeable_len = 0; 455 n = 7; 456 goto req_write_and_issue; 457 } 458 459 if (h->conn_req_state >= LWSSSPC_ONW_ONGOING) { 460 lwsl_sspc_info(h, "conn_req_state %d", 461 h->conn_req_state); 462 break; 463 } 464 465 lwsl_sspc_info(h, "(local_conn) onward connect"); 466 467 h->conn_req_state = LWSSSPC_ONW_ONGOING; 468 469 s[0] = LWSSS_SER_TXPRE_ONWARD_CONNECT; 470 s[1] = 0; 471 s[2] = 0; 472 n = 3; 473 break; 474 475 case LPCSCLI_OPERATIONAL: 476 477 /* 478 * 479 * - Do we need to prioritize sending any metadata 480 * changes? (includes txcr updates) 481 * 482 * - Do we need to forward a hint about the payload 483 * length? 484 */ 485 486 pkt = lws_malloc(pktsize + LWS_PRE, __func__); 487 if (!pkt) 488 goto hangup; 489 cp = p = pkt + LWS_PRE; 490 end = p + pktsize; 491 492 if (h->metadata_owner.count) { 493 lws_sspc_metadata_t *md = lws_container_of( 494 lws_dll2_get_tail(&h->metadata_owner), 495 lws_sspc_metadata_t, list); 496 497 n = lws_sspc_serialize_metadata(h, md, p, end); 498 if (n < 0) 499 goto metadata_hangup; 500 501 goto req_write_and_issue; 502 } 503 504 if (h->pending_writeable_len) { 505 lwsl_sspc_info(h, "PAYLOAD_LENGTH_HINT %u", 506 (unsigned int)h->writeable_len); 507 s[0] = LWSSS_SER_TXPRE_PAYLOAD_LENGTH_HINT; 508 lws_ser_wu16be(&s[1], 4); 509 lws_ser_wu32be(&s[3], (uint32_t)h->writeable_len); 510 h->pending_writeable_len = 0; 511 n = 7; 512 goto req_write_and_issue; 513 } 514 515 /* we can't write anything if we don't have credit */ 516 if (!h->ignore_txc && h->txc.tx_cr <= 0) { 517 lwsl_sspc_info(h, "WRITEABLE / OPERATIONAL:" 518 " lack credit (%d)", 519 h->txc.tx_cr); 520 // break; 521 } 522 523 len = pktsize - LWS_PRE - 19; 524 flags = 0; 525 if (!h->ssi.tx) { 526 n = 0; 527 goto do_write_nz; 528 } 529 530 n = h->ssi.tx(m, h->ord++, pkt + LWS_PRE + 19, &len, 531 &flags); 532 switch (n) { 533 case LWSSSSRET_TX_DONT_SEND: 534 n = 0; 535 goto do_write_nz; 536 537 case LWSSSSRET_DISCONNECT_ME: 538 case LWSSSSRET_DESTROY_ME: 539 lwsl_notice("%s: sspc tx DISCONNECT/DESTROY unimplemented\n", __func__); 540 break; 541 default: 542 break; 543 } 544 545 h->txc.tx_cr = h->txc.tx_cr - (int)len; 546 547 cp = p; 548 n = (int)(len + 19); 549 us = lws_now_usecs(); 550 p[0] = LWSSS_SER_TXPRE_TX_PAYLOAD; 551 lws_ser_wu16be(&p[1], (uint16_t)(len + 19 - 3)); 552 lws_ser_wu32be(&p[3], (uint32_t)flags); 553 /* time spent here waiting to send this */ 554 lws_ser_wu32be(&p[7], (uint32_t)(us - h->us_earliest_write_req)); 555 /* ust that the client write happened */ 556 lws_ser_wu64be(&p[11], (uint64_t)us); 557 h->us_earliest_write_req = 0; 558 559 if (flags & LWSSS_FLAG_EOM) 560 if (h->rsidx + 1 < (int)LWS_ARRAY_SIZE(h->rideshare_ofs) && 561 h->rideshare_ofs[h->rsidx + 1]) 562 h->rsidx++; 563 564 break; 565 default: 566 break; 567 } 568 569do_write_nz: 570 571 if (!n) 572 break; 573 574do_write: 575 if (lws_fi(&h->fic, "sspc_link_write_fail")) 576 n = -1; 577 else 578 n = lws_write(wsi, (uint8_t *)cp, (unsigned int)n, LWS_WRITE_RAW); 579 if (n < 0) { 580 lwsl_sspc_notice(h, "WRITEABLE: %d", n); 581 582 goto hangup; 583 } 584 break; 585 586 default: 587 break; 588 } 589 590 lws_free(pkt); 591 592 return lws_callback_http_dummy(wsi, reason, user, in, len); 593 594metadata_hangup: 595 lwsl_sspc_err(h, "metadata too large"); 596 597hangup: 598 lws_free(pkt); 599 lwsl_warn("hangup\n"); 600 /* hang up on him */ 601 return -1; 602 603req_write_and_issue: 604 /* in case anything else to write */ 605 lws_callback_on_writable(h->cwsi); 606 goto do_write_nz; 607} 608 609const struct lws_protocols lws_sspc_protocols[] = { 610 { 611 "ssproxy-protocol", 612 callback_sspc_client, 613 0, 614 2048, 2048, NULL, 0 615 }, 616 { NULL, NULL, 0, 0, 0, NULL, 0 } 617}; 618 619int 620lws_sspc_create(struct lws_context *context, int tsi, const lws_ss_info_t *ssi, 621 void *opaque_user_data, lws_sspc_handle_t **ppss, 622 struct lws_sequencer *seq_owner, const char **ppayload_fmt) 623{ 624 lws_sspc_handle_t *h; 625 uint8_t *ua; 626 char *p; 627 628 lws_service_assert_loop_thread(context, tsi); 629 630 /* allocate the handle (including ssi), the user alloc, 631 * and the streamname */ 632 633 h = malloc(sizeof(lws_sspc_handle_t) + ssi->user_alloc + 634 strlen(ssi->streamtype) + 1); 635 if (!h) 636 return 1; 637 memset(h, 0, sizeof(*h)); 638 639 h->lc.log_cx = context->log_cx; 640 641#if defined(LWS_WITH_SYS_FAULT_INJECTION) 642 h->fic.name = "sspc"; 643 lws_xos_init(&h->fic.xos, lws_xos(&context->fic.xos)); 644 if (ssi->fic.fi_owner.count) 645 lws_fi_import(&h->fic, &ssi->fic); 646 647 lws_fi_inherit_copy(&h->fic, &context->fic, "ss", ssi->streamtype); 648#endif 649 650 if (lws_fi(&h->fic, "sspc_create_oom")) { 651 /* 652 * We have to do this a litte later, so we can cleanly inherit 653 * the OOM pieces and drain the info fic 654 */ 655 lws_fi_destroy(&h->fic); 656 free(h); 657 return 1; 658 } 659 660 __lws_lc_tag(context, &context->lcg[LWSLCG_SSP_CLIENT], &h->lc, 661 ssi->streamtype); 662 663 memcpy(&h->ssi, ssi, sizeof(*ssi)); 664 ua = (uint8_t *)(h + 1); 665 memset(ua, 0, ssi->user_alloc); 666 p = (char *)ua + ssi->user_alloc; 667 memcpy(p, ssi->streamtype, strlen(ssi->streamtype) + 1); 668 h->ssi.streamtype = (const char *)p; 669 h->context = context; 670 h->us_start_upstream = lws_now_usecs(); 671 672 if (!ssi->manual_initial_tx_credit) 673 h->txc.peer_tx_cr_est = 500000000; 674 else 675 h->txc.peer_tx_cr_est = ssi->manual_initial_tx_credit; 676 677 if (!strcmp(ssi->streamtype, LWS_SMD_STREAMTYPENAME)) 678 h->ignore_txc = 1; 679 680 lws_dll2_add_head(&h->client_list, &context->pt[tsi].ss_client_owner); 681 682 /* fill in the things the real api does for the caller */ 683 684 *((void **)(ua + ssi->opaque_user_data_offset)) = opaque_user_data; 685 *((void **)(ua + ssi->handle_offset)) = h; 686 687 if (ppss) 688 *ppss = h; 689 690 /* try the actual connect */ 691 692 lws_sspc_sul_retry_cb(&h->sul_retry); 693 694 return 0; 695} 696 697/* used on context destroy when iterating listed lws_ss on a pt */ 698 699int 700lws_sspc_destroy_dll(struct lws_dll2 *d, void *user) 701{ 702 lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list); 703 704 lws_sspc_destroy(&h); 705 706 return 0; 707} 708 709void 710lws_sspc_rxmetadata_destroy(lws_sspc_handle_t *h) 711{ 712 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, 713 lws_dll2_get_head(&h->metadata_owner_rx)) { 714 lws_sspc_metadata_t *md = 715 lws_container_of(d, lws_sspc_metadata_t, list); 716 717 lws_dll2_remove(&md->list); 718 lws_free(md); 719 720 } lws_end_foreach_dll_safe(d, d1); 721} 722 723void 724lws_sspc_destroy(lws_sspc_handle_t **ph) 725{ 726 lws_sspc_handle_t *h; 727 728 if (!*ph) 729 return; 730 731 h = *ph; 732 if (h == h->h_in_svc) { 733 lwsl_err("%s: illegal destroy, return LWSSSSRET_DESTROY_ME instead\n", 734 __func__); 735 assert(0); 736 return; 737 } 738 739 lws_service_assert_loop_thread(h->context, 0); 740 741 if (h->destroying) 742 return; 743 744 h->destroying = 1; 745 746 /* if this caliper is still dangling at destroy, we failed */ 747#if defined(LWS_WITH_SYS_METRICS) 748 /* 749 * If any hanging caliper measurement, dump it, and free any tags 750 */ 751 lws_metrics_caliper_report_hist(h->cal_txn, (struct lws *)NULL); 752#endif 753 if (h->ss_dangling_connected && h->ssi.state) { 754 lws_sspc_event_helper(h, LWSSSCS_DISCONNECTED, 0); 755 h->ss_dangling_connected = 0; 756 } 757 758#if defined(LWS_WITH_SYS_FAULT_INJECTION) 759 lws_fi_destroy(&h->fic); 760#endif 761 762 lws_sul_cancel(&h->sul_retry); 763 lws_dll2_remove(&h->client_list); 764 765 if (h->dsh) 766 lws_dsh_destroy(&h->dsh); 767 if (h->cwsi) { 768 lws_set_opaque_user_data(h->cwsi, NULL); 769 lws_wsi_close(h->cwsi, LWS_TO_KILL_ASYNC); 770 h->cwsi = NULL; 771 } 772 773 /* clean out any pending metadata changes that didn't make it */ 774 775 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, 776 lws_dll2_get_head(&(*ph)->metadata_owner)) { 777 lws_sspc_metadata_t *md = 778 lws_container_of(d, lws_sspc_metadata_t, list); 779 780 lws_dll2_remove(&md->list); 781 lws_free(md); 782 783 } lws_end_foreach_dll_safe(d, d1); 784 785 lws_sspc_rxmetadata_destroy(h); 786 787 lws_sspc_event_helper(h, LWSSSCS_DESTROYING, 0); 788 *ph = NULL; 789 790 lws_sul_cancel(&h->sul_retry); 791 792 793 /* confirm no sul left scheduled in handle or user allocation object */ 794 lws_sul_debug_zombies(h->context, h, sizeof(*h) + h->ssi.user_alloc, 795 __func__); 796 797 __lws_lc_untag(h->context, &h->lc); 798 799 free(h); 800} 801 802lws_ss_state_return_t 803lws_sspc_request_tx(lws_sspc_handle_t *h) 804{ 805 if (!h || !h->cwsi) 806 return LWSSSSRET_OK; 807 808 lws_service_assert_loop_thread(h->context, 0); 809 810 if (!h->us_earliest_write_req) 811 h->us_earliest_write_req = lws_now_usecs(); 812 813 if (h->state == LPCSCLI_LOCAL_CONNECTED && 814 h->conn_req_state == LWSSSPC_ONW_NONE) 815 h->conn_req_state = LWSSSPC_ONW_REQ; 816 817 lws_callback_on_writable(h->cwsi); 818 819 return LWSSSSRET_OK; 820} 821 822/* 823 * Currently we fulfil the writeable part locally by just enabling POLLOUT on 824 * the UDS link, without serialization footprint, which is reasonable as far as 825 * it goes. 826 * 827 * But for the ..._len() variant, the expected payload length hint we are being 828 * told is something that must be serialized to the onward peer, since either 829 * that guy or someone upstream of him is the guy who will compose the framing 830 * with it that actually goes out. 831 * 832 * This information is needed at the upstream guy before we have sent any 833 * payload, eg, for http POST, he has to prepare the content-length in the 834 * headers, before any payload. So we have to issue a serialization of the 835 * length at this point. 836 */ 837 838lws_ss_state_return_t 839lws_sspc_request_tx_len(lws_sspc_handle_t *h, unsigned long len) 840{ 841 /* 842 * for client conns, they cannot even complete creation of the handle 843 * without the onwared connection to the proxy, it's not legal to start 844 * using it until it's operation and has the onward connection (and the 845 * link has called CREATED state) 846 */ 847 848 if (!h) 849 return LWSSSSRET_OK; 850 851 lws_service_assert_loop_thread(h->context, 0); 852 853 lwsl_sspc_notice(h, "setting writeable_len %u", (unsigned int)len); 854 h->writeable_len = len; 855 h->pending_writeable_len = 1; 856 857 if (!h->us_earliest_write_req) 858 h->us_earliest_write_req = lws_now_usecs(); 859 860 if (h->state == LPCSCLI_LOCAL_CONNECTED && 861 h->conn_req_state == LWSSSPC_ONW_NONE) 862 h->conn_req_state = LWSSSPC_ONW_REQ; 863 864 /* 865 * We're going to use this up with serializing h->writeable_len... that 866 * will request again. 867 */ 868 869 if (h->cwsi) 870 lws_callback_on_writable(h->cwsi); 871 872 return LWSSSSRET_OK; 873} 874 875int 876lws_sspc_client_connect(lws_sspc_handle_t *h) 877{ 878 if (!h || h->state == LPCSCLI_OPERATIONAL) 879 return 0; 880 881 lws_service_assert_loop_thread(h->context, 0); 882 883 assert(h->state == LPCSCLI_LOCAL_CONNECTED); 884 if (h->state == LPCSCLI_LOCAL_CONNECTED && 885 h->conn_req_state == LWSSSPC_ONW_NONE) 886 h->conn_req_state = LWSSSPC_ONW_REQ; 887 if (h->cwsi) 888 lws_callback_on_writable(h->cwsi); 889 890 return 0; 891} 892 893struct lws_context * 894lws_sspc_get_context(struct lws_sspc_handle *h) 895{ 896 return h->context; 897} 898 899const char * 900lws_sspc_rideshare(struct lws_sspc_handle *h) 901{ 902 /* 903 * ...the serialized RX rideshare name if any... 904 */ 905 906 if (h->parser.rideshare[0]) { 907 lwsl_sspc_info(h, "parser %s", h->parser.rideshare); 908 909 return h->parser.rideshare; 910 } 911 912 /* 913 * The tx rideshare index 914 */ 915 916 if (h->rideshare_list[0]) { 917 lwsl_sspc_info(h, "tx list %s", 918 &h->rideshare_list[h->rideshare_ofs[h->rsidx]]); 919 return &h->rideshare_list[h->rideshare_ofs[h->rsidx]]; 920 } 921 922 /* 923 * ... otherwise default to our stream type name 924 */ 925 926 lwsl_sspc_info(h, "def %s\n", h->ssi.streamtype); 927 928 return h->ssi.streamtype; 929} 930 931static int 932_lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name, 933 const void *value, size_t len, int tx_cr_adjust) 934{ 935 lws_sspc_metadata_t *md; 936 937 lws_service_assert_loop_thread(h->context, 0); 938 939 /* 940 * Are we replacing a pending metadata of the same name? It's not 941 * efficient to do this but user code can do what it likes... let's 942 * optimize away the old one. 943 * 944 * Tx credit adjust always has name "" 945 */ 946 947 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, 948 lws_dll2_get_head(&h->metadata_owner)) { 949 md = lws_container_of(d, lws_sspc_metadata_t, list); 950 951 if (!strcmp(name, md->name)) { 952 lws_dll2_remove(&md->list); 953 lws_free(md); 954 break; 955 } 956 957 } lws_end_foreach_dll_safe(d, d1); 958 959 /* 960 * We have to stash the metadata and pass it to the proxy 961 */ 962 963 if (lws_fi(&h->fic, "sspc_fail_metadata_set")) 964 md = NULL; 965 else 966 md = lws_malloc(sizeof(*md) + len, "set metadata"); 967 if (!md) { 968 lwsl_sspc_err(h, "OOM"); 969 970 return 1; 971 } 972 973 memset(md, 0, sizeof(*md)); 974 975 md->tx_cr_adjust = tx_cr_adjust; 976 h->txc.peer_tx_cr_est += tx_cr_adjust; 977 978 lws_strncpy(md->name, name, sizeof(md->name)); 979 md->len = len; 980 if (len) 981 memcpy(&md[1], value, len); 982 983 lws_dll2_add_tail(&md->list, &h->metadata_owner); 984 985 if (len) { 986 lwsl_sspc_info(h, "set metadata %s", name); 987 lwsl_hexdump_sspc_info(h, value, len); 988 } else 989 lwsl_sspc_info(h, "serializing tx cr adj %d", 990 (int)tx_cr_adjust); 991 992 if (h->cwsi) 993 lws_callback_on_writable(h->cwsi); 994 995 return 0; 996} 997 998int 999lws_sspc_set_metadata(struct lws_sspc_handle *h, const char *name, 1000 const void *value, size_t len) 1001{ 1002 return _lws_sspc_set_metadata(h, name, value, len, 0); 1003} 1004 1005int 1006lws_sspc_get_metadata(struct lws_sspc_handle *h, const char *name, 1007 const void **value, size_t *len) 1008{ 1009 lws_sspc_metadata_t *md; 1010 1011 /* 1012 * client side does not have access to policy 1013 * and any metadata are new to it each time, 1014 * we allocate them, removing any existing with 1015 * the same name first 1016 */ 1017 1018 lws_service_assert_loop_thread(h->context, 0); 1019 1020 lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1, 1021 lws_dll2_get_head(&h->metadata_owner_rx)) { 1022 md = lws_container_of(d, 1023 lws_sspc_metadata_t, list); 1024 1025 if (!strcmp(md->name, name)) { 1026 *len = md->len; 1027 *value = &md[1]; 1028 1029 return 0; 1030 } 1031 1032 } lws_end_foreach_dll_safe(d, d1); 1033 1034 return 1; 1035} 1036 1037int 1038lws_sspc_add_peer_tx_credit(struct lws_sspc_handle *h, int32_t bump) 1039{ 1040 lws_service_assert_loop_thread(h->context, 0); 1041 lwsl_sspc_notice(h, "%d\n", bump); 1042 return _lws_sspc_set_metadata(h, "", NULL, 0, (int)bump); 1043} 1044 1045int 1046lws_sspc_get_est_peer_tx_credit(struct lws_sspc_handle *h) 1047{ 1048 lws_service_assert_loop_thread(h->context, 0); 1049 return h->txc.peer_tx_cr_est; 1050} 1051 1052void 1053lws_sspc_start_timeout(struct lws_sspc_handle *h, unsigned int timeout_ms) 1054{ 1055 lws_service_assert_loop_thread(h->context, 0); 1056 if (!h->cwsi) 1057 /* we can't fulfil it */ 1058 return; 1059 h->timeout_ms = (uint32_t)timeout_ms; 1060 h->pending_timeout_update = 1; 1061 lws_callback_on_writable(h->cwsi); 1062} 1063 1064void 1065lws_sspc_cancel_timeout(struct lws_sspc_handle *h) 1066{ 1067 lws_sspc_start_timeout(h, (unsigned int)-1); 1068} 1069 1070void * 1071lws_sspc_to_user_object(struct lws_sspc_handle *h) 1072{ 1073 return (void *)(h + 1); 1074} 1075 1076void 1077lws_sspc_change_handlers(struct lws_sspc_handle *h, 1078 lws_ss_state_return_t (*rx)(void *userobj, const uint8_t *buf, size_t len, int flags), 1079 lws_ss_state_return_t (*tx)(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf, 1080 size_t *len, int *flags), 1081 lws_ss_state_return_t (*state)(void *userobj, void *h_src /* ss handle type */, 1082 lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)) 1083{ 1084 if (rx) 1085 h->ssi.rx = rx; 1086 if (tx) 1087 h->ssi.tx = tx; 1088 if (state) 1089 h->ssi.state = state; 1090} 1091 1092const char * 1093lws_sspc_tag(struct lws_sspc_handle *h) 1094{ 1095 if (!h) 1096 return "[null sspc]"; 1097 return lws_lc_tag(&h->lc); 1098} 1099 1100int 1101lws_sspc_cancel_notify_dll(struct lws_dll2 *d, void *user) 1102{ 1103 lws_sspc_handle_t *h = lws_container_of(d, lws_sspc_handle_t, client_list); 1104 1105 lws_sspc_event_helper(h, LWSSSCS_EVENT_WAIT_CANCELLED, 0); 1106 1107 return 0; 1108} 1109 1110