1From 1e86ca5834b94cae7d5e6d219056c0fc895cf95d Mon Sep 17 00:00:00 2001
2From: AJ Heller <hork@google.com>
3Date: Wed, 12 Jul 2023 18:42:09 -0700
4Subject: [PATCH] [backport][iomgr][EventEngine] Improve server handling of
5 file descriptor exhaustion (#33672)
6
7Backport of #33656
8---
9 src/core/lib/iomgr/tcp_server_posix.cc        | 46 ++++++++++++++-----
10 src/core/lib/iomgr/tcp_server_utils_posix.h   | 13 +++++
11 .../iomgr/tcp_server_utils_posix_common.cc    | 21 ++++++++
12 3 files changed, 67 insertions(+), 13 deletions(-)
13
14diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
15index a1db16d916..6804928fe3 100644
16--- a/src/core/lib/iomgr/tcp_server_posix.cc
17+++ b/src/core/lib/iomgr/tcp_server_posix.cc
18@@ -16,13 +16,17 @@
19  *
20  */
21 
22+#include <grpc/support/port_platform.h>
23+
24+#include <utility>
25+
26+#include <grpc/support/atm.h>
27+
28 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
29 #ifndef _GNU_SOURCE
30 #define _GNU_SOURCE
31 #endif
32 
33-#include <grpc/support/port_platform.h>
34-
35 #include "src/core/lib/iomgr/port.h"
36 
37 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
38@@ -45,6 +49,7 @@
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 
42+#include <grpc/event_engine/event_engine.h>
43 #include <grpc/support/alloc.h>
44 #include <grpc/support/log.h>
45 #include <grpc/support/sync.h>
46@@ -350,21 +357,35 @@ static void on_read(void* arg, grpc_error_handle err) {
47     if (fd < 0) {
48       if (errno == EINTR) {
49         continue;
50-      } else if (errno == EAGAIN || errno == ECONNABORTED ||
51-                 errno == EWOULDBLOCK) {
52+      }
53+      // When the process runs out of fds, accept4() returns EMFILE. When this
54+      // happens, the connection is left in the accept queue until either a
55+      // read event triggers the on_read callback, or time has passed and the
56+      // accept should be re-tried regardless. This callback is not cancelled,
57+      // so a spurious wakeup may occur even when there's nothing to accept.
58+      // This is not a performant code path, but if an fd limit has been
59+      // reached, the system is likely in an unhappy state regardless.
60+      if (errno == EMFILE) {
61+        grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
62+        if (gpr_atm_full_xchg(&sp->retry_timer_armed, true)) return;
63+        grpc_timer_init(&sp->retry_timer,
64+                        grpc_core::ExecCtx::Get()->Now() + 1 * GPR_MS_PER_SEC,
65+                        &sp->retry_closure);
66+        return;
67+      }
68+      if (errno == EAGAIN || errno == ECONNABORTED || errno == EWOULDBLOCK) {
69         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
70         return;
71+      }
72+      gpr_mu_lock(&sp->server->mu);
73+      if (!sp->server->shutdown_listeners) {
74+        gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
75       } else {
76-        gpr_mu_lock(&sp->server->mu);
77-        if (!sp->server->shutdown_listeners) {
78-          gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
79-        } else {
80-          /* if we have shutdown listeners, accept4 could fail, and we
81-             needn't notify users */
82-        }
83-        gpr_mu_unlock(&sp->server->mu);
84-        goto error;
85+        // if we have shutdown listeners, accept4 could fail, and we
86+        // needn't notify users
87       }
88+      gpr_mu_unlock(&sp->server->mu);
89+      goto error;
90     }
91 
92     /* For UNIX sockets, the accept call might not fill up the member sun_path
93@@ -558,6 +581,7 @@ static grpc_error_handle clone_port(grpc_tcp_listener* listener,
94     sp->port_index = listener->port_index;
95     sp->fd_index = listener->fd_index + count - i;
96     GPR_ASSERT(sp->emfd);
97+    grpc_tcp_server_listener_initialize_retry_timer(sp);
98     while (listener->server->tail->next != nullptr) {
99       listener->server->tail = listener->server->tail->next;
100     }
101@@ -791,6 +815,7 @@ static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
102   if (s->active_ports) {
103     grpc_tcp_listener* sp;
104     for (sp = s->head; sp; sp = sp->next) {
105+      grpc_timer_cancel(&sp->retry_timer);
106       grpc_fd_shutdown(sp->emfd,
107                        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
108     }
109diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h
110index 26cef0209f..de5a888cff 100644
111--- a/src/core/lib/iomgr/tcp_server_utils_posix.h
112+++ b/src/core/lib/iomgr/tcp_server_utils_posix.h
113@@ -30,6 +30,7 @@
114 #include "src/core/lib/iomgr/resolve_address.h"
115 #include "src/core/lib/iomgr/socket_utils_posix.h"
116 #include "src/core/lib/iomgr/tcp_server.h"
117+#include "src/core/lib/iomgr/timer.h"
118 
119 /* one listening port */
120 typedef struct grpc_tcp_listener {
121@@ -52,6 +53,11 @@ typedef struct grpc_tcp_listener {
122      identified while iterating through 'next'. */
123   struct grpc_tcp_listener* sibling;
124   int is_sibling;
125+  // If an accept4() call fails, a timer is started to drain the accept queue in
126+  // case no further connection attempts reach the gRPC server.
127+  grpc_closure retry_closure;
128+  grpc_timer retry_timer;
129+  gpr_atm retry_timer_armed;
130 } grpc_tcp_listener;
131 
132 /* the overall server */
133@@ -139,4 +145,10 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
134 /* Ruturn true if the platform supports ifaddrs */
135 bool grpc_tcp_server_have_ifaddrs(void);
136 
137+// Initialize (but don't start) the timer and callback to retry accept4() on a
138+// listening socket after file descriptors have been exhausted. This must be
139+// called when creating a new listener.
140+void grpc_tcp_server_listener_initialize_retry_timer(
141+    grpc_tcp_listener* listener);
142+
143 #endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */
144diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
145index 574fd02d0d..a32f542c4a 100644
146--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
147+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
148@@ -18,6 +18,8 @@
149 
150 #include <grpc/support/port_platform.h>
151 
152+#include <grpc/support/atm.h>
153+
154 #include "src/core/lib/iomgr/port.h"
155 
156 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
157@@ -81,6 +83,24 @@ static int get_max_accept_queue_size(void) {
158   return s_max_accept_queue_size;
159 }
160 
161+static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
162+  // Do nothing if cancelled.
163+  if (err != GRPC_ERROR_NONE) return;
164+  grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
165+  gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
166+  if (!grpc_fd_is_shutdown(listener->emfd)) {
167+    grpc_fd_set_readable(listener->emfd);
168+  }
169+}
170+
171+void grpc_tcp_server_listener_initialize_retry_timer(
172+    grpc_tcp_listener* listener) {
173+  gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
174+  grpc_timer_init_unset(&listener->retry_timer);
175+  GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
176+                    grpc_schedule_on_exec_ctx);
177+}
178+
179 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
180                                               const grpc_resolved_address* addr,
181                                               unsigned port_index,
182@@ -112,6 +132,7 @@ static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
183     sp->server = s;
184     sp->fd = fd;
185     sp->emfd = grpc_fd_create(fd, name.c_str(), true);
186+    grpc_tcp_server_listener_initialize_retry_timer(sp);
187     memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
188     sp->port = port;
189     sp->port_index = port_index;
190-- 
1912.33.0
192
193