1/* 2 * libwebsockets - small server side websockets and web server implementation 3 * 4 * Copyright (C) 2010 - 2019 Andy Green <andy@warmcat.com> 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining a copy 7 * of this software and associated documentation files (the "Software"), to 8 * deal in the Software without restriction, including without limitation the 9 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 10 * sell copies of the Software, and to permit persons to whom the Software is 11 * furnished to do so, subject to the following conditions: 12 * 13 * The above copyright notice and this permission notice shall be included in 14 * all copies or substantial portions of the Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 22 * IN THE SOFTWARE. 23 */ 24 25#if !defined (LWS_PLUGIN_STATIC) 26#if !defined(LWS_DLL) 27#define LWS_DLL 28#endif 29#if !defined(LWS_INTERNAL) 30#define LWS_INTERNAL 31#endif 32#include <libwebsockets.h> 33#endif 34 35#include <string.h> 36#include <sys/types.h> 37#include <fcntl.h> 38 39#define RING_DEPTH 8 40 41struct packet { 42 void *payload; 43 uint32_t len; 44 uint32_t ticket; 45}; 46 47enum { 48 ACC, 49 ONW 50}; 51 52/* 53 * Because both sides of the connection want to share this, we allocate it 54 * during accepted adoption and both sides have a pss that is just a wrapper 55 * pointing to this. 56 * 57 * The last one of the accepted side and the onward side to close frees it. 58 * This removes any chance of one side or the other having an invalidated 59 * pointer to the pss. 60 */ 61 62struct conn { 63 struct lws *wsi[2]; 64 65 /* rings containing unsent rx from accepted and onward sides */ 66 struct lws_ring *r[2]; 67 uint32_t t[2]; /* ring tail */ 68 69 uint32_t ticket_next; 70 uint32_t ticket_retired; 71 72 char rx_enabled[2]; 73 char closed[2]; 74 char established[2]; 75}; 76 77struct raw_pss { 78 struct conn *conn; 79}; 80 81/* one of these is created for each vhost our protocol is used with */ 82 83struct raw_vhd { 84 char addr[128]; 85 uint16_t port; 86 char ipv6; 87}; 88 89static void 90__destroy_packet(void *_pkt) 91{ 92 struct packet *pkt = _pkt; 93 94 free(pkt->payload); 95 pkt->payload = NULL; 96 pkt->len = 0; 97} 98 99static void 100destroy_conn(struct raw_vhd *vhd, struct raw_pss *pss) 101{ 102 struct conn *conn = pss->conn; 103 104 if (conn->r[ACC]) 105 lws_ring_destroy(conn->r[ACC]); 106 if (conn->r[ONW]) 107 lws_ring_destroy(conn->r[ONW]); 108 109 pss->conn = NULL; 110 111 free(conn); 112} 113 114static int 115connect_client(struct raw_vhd *vhd, struct raw_pss *pss) 116{ 117 struct lws_client_connect_info i; 118 char host[128]; 119 struct lws *cwsi; 120 121 lws_snprintf(host, sizeof(host), "%s:%u", vhd->addr, vhd->port); 122 123 memset(&i, 0, sizeof(i)); 124 125 i.method = "RAW"; 126 i.context = lws_get_context(pss->conn->wsi[ACC]); 127 i.port = vhd->port; 128 i.address = vhd->addr; 129 i.host = host; 130 i.origin = host; 131 i.ssl_connection = 0; 132 i.vhost = lws_get_vhost(pss->conn->wsi[ACC]); 133 i.local_protocol_name = "raw-proxy"; 134 i.protocol = "raw-proxy"; 135 i.path = "/"; 136 /* 137 * The "onward" client wsi has its own pss but shares the "conn" 138 * created when the inbound connection was accepted. We need to stash 139 * the address of the shared conn and apply it to the client psss 140 * when the client connection completes. 141 */ 142 i.opaque_user_data = pss->conn; 143 i.pwsi = &pss->conn->wsi[ONW]; 144 145 lwsl_info("%s: onward: %s:%d%s\n", __func__, i.address, i.port, i.path); 146 147 cwsi = lws_client_connect_via_info(&i); 148 if (!cwsi) 149 lwsl_err("%s: client connect failed early\n", __func__); 150 151 return !cwsi; 152} 153 154static int 155flow_control(struct conn *conn, int side, int enable) 156{ 157 if (conn->closed[side] || 158 enable == conn->rx_enabled[side] || 159 !conn->established[side]) 160 return 0; 161 162 if (lws_rx_flow_control(conn->wsi[side], enable)) 163 return 1; 164 165 conn->rx_enabled[side] = (char)enable; 166 lwsl_info("%s: %s side: %s\n", __func__, side ? "ONW" : "ACC", 167 enable ? "rx enabled" : "rx flow controlled"); 168 169 return 0; 170} 171 172static int 173callback_raw_proxy(struct lws *wsi, enum lws_callback_reasons reason, 174 void *user, void *in, size_t len) 175{ 176 struct raw_pss *pss = (struct raw_pss *)user; 177 struct raw_vhd *vhd = (struct raw_vhd *)lws_protocol_vh_priv_get( 178 lws_get_vhost(wsi), lws_get_protocol(wsi)); 179 const struct packet *ppkt; 180 struct conn *conn = NULL; 181 struct lws_tokenize ts; 182 lws_tokenize_elem e; 183 struct packet pkt; 184 const char *cp; 185 int n; 186 187 if (pss) 188 conn = pss->conn; 189 190 switch (reason) { 191 case LWS_CALLBACK_PROTOCOL_INIT: 192 vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), 193 lws_get_protocol(wsi), sizeof(struct raw_vhd)); 194 if (!vhd) 195 return 0; 196 if (lws_pvo_get_str(in, "onward", &cp)) { 197 lwsl_warn("%s: vh %s: pvo 'onward' required\n", __func__, 198 lws_get_vhost_name(lws_get_vhost(wsi))); 199 200 return 0; 201 } 202 lws_tokenize_init(&ts, cp, LWS_TOKENIZE_F_DOT_NONTERM | 203 LWS_TOKENIZE_F_MINUS_NONTERM | 204 LWS_TOKENIZE_F_NO_FLOATS); 205 ts.len = strlen(cp); 206 207 if (lws_tokenize(&ts) != LWS_TOKZE_TOKEN) 208 goto bad_onward; 209 if (!strncmp(ts.token, "ipv6", ts.token_len)) 210 vhd->ipv6 = 1; 211 else 212 if (strncmp(ts.token, "ipv4", ts.token_len)) 213 goto bad_onward; 214 215 /* then the colon */ 216 if (lws_tokenize(&ts) != LWS_TOKZE_DELIMITER) 217 goto bad_onward; 218 219 e = lws_tokenize(&ts); 220 if (!vhd->ipv6) { 221 if (e != LWS_TOKZE_TOKEN || 222 ts.token_len + 1 >= (int)sizeof(vhd->addr)) 223 goto bad_onward; 224 225 lws_strncpy(vhd->addr, ts.token, ts.token_len + 1); 226 e = lws_tokenize(&ts); 227 if (e == LWS_TOKZE_DELIMITER) { 228 /* there should be a port then */ 229 e = lws_tokenize(&ts); 230 if (e != LWS_TOKZE_INTEGER) 231 goto bad_onward; 232 vhd->port = (uint16_t)atoi(ts.token); 233 e = lws_tokenize(&ts); 234 } 235 if (e != LWS_TOKZE_ENDED) 236 goto bad_onward; 237 } else 238 lws_strncpy(vhd->addr, ts.token, sizeof(vhd->addr)); 239 240 lwsl_notice("%s: vh %s: onward %s:%s:%d\n", __func__, 241 lws_get_vhost_name(lws_get_vhost(wsi)), 242 vhd->ipv6 ? "ipv6": "ipv4", vhd->addr, vhd->port); 243 break; 244 245bad_onward: 246 lwsl_err("%s: onward pvo format must be ipv4:addr[:port] " 247 " or ipv6:addr, not '%s'\n", __func__, cp); 248 return -1; 249 250 case LWS_CALLBACK_PROTOCOL_DESTROY: 251 break; 252 253 /* callbacks related to client "onward side" */ 254 255 case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: 256 lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", 257 in ? (char *)in : "(null)"); 258 break; 259 260 case LWS_CALLBACK_RAW_PROXY_CLI_ADOPT: 261 lwsl_debug("%s: %p: LWS_CALLBACK_RAW_CLI_ADOPT: pss %p\n", __func__, wsi, pss); 262 if (conn || !pss) 263 break; 264 conn = pss->conn = lws_get_opaque_user_data(wsi); 265 if (!conn) 266 break; 267 conn->established[ONW] = 1; 268 /* they start enabled */ 269 conn->rx_enabled[ACC] = 1; 270 conn->rx_enabled[ONW] = 1; 271 272 /* he disabled his rx while waiting for use to be established */ 273 flow_control(conn, ACC, 1); 274 275 lws_callback_on_writable(wsi); 276 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0); 277 break; 278 279 case LWS_CALLBACK_RAW_PROXY_CLI_CLOSE: 280 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_CLOSE\n"); 281 if (!conn) 282 break; 283 284 conn->closed[ONW] = 1; 285 286 if (conn->closed[ACC]) 287 destroy_conn(vhd, pss); 288 289 break; 290 291 case LWS_CALLBACK_RAW_PROXY_CLI_RX: 292 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_RX: %d\n", (int)len); 293 294 if (!conn) 295 return 0; 296 297 if (!pss || !conn->wsi[ACC] || conn->closed[ACC]) { 298 lwsl_info(" pss %p, wsi[ACC] %p, closed[ACC] %d\n", 299 pss, conn->wsi[ACC], conn->closed[ACC]); 300 return -1; 301 } 302 pkt.payload = malloc(len); 303 if (!pkt.payload) { 304 lwsl_notice("OOM: dropping\n"); 305 return -1; 306 } 307 pkt.len = (uint32_t)len; 308 pkt.ticket = conn->ticket_next++; 309 310 memcpy(pkt.payload, in, len); 311 if (!lws_ring_insert(conn->r[ONW], &pkt, 1)) { 312 __destroy_packet(&pkt); 313 lwsl_notice("dropping!\n"); 314 return -1; 315 } 316 317 lwsl_debug("After onward RX: acc free: %d...\n", 318 (int)lws_ring_get_count_free_elements(conn->r[ONW])); 319 320 if (conn->rx_enabled[ONW] && 321 lws_ring_get_count_free_elements(conn->r[ONW]) < 2) 322 flow_control(conn, ONW, 0); 323 324 if (!conn->closed[ACC]) 325 lws_callback_on_writable(conn->wsi[ACC]); 326 break; 327 328 case LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE: 329 lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE\n"); 330 331 if (!conn) 332 break; 333 334 ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]); 335 if (!ppkt) { 336 lwsl_info("%s: CLI_WRITABLE had nothing in acc ring\n", 337 __func__); 338 break; 339 } 340 341 if (ppkt->ticket != conn->ticket_retired + 1) { 342 lwsl_info("%s: acc ring has %d but next %d\n", __func__, 343 ppkt->ticket, conn->ticket_retired + 1); 344 lws_callback_on_writable(conn->wsi[ACC]); 345 break; 346 } 347 348 n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW); 349 if (n < 0) { 350 lwsl_info("%s: WRITEABLE: %d\n", __func__, n); 351 352 return -1; 353 } 354 355 conn->ticket_retired = ppkt->ticket; 356 lws_ring_consume(conn->r[ACC], &conn->t[ACC], NULL, 1); 357 lws_ring_update_oldest_tail(conn->r[ACC], conn->t[ACC]); 358 359 lwsl_debug("acc free: %d...\n", 360 (int)lws_ring_get_count_free_elements(conn->r[ACC])); 361 362 if (!conn->rx_enabled[ACC] && 363 lws_ring_get_count_free_elements(conn->r[ACC]) > 2) 364 flow_control(conn, ACC, 1); 365 366 ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]); 367 lwsl_debug("%s: CLI_WRITABLE: next acc pkt %p idx %d vs %d\n", 368 __func__, ppkt, ppkt ? ppkt->ticket : 0, 369 conn->ticket_retired + 1); 370 371 if (ppkt && ppkt->ticket == conn->ticket_retired + 1) 372 lws_callback_on_writable(wsi); 373 else { 374 /* 375 * defer checking for accepted side closing until we 376 * sent everything in the ring to onward 377 */ 378 if (conn->closed[ACC]) 379 /* 380 * there is never going to be any more... but 381 * we may have some tx still in tx buflist / 382 * partial 383 */ 384 return lws_raw_transaction_completed(wsi); 385 386 if (lws_ring_get_element(conn->r[ONW], &conn->t[ONW])) 387 lws_callback_on_writable(conn->wsi[ACC]); 388 } 389 break; 390 391 /* callbacks related to raw socket descriptor "accepted side" */ 392 393 case LWS_CALLBACK_RAW_PROXY_SRV_ADOPT: 394 lwsl_debug("LWS_CALLBACK_RAW_SRV_ADOPT\n"); 395 if (!pss) 396 return -1; 397 conn = pss->conn = malloc(sizeof(struct conn)); 398 if (!pss->conn) 399 return -1; 400 memset(conn, 0, sizeof(*conn)); 401 402 conn->wsi[ACC] = wsi; 403 conn->ticket_next = 1; 404 405 conn->r[ACC] = lws_ring_create(sizeof(struct packet), 406 RING_DEPTH, __destroy_packet); 407 if (!conn->r[ACC]) { 408 lwsl_err("%s: OOM\n", __func__); 409 return -1; 410 } 411 conn->r[ONW] = lws_ring_create(sizeof(struct packet), 412 RING_DEPTH, __destroy_packet); 413 if (!conn->r[ONW]) { 414 lws_ring_destroy(conn->r[ACC]); 415 conn->r[ACC] = NULL; 416 lwsl_err("%s: OOM\n", __func__); 417 418 return -1; 419 } 420 421 conn->established[ACC] = 1; 422 423 /* disable any rx until the client side is up */ 424 flow_control(conn, ACC, 0); 425 426 lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0); 427 428 /* try to create the onward client connection */ 429 connect_client(vhd, pss); 430 break; 431 432 case LWS_CALLBACK_RAW_PROXY_SRV_CLOSE: 433 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:\n"); 434 435 if (!conn) 436 break; 437 438 conn->closed[ACC] = 1; 439 if (conn->closed[ONW]) 440 destroy_conn(vhd, pss); 441 break; 442 443 case LWS_CALLBACK_RAW_PROXY_SRV_RX: 444 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_RX: rx %d\n", (int)len); 445 446 if (!conn || !conn->wsi[ONW]) { 447 lwsl_err("%s: LWS_CALLBACK_RAW_PROXY_SRV_RX: " 448 "conn->wsi[ONW] NULL\n", __func__); 449 return -1; 450 } 451 if (conn->closed[ONW]) { 452 lwsl_info(" closed[ONW] %d\n", conn->closed[ONW]); 453 return -1; 454 } 455 456 if (!len) 457 return 0; 458 459 pkt.payload = malloc(len); 460 if (!pkt.payload) { 461 lwsl_notice("OOM: dropping\n"); 462 return -1; 463 } 464 pkt.len = (uint32_t)len; 465 pkt.ticket = conn->ticket_next++; 466 467 memcpy(pkt.payload, in, len); 468 if (!lws_ring_insert(conn->r[ACC], &pkt, 1)) { 469 __destroy_packet(&pkt); 470 lwsl_notice("dropping!\n"); 471 return -1; 472 } 473 474 lwsl_debug("After acc RX: acc free: %d...\n", 475 (int)lws_ring_get_count_free_elements(conn->r[ACC])); 476 477 if (conn->rx_enabled[ACC] && 478 lws_ring_get_count_free_elements(conn->r[ACC]) <= 2) 479 flow_control(conn, ACC, 0); 480 481 if (conn->established[ONW] && !conn->closed[ONW]) 482 lws_callback_on_writable(conn->wsi[ONW]); 483 break; 484 485 case LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE: 486 lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n"); 487 488 if (!conn || !conn->established[ONW] || conn->closed[ONW]) 489 break; 490 491 ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]); 492 if (!ppkt) { 493 lwsl_info("%s: SRV_WRITABLE nothing in onw ring\n", 494 __func__); 495 break; 496 } 497 498 if (ppkt->ticket != conn->ticket_retired + 1) { 499 lwsl_info("%s: onw ring has %d but next %d\n", __func__, 500 ppkt->ticket, conn->ticket_retired + 1); 501 lws_callback_on_writable(conn->wsi[ONW]); 502 break; 503 } 504 505 n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW); 506 if (n < 0) { 507 lwsl_info("%s: WRITEABLE: %d\n", __func__, n); 508 509 return -1; 510 } 511 512 conn->ticket_retired = ppkt->ticket; 513 lws_ring_consume(conn->r[ONW], &conn->t[ONW], NULL, 1); 514 lws_ring_update_oldest_tail(conn->r[ONW], conn->t[ONW]); 515 516 lwsl_debug("onward free: %d... waiting %d\n", 517 (int)lws_ring_get_count_free_elements(conn->r[ONW]), 518 (int)lws_ring_get_count_waiting_elements(conn->r[ONW], 519 &conn->t[ONW])); 520 521 if (!conn->rx_enabled[ONW] && 522 lws_ring_get_count_free_elements(conn->r[ONW]) > 2) 523 flow_control(conn, ONW, 1); 524 525 ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]); 526 lwsl_debug("%s: SRV_WRITABLE: next onw pkt %p idx %d vs %d\n", 527 __func__, ppkt, ppkt ? ppkt->ticket : 0, 528 conn->ticket_retired + 1); 529 530 if (ppkt && ppkt->ticket == conn->ticket_retired + 1) 531 lws_callback_on_writable(wsi); 532 else { 533 /* 534 * defer checking for onward side closing until we 535 * sent everything in the ring to accepted side 536 */ 537 if (conn->closed[ONW]) 538 /* 539 * there is never going to be any more... but 540 * we may have some tx still in tx buflist / 541 * partial 542 */ 543 return lws_raw_transaction_completed(wsi); 544 545 if (lws_ring_get_element(conn->r[ACC], &conn->t[ACC])) 546 lws_callback_on_writable(conn->wsi[ONW]); 547 } 548 break; 549 550 default: 551 break; 552 } 553 554 return lws_callback_http_dummy(wsi, reason, user, in, len); 555} 556 557#define LWS_PLUGIN_PROTOCOL_RAW_PROXY { \ 558 "raw-proxy", \ 559 callback_raw_proxy, \ 560 sizeof(struct raw_pss), \ 561 8192, \ 562 8192, NULL, 0 \ 563 } 564 565#if !defined (LWS_PLUGIN_STATIC) 566 567LWS_VISIBLE const struct lws_protocols lws_raw_proxy_protocols[] = { 568 LWS_PLUGIN_PROTOCOL_RAW_PROXY 569}; 570 571LWS_VISIBLE const lws_plugin_protocol_t lws_raw_proxy = { 572 .hdr = { 573 "raw proxy", 574 "lws_protocol_plugin", 575 LWS_BUILD_HASH, 576 LWS_PLUGIN_API_MAGIC 577 }, 578 579 .protocols = lws_raw_proxy_protocols, 580 .count_protocols = LWS_ARRAY_SIZE(lws_raw_proxy_protocols), 581 .extensions = NULL, 582 .count_extensions = 0, 583}; 584#endif 585 586 587