1From 1e86ca5834b94cae7d5e6d219056c0fc895cf95d Mon Sep 17 00:00:00 2001 2From: AJ Heller <hork@google.com> 3Date: Wed, 12 Jul 2023 18:42:09 -0700 4Subject: [PATCH] [backport][iomgr][EventEngine] Improve server handling of 5 file descriptor exhaustion (#33672) 6 7Backport of #33656 8--- 9 src/core/lib/iomgr/tcp_server_posix.cc | 46 ++++++++++++++----- 10 src/core/lib/iomgr/tcp_server_utils_posix.h | 13 +++++ 11 .../iomgr/tcp_server_utils_posix_common.cc | 21 ++++++++ 12 3 files changed, 67 insertions(+), 13 deletions(-) 13 14diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc 15index a1db16d916..6804928fe3 100644 16--- a/src/core/lib/iomgr/tcp_server_posix.cc 17+++ b/src/core/lib/iomgr/tcp_server_posix.cc 18@@ -16,13 +16,17 @@ 19 * 20 */ 21 22+#include <grpc/support/port_platform.h> 23+ 24+#include <utility> 25+ 26+#include <grpc/support/atm.h> 27+ 28 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */ 29 #ifndef _GNU_SOURCE 30 #define _GNU_SOURCE 31 #endif 32 33-#include <grpc/support/port_platform.h> 34- 35 #include "src/core/lib/iomgr/port.h" 36 37 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER 38@@ -45,6 +49,7 @@ 39 #include "absl/strings/str_cat.h" 40 #include "absl/strings/str_format.h" 41 42+#include <grpc/event_engine/event_engine.h> 43 #include <grpc/support/alloc.h> 44 #include <grpc/support/log.h> 45 #include <grpc/support/sync.h> 46@@ -350,21 +357,35 @@ static void on_read(void* arg, grpc_error_handle err) { 47 if (fd < 0) { 48 if (errno == EINTR) { 49 continue; 50- } else if (errno == EAGAIN || errno == ECONNABORTED || 51- errno == EWOULDBLOCK) { 52+ } 53+ // When the process runs out of fds, accept4() returns EMFILE. When this 54+ // happens, the connection is left in the accept queue until either a 55+ // read event triggers the on_read callback, or time has passed and the 56+ // accept should be re-tried regardless. This callback is not cancelled, 57+ // so a spurious wakeup may occur even when there's nothing to accept. 58+ // This is not a performant code path, but if an fd limit has been 59+ // reached, the system is likely in an unhappy state regardless. 60+ if (errno == EMFILE) { 61+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); 62+ if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return; 63+ grpc_timer_init(&sp->retry_timer, 64+ grpc_core::ExecCtx::Get()->Now() + 1 * GPR_MS_PER_SEC, 65+ &sp->retry_closure); 66+ return; 67+ } 68+ if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) { 69 grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); 70 return; 71+ } 72+ gpr_mu_lock(&sp->server->mu); 73+ if (!sp->server->shutdown_listeners) { 74+ gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); 75 } else { 76- gpr_mu_lock(&sp->server->mu); 77- if (!sp->server->shutdown_listeners) { 78- gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); 79- } else { 80- /* if we have shutdown listeners, accept4 could fail, and we 81- needn't notify users */ 82- } 83- gpr_mu_unlock(&sp->server->mu); 84- goto error; 85+ // if we have shutdown listeners, accept4 could fail, and we 86+ // needn't notify users 87 } 88+ gpr_mu_unlock(&sp->server->mu); 89+ goto error; 90 } 91 92 /* For UNIX sockets, the accept call might not fill up the member sun_path 93@@ -558,6 +581,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener, 94 sp->port_index = listener->port_index; 95 sp->fd_index = listener->fd_index + count - i; 96 GPR_ASSERT(sp->emfd); 97+ grpc_tcp_server_listener_initialize_retry_timer(sp); 98 while (listener->server->tail->next != nullptr) { 99 listener->server->tail = listener->server->tail->next; 100 } 101@@ -791,6 +815,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) { 102 if (s->active_ports) { 103 grpc_tcp_listener* sp; 104 for (sp = s->head; sp; sp = sp->next) { 105+ grpc_timer_cancel(&sp->retry_timer); 106 grpc_fd_shutdown(sp->emfd, 107 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown")); 108 } 109diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h 110index 26cef0209f..de5a888cff 100644 111--- a/src/core/lib/iomgr/tcp_server_utils_posix.h 112+++ b/src/core/lib/iomgr/tcp_server_utils_posix.h 113@@ -30,6 +30,7 @@ 114 #include "src/core/lib/iomgr/resolve_address.h" 115 #include "src/core/lib/iomgr/socket_utils_posix.h" 116 #include "src/core/lib/iomgr/tcp_server.h" 117+#include "src/core/lib/iomgr/timer.h" 118 119 /* one listening port */ 120 typedef struct grpc_tcp_listener { 121@@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener { 122 identified while iterating through 'next'. */ 123 struct grpc_tcp_listener* sibling; 124 int is_sibling; 125+ // If an accept4() call fails, a timer is started to drain the accept queue in 126+ // case no further connection attempts reach the gRPC server. 127+ grpc_closure retry_closure; 128+ grpc_timer retry_timer; 129+ gpr_atm retry_timer_armed; 130 } grpc_tcp_listener; 131 132 /* the overall server */ 133@@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket( 134 /* Ruturn true if the platform supports ifaddrs */ 135 bool grpc_tcp_server_have_ifaddrs(void); 136 137+// Initialize (but don't start) the timer and callback to retry accept4() on a 138+// listening socket after file descriptors have been exhausted. This must be 139+// called when creating a new listener. 140+void grpc_tcp_server_listener_initialize_retry_timer( 141+ grpc_tcp_listener* listener); 142+ 143 #endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ 144diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc 145index 574fd02d0d..a32f542c4a 100644 146--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc 147+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc 148@@ -18,6 +18,8 @@ 149 150 #include <grpc/support/port_platform.h> 151 152+#include <grpc/support/atm.h> 153+ 154 #include "src/core/lib/iomgr/port.h" 155 156 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 157@@ -81,6 +83,24 @@ static int get_max_accept_queue_size(void) { 158 return s_max_accept_queue_size; 159 } 160 161+static void listener_retry_timer_cb(void* arg, grpc_error_handle err) { 162+ // Do nothing if cancelled. 163+ if (err != GRPC_ERROR_NONE) return; 164+ grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg); 165+ gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); 166+ if (!grpc_fd_is_shutdown(listener->emfd)) { 167+ grpc_fd_set_readable(listener->emfd); 168+ } 169+} 170+ 171+void grpc_tcp_server_listener_initialize_retry_timer( 172+ grpc_tcp_listener* listener) { 173+ gpr_atm_no_barrier_store(&listener->retry_timer_armed, false); 174+ grpc_timer_init_unset(&listener->retry_timer); 175+ GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener, 176+ grpc_schedule_on_exec_ctx); 177+} 178+ 179 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, 180 const grpc_resolved_address* addr, 181 unsigned port_index, 182@@ -112,6 +132,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd, 183 sp->server = s; 184 sp->fd = fd; 185 sp->emfd = grpc_fd_create(fd, name.c_str(), true); 186+ grpc_tcp_server_listener_initialize_retry_timer(sp); 187 memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); 188 sp->port = port; 189 sp->port_index = port_index; 190-- 1912.33.0 192 193