1diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/chttp2_transport.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/chttp2_transport.cc 2--- grpc-1.41.1/src/core/ext/transport/chttp2/transport/chttp2_transport.cc 2021-10-20 04:14:40.000000000 +0800 3+++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/chttp2_transport.cc 2024-07-31 17:54:56.767196600 +0800 4@@ -35,6 +35,7 @@ 5 #include "src/core/ext/transport/chttp2/transport/frame_data.h" 6 #include "src/core/ext/transport/chttp2/transport/internal.h" 7 #include "src/core/ext/transport/chttp2/transport/varint.h" 8+#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" 9 #include "src/core/lib/channel/channel_args.h" 10 #include "src/core/lib/compression/stream_compression.h" 11 #include "src/core/lib/debug/stats.h" 12@@ -378,6 +379,9 @@ 13 if (value >= 0) { 14 queue_setting_update(t, settings_map[j].setting_id, 15 static_cast<uint32_t>(value)); 16+ if (settings_map[j].setting_id == GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) { 17+ t->max_concurrent_streams_policy.SetTarget(value); 18+ } 19 } 20 } 21 break; 22diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/internal.h third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/internal.h 23--- grpc-1.41.1/src/core/ext/transport/chttp2/transport/internal.h 2021-10-20 04:14:40.000000000 +0800 24+++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/internal.h 2024-07-31 17:16:44.925139300 +0800 25@@ -36,6 +36,7 @@ 26 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" 27 #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" 28 #include "src/core/ext/transport/chttp2/transport/stream_map.h" 29+#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" 30 #include "src/core/lib/channel/channelz.h" 31 #include "src/core/lib/compression/stream_compression.h" 32 #include "src/core/lib/gprpp/manual_constructor.h" 33@@ -387,6 +388,8 @@ 34 uint64_t ping_ctr = 0; /* unique id for pings */ 35 grpc_closure retry_initiate_ping_locked; 36 37+ grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy; 38+ 39 /** ping acks */ 40 size_t ping_ack_count = 0; 41 size_t ping_ack_capacity = 0; 42@@ -482,6 +485,8 @@ 43 /** grace period for a ping to complete before watchdog kicks in */ 44 grpc_millis keepalive_timeout; 45 /** if keepalive pings are allowed when there's no outstanding streams */ 46+ /// number of stream objects currently allocated by this transport 47+ std::atomic<size_t> streams_allocated{0}; 48 bool keepalive_permit_without_calls = false; 49 /** If start_keepalive_ping_locked has been called */ 50 bool keepalive_ping_started = false; 51diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc 52--- grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc 1970-01-01 08:00:00.000000000 +0800 53+++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc 2024-07-31 17:16:44.925139300 +0800 54@@ -0,0 +1,44 @@ 55+// Copyright 2023 gRPC authors. 56+// 57+// Licensed under the Apache License, Version 2.0 (the "License"); 58+// you may not use this file except in compliance with the License. 59+// You may obtain a copy of the License at 60+// 61+// http://www.apache.org/licenses/LICENSE-2.0 62+// 63+// Unless required by applicable law or agreed to in writing, software 64+// distributed under the License is distributed on an "AS IS" BASIS, 65+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 66+// See the License for the specific language governing permissions and 67+// limitations under the License. 68+ 69+#include <grpc/support/port_platform.h> 70+ 71+#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" 72+ 73+#include <utility> 74+ 75+#include <grpc/support/log.h> 76+ 77+namespace grpc_core { 78+ 79+void Chttp2MaxConcurrentStreamsPolicy::AddDemerit() { 80+ ++new_demerits_; 81+ ++unacked_demerits_; 82+} 83+ 84+void Chttp2MaxConcurrentStreamsPolicy::FlushedSettings() { 85+ sent_demerits_ += std::exchange(new_demerits_, 0); 86+} 87+ 88+void Chttp2MaxConcurrentStreamsPolicy::AckLastSend() { 89+ GPR_ASSERT(unacked_demerits_ >= sent_demerits_); 90+ unacked_demerits_ -= std::exchange(sent_demerits_, 0); 91+} 92+ 93+uint32_t Chttp2MaxConcurrentStreamsPolicy::AdvertiseValue() const { 94+ if (target_ < unacked_demerits_) return 0; 95+ return target_ - unacked_demerits_; 96+} 97+ 98+} // namespace grpc_core 99diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h 100--- grpc-1.41.1/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h 1970-01-01 08:00:00.000000000 +0800 101+++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h 2024-07-31 17:16:44.925139300 +0800 102@@ -0,0 +1,67 @@ 103+// Copyright 2023 gRPC authors. 104+// 105+// Licensed under the Apache License, Version 2.0 (the "License"); 106+// you may not use this file except in compliance with the License. 107+// You may obtain a copy of the License at 108+// 109+// http://www.apache.org/licenses/LICENSE-2.0 110+// 111+// Unless required by applicable law or agreed to in writing, software 112+// distributed under the License is distributed on an "AS IS" BASIS, 113+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 114+// See the License for the specific language governing permissions and 115+// limitations under the License. 116+ 117+#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H 118+#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H 119+ 120+#include <grpc/support/port_platform.h> 121+ 122+#include <cstdint> 123+#include <limits> 124+ 125+namespace grpc_core { 126+ 127+class Chttp2MaxConcurrentStreamsPolicy { 128+ public: 129+ // Set the target number of concurrent streams. 130+ // If everything is idle we should advertise this number. 131+ void SetTarget(uint32_t target) { target_ = target; } 132+ 133+ // Add one demerit to the current target. 134+ // We need to do one full settings round trip after this to clear this 135+ // demerit. 136+ // It will reduce our advertised max concurrent streams by one. 137+ void AddDemerit(); 138+ 139+ // Notify the policy that we've sent a settings frame. 140+ // Newly added demerits since the last settings frame was sent will be cleared 141+ // once that settings frame is acknowledged. 142+ void FlushedSettings(); 143+ 144+ // Notify the policy that we've received an acknowledgement for the last 145+ // settings frame we sent. 146+ void AckLastSend(); 147+ 148+ // Returns what we should advertise as max concurrent streams. 149+ uint32_t AdvertiseValue() const; 150+ 151+ private: 152+ uint32_t target_ = std::numeric_limits<int32_t>::max(); 153+ // Demerit flow: 154+ // When we add a demerit, we add to both new & unacked. 155+ // When we flush settings, we move new to sent. 156+ // When we ack settings, we remove what we sent from unacked. 157+ // eg: 158+ // we add 10 demerits - now new=10, sent=0, unacked=10 159+ // we send settings - now new=0, sent=10, unacked=10 160+ // we add 5 demerits - now new=5, sent=10, unacked=15 161+ // we get the settings ack - now new=5, sent=0, unacked=5 162+ uint32_t new_demerits_ = 0; 163+ uint32_t sent_demerits_ = 0; 164+ uint32_t unacked_demerits_ = 0; 165+}; 166+ 167+} // namespace grpc_core 168+ 169+#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_MAX_CONCURRENT_STREAMS_POLICY_H 170diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/parsing.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/parsing.cc 171--- grpc-1.41.1/src/core/ext/transport/chttp2/transport/parsing.cc 2021-10-20 04:14:40.000000000 +0800 172+++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/parsing.cc 2024-07-31 17:16:44.925139300 +0800 173@@ -27,6 +27,7 @@ 174 #include <grpc/support/log.h> 175 176 #include "src/core/ext/transport/chttp2/transport/internal.h" 177+#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" 178 #include "src/core/lib/profiling/timers.h" 179 #include "src/core/lib/slice/slice_string_helpers.h" 180 #include "src/core/lib/slice/slice_utils.h" 181@@ -636,6 +637,17 @@ 182 t->settings[GRPC_ACKED_SETTINGS] 183 [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS])) { 184 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Max stream count exceeded"); 185+ } else if (GPR_UNLIKELY( 186+ t->streams_allocated.load(std::memory_order_relaxed) > 187+ t->max_concurrent_streams_policy.AdvertiseValue())) { 188+ // We have more streams allocated than we'd like, so apply some pushback 189+ // by refusing this stream. 190+ ++t->num_pending_induced_frames; 191+ grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_rst_stream_create( 192+ t->incoming_stream_id, 193+ GRPC_HTTP2_REFUSED_STREAM, nullptr)); 194+ grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM); 195+ return init_header_skip_frame_parser(t, priority_type); 196 } 197 t->last_new_stream_id = t->incoming_stream_id; 198 s = t->incoming_stream = 199@@ -756,6 +768,7 @@ 200 return err; 201 } 202 if (t->incoming_frame_flags & GRPC_CHTTP2_FLAG_ACK) { 203+ t->max_concurrent_streams_policy.AckLastSend(); 204 memcpy(t->settings[GRPC_ACKED_SETTINGS], t->settings[GRPC_SENT_SETTINGS], 205 GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t)); 206 t->hpack_parser.hpack_table()->SetMaxBytes( 207@@ -765,6 +778,9 @@ 208 } 209 t->parser = grpc_chttp2_settings_parser_parse; 210 t->parser_data = &t->simple.settings; 211+ if (!t->is_client) { 212+ t->max_concurrent_streams_policy.AddDemerit(); 213+ } 214 return GRPC_ERROR_NONE; 215 } 216 217diff -Naur grpc-1.41.1/src/core/ext/transport/chttp2/transport/writing.cc third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/writing.cc 218--- grpc-1.41.1/src/core/ext/transport/chttp2/transport/writing.cc 2021-10-20 04:14:40.000000000 +0800 219+++ third_party_grpc_sxy/src/core/ext/transport/chttp2/transport/writing.cc 2024-07-31 17:16:44.925139300 +0800 220@@ -25,6 +25,7 @@ 221 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 222 #include "src/core/ext/transport/chttp2/transport/context_list.h" 223 #include "src/core/ext/transport/chttp2/transport/internal.h" 224+#include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h" 225 #include "src/core/lib/compression/stream_compression.h" 226 #include "src/core/lib/debug/stats.h" 227 #include "src/core/lib/profiling/timers.h" 228@@ -217,7 +218,15 @@ 229 } 230 231 void FlushSettings() { 232- if (t_->dirtied_local_settings && !t_->sent_local_settings) { 233+ const bool dirty = 234+ t_->dirtied_local_settings || 235+ t_->settings[GRPC_SENT_SETTINGS] 236+ [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] != 237+ t_->max_concurrent_streams_policy.AdvertiseValue(); 238+ if (dirty && !t_->sent_local_settings) { 239+ t_->settings[GRPC_LOCAL_SETTINGS] 240+ [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] = 241+ t_->max_concurrent_streams_policy.AdvertiseValue(); 242 grpc_slice_buffer_add( 243 &t_->outbuf, grpc_chttp2_settings_create( 244 t_->settings[GRPC_SENT_SETTINGS], 245@@ -226,6 +235,7 @@ 246 t_->force_send_settings = false; 247 t_->dirtied_local_settings = false; 248 t_->sent_local_settings = true; 249+ t_->max_concurrent_streams_policy.FlushedSettings(); 250 GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(); 251 } 252 } 253diff -Naur grpc-1.41.1/src/python/grpcio/grpc_core_dependencies.py third_party_grpc_sxy/src/python/grpcio/grpc_core_dependencies.py 254--- grpc-1.41.1/src/python/grpcio/grpc_core_dependencies.py 2021-10-20 04:14:40.000000000 +0800 255+++ third_party_grpc_sxy/src/python/grpcio/grpc_core_dependencies.py 2024-07-31 17:16:44.925139300 +0800 256@@ -121,6 +121,7 @@ 257 'src/core/ext/transport/chttp2/transport/http2_settings.cc', 258 'src/core/ext/transport/chttp2/transport/huffsyms.cc', 259 'src/core/ext/transport/chttp2/transport/incoming_metadata.cc', 260+ 'src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.cc', 261 'src/core/ext/transport/chttp2/transport/parsing.cc', 262 'src/core/ext/transport/chttp2/transport/stream_lists.cc', 263 'src/core/ext/transport/chttp2/transport/stream_map.cc', 264