1/* 2 * nghttp2 - HTTP/2 C Library 3 * 4 * Copyright (c) 2012 Tatsuhiro Tsujikawa 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining 7 * a copy of this software and associated documentation files (the 8 * "Software"), to deal in the Software without restriction, including 9 * without limitation the rights to use, copy, modify, merge, publish, 10 * distribute, sublicense, and/or sell copies of the Software, and to 11 * permit persons to whom the Software is furnished to do so, subject to 12 * the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be 15 * included in all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 24 */ 25#include "shrpx_connection_handler.h" 26 27#ifdef HAVE_UNISTD_H 28# include <unistd.h> 29#endif // HAVE_UNISTD_H 30#include <sys/types.h> 31#include <sys/wait.h> 32 33#include <cerrno> 34#include <thread> 35#include <random> 36 37#include "shrpx_client_handler.h" 38#include "shrpx_tls.h" 39#include "shrpx_worker.h" 40#include "shrpx_config.h" 41#include "shrpx_http2_session.h" 42#include "shrpx_connect_blocker.h" 43#include "shrpx_downstream_connection.h" 44#include "shrpx_accept_handler.h" 45#include "shrpx_memcached_dispatcher.h" 46#include "shrpx_signal.h" 47#include "shrpx_log.h" 48#include "xsi_strerror.h" 49#include "util.h" 50#include "template.h" 51 52using namespace nghttp2; 53 54namespace shrpx { 55 56namespace { 57void acceptor_disable_cb(struct ev_loop *loop, ev_timer *w, int revent) { 58 auto h = static_cast<ConnectionHandler *>(w->data); 59 60 // If we are in graceful shutdown period, we must not enable 61 // acceptors again. 62 if (h->get_graceful_shutdown()) { 63 return; 64 } 65 66 h->enable_acceptor(); 67} 68} // namespace 69 70namespace { 71void ocsp_cb(struct ev_loop *loop, ev_timer *w, int revent) { 72 auto h = static_cast<ConnectionHandler *>(w->data); 73 74 // If we are in graceful shutdown period, we won't do ocsp query. 75 if (h->get_graceful_shutdown()) { 76 return; 77 } 78 79 LOG(NOTICE) << "Start ocsp update"; 80 81 h->proceed_next_cert_ocsp(); 82} 83} // namespace 84 85namespace { 86void ocsp_read_cb(struct ev_loop *loop, ev_io *w, int revent) { 87 auto h = static_cast<ConnectionHandler *>(w->data); 88 89 h->read_ocsp_chunk(); 90} 91} // namespace 92 93namespace { 94void ocsp_chld_cb(struct ev_loop *loop, ev_child *w, int revent) { 95 auto h = static_cast<ConnectionHandler *>(w->data); 96 97 h->handle_ocsp_complete(); 98} 99} // namespace 100 101namespace { 102void thread_join_async_cb(struct ev_loop *loop, ev_async *w, int revent) { 103 ev_break(loop); 104} 105} // namespace 106 107namespace { 108void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) { 109 auto h = static_cast<ConnectionHandler *>(w->data); 110 111 h->handle_serial_event(); 112} 113} // namespace 114 115ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen) 116 : 117#ifdef ENABLE_HTTP3 118 quic_ipc_fd_(-1), 119#endif // ENABLE_HTTP3 120 gen_(gen), 121 single_worker_(nullptr), 122 loop_(loop), 123#ifdef HAVE_NEVERBLEED 124 nb_(nullptr), 125#endif // HAVE_NEVERBLEED 126 tls_ticket_key_memcached_get_retry_count_(0), 127 tls_ticket_key_memcached_fail_count_(0), 128 worker_round_robin_cnt_(get_config()->api.enabled ? 1 : 0), 129 graceful_shutdown_(false), 130 enable_acceptor_on_ocsp_completion_(false) { 131 ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.); 132 disable_acceptor_timer_.data = this; 133 134 ev_timer_init(&ocsp_timer_, ocsp_cb, 0., 0.); 135 ocsp_timer_.data = this; 136 137 ev_io_init(&ocsp_.rev, ocsp_read_cb, -1, EV_READ); 138 ocsp_.rev.data = this; 139 140 ev_async_init(&thread_join_asyncev_, thread_join_async_cb); 141 142 ev_async_init(&serial_event_asyncev_, serial_event_async_cb); 143 serial_event_asyncev_.data = this; 144 145 ev_async_start(loop_, &serial_event_asyncev_); 146 147 ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0); 148 ocsp_.chldev.data = this; 149 150 ocsp_.next = 0; 151 ocsp_.proc.rfd = -1; 152 153 reset_ocsp(); 154} 155 156ConnectionHandler::~ConnectionHandler() { 157 ev_child_stop(loop_, &ocsp_.chldev); 158 ev_async_stop(loop_, &serial_event_asyncev_); 159 ev_async_stop(loop_, &thread_join_asyncev_); 160 ev_io_stop(loop_, &ocsp_.rev); 161 ev_timer_stop(loop_, &ocsp_timer_); 162 ev_timer_stop(loop_, &disable_acceptor_timer_); 163 164#ifdef ENABLE_HTTP3 165 for (auto ssl_ctx : quic_all_ssl_ctx_) { 166 if (ssl_ctx == nullptr) { 167 continue; 168 } 169 170 auto tls_ctx_data = 171 static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx)); 172 delete tls_ctx_data; 173 SSL_CTX_free(ssl_ctx); 174 } 175#endif // ENABLE_HTTP3 176 177 for (auto ssl_ctx : all_ssl_ctx_) { 178 auto tls_ctx_data = 179 static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx)); 180 delete tls_ctx_data; 181 SSL_CTX_free(ssl_ctx); 182 } 183 184 // Free workers before destroying ev_loop 185 workers_.clear(); 186 187 for (auto loop : worker_loops_) { 188 ev_loop_destroy(loop); 189 } 190} 191 192void ConnectionHandler::set_ticket_keys_to_worker( 193 const std::shared_ptr<TicketKeys> &ticket_keys) { 194 for (auto &worker : workers_) { 195 worker->set_ticket_keys(ticket_keys); 196 } 197} 198 199void ConnectionHandler::worker_reopen_log_files() { 200 for (auto &worker : workers_) { 201 WorkerEvent wev{}; 202 203 wev.type = WorkerEventType::REOPEN_LOG; 204 205 worker->send(std::move(wev)); 206 } 207} 208 209void ConnectionHandler::worker_replace_downstream( 210 std::shared_ptr<DownstreamConfig> downstreamconf) { 211 for (auto &worker : workers_) { 212 WorkerEvent wev{}; 213 214 wev.type = WorkerEventType::REPLACE_DOWNSTREAM; 215 wev.downstreamconf = downstreamconf; 216 217 worker->send(std::move(wev)); 218 } 219} 220 221int ConnectionHandler::create_single_worker() { 222 cert_tree_ = tls::create_cert_lookup_tree(); 223 auto sv_ssl_ctx = tls::setup_server_ssl_context( 224 all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get() 225#ifdef HAVE_NEVERBLEED 226 , 227 nb_ 228#endif // HAVE_NEVERBLEED 229 ); 230 231#ifdef ENABLE_HTTP3 232 quic_cert_tree_ = tls::create_cert_lookup_tree(); 233 auto quic_sv_ssl_ctx = tls::setup_quic_server_ssl_context( 234 quic_all_ssl_ctx_, quic_indexed_ssl_ctx_, quic_cert_tree_.get() 235# ifdef HAVE_NEVERBLEED 236 , 237 nb_ 238# endif // HAVE_NEVERBLEED 239 ); 240#endif // ENABLE_HTTP3 241 242 auto cl_ssl_ctx = tls::setup_downstream_client_ssl_context( 243#ifdef HAVE_NEVERBLEED 244 nb_ 245#endif // HAVE_NEVERBLEED 246 ); 247 248 if (cl_ssl_ctx) { 249 all_ssl_ctx_.push_back(cl_ssl_ctx); 250#ifdef ENABLE_HTTP3 251 quic_all_ssl_ctx_.push_back(nullptr); 252#endif // ENABLE_HTTP3 253 } 254 255 auto config = get_config(); 256 auto &tlsconf = config->tls; 257 258 SSL_CTX *session_cache_ssl_ctx = nullptr; 259 { 260 auto &memcachedconf = config->tls.session_cache.memcached; 261 if (memcachedconf.tls) { 262 session_cache_ssl_ctx = tls::create_ssl_client_context( 263#ifdef HAVE_NEVERBLEED 264 nb_, 265#endif // HAVE_NEVERBLEED 266 tlsconf.cacert, memcachedconf.cert_file, 267 memcachedconf.private_key_file, nullptr); 268 all_ssl_ctx_.push_back(session_cache_ssl_ctx); 269#ifdef ENABLE_HTTP3 270 quic_all_ssl_ctx_.push_back(nullptr); 271#endif // ENABLE_HTTP3 272 } 273 } 274 275#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) 276 quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size()); 277#endif // ENABLE_HTTP3 && HAVE_LIBBPF 278 279#ifdef ENABLE_HTTP3 280 assert(cid_prefixes_.size() == 1); 281 const auto &cid_prefix = cid_prefixes_[0]; 282#endif // ENABLE_HTTP3 283 284 single_worker_ = std::make_unique<Worker>( 285 loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), 286#ifdef ENABLE_HTTP3 287 quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(), 288 cid_prefix.size(), 289# ifdef HAVE_LIBBPF 290 /* index = */ 0, 291# endif // HAVE_LIBBPF 292#endif // ENABLE_HTTP3 293 ticket_keys_, this, config->conn.downstream); 294#ifdef HAVE_MRUBY 295 if (single_worker_->create_mruby_context() != 0) { 296 return -1; 297 } 298#endif // HAVE_MRUBY 299 300#ifdef ENABLE_HTTP3 301 if (single_worker_->setup_quic_server_socket() != 0) { 302 return -1; 303 } 304#endif // ENABLE_HTTP3 305 306 return 0; 307} 308 309int ConnectionHandler::create_worker_thread(size_t num) { 310#ifndef NOTHREADS 311 assert(workers_.size() == 0); 312 313 cert_tree_ = tls::create_cert_lookup_tree(); 314 auto sv_ssl_ctx = tls::setup_server_ssl_context( 315 all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get() 316# ifdef HAVE_NEVERBLEED 317 , 318 nb_ 319# endif // HAVE_NEVERBLEED 320 ); 321 322# ifdef ENABLE_HTTP3 323 quic_cert_tree_ = tls::create_cert_lookup_tree(); 324 auto quic_sv_ssl_ctx = tls::setup_quic_server_ssl_context( 325 quic_all_ssl_ctx_, quic_indexed_ssl_ctx_, quic_cert_tree_.get() 326# ifdef HAVE_NEVERBLEED 327 , 328 nb_ 329# endif // HAVE_NEVERBLEED 330 ); 331# endif // ENABLE_HTTP3 332 333 auto cl_ssl_ctx = tls::setup_downstream_client_ssl_context( 334# ifdef HAVE_NEVERBLEED 335 nb_ 336# endif // HAVE_NEVERBLEED 337 ); 338 339 if (cl_ssl_ctx) { 340 all_ssl_ctx_.push_back(cl_ssl_ctx); 341# ifdef ENABLE_HTTP3 342 quic_all_ssl_ctx_.push_back(nullptr); 343# endif // ENABLE_HTTP3 344 } 345 346 auto config = get_config(); 347 auto &tlsconf = config->tls; 348 auto &apiconf = config->api; 349 350# if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF) 351 quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size()); 352# endif // ENABLE_HTTP3 && HAVE_LIBBPF 353 354 // We have dedicated worker for API request processing. 355 if (apiconf.enabled) { 356 ++num; 357 } 358 359 SSL_CTX *session_cache_ssl_ctx = nullptr; 360 { 361 auto &memcachedconf = config->tls.session_cache.memcached; 362 363 if (memcachedconf.tls) { 364 session_cache_ssl_ctx = tls::create_ssl_client_context( 365# ifdef HAVE_NEVERBLEED 366 nb_, 367# endif // HAVE_NEVERBLEED 368 tlsconf.cacert, memcachedconf.cert_file, 369 memcachedconf.private_key_file, nullptr); 370 all_ssl_ctx_.push_back(session_cache_ssl_ctx); 371# ifdef ENABLE_HTTP3 372 quic_all_ssl_ctx_.push_back(nullptr); 373# endif // ENABLE_HTTP3 374 } 375 } 376 377# ifdef ENABLE_HTTP3 378 assert(cid_prefixes_.size() == num); 379# endif // ENABLE_HTTP3 380 381 for (size_t i = 0; i < num; ++i) { 382 auto loop = ev_loop_new(config->ev_loop_flags); 383 384# ifdef ENABLE_HTTP3 385 const auto &cid_prefix = cid_prefixes_[i]; 386# endif // ENABLE_HTTP3 387 388 auto worker = std::make_unique<Worker>( 389 loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(), 390# ifdef ENABLE_HTTP3 391 quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(), 392 cid_prefix.size(), 393# ifdef HAVE_LIBBPF 394 i, 395# endif // HAVE_LIBBPF 396# endif // ENABLE_HTTP3 397 ticket_keys_, this, config->conn.downstream); 398# ifdef HAVE_MRUBY 399 if (worker->create_mruby_context() != 0) { 400 return -1; 401 } 402# endif // HAVE_MRUBY 403 404# ifdef ENABLE_HTTP3 405 if ((!apiconf.enabled || i != 0) && 406 worker->setup_quic_server_socket() != 0) { 407 return -1; 408 } 409# endif // ENABLE_HTTP3 410 411 workers_.push_back(std::move(worker)); 412 worker_loops_.push_back(loop); 413 414 LLOG(NOTICE, this) << "Created worker thread #" << workers_.size() - 1; 415 } 416 417 for (auto &worker : workers_) { 418 worker->run_async(); 419 } 420 421#endif // NOTHREADS 422 423 return 0; 424} 425 426void ConnectionHandler::join_worker() { 427#ifndef NOTHREADS 428 int n = 0; 429 430 if (LOG_ENABLED(INFO)) { 431 LLOG(INFO, this) << "Waiting for worker thread to join: n=" 432 << workers_.size(); 433 } 434 435 for (auto &worker : workers_) { 436 worker->wait(); 437 if (LOG_ENABLED(INFO)) { 438 LLOG(INFO, this) << "Thread #" << n << " joined"; 439 } 440 ++n; 441 } 442#endif // NOTHREADS 443} 444 445void ConnectionHandler::graceful_shutdown_worker() { 446 if (single_worker_) { 447 return; 448 } 449 450 if (LOG_ENABLED(INFO)) { 451 LLOG(INFO, this) << "Sending graceful shutdown signal to worker"; 452 } 453 454 for (auto &worker : workers_) { 455 WorkerEvent wev{}; 456 wev.type = WorkerEventType::GRACEFUL_SHUTDOWN; 457 458 worker->send(std::move(wev)); 459 } 460 461#ifndef NOTHREADS 462 ev_async_start(loop_, &thread_join_asyncev_); 463 464 thread_join_fut_ = std::async(std::launch::async, [this]() { 465 (void)reopen_log_files(get_config()->logging); 466 join_worker(); 467 ev_async_send(get_loop(), &thread_join_asyncev_); 468 delete_log_config(); 469 }); 470#endif // NOTHREADS 471} 472 473int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen, 474 const UpstreamAddr *faddr) { 475 if (LOG_ENABLED(INFO)) { 476 LLOG(INFO, this) << "Accepted connection from " 477 << util::numeric_name(addr, addrlen) << ", fd=" << fd; 478 } 479 480 auto config = get_config(); 481 482 if (single_worker_) { 483 auto &upstreamconf = config->conn.upstream; 484 if (single_worker_->get_worker_stat()->num_connections >= 485 upstreamconf.worker_connections) { 486 487 if (LOG_ENABLED(INFO)) { 488 LLOG(INFO, this) << "Too many connections >=" 489 << upstreamconf.worker_connections; 490 } 491 492 close(fd); 493 return -1; 494 } 495 496 auto client = 497 tls::accept_connection(single_worker_.get(), fd, addr, addrlen, faddr); 498 if (!client) { 499 LLOG(ERROR, this) << "ClientHandler creation failed"; 500 501 close(fd); 502 return -1; 503 } 504 505 return 0; 506 } 507 508 Worker *worker; 509 510 if (faddr->alt_mode == UpstreamAltMode::API) { 511 worker = workers_[0].get(); 512 513 if (LOG_ENABLED(INFO)) { 514 LOG(INFO) << "Dispatch connection to API worker #0"; 515 } 516 } else { 517 worker = workers_[worker_round_robin_cnt_].get(); 518 519 if (LOG_ENABLED(INFO)) { 520 LOG(INFO) << "Dispatch connection to worker #" << worker_round_robin_cnt_; 521 } 522 523 if (++worker_round_robin_cnt_ == workers_.size()) { 524 auto &apiconf = config->api; 525 526 if (apiconf.enabled) { 527 worker_round_robin_cnt_ = 1; 528 } else { 529 worker_round_robin_cnt_ = 0; 530 } 531 } 532 } 533 534 WorkerEvent wev{}; 535 wev.type = WorkerEventType::NEW_CONNECTION; 536 wev.client_fd = fd; 537 memcpy(&wev.client_addr, addr, addrlen); 538 wev.client_addrlen = addrlen; 539 wev.faddr = faddr; 540 541 worker->send(std::move(wev)); 542 543 return 0; 544} 545 546struct ev_loop *ConnectionHandler::get_loop() const { 547 return loop_; 548} 549 550Worker *ConnectionHandler::get_single_worker() const { 551 return single_worker_.get(); 552} 553 554void ConnectionHandler::add_acceptor(std::unique_ptr<AcceptHandler> h) { 555 acceptors_.push_back(std::move(h)); 556} 557 558void ConnectionHandler::delete_acceptor() { acceptors_.clear(); } 559 560void ConnectionHandler::enable_acceptor() { 561 for (auto &a : acceptors_) { 562 a->enable(); 563 } 564} 565 566void ConnectionHandler::disable_acceptor() { 567 for (auto &a : acceptors_) { 568 a->disable(); 569 } 570} 571 572void ConnectionHandler::sleep_acceptor(ev_tstamp t) { 573 if (t == 0. || ev_is_active(&disable_acceptor_timer_)) { 574 return; 575 } 576 577 disable_acceptor(); 578 579 ev_timer_set(&disable_acceptor_timer_, t, 0.); 580 ev_timer_start(loop_, &disable_acceptor_timer_); 581} 582 583void ConnectionHandler::accept_pending_connection() { 584 for (auto &a : acceptors_) { 585 a->accept_connection(); 586 } 587} 588 589void ConnectionHandler::set_ticket_keys( 590 std::shared_ptr<TicketKeys> ticket_keys) { 591 ticket_keys_ = std::move(ticket_keys); 592 if (single_worker_) { 593 single_worker_->set_ticket_keys(ticket_keys_); 594 } 595} 596 597const std::shared_ptr<TicketKeys> &ConnectionHandler::get_ticket_keys() const { 598 return ticket_keys_; 599} 600 601void ConnectionHandler::set_graceful_shutdown(bool f) { 602 graceful_shutdown_ = f; 603 if (single_worker_) { 604 single_worker_->set_graceful_shutdown(f); 605 } 606} 607 608bool ConnectionHandler::get_graceful_shutdown() const { 609 return graceful_shutdown_; 610} 611 612void ConnectionHandler::cancel_ocsp_update() { 613 enable_acceptor_on_ocsp_completion_ = false; 614 ev_timer_stop(loop_, &ocsp_timer_); 615 616 if (ocsp_.proc.pid == 0) { 617 return; 618 } 619 620 int rv; 621 622 rv = kill(ocsp_.proc.pid, SIGTERM); 623 if (rv != 0) { 624 auto error = errno; 625 LOG(ERROR) << "Could not send signal to OCSP query process: errno=" 626 << error; 627 } 628 629 while ((rv = waitpid(ocsp_.proc.pid, nullptr, 0)) == -1 && errno == EINTR) 630 ; 631 if (rv == -1) { 632 auto error = errno; 633 LOG(ERROR) << "Error occurred while we were waiting for the completion of " 634 "OCSP query process: errno=" 635 << error; 636 } 637} 638 639// inspired by h2o_read_command function from h2o project: 640// https://github.com/h2o/h2o 641int ConnectionHandler::start_ocsp_update(const char *cert_file) { 642 int rv; 643 644 if (LOG_ENABLED(INFO)) { 645 LOG(INFO) << "Start ocsp update for " << cert_file; 646 } 647 648 assert(!ev_is_active(&ocsp_.rev)); 649 assert(!ev_is_active(&ocsp_.chldev)); 650 651 char *const argv[] = { 652 const_cast<char *>( 653 get_config()->tls.ocsp.fetch_ocsp_response_file.c_str()), 654 const_cast<char *>(cert_file), nullptr}; 655 656 Process proc; 657 rv = exec_read_command(proc, argv); 658 if (rv != 0) { 659 return -1; 660 } 661 662 ocsp_.proc = proc; 663 664 ev_io_set(&ocsp_.rev, ocsp_.proc.rfd, EV_READ); 665 ev_io_start(loop_, &ocsp_.rev); 666 667 ev_child_set(&ocsp_.chldev, ocsp_.proc.pid, 0); 668 ev_child_start(loop_, &ocsp_.chldev); 669 670 return 0; 671} 672 673void ConnectionHandler::read_ocsp_chunk() { 674 std::array<uint8_t, 4_k> buf; 675 for (;;) { 676 ssize_t n; 677 while ((n = read(ocsp_.proc.rfd, buf.data(), buf.size())) == -1 && 678 errno == EINTR) 679 ; 680 681 if (n == -1) { 682 if (errno == EAGAIN || errno == EWOULDBLOCK) { 683 return; 684 } 685 auto error = errno; 686 LOG(WARN) << "Reading from ocsp query command failed: errno=" << error; 687 ocsp_.error = error; 688 689 break; 690 } 691 692 if (n == 0) { 693 break; 694 } 695 696 std::copy_n(std::begin(buf), n, std::back_inserter(ocsp_.resp)); 697 } 698 699 ev_io_stop(loop_, &ocsp_.rev); 700} 701 702void ConnectionHandler::handle_ocsp_complete() { 703 ev_io_stop(loop_, &ocsp_.rev); 704 ev_child_stop(loop_, &ocsp_.chldev); 705 706 assert(ocsp_.next < all_ssl_ctx_.size()); 707#ifdef ENABLE_HTTP3 708 assert(all_ssl_ctx_.size() == quic_all_ssl_ctx_.size()); 709#endif // ENABLE_HTTP3 710 711 auto ssl_ctx = all_ssl_ctx_[ocsp_.next]; 712 auto tls_ctx_data = 713 static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx)); 714 715 auto rstatus = ocsp_.chldev.rstatus; 716 auto status = WEXITSTATUS(rstatus); 717 if (ocsp_.error || !WIFEXITED(rstatus) || status != 0) { 718 LOG(WARN) << "ocsp query command for " << tls_ctx_data->cert_file 719 << " failed: error=" << ocsp_.error << ", rstatus=" << log::hex 720 << rstatus << log::dec << ", status=" << status; 721 ++ocsp_.next; 722 proceed_next_cert_ocsp(); 723 return; 724 } 725 726 if (LOG_ENABLED(INFO)) { 727 LOG(INFO) << "ocsp update for " << tls_ctx_data->cert_file 728 << " finished successfully"; 729 } 730 731 auto config = get_config(); 732 auto &tlsconf = config->tls; 733 734 if (tlsconf.ocsp.no_verify || 735 tls::verify_ocsp_response(ssl_ctx, ocsp_.resp.data(), 736 ocsp_.resp.size()) == 0) { 737#ifdef ENABLE_HTTP3 738 // We have list of SSL_CTX with the same certificate in 739 // quic_all_ssl_ctx_ as well. Some SSL_CTXs are missing there in 740 // that case we get nullptr. 741 auto quic_ssl_ctx = quic_all_ssl_ctx_[ocsp_.next]; 742 if (quic_ssl_ctx) { 743# ifndef OPENSSL_IS_BORINGSSL 744 auto quic_tls_ctx_data = static_cast<tls::TLSContextData *>( 745 SSL_CTX_get_app_data(quic_ssl_ctx)); 746# ifdef HAVE_ATOMIC_STD_SHARED_PTR 747 std::atomic_store_explicit( 748 &quic_tls_ctx_data->ocsp_data, 749 std::make_shared<std::vector<uint8_t>>(ocsp_.resp), 750 std::memory_order_release); 751# else // !HAVE_ATOMIC_STD_SHARED_PTR 752 std::lock_guard<std::mutex> g(quic_tls_ctx_data->mu); 753 quic_tls_ctx_data->ocsp_data = 754 std::make_shared<std::vector<uint8_t>>(ocsp_.resp); 755# endif // !HAVE_ATOMIC_STD_SHARED_PTR 756# else // OPENSSL_IS_BORINGSSL 757 SSL_CTX_set_ocsp_response(quic_ssl_ctx, ocsp_.resp.data(), 758 ocsp_.resp.size()); 759# endif // OPENSSL_IS_BORINGSSL 760 } 761#endif // ENABLE_HTTP3 762 763#ifndef OPENSSL_IS_BORINGSSL 764# ifdef HAVE_ATOMIC_STD_SHARED_PTR 765 std::atomic_store_explicit( 766 &tls_ctx_data->ocsp_data, 767 std::make_shared<std::vector<uint8_t>>(std::move(ocsp_.resp)), 768 std::memory_order_release); 769# else // !HAVE_ATOMIC_STD_SHARED_PTR 770 std::lock_guard<std::mutex> g(tls_ctx_data->mu); 771 tls_ctx_data->ocsp_data = 772 std::make_shared<std::vector<uint8_t>>(std::move(ocsp_.resp)); 773# endif // !HAVE_ATOMIC_STD_SHARED_PTR 774#else // OPENSSL_IS_BORINGSSL 775 SSL_CTX_set_ocsp_response(ssl_ctx, ocsp_.resp.data(), ocsp_.resp.size()); 776#endif // OPENSSL_IS_BORINGSSL 777 } 778 779 ++ocsp_.next; 780 proceed_next_cert_ocsp(); 781} 782 783void ConnectionHandler::reset_ocsp() { 784 if (ocsp_.proc.rfd != -1) { 785 close(ocsp_.proc.rfd); 786 } 787 788 ocsp_.proc.rfd = -1; 789 ocsp_.proc.pid = 0; 790 ocsp_.error = 0; 791 ocsp_.resp = std::vector<uint8_t>(); 792} 793 794void ConnectionHandler::proceed_next_cert_ocsp() { 795 for (;;) { 796 reset_ocsp(); 797 if (ocsp_.next == all_ssl_ctx_.size()) { 798 ocsp_.next = 0; 799 // We have updated all ocsp response, and schedule next update. 800 ev_timer_set(&ocsp_timer_, get_config()->tls.ocsp.update_interval, 0.); 801 ev_timer_start(loop_, &ocsp_timer_); 802 803 if (enable_acceptor_on_ocsp_completion_) { 804 enable_acceptor_on_ocsp_completion_ = false; 805 enable_acceptor(); 806 } 807 808 return; 809 } 810 811 auto ssl_ctx = all_ssl_ctx_[ocsp_.next]; 812 auto tls_ctx_data = 813 static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx)); 814 815 // client SSL_CTX is also included in all_ssl_ctx_, but has no 816 // tls_ctx_data. 817 if (!tls_ctx_data) { 818 ++ocsp_.next; 819 continue; 820 } 821 822 auto cert_file = tls_ctx_data->cert_file; 823 824 if (start_ocsp_update(cert_file) != 0) { 825 ++ocsp_.next; 826 continue; 827 } 828 829 break; 830 } 831} 832 833void ConnectionHandler::set_tls_ticket_key_memcached_dispatcher( 834 std::unique_ptr<MemcachedDispatcher> dispatcher) { 835 tls_ticket_key_memcached_dispatcher_ = std::move(dispatcher); 836} 837 838MemcachedDispatcher * 839ConnectionHandler::get_tls_ticket_key_memcached_dispatcher() const { 840 return tls_ticket_key_memcached_dispatcher_.get(); 841} 842 843// Use the similar backoff algorithm described in 844// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md 845namespace { 846constexpr size_t MAX_BACKOFF_EXP = 10; 847constexpr auto MULTIPLIER = 3.2; 848constexpr auto JITTER = 0.2; 849} // namespace 850 851void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) { 852 if (++tls_ticket_key_memcached_get_retry_count_ >= 853 get_config()->tls.ticket.memcached.max_retry) { 854 LOG(WARN) << "Memcached: tls ticket get retry all failed " 855 << tls_ticket_key_memcached_get_retry_count_ << " times."; 856 857 on_tls_ticket_key_not_found(w); 858 return; 859 } 860 861 auto base_backoff = util::int_pow( 862 MULTIPLIER, 863 std::min(MAX_BACKOFF_EXP, tls_ticket_key_memcached_get_retry_count_)); 864 auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff, 865 JITTER * base_backoff); 866 867 auto backoff = base_backoff + dist(gen_); 868 869 LOG(WARN) 870 << "Memcached: tls ticket get failed due to network error, retrying in " 871 << backoff << " seconds"; 872 873 ev_timer_set(w, backoff, 0.); 874 ev_timer_start(loop_, w); 875} 876 877void ConnectionHandler::on_tls_ticket_key_not_found(ev_timer *w) { 878 tls_ticket_key_memcached_get_retry_count_ = 0; 879 880 if (++tls_ticket_key_memcached_fail_count_ >= 881 get_config()->tls.ticket.memcached.max_fail) { 882 LOG(WARN) << "Memcached: could not get tls ticket; disable tls ticket"; 883 884 tls_ticket_key_memcached_fail_count_ = 0; 885 886 set_ticket_keys(nullptr); 887 set_ticket_keys_to_worker(nullptr); 888 } 889 890 LOG(WARN) << "Memcached: tls ticket get failed, schedule next"; 891 schedule_next_tls_ticket_key_memcached_get(w); 892} 893 894void ConnectionHandler::on_tls_ticket_key_get_success( 895 const std::shared_ptr<TicketKeys> &ticket_keys, ev_timer *w) { 896 LOG(NOTICE) << "Memcached: tls ticket get success"; 897 898 tls_ticket_key_memcached_get_retry_count_ = 0; 899 tls_ticket_key_memcached_fail_count_ = 0; 900 901 schedule_next_tls_ticket_key_memcached_get(w); 902 903 if (!ticket_keys || ticket_keys->keys.empty()) { 904 LOG(WARN) << "Memcached: tls ticket keys are empty; tls ticket disabled"; 905 set_ticket_keys(nullptr); 906 set_ticket_keys_to_worker(nullptr); 907 return; 908 } 909 910 if (LOG_ENABLED(INFO)) { 911 LOG(INFO) << "ticket keys get done"; 912 LOG(INFO) << 0 << " enc+dec: " 913 << util::format_hex(ticket_keys->keys[0].data.name); 914 for (size_t i = 1; i < ticket_keys->keys.size(); ++i) { 915 auto &key = ticket_keys->keys[i]; 916 LOG(INFO) << i << " dec: " << util::format_hex(key.data.name); 917 } 918 } 919 920 set_ticket_keys(ticket_keys); 921 set_ticket_keys_to_worker(ticket_keys); 922} 923 924void ConnectionHandler::schedule_next_tls_ticket_key_memcached_get( 925 ev_timer *w) { 926 ev_timer_set(w, get_config()->tls.ticket.memcached.interval, 0.); 927 ev_timer_start(loop_, w); 928} 929 930SSL_CTX *ConnectionHandler::create_tls_ticket_key_memcached_ssl_ctx() { 931 auto config = get_config(); 932 auto &tlsconf = config->tls; 933 auto &memcachedconf = config->tls.ticket.memcached; 934 935 auto ssl_ctx = tls::create_ssl_client_context( 936#ifdef HAVE_NEVERBLEED 937 nb_, 938#endif // HAVE_NEVERBLEED 939 tlsconf.cacert, memcachedconf.cert_file, memcachedconf.private_key_file, 940 nullptr); 941 942 all_ssl_ctx_.push_back(ssl_ctx); 943#ifdef ENABLE_HTTP3 944 quic_all_ssl_ctx_.push_back(nullptr); 945#endif // ENABLE_HTTP3 946 947 return ssl_ctx; 948} 949 950#ifdef HAVE_NEVERBLEED 951void ConnectionHandler::set_neverbleed(neverbleed_t *nb) { nb_ = nb; } 952#endif // HAVE_NEVERBLEED 953 954void ConnectionHandler::handle_serial_event() { 955 std::vector<SerialEvent> q; 956 { 957 std::lock_guard<std::mutex> g(serial_event_mu_); 958 q.swap(serial_events_); 959 } 960 961 for (auto &sev : q) { 962 switch (sev.type) { 963 case SerialEventType::REPLACE_DOWNSTREAM: 964 // Mmake sure that none of worker uses 965 // get_config()->conn.downstream 966 mod_config()->conn.downstream = sev.downstreamconf; 967 968 if (single_worker_) { 969 single_worker_->replace_downstream_config(sev.downstreamconf); 970 971 break; 972 } 973 974 worker_replace_downstream(sev.downstreamconf); 975 976 break; 977 default: 978 break; 979 } 980 } 981} 982 983void ConnectionHandler::send_replace_downstream( 984 const std::shared_ptr<DownstreamConfig> &downstreamconf) { 985 send_serial_event( 986 SerialEvent(SerialEventType::REPLACE_DOWNSTREAM, downstreamconf)); 987} 988 989void ConnectionHandler::send_serial_event(SerialEvent ev) { 990 { 991 std::lock_guard<std::mutex> g(serial_event_mu_); 992 993 serial_events_.push_back(std::move(ev)); 994 } 995 996 ev_async_send(loop_, &serial_event_asyncev_); 997} 998 999SSL_CTX *ConnectionHandler::get_ssl_ctx(size_t idx) const { 1000 return all_ssl_ctx_[idx]; 1001} 1002 1003const std::vector<SSL_CTX *> & 1004ConnectionHandler::get_indexed_ssl_ctx(size_t idx) const { 1005 return indexed_ssl_ctx_[idx]; 1006} 1007 1008#ifdef ENABLE_HTTP3 1009const std::vector<SSL_CTX *> & 1010ConnectionHandler::get_quic_indexed_ssl_ctx(size_t idx) const { 1011 return quic_indexed_ssl_ctx_[idx]; 1012} 1013#endif // ENABLE_HTTP3 1014 1015void ConnectionHandler::set_enable_acceptor_on_ocsp_completion(bool f) { 1016 enable_acceptor_on_ocsp_completion_ = f; 1017} 1018 1019#ifdef ENABLE_HTTP3 1020int ConnectionHandler::forward_quic_packet( 1021 const UpstreamAddr *faddr, const Address &remote_addr, 1022 const Address &local_addr, const ngtcp2_pkt_info &pi, 1023 const uint8_t *cid_prefix, const uint8_t *data, size_t datalen) { 1024 assert(!get_config()->single_thread); 1025 1026 for (auto &worker : workers_) { 1027 if (!std::equal(cid_prefix, cid_prefix + SHRPX_QUIC_CID_PREFIXLEN, 1028 worker->get_cid_prefix())) { 1029 continue; 1030 } 1031 1032 WorkerEvent wev{}; 1033 wev.type = WorkerEventType::QUIC_PKT_FORWARD; 1034 wev.quic_pkt = std::make_unique<QUICPacket>(faddr->index, remote_addr, 1035 local_addr, pi, data, datalen); 1036 1037 worker->send(std::move(wev)); 1038 1039 return 0; 1040 } 1041 1042 return -1; 1043} 1044 1045void ConnectionHandler::set_quic_keying_materials( 1046 std::shared_ptr<QUICKeyingMaterials> qkms) { 1047 quic_keying_materials_ = std::move(qkms); 1048} 1049 1050const std::shared_ptr<QUICKeyingMaterials> & 1051ConnectionHandler::get_quic_keying_materials() const { 1052 return quic_keying_materials_; 1053} 1054 1055void ConnectionHandler::set_cid_prefixes( 1056 const std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>> 1057 &cid_prefixes) { 1058 cid_prefixes_ = cid_prefixes; 1059} 1060 1061QUICLingeringWorkerProcess * 1062ConnectionHandler::match_quic_lingering_worker_process_cid_prefix( 1063 const uint8_t *dcid, size_t dcidlen) { 1064 assert(dcidlen >= SHRPX_QUIC_CID_PREFIXLEN); 1065 1066 for (auto &lwps : quic_lingering_worker_processes_) { 1067 for (auto &cid_prefix : lwps.cid_prefixes) { 1068 if (std::equal(std::begin(cid_prefix), std::end(cid_prefix), dcid)) { 1069 return &lwps; 1070 } 1071 } 1072 } 1073 1074 return nullptr; 1075} 1076 1077# ifdef HAVE_LIBBPF 1078std::vector<BPFRef> &ConnectionHandler::get_quic_bpf_refs() { 1079 return quic_bpf_refs_; 1080} 1081 1082void ConnectionHandler::unload_bpf_objects() { 1083 LOG(NOTICE) << "Unloading BPF objects"; 1084 1085 for (auto &ref : quic_bpf_refs_) { 1086 if (ref.obj == nullptr) { 1087 continue; 1088 } 1089 1090 bpf_object__close(ref.obj); 1091 1092 ref.obj = nullptr; 1093 } 1094} 1095# endif // HAVE_LIBBPF 1096 1097void ConnectionHandler::set_quic_ipc_fd(int fd) { quic_ipc_fd_ = fd; } 1098 1099void ConnectionHandler::set_quic_lingering_worker_processes( 1100 const std::vector<QUICLingeringWorkerProcess> &quic_lwps) { 1101 quic_lingering_worker_processes_ = quic_lwps; 1102} 1103 1104int ConnectionHandler::forward_quic_packet_to_lingering_worker_process( 1105 QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr, 1106 const Address &local_addr, const ngtcp2_pkt_info &pi, const uint8_t *data, 1107 size_t datalen) { 1108 std::array<uint8_t, 512> header; 1109 1110 assert(header.size() >= 1 + 1 + 1 + 1 + sizeof(sockaddr_storage) * 2); 1111 assert(remote_addr.len > 0); 1112 assert(local_addr.len > 0); 1113 1114 auto p = header.data(); 1115 1116 *p++ = static_cast<uint8_t>(QUICIPCType::DGRAM_FORWARD); 1117 *p++ = static_cast<uint8_t>(remote_addr.len - 1); 1118 p = std::copy_n(reinterpret_cast<const uint8_t *>(&remote_addr.su), 1119 remote_addr.len, p); 1120 *p++ = static_cast<uint8_t>(local_addr.len - 1); 1121 p = std::copy_n(reinterpret_cast<const uint8_t *>(&local_addr.su), 1122 local_addr.len, p); 1123 *p++ = pi.ecn; 1124 1125 iovec msg_iov[] = { 1126 { 1127 .iov_base = header.data(), 1128 .iov_len = static_cast<size_t>(p - header.data()), 1129 }, 1130 { 1131 .iov_base = const_cast<uint8_t *>(data), 1132 .iov_len = datalen, 1133 }, 1134 }; 1135 1136 msghdr msg{}; 1137 msg.msg_iov = msg_iov; 1138 msg.msg_iovlen = array_size(msg_iov); 1139 1140 ssize_t nwrite; 1141 1142 while ((nwrite = sendmsg(quic_lwp->quic_ipc_fd, &msg, 0)) == -1 && 1143 errno == EINTR) 1144 ; 1145 1146 if (nwrite == -1) { 1147 std::array<char, STRERROR_BUFSIZE> errbuf; 1148 1149 auto error = errno; 1150 LOG(ERROR) << "Failed to send QUIC IPC message: " 1151 << xsi_strerror(error, errbuf.data(), errbuf.size()); 1152 1153 return -1; 1154 } 1155 1156 return 0; 1157} 1158 1159int ConnectionHandler::quic_ipc_read() { 1160 std::array<uint8_t, 65536> buf; 1161 1162 ssize_t nread; 1163 1164 while ((nread = recv(quic_ipc_fd_, buf.data(), buf.size(), 0)) == -1 && 1165 errno == EINTR) 1166 ; 1167 1168 if (nread == -1) { 1169 std::array<char, STRERROR_BUFSIZE> errbuf; 1170 1171 auto error = errno; 1172 LOG(ERROR) << "Failed to read data from QUIC IPC channel: " 1173 << xsi_strerror(error, errbuf.data(), errbuf.size()); 1174 1175 return -1; 1176 } 1177 1178 if (nread == 0) { 1179 return 0; 1180 } 1181 1182 size_t len = 1 + 1 + 1 + 1; 1183 1184 // Wire format: 1185 // TYPE(1) REMOTE_ADDRLEN(1) REMOTE_ADDR(N) LOCAL_ADDRLEN(1) LOCAL_ADDR(N) 1186 // ECN(1) DGRAM_PAYLOAD(N) 1187 // 1188 // When encoding, REMOTE_ADDRLEN and LOCAL_ADDRLEN are decremented 1189 // by 1. 1190 if (static_cast<size_t>(nread) < len) { 1191 return 0; 1192 } 1193 1194 auto p = buf.data(); 1195 if (*p != static_cast<uint8_t>(QUICIPCType::DGRAM_FORWARD)) { 1196 LOG(ERROR) << "Unknown QUICIPCType: " << static_cast<uint32_t>(*p); 1197 1198 return -1; 1199 } 1200 1201 ++p; 1202 1203 auto pkt = std::make_unique<QUICPacket>(); 1204 1205 auto remote_addrlen = static_cast<size_t>(*p++) + 1; 1206 if (remote_addrlen > sizeof(sockaddr_storage)) { 1207 LOG(ERROR) << "The length of remote address is too large: " 1208 << remote_addrlen; 1209 1210 return -1; 1211 } 1212 1213 len += remote_addrlen; 1214 1215 if (static_cast<size_t>(nread) < len) { 1216 LOG(ERROR) << "Insufficient QUIC IPC message length"; 1217 1218 return -1; 1219 } 1220 1221 pkt->remote_addr.len = remote_addrlen; 1222 memcpy(&pkt->remote_addr.su, p, remote_addrlen); 1223 1224 p += remote_addrlen; 1225 1226 auto local_addrlen = static_cast<size_t>(*p++) + 1; 1227 if (local_addrlen > sizeof(sockaddr_storage)) { 1228 LOG(ERROR) << "The length of local address is too large: " << local_addrlen; 1229 1230 return -1; 1231 } 1232 1233 len += local_addrlen; 1234 1235 if (static_cast<size_t>(nread) < len) { 1236 LOG(ERROR) << "Insufficient QUIC IPC message length"; 1237 1238 return -1; 1239 } 1240 1241 pkt->local_addr.len = local_addrlen; 1242 memcpy(&pkt->local_addr.su, p, local_addrlen); 1243 1244 p += local_addrlen; 1245 1246 pkt->pi.ecn = *p++; 1247 1248 auto datalen = nread - (p - buf.data()); 1249 1250 pkt->data.assign(p, p + datalen); 1251 1252 // At the moment, UpstreamAddr index is unknown. 1253 pkt->upstream_addr_index = static_cast<size_t>(-1); 1254 1255 ngtcp2_version_cid vc; 1256 1257 auto rv = ngtcp2_pkt_decode_version_cid(&vc, p, datalen, SHRPX_QUIC_SCIDLEN); 1258 if (rv < 0) { 1259 LOG(ERROR) << "ngtcp2_pkt_decode_version_cid: " << ngtcp2_strerror(rv); 1260 1261 return -1; 1262 } 1263 1264 if (vc.dcidlen != SHRPX_QUIC_SCIDLEN) { 1265 LOG(ERROR) << "DCID length is invalid"; 1266 return -1; 1267 } 1268 1269 if (single_worker_) { 1270 auto faddr = single_worker_->find_quic_upstream_addr(pkt->local_addr); 1271 if (faddr == nullptr) { 1272 LOG(ERROR) << "No suitable upstream address found"; 1273 1274 return 0; 1275 } 1276 1277 auto quic_conn_handler = single_worker_->get_quic_connection_handler(); 1278 1279 // Ignore return value 1280 quic_conn_handler->handle_packet(faddr, pkt->remote_addr, pkt->local_addr, 1281 pkt->pi, pkt->data.data(), 1282 pkt->data.size()); 1283 1284 return 0; 1285 } 1286 1287 auto &qkm = quic_keying_materials_->keying_materials.front(); 1288 1289 std::array<uint8_t, SHRPX_QUIC_DECRYPTED_DCIDLEN> decrypted_dcid; 1290 1291 if (decrypt_quic_connection_id(decrypted_dcid.data(), 1292 vc.dcid + SHRPX_QUIC_CID_PREFIX_OFFSET, 1293 qkm.cid_encryption_key.data()) != 0) { 1294 return -1; 1295 } 1296 1297 for (auto &worker : workers_) { 1298 if (!std::equal(std::begin(decrypted_dcid), 1299 std::begin(decrypted_dcid) + SHRPX_QUIC_CID_PREFIXLEN, 1300 worker->get_cid_prefix())) { 1301 continue; 1302 } 1303 1304 WorkerEvent wev{ 1305 .type = WorkerEventType::QUIC_PKT_FORWARD, 1306 .quic_pkt = std::move(pkt), 1307 }; 1308 worker->send(std::move(wev)); 1309 1310 return 0; 1311 } 1312 1313 if (LOG_ENABLED(INFO)) { 1314 LOG(INFO) << "No worker to match CID prefix"; 1315 } 1316 1317 return 0; 1318} 1319#endif // ENABLE_HTTP3 1320 1321} // namespace shrpx 1322