1/*
2 * nghttp2 - HTTP/2 C Library
3 *
4 * Copyright (c) 2012 Tatsuhiro Tsujikawa
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining
7 * a copy of this software and associated documentation files (the
8 * "Software"), to deal in the Software without restriction, including
9 * without limitation the rights to use, copy, modify, merge, publish,
10 * distribute, sublicense, and/or sell copies of the Software, and to
11 * permit persons to whom the Software is furnished to do so, subject to
12 * the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be
15 * included in all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 */
25#include "shrpx_connection_handler.h"
26
27#ifdef HAVE_UNISTD_H
28#  include <unistd.h>
29#endif // HAVE_UNISTD_H
30#include <sys/types.h>
31#include <sys/wait.h>
32
33#include <cerrno>
34#include <thread>
35#include <random>
36
37#include "shrpx_client_handler.h"
38#include "shrpx_tls.h"
39#include "shrpx_worker.h"
40#include "shrpx_config.h"
41#include "shrpx_http2_session.h"
42#include "shrpx_connect_blocker.h"
43#include "shrpx_downstream_connection.h"
44#include "shrpx_accept_handler.h"
45#include "shrpx_memcached_dispatcher.h"
46#include "shrpx_signal.h"
47#include "shrpx_log.h"
48#include "xsi_strerror.h"
49#include "util.h"
50#include "template.h"
51
52using namespace nghttp2;
53
54namespace shrpx {
55
56namespace {
57void acceptor_disable_cb(struct ev_loop *loop, ev_timer *w, int revent) {
58  auto h = static_cast<ConnectionHandler *>(w->data);
59
60  // If we are in graceful shutdown period, we must not enable
61  // acceptors again.
62  if (h->get_graceful_shutdown()) {
63    return;
64  }
65
66  h->enable_acceptor();
67}
68} // namespace
69
70namespace {
71void ocsp_cb(struct ev_loop *loop, ev_timer *w, int revent) {
72  auto h = static_cast<ConnectionHandler *>(w->data);
73
74  // If we are in graceful shutdown period, we won't do ocsp query.
75  if (h->get_graceful_shutdown()) {
76    return;
77  }
78
79  LOG(NOTICE) << "Start ocsp update";
80
81  h->proceed_next_cert_ocsp();
82}
83} // namespace
84
85namespace {
86void ocsp_read_cb(struct ev_loop *loop, ev_io *w, int revent) {
87  auto h = static_cast<ConnectionHandler *>(w->data);
88
89  h->read_ocsp_chunk();
90}
91} // namespace
92
93namespace {
94void ocsp_chld_cb(struct ev_loop *loop, ev_child *w, int revent) {
95  auto h = static_cast<ConnectionHandler *>(w->data);
96
97  h->handle_ocsp_complete();
98}
99} // namespace
100
101namespace {
102void thread_join_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
103  ev_break(loop);
104}
105} // namespace
106
107namespace {
108void serial_event_async_cb(struct ev_loop *loop, ev_async *w, int revent) {
109  auto h = static_cast<ConnectionHandler *>(w->data);
110
111  h->handle_serial_event();
112}
113} // namespace
114
115ConnectionHandler::ConnectionHandler(struct ev_loop *loop, std::mt19937 &gen)
116    :
117#ifdef ENABLE_HTTP3
118      quic_ipc_fd_(-1),
119#endif // ENABLE_HTTP3
120      gen_(gen),
121      single_worker_(nullptr),
122      loop_(loop),
123#ifdef HAVE_NEVERBLEED
124      nb_(nullptr),
125#endif // HAVE_NEVERBLEED
126      tls_ticket_key_memcached_get_retry_count_(0),
127      tls_ticket_key_memcached_fail_count_(0),
128      worker_round_robin_cnt_(get_config()->api.enabled ? 1 : 0),
129      graceful_shutdown_(false),
130      enable_acceptor_on_ocsp_completion_(false) {
131  ev_timer_init(&disable_acceptor_timer_, acceptor_disable_cb, 0., 0.);
132  disable_acceptor_timer_.data = this;
133
134  ev_timer_init(&ocsp_timer_, ocsp_cb, 0., 0.);
135  ocsp_timer_.data = this;
136
137  ev_io_init(&ocsp_.rev, ocsp_read_cb, -1, EV_READ);
138  ocsp_.rev.data = this;
139
140  ev_async_init(&thread_join_asyncev_, thread_join_async_cb);
141
142  ev_async_init(&serial_event_asyncev_, serial_event_async_cb);
143  serial_event_asyncev_.data = this;
144
145  ev_async_start(loop_, &serial_event_asyncev_);
146
147  ev_child_init(&ocsp_.chldev, ocsp_chld_cb, 0, 0);
148  ocsp_.chldev.data = this;
149
150  ocsp_.next = 0;
151  ocsp_.proc.rfd = -1;
152
153  reset_ocsp();
154}
155
156ConnectionHandler::~ConnectionHandler() {
157  ev_child_stop(loop_, &ocsp_.chldev);
158  ev_async_stop(loop_, &serial_event_asyncev_);
159  ev_async_stop(loop_, &thread_join_asyncev_);
160  ev_io_stop(loop_, &ocsp_.rev);
161  ev_timer_stop(loop_, &ocsp_timer_);
162  ev_timer_stop(loop_, &disable_acceptor_timer_);
163
164#ifdef ENABLE_HTTP3
165  for (auto ssl_ctx : quic_all_ssl_ctx_) {
166    if (ssl_ctx == nullptr) {
167      continue;
168    }
169
170    auto tls_ctx_data =
171        static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx));
172    delete tls_ctx_data;
173    SSL_CTX_free(ssl_ctx);
174  }
175#endif // ENABLE_HTTP3
176
177  for (auto ssl_ctx : all_ssl_ctx_) {
178    auto tls_ctx_data =
179        static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx));
180    delete tls_ctx_data;
181    SSL_CTX_free(ssl_ctx);
182  }
183
184  // Free workers before destroying ev_loop
185  workers_.clear();
186
187  for (auto loop : worker_loops_) {
188    ev_loop_destroy(loop);
189  }
190}
191
192void ConnectionHandler::set_ticket_keys_to_worker(
193    const std::shared_ptr<TicketKeys> &ticket_keys) {
194  for (auto &worker : workers_) {
195    worker->set_ticket_keys(ticket_keys);
196  }
197}
198
199void ConnectionHandler::worker_reopen_log_files() {
200  for (auto &worker : workers_) {
201    WorkerEvent wev{};
202
203    wev.type = WorkerEventType::REOPEN_LOG;
204
205    worker->send(std::move(wev));
206  }
207}
208
209void ConnectionHandler::worker_replace_downstream(
210    std::shared_ptr<DownstreamConfig> downstreamconf) {
211  for (auto &worker : workers_) {
212    WorkerEvent wev{};
213
214    wev.type = WorkerEventType::REPLACE_DOWNSTREAM;
215    wev.downstreamconf = downstreamconf;
216
217    worker->send(std::move(wev));
218  }
219}
220
221int ConnectionHandler::create_single_worker() {
222  cert_tree_ = tls::create_cert_lookup_tree();
223  auto sv_ssl_ctx = tls::setup_server_ssl_context(
224      all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get()
225#ifdef HAVE_NEVERBLEED
226                                          ,
227      nb_
228#endif // HAVE_NEVERBLEED
229  );
230
231#ifdef ENABLE_HTTP3
232  quic_cert_tree_ = tls::create_cert_lookup_tree();
233  auto quic_sv_ssl_ctx = tls::setup_quic_server_ssl_context(
234      quic_all_ssl_ctx_, quic_indexed_ssl_ctx_, quic_cert_tree_.get()
235#  ifdef HAVE_NEVERBLEED
236                                                    ,
237      nb_
238#  endif // HAVE_NEVERBLEED
239  );
240#endif // ENABLE_HTTP3
241
242  auto cl_ssl_ctx = tls::setup_downstream_client_ssl_context(
243#ifdef HAVE_NEVERBLEED
244      nb_
245#endif // HAVE_NEVERBLEED
246  );
247
248  if (cl_ssl_ctx) {
249    all_ssl_ctx_.push_back(cl_ssl_ctx);
250#ifdef ENABLE_HTTP3
251    quic_all_ssl_ctx_.push_back(nullptr);
252#endif // ENABLE_HTTP3
253  }
254
255  auto config = get_config();
256  auto &tlsconf = config->tls;
257
258  SSL_CTX *session_cache_ssl_ctx = nullptr;
259  {
260    auto &memcachedconf = config->tls.session_cache.memcached;
261    if (memcachedconf.tls) {
262      session_cache_ssl_ctx = tls::create_ssl_client_context(
263#ifdef HAVE_NEVERBLEED
264          nb_,
265#endif // HAVE_NEVERBLEED
266          tlsconf.cacert, memcachedconf.cert_file,
267          memcachedconf.private_key_file, nullptr);
268      all_ssl_ctx_.push_back(session_cache_ssl_ctx);
269#ifdef ENABLE_HTTP3
270      quic_all_ssl_ctx_.push_back(nullptr);
271#endif // ENABLE_HTTP3
272    }
273  }
274
275#if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
276  quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size());
277#endif // ENABLE_HTTP3 && HAVE_LIBBPF
278
279#ifdef ENABLE_HTTP3
280  assert(cid_prefixes_.size() == 1);
281  const auto &cid_prefix = cid_prefixes_[0];
282#endif // ENABLE_HTTP3
283
284  single_worker_ = std::make_unique<Worker>(
285      loop_, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(),
286#ifdef ENABLE_HTTP3
287      quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(),
288      cid_prefix.size(),
289#  ifdef HAVE_LIBBPF
290      /* index = */ 0,
291#  endif // HAVE_LIBBPF
292#endif   // ENABLE_HTTP3
293      ticket_keys_, this, config->conn.downstream);
294#ifdef HAVE_MRUBY
295  if (single_worker_->create_mruby_context() != 0) {
296    return -1;
297  }
298#endif // HAVE_MRUBY
299
300#ifdef ENABLE_HTTP3
301  if (single_worker_->setup_quic_server_socket() != 0) {
302    return -1;
303  }
304#endif // ENABLE_HTTP3
305
306  return 0;
307}
308
309int ConnectionHandler::create_worker_thread(size_t num) {
310#ifndef NOTHREADS
311  assert(workers_.size() == 0);
312
313  cert_tree_ = tls::create_cert_lookup_tree();
314  auto sv_ssl_ctx = tls::setup_server_ssl_context(
315      all_ssl_ctx_, indexed_ssl_ctx_, cert_tree_.get()
316#  ifdef HAVE_NEVERBLEED
317                                          ,
318      nb_
319#  endif // HAVE_NEVERBLEED
320  );
321
322#  ifdef ENABLE_HTTP3
323  quic_cert_tree_ = tls::create_cert_lookup_tree();
324  auto quic_sv_ssl_ctx = tls::setup_quic_server_ssl_context(
325      quic_all_ssl_ctx_, quic_indexed_ssl_ctx_, quic_cert_tree_.get()
326#    ifdef HAVE_NEVERBLEED
327                                                    ,
328      nb_
329#    endif // HAVE_NEVERBLEED
330  );
331#  endif // ENABLE_HTTP3
332
333  auto cl_ssl_ctx = tls::setup_downstream_client_ssl_context(
334#  ifdef HAVE_NEVERBLEED
335      nb_
336#  endif // HAVE_NEVERBLEED
337  );
338
339  if (cl_ssl_ctx) {
340    all_ssl_ctx_.push_back(cl_ssl_ctx);
341#  ifdef ENABLE_HTTP3
342    quic_all_ssl_ctx_.push_back(nullptr);
343#  endif // ENABLE_HTTP3
344  }
345
346  auto config = get_config();
347  auto &tlsconf = config->tls;
348  auto &apiconf = config->api;
349
350#  if defined(ENABLE_HTTP3) && defined(HAVE_LIBBPF)
351  quic_bpf_refs_.resize(config->conn.quic_listener.addrs.size());
352#  endif // ENABLE_HTTP3 && HAVE_LIBBPF
353
354  // We have dedicated worker for API request processing.
355  if (apiconf.enabled) {
356    ++num;
357  }
358
359  SSL_CTX *session_cache_ssl_ctx = nullptr;
360  {
361    auto &memcachedconf = config->tls.session_cache.memcached;
362
363    if (memcachedconf.tls) {
364      session_cache_ssl_ctx = tls::create_ssl_client_context(
365#  ifdef HAVE_NEVERBLEED
366          nb_,
367#  endif // HAVE_NEVERBLEED
368          tlsconf.cacert, memcachedconf.cert_file,
369          memcachedconf.private_key_file, nullptr);
370      all_ssl_ctx_.push_back(session_cache_ssl_ctx);
371#  ifdef ENABLE_HTTP3
372      quic_all_ssl_ctx_.push_back(nullptr);
373#  endif // ENABLE_HTTP3
374    }
375  }
376
377#  ifdef ENABLE_HTTP3
378  assert(cid_prefixes_.size() == num);
379#  endif // ENABLE_HTTP3
380
381  for (size_t i = 0; i < num; ++i) {
382    auto loop = ev_loop_new(config->ev_loop_flags);
383
384#  ifdef ENABLE_HTTP3
385    const auto &cid_prefix = cid_prefixes_[i];
386#  endif // ENABLE_HTTP3
387
388    auto worker = std::make_unique<Worker>(
389        loop, sv_ssl_ctx, cl_ssl_ctx, session_cache_ssl_ctx, cert_tree_.get(),
390#  ifdef ENABLE_HTTP3
391        quic_sv_ssl_ctx, quic_cert_tree_.get(), cid_prefix.data(),
392        cid_prefix.size(),
393#    ifdef HAVE_LIBBPF
394        i,
395#    endif // HAVE_LIBBPF
396#  endif   // ENABLE_HTTP3
397        ticket_keys_, this, config->conn.downstream);
398#  ifdef HAVE_MRUBY
399    if (worker->create_mruby_context() != 0) {
400      return -1;
401    }
402#  endif // HAVE_MRUBY
403
404#  ifdef ENABLE_HTTP3
405    if ((!apiconf.enabled || i != 0) &&
406        worker->setup_quic_server_socket() != 0) {
407      return -1;
408    }
409#  endif // ENABLE_HTTP3
410
411    workers_.push_back(std::move(worker));
412    worker_loops_.push_back(loop);
413
414    LLOG(NOTICE, this) << "Created worker thread #" << workers_.size() - 1;
415  }
416
417  for (auto &worker : workers_) {
418    worker->run_async();
419  }
420
421#endif // NOTHREADS
422
423  return 0;
424}
425
426void ConnectionHandler::join_worker() {
427#ifndef NOTHREADS
428  int n = 0;
429
430  if (LOG_ENABLED(INFO)) {
431    LLOG(INFO, this) << "Waiting for worker thread to join: n="
432                     << workers_.size();
433  }
434
435  for (auto &worker : workers_) {
436    worker->wait();
437    if (LOG_ENABLED(INFO)) {
438      LLOG(INFO, this) << "Thread #" << n << " joined";
439    }
440    ++n;
441  }
442#endif // NOTHREADS
443}
444
445void ConnectionHandler::graceful_shutdown_worker() {
446  if (single_worker_) {
447    return;
448  }
449
450  if (LOG_ENABLED(INFO)) {
451    LLOG(INFO, this) << "Sending graceful shutdown signal to worker";
452  }
453
454  for (auto &worker : workers_) {
455    WorkerEvent wev{};
456    wev.type = WorkerEventType::GRACEFUL_SHUTDOWN;
457
458    worker->send(std::move(wev));
459  }
460
461#ifndef NOTHREADS
462  ev_async_start(loop_, &thread_join_asyncev_);
463
464  thread_join_fut_ = std::async(std::launch::async, [this]() {
465    (void)reopen_log_files(get_config()->logging);
466    join_worker();
467    ev_async_send(get_loop(), &thread_join_asyncev_);
468    delete_log_config();
469  });
470#endif // NOTHREADS
471}
472
473int ConnectionHandler::handle_connection(int fd, sockaddr *addr, int addrlen,
474                                         const UpstreamAddr *faddr) {
475  if (LOG_ENABLED(INFO)) {
476    LLOG(INFO, this) << "Accepted connection from "
477                     << util::numeric_name(addr, addrlen) << ", fd=" << fd;
478  }
479
480  auto config = get_config();
481
482  if (single_worker_) {
483    auto &upstreamconf = config->conn.upstream;
484    if (single_worker_->get_worker_stat()->num_connections >=
485        upstreamconf.worker_connections) {
486
487      if (LOG_ENABLED(INFO)) {
488        LLOG(INFO, this) << "Too many connections >="
489                         << upstreamconf.worker_connections;
490      }
491
492      close(fd);
493      return -1;
494    }
495
496    auto client =
497        tls::accept_connection(single_worker_.get(), fd, addr, addrlen, faddr);
498    if (!client) {
499      LLOG(ERROR, this) << "ClientHandler creation failed";
500
501      close(fd);
502      return -1;
503    }
504
505    return 0;
506  }
507
508  Worker *worker;
509
510  if (faddr->alt_mode == UpstreamAltMode::API) {
511    worker = workers_[0].get();
512
513    if (LOG_ENABLED(INFO)) {
514      LOG(INFO) << "Dispatch connection to API worker #0";
515    }
516  } else {
517    worker = workers_[worker_round_robin_cnt_].get();
518
519    if (LOG_ENABLED(INFO)) {
520      LOG(INFO) << "Dispatch connection to worker #" << worker_round_robin_cnt_;
521    }
522
523    if (++worker_round_robin_cnt_ == workers_.size()) {
524      auto &apiconf = config->api;
525
526      if (apiconf.enabled) {
527        worker_round_robin_cnt_ = 1;
528      } else {
529        worker_round_robin_cnt_ = 0;
530      }
531    }
532  }
533
534  WorkerEvent wev{};
535  wev.type = WorkerEventType::NEW_CONNECTION;
536  wev.client_fd = fd;
537  memcpy(&wev.client_addr, addr, addrlen);
538  wev.client_addrlen = addrlen;
539  wev.faddr = faddr;
540
541  worker->send(std::move(wev));
542
543  return 0;
544}
545
546struct ev_loop *ConnectionHandler::get_loop() const {
547  return loop_;
548}
549
550Worker *ConnectionHandler::get_single_worker() const {
551  return single_worker_.get();
552}
553
554void ConnectionHandler::add_acceptor(std::unique_ptr<AcceptHandler> h) {
555  acceptors_.push_back(std::move(h));
556}
557
558void ConnectionHandler::delete_acceptor() { acceptors_.clear(); }
559
560void ConnectionHandler::enable_acceptor() {
561  for (auto &a : acceptors_) {
562    a->enable();
563  }
564}
565
566void ConnectionHandler::disable_acceptor() {
567  for (auto &a : acceptors_) {
568    a->disable();
569  }
570}
571
572void ConnectionHandler::sleep_acceptor(ev_tstamp t) {
573  if (t == 0. || ev_is_active(&disable_acceptor_timer_)) {
574    return;
575  }
576
577  disable_acceptor();
578
579  ev_timer_set(&disable_acceptor_timer_, t, 0.);
580  ev_timer_start(loop_, &disable_acceptor_timer_);
581}
582
583void ConnectionHandler::accept_pending_connection() {
584  for (auto &a : acceptors_) {
585    a->accept_connection();
586  }
587}
588
589void ConnectionHandler::set_ticket_keys(
590    std::shared_ptr<TicketKeys> ticket_keys) {
591  ticket_keys_ = std::move(ticket_keys);
592  if (single_worker_) {
593    single_worker_->set_ticket_keys(ticket_keys_);
594  }
595}
596
597const std::shared_ptr<TicketKeys> &ConnectionHandler::get_ticket_keys() const {
598  return ticket_keys_;
599}
600
601void ConnectionHandler::set_graceful_shutdown(bool f) {
602  graceful_shutdown_ = f;
603  if (single_worker_) {
604    single_worker_->set_graceful_shutdown(f);
605  }
606}
607
608bool ConnectionHandler::get_graceful_shutdown() const {
609  return graceful_shutdown_;
610}
611
612void ConnectionHandler::cancel_ocsp_update() {
613  enable_acceptor_on_ocsp_completion_ = false;
614  ev_timer_stop(loop_, &ocsp_timer_);
615
616  if (ocsp_.proc.pid == 0) {
617    return;
618  }
619
620  int rv;
621
622  rv = kill(ocsp_.proc.pid, SIGTERM);
623  if (rv != 0) {
624    auto error = errno;
625    LOG(ERROR) << "Could not send signal to OCSP query process: errno="
626               << error;
627  }
628
629  while ((rv = waitpid(ocsp_.proc.pid, nullptr, 0)) == -1 && errno == EINTR)
630    ;
631  if (rv == -1) {
632    auto error = errno;
633    LOG(ERROR) << "Error occurred while we were waiting for the completion of "
634                  "OCSP query process: errno="
635               << error;
636  }
637}
638
639// inspired by h2o_read_command function from h2o project:
640// https://github.com/h2o/h2o
641int ConnectionHandler::start_ocsp_update(const char *cert_file) {
642  int rv;
643
644  if (LOG_ENABLED(INFO)) {
645    LOG(INFO) << "Start ocsp update for " << cert_file;
646  }
647
648  assert(!ev_is_active(&ocsp_.rev));
649  assert(!ev_is_active(&ocsp_.chldev));
650
651  char *const argv[] = {
652      const_cast<char *>(
653          get_config()->tls.ocsp.fetch_ocsp_response_file.c_str()),
654      const_cast<char *>(cert_file), nullptr};
655
656  Process proc;
657  rv = exec_read_command(proc, argv);
658  if (rv != 0) {
659    return -1;
660  }
661
662  ocsp_.proc = proc;
663
664  ev_io_set(&ocsp_.rev, ocsp_.proc.rfd, EV_READ);
665  ev_io_start(loop_, &ocsp_.rev);
666
667  ev_child_set(&ocsp_.chldev, ocsp_.proc.pid, 0);
668  ev_child_start(loop_, &ocsp_.chldev);
669
670  return 0;
671}
672
673void ConnectionHandler::read_ocsp_chunk() {
674  std::array<uint8_t, 4_k> buf;
675  for (;;) {
676    ssize_t n;
677    while ((n = read(ocsp_.proc.rfd, buf.data(), buf.size())) == -1 &&
678           errno == EINTR)
679      ;
680
681    if (n == -1) {
682      if (errno == EAGAIN || errno == EWOULDBLOCK) {
683        return;
684      }
685      auto error = errno;
686      LOG(WARN) << "Reading from ocsp query command failed: errno=" << error;
687      ocsp_.error = error;
688
689      break;
690    }
691
692    if (n == 0) {
693      break;
694    }
695
696    std::copy_n(std::begin(buf), n, std::back_inserter(ocsp_.resp));
697  }
698
699  ev_io_stop(loop_, &ocsp_.rev);
700}
701
702void ConnectionHandler::handle_ocsp_complete() {
703  ev_io_stop(loop_, &ocsp_.rev);
704  ev_child_stop(loop_, &ocsp_.chldev);
705
706  assert(ocsp_.next < all_ssl_ctx_.size());
707#ifdef ENABLE_HTTP3
708  assert(all_ssl_ctx_.size() == quic_all_ssl_ctx_.size());
709#endif // ENABLE_HTTP3
710
711  auto ssl_ctx = all_ssl_ctx_[ocsp_.next];
712  auto tls_ctx_data =
713      static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx));
714
715  auto rstatus = ocsp_.chldev.rstatus;
716  auto status = WEXITSTATUS(rstatus);
717  if (ocsp_.error || !WIFEXITED(rstatus) || status != 0) {
718    LOG(WARN) << "ocsp query command for " << tls_ctx_data->cert_file
719              << " failed: error=" << ocsp_.error << ", rstatus=" << log::hex
720              << rstatus << log::dec << ", status=" << status;
721    ++ocsp_.next;
722    proceed_next_cert_ocsp();
723    return;
724  }
725
726  if (LOG_ENABLED(INFO)) {
727    LOG(INFO) << "ocsp update for " << tls_ctx_data->cert_file
728              << " finished successfully";
729  }
730
731  auto config = get_config();
732  auto &tlsconf = config->tls;
733
734  if (tlsconf.ocsp.no_verify ||
735      tls::verify_ocsp_response(ssl_ctx, ocsp_.resp.data(),
736                                ocsp_.resp.size()) == 0) {
737#ifdef ENABLE_HTTP3
738    // We have list of SSL_CTX with the same certificate in
739    // quic_all_ssl_ctx_ as well.  Some SSL_CTXs are missing there in
740    // that case we get nullptr.
741    auto quic_ssl_ctx = quic_all_ssl_ctx_[ocsp_.next];
742    if (quic_ssl_ctx) {
743#  ifndef OPENSSL_IS_BORINGSSL
744      auto quic_tls_ctx_data = static_cast<tls::TLSContextData *>(
745          SSL_CTX_get_app_data(quic_ssl_ctx));
746#    ifdef HAVE_ATOMIC_STD_SHARED_PTR
747      std::atomic_store_explicit(
748          &quic_tls_ctx_data->ocsp_data,
749          std::make_shared<std::vector<uint8_t>>(ocsp_.resp),
750          std::memory_order_release);
751#    else  // !HAVE_ATOMIC_STD_SHARED_PTR
752      std::lock_guard<std::mutex> g(quic_tls_ctx_data->mu);
753      quic_tls_ctx_data->ocsp_data =
754          std::make_shared<std::vector<uint8_t>>(ocsp_.resp);
755#    endif // !HAVE_ATOMIC_STD_SHARED_PTR
756#  else    // OPENSSL_IS_BORINGSSL
757      SSL_CTX_set_ocsp_response(quic_ssl_ctx, ocsp_.resp.data(),
758                                ocsp_.resp.size());
759#  endif   // OPENSSL_IS_BORINGSSL
760    }
761#endif // ENABLE_HTTP3
762
763#ifndef OPENSSL_IS_BORINGSSL
764#  ifdef HAVE_ATOMIC_STD_SHARED_PTR
765    std::atomic_store_explicit(
766        &tls_ctx_data->ocsp_data,
767        std::make_shared<std::vector<uint8_t>>(std::move(ocsp_.resp)),
768        std::memory_order_release);
769#  else  // !HAVE_ATOMIC_STD_SHARED_PTR
770    std::lock_guard<std::mutex> g(tls_ctx_data->mu);
771    tls_ctx_data->ocsp_data =
772        std::make_shared<std::vector<uint8_t>>(std::move(ocsp_.resp));
773#  endif // !HAVE_ATOMIC_STD_SHARED_PTR
774#else    // OPENSSL_IS_BORINGSSL
775    SSL_CTX_set_ocsp_response(ssl_ctx, ocsp_.resp.data(), ocsp_.resp.size());
776#endif   // OPENSSL_IS_BORINGSSL
777  }
778
779  ++ocsp_.next;
780  proceed_next_cert_ocsp();
781}
782
783void ConnectionHandler::reset_ocsp() {
784  if (ocsp_.proc.rfd != -1) {
785    close(ocsp_.proc.rfd);
786  }
787
788  ocsp_.proc.rfd = -1;
789  ocsp_.proc.pid = 0;
790  ocsp_.error = 0;
791  ocsp_.resp = std::vector<uint8_t>();
792}
793
794void ConnectionHandler::proceed_next_cert_ocsp() {
795  for (;;) {
796    reset_ocsp();
797    if (ocsp_.next == all_ssl_ctx_.size()) {
798      ocsp_.next = 0;
799      // We have updated all ocsp response, and schedule next update.
800      ev_timer_set(&ocsp_timer_, get_config()->tls.ocsp.update_interval, 0.);
801      ev_timer_start(loop_, &ocsp_timer_);
802
803      if (enable_acceptor_on_ocsp_completion_) {
804        enable_acceptor_on_ocsp_completion_ = false;
805        enable_acceptor();
806      }
807
808      return;
809    }
810
811    auto ssl_ctx = all_ssl_ctx_[ocsp_.next];
812    auto tls_ctx_data =
813        static_cast<tls::TLSContextData *>(SSL_CTX_get_app_data(ssl_ctx));
814
815    // client SSL_CTX is also included in all_ssl_ctx_, but has no
816    // tls_ctx_data.
817    if (!tls_ctx_data) {
818      ++ocsp_.next;
819      continue;
820    }
821
822    auto cert_file = tls_ctx_data->cert_file;
823
824    if (start_ocsp_update(cert_file) != 0) {
825      ++ocsp_.next;
826      continue;
827    }
828
829    break;
830  }
831}
832
833void ConnectionHandler::set_tls_ticket_key_memcached_dispatcher(
834    std::unique_ptr<MemcachedDispatcher> dispatcher) {
835  tls_ticket_key_memcached_dispatcher_ = std::move(dispatcher);
836}
837
838MemcachedDispatcher *
839ConnectionHandler::get_tls_ticket_key_memcached_dispatcher() const {
840  return tls_ticket_key_memcached_dispatcher_.get();
841}
842
843// Use the similar backoff algorithm described in
844// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
845namespace {
846constexpr size_t MAX_BACKOFF_EXP = 10;
847constexpr auto MULTIPLIER = 3.2;
848constexpr auto JITTER = 0.2;
849} // namespace
850
851void ConnectionHandler::on_tls_ticket_key_network_error(ev_timer *w) {
852  if (++tls_ticket_key_memcached_get_retry_count_ >=
853      get_config()->tls.ticket.memcached.max_retry) {
854    LOG(WARN) << "Memcached: tls ticket get retry all failed "
855              << tls_ticket_key_memcached_get_retry_count_ << " times.";
856
857    on_tls_ticket_key_not_found(w);
858    return;
859  }
860
861  auto base_backoff = util::int_pow(
862      MULTIPLIER,
863      std::min(MAX_BACKOFF_EXP, tls_ticket_key_memcached_get_retry_count_));
864  auto dist = std::uniform_real_distribution<>(-JITTER * base_backoff,
865                                               JITTER * base_backoff);
866
867  auto backoff = base_backoff + dist(gen_);
868
869  LOG(WARN)
870      << "Memcached: tls ticket get failed due to network error, retrying in "
871      << backoff << " seconds";
872
873  ev_timer_set(w, backoff, 0.);
874  ev_timer_start(loop_, w);
875}
876
877void ConnectionHandler::on_tls_ticket_key_not_found(ev_timer *w) {
878  tls_ticket_key_memcached_get_retry_count_ = 0;
879
880  if (++tls_ticket_key_memcached_fail_count_ >=
881      get_config()->tls.ticket.memcached.max_fail) {
882    LOG(WARN) << "Memcached: could not get tls ticket; disable tls ticket";
883
884    tls_ticket_key_memcached_fail_count_ = 0;
885
886    set_ticket_keys(nullptr);
887    set_ticket_keys_to_worker(nullptr);
888  }
889
890  LOG(WARN) << "Memcached: tls ticket get failed, schedule next";
891  schedule_next_tls_ticket_key_memcached_get(w);
892}
893
894void ConnectionHandler::on_tls_ticket_key_get_success(
895    const std::shared_ptr<TicketKeys> &ticket_keys, ev_timer *w) {
896  LOG(NOTICE) << "Memcached: tls ticket get success";
897
898  tls_ticket_key_memcached_get_retry_count_ = 0;
899  tls_ticket_key_memcached_fail_count_ = 0;
900
901  schedule_next_tls_ticket_key_memcached_get(w);
902
903  if (!ticket_keys || ticket_keys->keys.empty()) {
904    LOG(WARN) << "Memcached: tls ticket keys are empty; tls ticket disabled";
905    set_ticket_keys(nullptr);
906    set_ticket_keys_to_worker(nullptr);
907    return;
908  }
909
910  if (LOG_ENABLED(INFO)) {
911    LOG(INFO) << "ticket keys get done";
912    LOG(INFO) << 0 << " enc+dec: "
913              << util::format_hex(ticket_keys->keys[0].data.name);
914    for (size_t i = 1; i < ticket_keys->keys.size(); ++i) {
915      auto &key = ticket_keys->keys[i];
916      LOG(INFO) << i << " dec: " << util::format_hex(key.data.name);
917    }
918  }
919
920  set_ticket_keys(ticket_keys);
921  set_ticket_keys_to_worker(ticket_keys);
922}
923
924void ConnectionHandler::schedule_next_tls_ticket_key_memcached_get(
925    ev_timer *w) {
926  ev_timer_set(w, get_config()->tls.ticket.memcached.interval, 0.);
927  ev_timer_start(loop_, w);
928}
929
930SSL_CTX *ConnectionHandler::create_tls_ticket_key_memcached_ssl_ctx() {
931  auto config = get_config();
932  auto &tlsconf = config->tls;
933  auto &memcachedconf = config->tls.ticket.memcached;
934
935  auto ssl_ctx = tls::create_ssl_client_context(
936#ifdef HAVE_NEVERBLEED
937      nb_,
938#endif // HAVE_NEVERBLEED
939      tlsconf.cacert, memcachedconf.cert_file, memcachedconf.private_key_file,
940      nullptr);
941
942  all_ssl_ctx_.push_back(ssl_ctx);
943#ifdef ENABLE_HTTP3
944  quic_all_ssl_ctx_.push_back(nullptr);
945#endif // ENABLE_HTTP3
946
947  return ssl_ctx;
948}
949
950#ifdef HAVE_NEVERBLEED
951void ConnectionHandler::set_neverbleed(neverbleed_t *nb) { nb_ = nb; }
952#endif // HAVE_NEVERBLEED
953
954void ConnectionHandler::handle_serial_event() {
955  std::vector<SerialEvent> q;
956  {
957    std::lock_guard<std::mutex> g(serial_event_mu_);
958    q.swap(serial_events_);
959  }
960
961  for (auto &sev : q) {
962    switch (sev.type) {
963    case SerialEventType::REPLACE_DOWNSTREAM:
964      // Mmake sure that none of worker uses
965      // get_config()->conn.downstream
966      mod_config()->conn.downstream = sev.downstreamconf;
967
968      if (single_worker_) {
969        single_worker_->replace_downstream_config(sev.downstreamconf);
970
971        break;
972      }
973
974      worker_replace_downstream(sev.downstreamconf);
975
976      break;
977    default:
978      break;
979    }
980  }
981}
982
983void ConnectionHandler::send_replace_downstream(
984    const std::shared_ptr<DownstreamConfig> &downstreamconf) {
985  send_serial_event(
986      SerialEvent(SerialEventType::REPLACE_DOWNSTREAM, downstreamconf));
987}
988
989void ConnectionHandler::send_serial_event(SerialEvent ev) {
990  {
991    std::lock_guard<std::mutex> g(serial_event_mu_);
992
993    serial_events_.push_back(std::move(ev));
994  }
995
996  ev_async_send(loop_, &serial_event_asyncev_);
997}
998
999SSL_CTX *ConnectionHandler::get_ssl_ctx(size_t idx) const {
1000  return all_ssl_ctx_[idx];
1001}
1002
1003const std::vector<SSL_CTX *> &
1004ConnectionHandler::get_indexed_ssl_ctx(size_t idx) const {
1005  return indexed_ssl_ctx_[idx];
1006}
1007
1008#ifdef ENABLE_HTTP3
1009const std::vector<SSL_CTX *> &
1010ConnectionHandler::get_quic_indexed_ssl_ctx(size_t idx) const {
1011  return quic_indexed_ssl_ctx_[idx];
1012}
1013#endif // ENABLE_HTTP3
1014
1015void ConnectionHandler::set_enable_acceptor_on_ocsp_completion(bool f) {
1016  enable_acceptor_on_ocsp_completion_ = f;
1017}
1018
1019#ifdef ENABLE_HTTP3
1020int ConnectionHandler::forward_quic_packet(
1021    const UpstreamAddr *faddr, const Address &remote_addr,
1022    const Address &local_addr, const ngtcp2_pkt_info &pi,
1023    const uint8_t *cid_prefix, const uint8_t *data, size_t datalen) {
1024  assert(!get_config()->single_thread);
1025
1026  for (auto &worker : workers_) {
1027    if (!std::equal(cid_prefix, cid_prefix + SHRPX_QUIC_CID_PREFIXLEN,
1028                    worker->get_cid_prefix())) {
1029      continue;
1030    }
1031
1032    WorkerEvent wev{};
1033    wev.type = WorkerEventType::QUIC_PKT_FORWARD;
1034    wev.quic_pkt = std::make_unique<QUICPacket>(faddr->index, remote_addr,
1035                                                local_addr, pi, data, datalen);
1036
1037    worker->send(std::move(wev));
1038
1039    return 0;
1040  }
1041
1042  return -1;
1043}
1044
1045void ConnectionHandler::set_quic_keying_materials(
1046    std::shared_ptr<QUICKeyingMaterials> qkms) {
1047  quic_keying_materials_ = std::move(qkms);
1048}
1049
1050const std::shared_ptr<QUICKeyingMaterials> &
1051ConnectionHandler::get_quic_keying_materials() const {
1052  return quic_keying_materials_;
1053}
1054
1055void ConnectionHandler::set_cid_prefixes(
1056    const std::vector<std::array<uint8_t, SHRPX_QUIC_CID_PREFIXLEN>>
1057        &cid_prefixes) {
1058  cid_prefixes_ = cid_prefixes;
1059}
1060
1061QUICLingeringWorkerProcess *
1062ConnectionHandler::match_quic_lingering_worker_process_cid_prefix(
1063    const uint8_t *dcid, size_t dcidlen) {
1064  assert(dcidlen >= SHRPX_QUIC_CID_PREFIXLEN);
1065
1066  for (auto &lwps : quic_lingering_worker_processes_) {
1067    for (auto &cid_prefix : lwps.cid_prefixes) {
1068      if (std::equal(std::begin(cid_prefix), std::end(cid_prefix), dcid)) {
1069        return &lwps;
1070      }
1071    }
1072  }
1073
1074  return nullptr;
1075}
1076
1077#  ifdef HAVE_LIBBPF
1078std::vector<BPFRef> &ConnectionHandler::get_quic_bpf_refs() {
1079  return quic_bpf_refs_;
1080}
1081
1082void ConnectionHandler::unload_bpf_objects() {
1083  LOG(NOTICE) << "Unloading BPF objects";
1084
1085  for (auto &ref : quic_bpf_refs_) {
1086    if (ref.obj == nullptr) {
1087      continue;
1088    }
1089
1090    bpf_object__close(ref.obj);
1091
1092    ref.obj = nullptr;
1093  }
1094}
1095#  endif // HAVE_LIBBPF
1096
1097void ConnectionHandler::set_quic_ipc_fd(int fd) { quic_ipc_fd_ = fd; }
1098
1099void ConnectionHandler::set_quic_lingering_worker_processes(
1100    const std::vector<QUICLingeringWorkerProcess> &quic_lwps) {
1101  quic_lingering_worker_processes_ = quic_lwps;
1102}
1103
1104int ConnectionHandler::forward_quic_packet_to_lingering_worker_process(
1105    QUICLingeringWorkerProcess *quic_lwp, const Address &remote_addr,
1106    const Address &local_addr, const ngtcp2_pkt_info &pi, const uint8_t *data,
1107    size_t datalen) {
1108  std::array<uint8_t, 512> header;
1109
1110  assert(header.size() >= 1 + 1 + 1 + 1 + sizeof(sockaddr_storage) * 2);
1111  assert(remote_addr.len > 0);
1112  assert(local_addr.len > 0);
1113
1114  auto p = header.data();
1115
1116  *p++ = static_cast<uint8_t>(QUICIPCType::DGRAM_FORWARD);
1117  *p++ = static_cast<uint8_t>(remote_addr.len - 1);
1118  p = std::copy_n(reinterpret_cast<const uint8_t *>(&remote_addr.su),
1119                  remote_addr.len, p);
1120  *p++ = static_cast<uint8_t>(local_addr.len - 1);
1121  p = std::copy_n(reinterpret_cast<const uint8_t *>(&local_addr.su),
1122                  local_addr.len, p);
1123  *p++ = pi.ecn;
1124
1125  iovec msg_iov[] = {
1126      {
1127          .iov_base = header.data(),
1128          .iov_len = static_cast<size_t>(p - header.data()),
1129      },
1130      {
1131          .iov_base = const_cast<uint8_t *>(data),
1132          .iov_len = datalen,
1133      },
1134  };
1135
1136  msghdr msg{};
1137  msg.msg_iov = msg_iov;
1138  msg.msg_iovlen = array_size(msg_iov);
1139
1140  ssize_t nwrite;
1141
1142  while ((nwrite = sendmsg(quic_lwp->quic_ipc_fd, &msg, 0)) == -1 &&
1143         errno == EINTR)
1144    ;
1145
1146  if (nwrite == -1) {
1147    std::array<char, STRERROR_BUFSIZE> errbuf;
1148
1149    auto error = errno;
1150    LOG(ERROR) << "Failed to send QUIC IPC message: "
1151               << xsi_strerror(error, errbuf.data(), errbuf.size());
1152
1153    return -1;
1154  }
1155
1156  return 0;
1157}
1158
1159int ConnectionHandler::quic_ipc_read() {
1160  std::array<uint8_t, 65536> buf;
1161
1162  ssize_t nread;
1163
1164  while ((nread = recv(quic_ipc_fd_, buf.data(), buf.size(), 0)) == -1 &&
1165         errno == EINTR)
1166    ;
1167
1168  if (nread == -1) {
1169    std::array<char, STRERROR_BUFSIZE> errbuf;
1170
1171    auto error = errno;
1172    LOG(ERROR) << "Failed to read data from QUIC IPC channel: "
1173               << xsi_strerror(error, errbuf.data(), errbuf.size());
1174
1175    return -1;
1176  }
1177
1178  if (nread == 0) {
1179    return 0;
1180  }
1181
1182  size_t len = 1 + 1 + 1 + 1;
1183
1184  // Wire format:
1185  // TYPE(1) REMOTE_ADDRLEN(1) REMOTE_ADDR(N) LOCAL_ADDRLEN(1) LOCAL_ADDR(N)
1186  // ECN(1) DGRAM_PAYLOAD(N)
1187  //
1188  // When encoding, REMOTE_ADDRLEN and LOCAL_ADDRLEN are decremented
1189  // by 1.
1190  if (static_cast<size_t>(nread) < len) {
1191    return 0;
1192  }
1193
1194  auto p = buf.data();
1195  if (*p != static_cast<uint8_t>(QUICIPCType::DGRAM_FORWARD)) {
1196    LOG(ERROR) << "Unknown QUICIPCType: " << static_cast<uint32_t>(*p);
1197
1198    return -1;
1199  }
1200
1201  ++p;
1202
1203  auto pkt = std::make_unique<QUICPacket>();
1204
1205  auto remote_addrlen = static_cast<size_t>(*p++) + 1;
1206  if (remote_addrlen > sizeof(sockaddr_storage)) {
1207    LOG(ERROR) << "The length of remote address is too large: "
1208               << remote_addrlen;
1209
1210    return -1;
1211  }
1212
1213  len += remote_addrlen;
1214
1215  if (static_cast<size_t>(nread) < len) {
1216    LOG(ERROR) << "Insufficient QUIC IPC message length";
1217
1218    return -1;
1219  }
1220
1221  pkt->remote_addr.len = remote_addrlen;
1222  memcpy(&pkt->remote_addr.su, p, remote_addrlen);
1223
1224  p += remote_addrlen;
1225
1226  auto local_addrlen = static_cast<size_t>(*p++) + 1;
1227  if (local_addrlen > sizeof(sockaddr_storage)) {
1228    LOG(ERROR) << "The length of local address is too large: " << local_addrlen;
1229
1230    return -1;
1231  }
1232
1233  len += local_addrlen;
1234
1235  if (static_cast<size_t>(nread) < len) {
1236    LOG(ERROR) << "Insufficient QUIC IPC message length";
1237
1238    return -1;
1239  }
1240
1241  pkt->local_addr.len = local_addrlen;
1242  memcpy(&pkt->local_addr.su, p, local_addrlen);
1243
1244  p += local_addrlen;
1245
1246  pkt->pi.ecn = *p++;
1247
1248  auto datalen = nread - (p - buf.data());
1249
1250  pkt->data.assign(p, p + datalen);
1251
1252  // At the moment, UpstreamAddr index is unknown.
1253  pkt->upstream_addr_index = static_cast<size_t>(-1);
1254
1255  ngtcp2_version_cid vc;
1256
1257  auto rv = ngtcp2_pkt_decode_version_cid(&vc, p, datalen, SHRPX_QUIC_SCIDLEN);
1258  if (rv < 0) {
1259    LOG(ERROR) << "ngtcp2_pkt_decode_version_cid: " << ngtcp2_strerror(rv);
1260
1261    return -1;
1262  }
1263
1264  if (vc.dcidlen != SHRPX_QUIC_SCIDLEN) {
1265    LOG(ERROR) << "DCID length is invalid";
1266    return -1;
1267  }
1268
1269  if (single_worker_) {
1270    auto faddr = single_worker_->find_quic_upstream_addr(pkt->local_addr);
1271    if (faddr == nullptr) {
1272      LOG(ERROR) << "No suitable upstream address found";
1273
1274      return 0;
1275    }
1276
1277    auto quic_conn_handler = single_worker_->get_quic_connection_handler();
1278
1279    // Ignore return value
1280    quic_conn_handler->handle_packet(faddr, pkt->remote_addr, pkt->local_addr,
1281                                     pkt->pi, pkt->data.data(),
1282                                     pkt->data.size());
1283
1284    return 0;
1285  }
1286
1287  auto &qkm = quic_keying_materials_->keying_materials.front();
1288
1289  std::array<uint8_t, SHRPX_QUIC_DECRYPTED_DCIDLEN> decrypted_dcid;
1290
1291  if (decrypt_quic_connection_id(decrypted_dcid.data(),
1292                                 vc.dcid + SHRPX_QUIC_CID_PREFIX_OFFSET,
1293                                 qkm.cid_encryption_key.data()) != 0) {
1294    return -1;
1295  }
1296
1297  for (auto &worker : workers_) {
1298    if (!std::equal(std::begin(decrypted_dcid),
1299                    std::begin(decrypted_dcid) + SHRPX_QUIC_CID_PREFIXLEN,
1300                    worker->get_cid_prefix())) {
1301      continue;
1302    }
1303
1304    WorkerEvent wev{
1305        .type = WorkerEventType::QUIC_PKT_FORWARD,
1306        .quic_pkt = std::move(pkt),
1307    };
1308    worker->send(std::move(wev));
1309
1310    return 0;
1311  }
1312
1313  if (LOG_ENABLED(INFO)) {
1314    LOG(INFO) << "No worker to match CID prefix";
1315  }
1316
1317  return 0;
1318}
1319#endif // ENABLE_HTTP3
1320
1321} // namespace shrpx
1322