1/* 2 * nghttp2 - HTTP/2 C Library 3 * 4 * Copyright (c) 2012 Tatsuhiro Tsujikawa 5 * 6 * Permission is hereby granted, free of charge, to any person obtaining 7 * a copy of this software and associated documentation files (the 8 * "Software"), to deal in the Software without restriction, including 9 * without limitation the rights to use, copy, modify, merge, publish, 10 * distribute, sublicense, and/or sell copies of the Software, and to 11 * permit persons to whom the Software is furnished to do so, subject to 12 * the following conditions: 13 * 14 * The above copyright notice and this permission notice shall be 15 * included in all copies or substantial portions of the Software. 16 * 17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 18 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 19 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 20 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 21 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 22 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 23 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 24 */ 25#include "shrpx_downstream_queue.h" 26 27#include <cassert> 28#include <limits> 29 30#include "shrpx_downstream.h" 31 32namespace shrpx { 33 34DownstreamQueue::HostEntry::HostEntry(ImmutableString &&key) 35 : key(std::move(key)), num_active(0) {} 36 37DownstreamQueue::DownstreamQueue(size_t conn_max_per_host, bool unified_host) 38 : conn_max_per_host_(conn_max_per_host == 0 39 ? std::numeric_limits<size_t>::max() 40 : conn_max_per_host), 41 unified_host_(unified_host) {} 42 43DownstreamQueue::~DownstreamQueue() { 44 dlist_delete_all(downstreams_); 45 for (auto &p : host_entries_) { 46 auto &ent = p.second; 47 dlist_delete_all(ent.blocked); 48 } 49} 50 51void DownstreamQueue::add_pending(std::unique_ptr<Downstream> downstream) { 52 downstream->set_dispatch_state(DispatchState::PENDING); 53 downstreams_.append(downstream.release()); 54} 55 56void DownstreamQueue::mark_failure(Downstream *downstream) { 57 downstream->set_dispatch_state(DispatchState::FAILURE); 58} 59 60DownstreamQueue::HostEntry & 61DownstreamQueue::find_host_entry(const StringRef &host) { 62 auto itr = host_entries_.find(host); 63 if (itr == std::end(host_entries_)) { 64 auto key = ImmutableString{std::begin(host), std::end(host)}; 65 auto key_ref = StringRef{key}; 66#ifdef HAVE_STD_MAP_EMPLACE 67 std::tie(itr, std::ignore) = 68 host_entries_.emplace(key_ref, HostEntry(std::move(key))); 69#else // !HAVE_STD_MAP_EMPLACE 70 // for g++-4.7 71 std::tie(itr, std::ignore) = host_entries_.insert( 72 std::make_pair(key_ref, HostEntry(std::move(key)))); 73#endif // !HAVE_STD_MAP_EMPLACE 74 } 75 return (*itr).second; 76} 77 78StringRef DownstreamQueue::make_host_key(const StringRef &host) const { 79 return unified_host_ ? StringRef{} : host; 80} 81 82StringRef DownstreamQueue::make_host_key(Downstream *downstream) const { 83 return make_host_key(downstream->request().authority); 84} 85 86void DownstreamQueue::mark_active(Downstream *downstream) { 87 auto &ent = find_host_entry(make_host_key(downstream)); 88 ++ent.num_active; 89 90 downstream->set_dispatch_state(DispatchState::ACTIVE); 91} 92 93void DownstreamQueue::mark_blocked(Downstream *downstream) { 94 auto &ent = find_host_entry(make_host_key(downstream)); 95 96 downstream->set_dispatch_state(DispatchState::BLOCKED); 97 98 auto link = new BlockedLink{}; 99 downstream->attach_blocked_link(link); 100 ent.blocked.append(link); 101} 102 103bool DownstreamQueue::can_activate(const StringRef &host) const { 104 auto itr = host_entries_.find(make_host_key(host)); 105 if (itr == std::end(host_entries_)) { 106 return true; 107 } 108 auto &ent = (*itr).second; 109 return ent.num_active < conn_max_per_host_; 110} 111 112namespace { 113bool remove_host_entry_if_empty(const DownstreamQueue::HostEntry &ent, 114 DownstreamQueue::HostEntryMap &host_entries, 115 const StringRef &host) { 116 if (ent.blocked.empty() && ent.num_active == 0) { 117 host_entries.erase(host); 118 return true; 119 } 120 return false; 121} 122} // namespace 123 124Downstream *DownstreamQueue::remove_and_get_blocked(Downstream *downstream, 125 bool next_blocked) { 126 // Delete downstream when this function returns. 127 auto delptr = std::unique_ptr<Downstream>(downstream); 128 129 downstreams_.remove(downstream); 130 131 auto host = make_host_key(downstream); 132 auto &ent = find_host_entry(host); 133 134 if (downstream->get_dispatch_state() == DispatchState::ACTIVE) { 135 --ent.num_active; 136 } else { 137 // For those downstreams deleted while in blocked state 138 auto link = downstream->detach_blocked_link(); 139 if (link) { 140 ent.blocked.remove(link); 141 delete link; 142 } 143 } 144 145 if (remove_host_entry_if_empty(ent, host_entries_, host)) { 146 return nullptr; 147 } 148 149 if (!next_blocked || ent.num_active >= conn_max_per_host_) { 150 return nullptr; 151 } 152 153 auto link = ent.blocked.head; 154 155 if (!link) { 156 return nullptr; 157 } 158 159 auto next_downstream = link->downstream; 160 auto link2 = next_downstream->detach_blocked_link(); 161 // This is required with --disable-assert. 162 (void)link2; 163 assert(link2 == link); 164 ent.blocked.remove(link); 165 delete link; 166 remove_host_entry_if_empty(ent, host_entries_, host); 167 168 return next_downstream; 169} 170 171Downstream *DownstreamQueue::get_downstreams() const { 172 return downstreams_.head; 173} 174 175} // namespace shrpx 176