full diff: https://github.com/golang/net/compare/v0.44.0...v0.45.0
From the security announcement:
[security] Vulnerabilities in golang.org/x/net
Hello gophers,
We have tagged version v0.45.0 of golang.org/x/net in order to address two
security issues.
This version fixes two vulnerabilities in the golang.org/x/net/html package
which could result in calls to Parse (and associated functions) executing
unexpectedly slowly relative to the size of the input or never returning when
encountering specific inputs.
These vulnerabilities affect programs which parse untrusted HTML documents.
- The parser implements the HTML specification, which contains a number of
algorithms which are quadratic in complexity by design. This causes the
processing time to scale non-linearly with respect to the size of the input for
some HTML documents. We have imposed a depth limit of 512 for nested HTML tags,
which should be high enough for the vast majority of valid HTML documents, to
address this.
Thanks to Jakub Guido Vranken and Jakub Ciolek for both independently reporting
this issue.
This is CVE-2025-47911 and Go issue https://go.dev/issue/75682.
- The parser also misimplemented a portion of the HTML specification for table
related tags. This could cause the parser to enter an infinite loop when
encountering specific combinations of tags.
Thanks to Guido Vranken for reporting this issue.
This is CVE-2025-58190 and Go issue https://go.dev/issue/70179.
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
| ... | ... |
@@ -103,7 +103,7 @@ require ( |
| 103 | 103 |
go.opentelemetry.io/otel/sdk v1.35.0 |
| 104 | 104 |
go.opentelemetry.io/otel/trace v1.35.0 |
| 105 | 105 |
golang.org/x/mod v0.28.0 |
| 106 |
- golang.org/x/net v0.44.0 |
|
| 106 |
+ golang.org/x/net v0.45.0 |
|
| 107 | 107 |
golang.org/x/sync v0.17.0 |
| 108 | 108 |
golang.org/x/sys v0.36.0 |
| 109 | 109 |
golang.org/x/text v0.29.0 |
| ... | ... |
@@ -698,8 +698,8 @@ golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLL |
| 698 | 698 |
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
| 699 | 699 |
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= |
| 700 | 700 |
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= |
| 701 |
-golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= |
|
| 702 |
-golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= |
|
| 701 |
+golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM= |
|
| 702 |
+golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= |
|
| 703 | 703 |
golang.org/x/oauth2 v0.0.0-20170912212905-13449ad91cb2/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= |
| 704 | 704 |
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= |
| 705 | 705 |
golang.org/x/oauth2 v0.29.0 h1:WdYw2tdTK1S8olAzWHdgeqfy+Mtm9XNhv/xJsY65d98= |
| ... | ... |
@@ -27,6 +27,7 @@ import ( |
| 27 | 27 |
// - If the resulting value is zero or out of range, use a default. |
| 28 | 28 |
type http2Config struct {
|
| 29 | 29 |
MaxConcurrentStreams uint32 |
| 30 |
+ StrictMaxConcurrentRequests bool |
|
| 30 | 31 |
MaxDecoderHeaderTableSize uint32 |
| 31 | 32 |
MaxEncoderHeaderTableSize uint32 |
| 32 | 33 |
MaxReadFrameSize uint32 |
| ... | ... |
@@ -64,12 +65,13 @@ func configFromServer(h1 *http.Server, h2 *Server) http2Config {
|
| 64 | 64 |
// (the net/http Transport). |
| 65 | 65 |
func configFromTransport(h2 *Transport) http2Config {
|
| 66 | 66 |
conf := http2Config{
|
| 67 |
- MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize, |
|
| 68 |
- MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize, |
|
| 69 |
- MaxReadFrameSize: h2.MaxReadFrameSize, |
|
| 70 |
- SendPingTimeout: h2.ReadIdleTimeout, |
|
| 71 |
- PingTimeout: h2.PingTimeout, |
|
| 72 |
- WriteByteTimeout: h2.WriteByteTimeout, |
|
| 67 |
+ StrictMaxConcurrentRequests: h2.StrictMaxConcurrentStreams, |
|
| 68 |
+ MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize, |
|
| 69 |
+ MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize, |
|
| 70 |
+ MaxReadFrameSize: h2.MaxReadFrameSize, |
|
| 71 |
+ SendPingTimeout: h2.ReadIdleTimeout, |
|
| 72 |
+ PingTimeout: h2.PingTimeout, |
|
| 73 |
+ WriteByteTimeout: h2.WriteByteTimeout, |
|
| 73 | 74 |
} |
| 74 | 75 |
|
| 75 | 76 |
// Unlike most config fields, where out-of-range values revert to the default, |
| ... | ... |
@@ -128,6 +130,9 @@ func fillNetHTTPConfig(conf *http2Config, h2 *http.HTTP2Config) {
|
| 128 | 128 |
if h2.MaxConcurrentStreams != 0 {
|
| 129 | 129 |
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams) |
| 130 | 130 |
} |
| 131 |
+ if http2ConfigStrictMaxConcurrentRequests(h2) {
|
|
| 132 |
+ conf.StrictMaxConcurrentRequests = true |
|
| 133 |
+ } |
|
| 131 | 134 |
if h2.MaxEncoderHeaderTableSize != 0 {
|
| 132 | 135 |
conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize) |
| 133 | 136 |
} |
| 134 | 137 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,15 @@ |
| 0 |
+// Copyright 2025 The Go Authors. All rights reserved. |
|
| 1 |
+// Use of this source code is governed by a BSD-style |
|
| 2 |
+// license that can be found in the LICENSE file. |
|
| 3 |
+ |
|
| 4 |
+//go:build !go1.26 |
|
| 5 |
+ |
|
| 6 |
+package http2 |
|
| 7 |
+ |
|
| 8 |
+import ( |
|
| 9 |
+ "net/http" |
|
| 10 |
+) |
|
| 11 |
+ |
|
| 12 |
+func http2ConfigStrictMaxConcurrentRequests(h2 *http.HTTP2Config) bool {
|
|
| 13 |
+ return false |
|
| 14 |
+} |
| 0 | 15 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,15 @@ |
| 0 |
+// Copyright 2025 The Go Authors. All rights reserved. |
|
| 1 |
+// Use of this source code is governed by a BSD-style |
|
| 2 |
+// license that can be found in the LICENSE file. |
|
| 3 |
+ |
|
| 4 |
+//go:build go1.26 |
|
| 5 |
+ |
|
| 6 |
+package http2 |
|
| 7 |
+ |
|
| 8 |
+import ( |
|
| 9 |
+ "net/http" |
|
| 10 |
+) |
|
| 11 |
+ |
|
| 12 |
+func http2ConfigStrictMaxConcurrentRequests(h2 *http.HTTP2Config) bool {
|
|
| 13 |
+ return h2.StrictMaxConcurrentRequests |
|
| 14 |
+} |
| ... | ... |
@@ -347,7 +347,7 @@ func (fr *Framer) maxHeaderListSize() uint32 {
|
| 347 | 347 |
func (f *Framer) startWrite(ftype FrameType, flags Flags, streamID uint32) {
|
| 348 | 348 |
// Write the FrameHeader. |
| 349 | 349 |
f.wbuf = append(f.wbuf[:0], |
| 350 |
- 0, // 3 bytes of length, filled in in endWrite |
|
| 350 |
+ 0, // 3 bytes of length, filled in endWrite |
|
| 351 | 351 |
0, |
| 352 | 352 |
0, |
| 353 | 353 |
byte(ftype), |
| ... | ... |
@@ -1152,6 +1152,15 @@ type PriorityFrame struct {
|
| 1152 | 1152 |
PriorityParam |
| 1153 | 1153 |
} |
| 1154 | 1154 |
|
| 1155 |
+var defaultRFC9218Priority = PriorityParam{
|
|
| 1156 |
+ incremental: 0, |
|
| 1157 |
+ urgency: 3, |
|
| 1158 |
+} |
|
| 1159 |
+ |
|
| 1160 |
+// Note that HTTP/2 has had two different prioritization schemes, and |
|
| 1161 |
+// PriorityParam struct below is a superset of both schemes. The exported |
|
| 1162 |
+// symbols are from RFC 7540 and the non-exported ones are from RFC 9218. |
|
| 1163 |
+ |
|
| 1155 | 1164 |
// PriorityParam are the stream prioritzation parameters. |
| 1156 | 1165 |
type PriorityParam struct {
|
| 1157 | 1166 |
// StreamDep is a 31-bit stream identifier for the |
| ... | ... |
@@ -1167,6 +1176,20 @@ type PriorityParam struct {
|
| 1167 | 1167 |
// the spec, "Add one to the value to obtain a weight between |
| 1168 | 1168 |
// 1 and 256." |
| 1169 | 1169 |
Weight uint8 |
| 1170 |
+ |
|
| 1171 |
+ // "The urgency (u) parameter value is Integer (see Section 3.3.1 of |
|
| 1172 |
+ // [STRUCTURED-FIELDS]), between 0 and 7 inclusive, in descending order of |
|
| 1173 |
+ // priority. The default is 3." |
|
| 1174 |
+ urgency uint8 |
|
| 1175 |
+ |
|
| 1176 |
+ // "The incremental (i) parameter value is Boolean (see Section 3.3.6 of |
|
| 1177 |
+ // [STRUCTURED-FIELDS]). It indicates if an HTTP response can be processed |
|
| 1178 |
+ // incrementally, i.e., provide some meaningful output as chunks of the |
|
| 1179 |
+ // response arrive." |
|
| 1180 |
+ // |
|
| 1181 |
+ // We use uint8 (i.e. 0 is false, 1 is true) instead of bool so we can |
|
| 1182 |
+ // avoid unnecessary type conversions and because either type takes 1 byte. |
|
| 1183 |
+ incremental uint8 |
|
| 1170 | 1184 |
} |
| 1171 | 1185 |
|
| 1172 | 1186 |
func (p PriorityParam) IsZero() bool {
|
| ... | ... |
@@ -181,6 +181,10 @@ type Server struct {
|
| 181 | 181 |
type serverInternalState struct {
|
| 182 | 182 |
mu sync.Mutex |
| 183 | 183 |
activeConns map[*serverConn]struct{}
|
| 184 |
+ |
|
| 185 |
+ // Pool of error channels. This is per-Server rather than global |
|
| 186 |
+ // because channels can't be reused across synctest bubbles. |
|
| 187 |
+ errChanPool sync.Pool |
|
| 184 | 188 |
} |
| 185 | 189 |
|
| 186 | 190 |
func (s *serverInternalState) registerConn(sc *serverConn) {
|
| ... | ... |
@@ -212,6 +216,27 @@ func (s *serverInternalState) startGracefulShutdown() {
|
| 212 | 212 |
s.mu.Unlock() |
| 213 | 213 |
} |
| 214 | 214 |
|
| 215 |
+// Global error channel pool used for uninitialized Servers. |
|
| 216 |
+// We use a per-Server pool when possible to avoid using channels across synctest bubbles. |
|
| 217 |
+var errChanPool = sync.Pool{
|
|
| 218 |
+ New: func() any { return make(chan error, 1) },
|
|
| 219 |
+} |
|
| 220 |
+ |
|
| 221 |
+func (s *serverInternalState) getErrChan() chan error {
|
|
| 222 |
+ if s == nil {
|
|
| 223 |
+ return errChanPool.Get().(chan error) // Server used without calling ConfigureServer |
|
| 224 |
+ } |
|
| 225 |
+ return s.errChanPool.Get().(chan error) |
|
| 226 |
+} |
|
| 227 |
+ |
|
| 228 |
+func (s *serverInternalState) putErrChan(ch chan error) {
|
|
| 229 |
+ if s == nil {
|
|
| 230 |
+ errChanPool.Put(ch) // Server used without calling ConfigureServer |
|
| 231 |
+ return |
|
| 232 |
+ } |
|
| 233 |
+ s.errChanPool.Put(ch) |
|
| 234 |
+} |
|
| 235 |
+ |
|
| 215 | 236 |
// ConfigureServer adds HTTP/2 support to a net/http Server. |
| 216 | 237 |
// |
| 217 | 238 |
// The configuration conf may be nil. |
| ... | ... |
@@ -224,7 +249,10 @@ func ConfigureServer(s *http.Server, conf *Server) error {
|
| 224 | 224 |
if conf == nil {
|
| 225 | 225 |
conf = new(Server) |
| 226 | 226 |
} |
| 227 |
- conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
|
|
| 227 |
+ conf.state = &serverInternalState{
|
|
| 228 |
+ activeConns: make(map[*serverConn]struct{}),
|
|
| 229 |
+ errChanPool: sync.Pool{New: func() any { return make(chan error, 1) }},
|
|
| 230 |
+ } |
|
| 228 | 231 |
if h1, h2 := s, conf; h2.IdleTimeout == 0 {
|
| 229 | 232 |
if h1.IdleTimeout != 0 {
|
| 230 | 233 |
h2.IdleTimeout = h1.IdleTimeout |
| ... | ... |
@@ -1124,25 +1152,6 @@ func (sc *serverConn) readPreface() error {
|
| 1124 | 1124 |
} |
| 1125 | 1125 |
} |
| 1126 | 1126 |
|
| 1127 |
-var errChanPool = sync.Pool{
|
|
| 1128 |
- New: func() interface{} { return make(chan error, 1) },
|
|
| 1129 |
-} |
|
| 1130 |
- |
|
| 1131 |
-func getErrChan() chan error {
|
|
| 1132 |
- if inTests {
|
|
| 1133 |
- // Channels cannot be reused across synctest tests. |
|
| 1134 |
- return make(chan error, 1) |
|
| 1135 |
- } else {
|
|
| 1136 |
- return errChanPool.Get().(chan error) |
|
| 1137 |
- } |
|
| 1138 |
-} |
|
| 1139 |
- |
|
| 1140 |
-func putErrChan(ch chan error) {
|
|
| 1141 |
- if !inTests {
|
|
| 1142 |
- errChanPool.Put(ch) |
|
| 1143 |
- } |
|
| 1144 |
-} |
|
| 1145 |
- |
|
| 1146 | 1127 |
var writeDataPool = sync.Pool{
|
| 1147 | 1128 |
New: func() interface{} { return new(writeData) },
|
| 1148 | 1129 |
} |
| ... | ... |
@@ -1150,7 +1159,7 @@ var writeDataPool = sync.Pool{
|
| 1150 | 1150 |
// writeDataFromHandler writes DATA response frames from a handler on |
| 1151 | 1151 |
// the given stream. |
| 1152 | 1152 |
func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
|
| 1153 |
- ch := getErrChan() |
|
| 1153 |
+ ch := sc.srv.state.getErrChan() |
|
| 1154 | 1154 |
writeArg := writeDataPool.Get().(*writeData) |
| 1155 | 1155 |
*writeArg = writeData{stream.id, data, endStream}
|
| 1156 | 1156 |
err := sc.writeFrameFromHandler(FrameWriteRequest{
|
| ... | ... |
@@ -1182,7 +1191,7 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea |
| 1182 | 1182 |
return errStreamClosed |
| 1183 | 1183 |
} |
| 1184 | 1184 |
} |
| 1185 |
- putErrChan(ch) |
|
| 1185 |
+ sc.srv.state.putErrChan(ch) |
|
| 1186 | 1186 |
if frameWriteDone {
|
| 1187 | 1187 |
writeDataPool.Put(writeArg) |
| 1188 | 1188 |
} |
| ... | ... |
@@ -2436,7 +2445,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro |
| 2436 | 2436 |
// waiting for this frame to be written, so an http.Flush mid-handler |
| 2437 | 2437 |
// writes out the correct value of keys, before a handler later potentially |
| 2438 | 2438 |
// mutates it. |
| 2439 |
- errc = getErrChan() |
|
| 2439 |
+ errc = sc.srv.state.getErrChan() |
|
| 2440 | 2440 |
} |
| 2441 | 2441 |
if err := sc.writeFrameFromHandler(FrameWriteRequest{
|
| 2442 | 2442 |
write: headerData, |
| ... | ... |
@@ -2448,7 +2457,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro |
| 2448 | 2448 |
if errc != nil {
|
| 2449 | 2449 |
select {
|
| 2450 | 2450 |
case err := <-errc: |
| 2451 |
- putErrChan(errc) |
|
| 2451 |
+ sc.srv.state.putErrChan(errc) |
|
| 2452 | 2452 |
return err |
| 2453 | 2453 |
case <-sc.doneServing: |
| 2454 | 2454 |
return errClientDisconnected |
| ... | ... |
@@ -3129,7 +3138,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
|
| 3129 | 3129 |
method: opts.Method, |
| 3130 | 3130 |
url: u, |
| 3131 | 3131 |
header: cloneHeader(opts.Header), |
| 3132 |
- done: getErrChan(), |
|
| 3132 |
+ done: sc.srv.state.getErrChan(), |
|
| 3133 | 3133 |
} |
| 3134 | 3134 |
|
| 3135 | 3135 |
select {
|
| ... | ... |
@@ -3146,7 +3155,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
|
| 3146 | 3146 |
case <-st.cw: |
| 3147 | 3147 |
return errStreamClosed |
| 3148 | 3148 |
case err := <-msg.done: |
| 3149 |
- putErrChan(msg.done) |
|
| 3149 |
+ sc.srv.state.putErrChan(msg.done) |
|
| 3150 | 3150 |
return err |
| 3151 | 3151 |
} |
| 3152 | 3152 |
} |
| ... | ... |
@@ -355,6 +355,7 @@ type ClientConn struct {
|
| 355 | 355 |
readIdleTimeout time.Duration |
| 356 | 356 |
pingTimeout time.Duration |
| 357 | 357 |
extendedConnectAllowed bool |
| 358 |
+ strictMaxConcurrentStreams bool |
|
| 358 | 359 |
|
| 359 | 360 |
// rstStreamPingsBlocked works around an unfortunate gRPC behavior. |
| 360 | 361 |
// gRPC strictly limits the number of PING frames that it will receive. |
| ... | ... |
@@ -784,7 +785,8 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro |
| 784 | 784 |
initialWindowSize: 65535, // spec default |
| 785 | 785 |
initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream, |
| 786 | 786 |
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings. |
| 787 |
- peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. |
|
| 787 |
+ strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests, |
|
| 788 |
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. |
|
| 788 | 789 |
streams: make(map[uint32]*clientStream), |
| 789 | 790 |
singleUse: singleUse, |
| 790 | 791 |
seenSettingsChan: make(chan struct{}),
|
| ... | ... |
@@ -1018,7 +1020,7 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
|
| 1018 | 1018 |
return |
| 1019 | 1019 |
} |
| 1020 | 1020 |
var maxConcurrentOkay bool |
| 1021 |
- if cc.t.StrictMaxConcurrentStreams {
|
|
| 1021 |
+ if cc.strictMaxConcurrentStreams {
|
|
| 1022 | 1022 |
// We'll tell the caller we can take a new request to |
| 1023 | 1023 |
// prevent the caller from dialing a new TCP |
| 1024 | 1024 |
// connection, but then we'll block later before |
| ... | ... |
@@ -42,6 +42,8 @@ type OpenStreamOptions struct {
|
| 42 | 42 |
// PusherID is zero if the stream was initiated by the client. Otherwise, |
| 43 | 43 |
// PusherID names the stream that pushed the newly opened stream. |
| 44 | 44 |
PusherID uint32 |
| 45 |
+ // priority is used to set the priority of the newly opened stream. |
|
| 46 |
+ priority PriorityParam |
|
| 45 | 47 |
} |
| 46 | 48 |
|
| 47 | 49 |
// FrameWriteRequest is a request to write a frame. |
| 48 | 50 |
deleted file mode 100644 |
| ... | ... |
@@ -1,451 +0,0 @@ |
| 1 |
-// Copyright 2016 The Go Authors. All rights reserved. |
|
| 2 |
-// Use of this source code is governed by a BSD-style |
|
| 3 |
-// license that can be found in the LICENSE file. |
|
| 4 |
- |
|
| 5 |
-package http2 |
|
| 6 |
- |
|
| 7 |
-import ( |
|
| 8 |
- "fmt" |
|
| 9 |
- "math" |
|
| 10 |
- "sort" |
|
| 11 |
-) |
|
| 12 |
- |
|
| 13 |
-// RFC 7540, Section 5.3.5: the default weight is 16. |
|
| 14 |
-const priorityDefaultWeight = 15 // 16 = 15 + 1 |
|
| 15 |
- |
|
| 16 |
-// PriorityWriteSchedulerConfig configures a priorityWriteScheduler. |
|
| 17 |
-type PriorityWriteSchedulerConfig struct {
|
|
| 18 |
- // MaxClosedNodesInTree controls the maximum number of closed streams to |
|
| 19 |
- // retain in the priority tree. Setting this to zero saves a small amount |
|
| 20 |
- // of memory at the cost of performance. |
|
| 21 |
- // |
|
| 22 |
- // See RFC 7540, Section 5.3.4: |
|
| 23 |
- // "It is possible for a stream to become closed while prioritization |
|
| 24 |
- // information ... is in transit. ... This potentially creates suboptimal |
|
| 25 |
- // prioritization, since the stream could be given a priority that is |
|
| 26 |
- // different from what is intended. To avoid these problems, an endpoint |
|
| 27 |
- // SHOULD retain stream prioritization state for a period after streams |
|
| 28 |
- // become closed. The longer state is retained, the lower the chance that |
|
| 29 |
- // streams are assigned incorrect or default priority values." |
|
| 30 |
- MaxClosedNodesInTree int |
|
| 31 |
- |
|
| 32 |
- // MaxIdleNodesInTree controls the maximum number of idle streams to |
|
| 33 |
- // retain in the priority tree. Setting this to zero saves a small amount |
|
| 34 |
- // of memory at the cost of performance. |
|
| 35 |
- // |
|
| 36 |
- // See RFC 7540, Section 5.3.4: |
|
| 37 |
- // Similarly, streams that are in the "idle" state can be assigned |
|
| 38 |
- // priority or become a parent of other streams. This allows for the |
|
| 39 |
- // creation of a grouping node in the dependency tree, which enables |
|
| 40 |
- // more flexible expressions of priority. Idle streams begin with a |
|
| 41 |
- // default priority (Section 5.3.5). |
|
| 42 |
- MaxIdleNodesInTree int |
|
| 43 |
- |
|
| 44 |
- // ThrottleOutOfOrderWrites enables write throttling to help ensure that |
|
| 45 |
- // data is delivered in priority order. This works around a race where |
|
| 46 |
- // stream B depends on stream A and both streams are about to call Write |
|
| 47 |
- // to queue DATA frames. If B wins the race, a naive scheduler would eagerly |
|
| 48 |
- // write as much data from B as possible, but this is suboptimal because A |
|
| 49 |
- // is a higher-priority stream. With throttling enabled, we write a small |
|
| 50 |
- // amount of data from B to minimize the amount of bandwidth that B can |
|
| 51 |
- // steal from A. |
|
| 52 |
- ThrottleOutOfOrderWrites bool |
|
| 53 |
-} |
|
| 54 |
- |
|
| 55 |
-// NewPriorityWriteScheduler constructs a WriteScheduler that schedules |
|
| 56 |
-// frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3. |
|
| 57 |
-// If cfg is nil, default options are used. |
|
| 58 |
-func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
|
|
| 59 |
- if cfg == nil {
|
|
| 60 |
- // For justification of these defaults, see: |
|
| 61 |
- // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY |
|
| 62 |
- cfg = &PriorityWriteSchedulerConfig{
|
|
| 63 |
- MaxClosedNodesInTree: 10, |
|
| 64 |
- MaxIdleNodesInTree: 10, |
|
| 65 |
- ThrottleOutOfOrderWrites: false, |
|
| 66 |
- } |
|
| 67 |
- } |
|
| 68 |
- |
|
| 69 |
- ws := &priorityWriteScheduler{
|
|
| 70 |
- nodes: make(map[uint32]*priorityNode), |
|
| 71 |
- maxClosedNodesInTree: cfg.MaxClosedNodesInTree, |
|
| 72 |
- maxIdleNodesInTree: cfg.MaxIdleNodesInTree, |
|
| 73 |
- enableWriteThrottle: cfg.ThrottleOutOfOrderWrites, |
|
| 74 |
- } |
|
| 75 |
- ws.nodes[0] = &ws.root |
|
| 76 |
- if cfg.ThrottleOutOfOrderWrites {
|
|
| 77 |
- ws.writeThrottleLimit = 1024 |
|
| 78 |
- } else {
|
|
| 79 |
- ws.writeThrottleLimit = math.MaxInt32 |
|
| 80 |
- } |
|
| 81 |
- return ws |
|
| 82 |
-} |
|
| 83 |
- |
|
| 84 |
-type priorityNodeState int |
|
| 85 |
- |
|
| 86 |
-const ( |
|
| 87 |
- priorityNodeOpen priorityNodeState = iota |
|
| 88 |
- priorityNodeClosed |
|
| 89 |
- priorityNodeIdle |
|
| 90 |
-) |
|
| 91 |
- |
|
| 92 |
-// priorityNode is a node in an HTTP/2 priority tree. |
|
| 93 |
-// Each node is associated with a single stream ID. |
|
| 94 |
-// See RFC 7540, Section 5.3. |
|
| 95 |
-type priorityNode struct {
|
|
| 96 |
- q writeQueue // queue of pending frames to write |
|
| 97 |
- id uint32 // id of the stream, or 0 for the root of the tree |
|
| 98 |
- weight uint8 // the actual weight is weight+1, so the value is in [1,256] |
|
| 99 |
- state priorityNodeState // open | closed | idle |
|
| 100 |
- bytes int64 // number of bytes written by this node, or 0 if closed |
|
| 101 |
- subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree |
|
| 102 |
- |
|
| 103 |
- // These links form the priority tree. |
|
| 104 |
- parent *priorityNode |
|
| 105 |
- kids *priorityNode // start of the kids list |
|
| 106 |
- prev, next *priorityNode // doubly-linked list of siblings |
|
| 107 |
-} |
|
| 108 |
- |
|
| 109 |
-func (n *priorityNode) setParent(parent *priorityNode) {
|
|
| 110 |
- if n == parent {
|
|
| 111 |
- panic("setParent to self")
|
|
| 112 |
- } |
|
| 113 |
- if n.parent == parent {
|
|
| 114 |
- return |
|
| 115 |
- } |
|
| 116 |
- // Unlink from current parent. |
|
| 117 |
- if parent := n.parent; parent != nil {
|
|
| 118 |
- if n.prev == nil {
|
|
| 119 |
- parent.kids = n.next |
|
| 120 |
- } else {
|
|
| 121 |
- n.prev.next = n.next |
|
| 122 |
- } |
|
| 123 |
- if n.next != nil {
|
|
| 124 |
- n.next.prev = n.prev |
|
| 125 |
- } |
|
| 126 |
- } |
|
| 127 |
- // Link to new parent. |
|
| 128 |
- // If parent=nil, remove n from the tree. |
|
| 129 |
- // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder). |
|
| 130 |
- n.parent = parent |
|
| 131 |
- if parent == nil {
|
|
| 132 |
- n.next = nil |
|
| 133 |
- n.prev = nil |
|
| 134 |
- } else {
|
|
| 135 |
- n.next = parent.kids |
|
| 136 |
- n.prev = nil |
|
| 137 |
- if n.next != nil {
|
|
| 138 |
- n.next.prev = n |
|
| 139 |
- } |
|
| 140 |
- parent.kids = n |
|
| 141 |
- } |
|
| 142 |
-} |
|
| 143 |
- |
|
| 144 |
-func (n *priorityNode) addBytes(b int64) {
|
|
| 145 |
- n.bytes += b |
|
| 146 |
- for ; n != nil; n = n.parent {
|
|
| 147 |
- n.subtreeBytes += b |
|
| 148 |
- } |
|
| 149 |
-} |
|
| 150 |
- |
|
| 151 |
-// walkReadyInOrder iterates over the tree in priority order, calling f for each node |
|
| 152 |
-// with a non-empty write queue. When f returns true, this function returns true and the |
|
| 153 |
-// walk halts. tmp is used as scratch space for sorting. |
|
| 154 |
-// |
|
| 155 |
-// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true |
|
| 156 |
-// if any ancestor p of n is still open (ignoring the root node). |
|
| 157 |
-func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
|
|
| 158 |
- if !n.q.empty() && f(n, openParent) {
|
|
| 159 |
- return true |
|
| 160 |
- } |
|
| 161 |
- if n.kids == nil {
|
|
| 162 |
- return false |
|
| 163 |
- } |
|
| 164 |
- |
|
| 165 |
- // Don't consider the root "open" when updating openParent since |
|
| 166 |
- // we can't send data frames on the root stream (only control frames). |
|
| 167 |
- if n.id != 0 {
|
|
| 168 |
- openParent = openParent || (n.state == priorityNodeOpen) |
|
| 169 |
- } |
|
| 170 |
- |
|
| 171 |
- // Common case: only one kid or all kids have the same weight. |
|
| 172 |
- // Some clients don't use weights; other clients (like web browsers) |
|
| 173 |
- // use mostly-linear priority trees. |
|
| 174 |
- w := n.kids.weight |
|
| 175 |
- needSort := false |
|
| 176 |
- for k := n.kids.next; k != nil; k = k.next {
|
|
| 177 |
- if k.weight != w {
|
|
| 178 |
- needSort = true |
|
| 179 |
- break |
|
| 180 |
- } |
|
| 181 |
- } |
|
| 182 |
- if !needSort {
|
|
| 183 |
- for k := n.kids; k != nil; k = k.next {
|
|
| 184 |
- if k.walkReadyInOrder(openParent, tmp, f) {
|
|
| 185 |
- return true |
|
| 186 |
- } |
|
| 187 |
- } |
|
| 188 |
- return false |
|
| 189 |
- } |
|
| 190 |
- |
|
| 191 |
- // Uncommon case: sort the child nodes. We remove the kids from the parent, |
|
| 192 |
- // then re-insert after sorting so we can reuse tmp for future sort calls. |
|
| 193 |
- *tmp = (*tmp)[:0] |
|
| 194 |
- for n.kids != nil {
|
|
| 195 |
- *tmp = append(*tmp, n.kids) |
|
| 196 |
- n.kids.setParent(nil) |
|
| 197 |
- } |
|
| 198 |
- sort.Sort(sortPriorityNodeSiblings(*tmp)) |
|
| 199 |
- for i := len(*tmp) - 1; i >= 0; i-- {
|
|
| 200 |
- (*tmp)[i].setParent(n) // setParent inserts at the head of n.kids |
|
| 201 |
- } |
|
| 202 |
- for k := n.kids; k != nil; k = k.next {
|
|
| 203 |
- if k.walkReadyInOrder(openParent, tmp, f) {
|
|
| 204 |
- return true |
|
| 205 |
- } |
|
| 206 |
- } |
|
| 207 |
- return false |
|
| 208 |
-} |
|
| 209 |
- |
|
| 210 |
-type sortPriorityNodeSiblings []*priorityNode |
|
| 211 |
- |
|
| 212 |
-func (z sortPriorityNodeSiblings) Len() int { return len(z) }
|
|
| 213 |
-func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
|
|
| 214 |
-func (z sortPriorityNodeSiblings) Less(i, k int) bool {
|
|
| 215 |
- // Prefer the subtree that has sent fewer bytes relative to its weight. |
|
| 216 |
- // See sections 5.3.2 and 5.3.4. |
|
| 217 |
- wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes) |
|
| 218 |
- wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes) |
|
| 219 |
- if bi == 0 && bk == 0 {
|
|
| 220 |
- return wi >= wk |
|
| 221 |
- } |
|
| 222 |
- if bk == 0 {
|
|
| 223 |
- return false |
|
| 224 |
- } |
|
| 225 |
- return bi/bk <= wi/wk |
|
| 226 |
-} |
|
| 227 |
- |
|
| 228 |
-type priorityWriteScheduler struct {
|
|
| 229 |
- // root is the root of the priority tree, where root.id = 0. |
|
| 230 |
- // The root queues control frames that are not associated with any stream. |
|
| 231 |
- root priorityNode |
|
| 232 |
- |
|
| 233 |
- // nodes maps stream ids to priority tree nodes. |
|
| 234 |
- nodes map[uint32]*priorityNode |
|
| 235 |
- |
|
| 236 |
- // maxID is the maximum stream id in nodes. |
|
| 237 |
- maxID uint32 |
|
| 238 |
- |
|
| 239 |
- // lists of nodes that have been closed or are idle, but are kept in |
|
| 240 |
- // the tree for improved prioritization. When the lengths exceed either |
|
| 241 |
- // maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded. |
|
| 242 |
- closedNodes, idleNodes []*priorityNode |
|
| 243 |
- |
|
| 244 |
- // From the config. |
|
| 245 |
- maxClosedNodesInTree int |
|
| 246 |
- maxIdleNodesInTree int |
|
| 247 |
- writeThrottleLimit int32 |
|
| 248 |
- enableWriteThrottle bool |
|
| 249 |
- |
|
| 250 |
- // tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations. |
|
| 251 |
- tmp []*priorityNode |
|
| 252 |
- |
|
| 253 |
- // pool of empty queues for reuse. |
|
| 254 |
- queuePool writeQueuePool |
|
| 255 |
-} |
|
| 256 |
- |
|
| 257 |
-func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
|
|
| 258 |
- // The stream may be currently idle but cannot be opened or closed. |
|
| 259 |
- if curr := ws.nodes[streamID]; curr != nil {
|
|
| 260 |
- if curr.state != priorityNodeIdle {
|
|
| 261 |
- panic(fmt.Sprintf("stream %d already opened", streamID))
|
|
| 262 |
- } |
|
| 263 |
- curr.state = priorityNodeOpen |
|
| 264 |
- return |
|
| 265 |
- } |
|
| 266 |
- |
|
| 267 |
- // RFC 7540, Section 5.3.5: |
|
| 268 |
- // "All streams are initially assigned a non-exclusive dependency on stream 0x0. |
|
| 269 |
- // Pushed streams initially depend on their associated stream. In both cases, |
|
| 270 |
- // streams are assigned a default weight of 16." |
|
| 271 |
- parent := ws.nodes[options.PusherID] |
|
| 272 |
- if parent == nil {
|
|
| 273 |
- parent = &ws.root |
|
| 274 |
- } |
|
| 275 |
- n := &priorityNode{
|
|
| 276 |
- q: *ws.queuePool.get(), |
|
| 277 |
- id: streamID, |
|
| 278 |
- weight: priorityDefaultWeight, |
|
| 279 |
- state: priorityNodeOpen, |
|
| 280 |
- } |
|
| 281 |
- n.setParent(parent) |
|
| 282 |
- ws.nodes[streamID] = n |
|
| 283 |
- if streamID > ws.maxID {
|
|
| 284 |
- ws.maxID = streamID |
|
| 285 |
- } |
|
| 286 |
-} |
|
| 287 |
- |
|
| 288 |
-func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
|
|
| 289 |
- if streamID == 0 {
|
|
| 290 |
- panic("violation of WriteScheduler interface: cannot close stream 0")
|
|
| 291 |
- } |
|
| 292 |
- if ws.nodes[streamID] == nil {
|
|
| 293 |
- panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
|
|
| 294 |
- } |
|
| 295 |
- if ws.nodes[streamID].state != priorityNodeOpen {
|
|
| 296 |
- panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
|
|
| 297 |
- } |
|
| 298 |
- |
|
| 299 |
- n := ws.nodes[streamID] |
|
| 300 |
- n.state = priorityNodeClosed |
|
| 301 |
- n.addBytes(-n.bytes) |
|
| 302 |
- |
|
| 303 |
- q := n.q |
|
| 304 |
- ws.queuePool.put(&q) |
|
| 305 |
- n.q.s = nil |
|
| 306 |
- if ws.maxClosedNodesInTree > 0 {
|
|
| 307 |
- ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n) |
|
| 308 |
- } else {
|
|
| 309 |
- ws.removeNode(n) |
|
| 310 |
- } |
|
| 311 |
-} |
|
| 312 |
- |
|
| 313 |
-func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
|
|
| 314 |
- if streamID == 0 {
|
|
| 315 |
- panic("adjustPriority on root")
|
|
| 316 |
- } |
|
| 317 |
- |
|
| 318 |
- // If streamID does not exist, there are two cases: |
|
| 319 |
- // - A closed stream that has been removed (this will have ID <= maxID) |
|
| 320 |
- // - An idle stream that is being used for "grouping" (this will have ID > maxID) |
|
| 321 |
- n := ws.nodes[streamID] |
|
| 322 |
- if n == nil {
|
|
| 323 |
- if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
|
|
| 324 |
- return |
|
| 325 |
- } |
|
| 326 |
- ws.maxID = streamID |
|
| 327 |
- n = &priorityNode{
|
|
| 328 |
- q: *ws.queuePool.get(), |
|
| 329 |
- id: streamID, |
|
| 330 |
- weight: priorityDefaultWeight, |
|
| 331 |
- state: priorityNodeIdle, |
|
| 332 |
- } |
|
| 333 |
- n.setParent(&ws.root) |
|
| 334 |
- ws.nodes[streamID] = n |
|
| 335 |
- ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n) |
|
| 336 |
- } |
|
| 337 |
- |
|
| 338 |
- // Section 5.3.1: A dependency on a stream that is not currently in the tree |
|
| 339 |
- // results in that stream being given a default priority (Section 5.3.5). |
|
| 340 |
- parent := ws.nodes[priority.StreamDep] |
|
| 341 |
- if parent == nil {
|
|
| 342 |
- n.setParent(&ws.root) |
|
| 343 |
- n.weight = priorityDefaultWeight |
|
| 344 |
- return |
|
| 345 |
- } |
|
| 346 |
- |
|
| 347 |
- // Ignore if the client tries to make a node its own parent. |
|
| 348 |
- if n == parent {
|
|
| 349 |
- return |
|
| 350 |
- } |
|
| 351 |
- |
|
| 352 |
- // Section 5.3.3: |
|
| 353 |
- // "If a stream is made dependent on one of its own dependencies, the |
|
| 354 |
- // formerly dependent stream is first moved to be dependent on the |
|
| 355 |
- // reprioritized stream's previous parent. The moved dependency retains |
|
| 356 |
- // its weight." |
|
| 357 |
- // |
|
| 358 |
- // That is: if parent depends on n, move parent to depend on n.parent. |
|
| 359 |
- for x := parent.parent; x != nil; x = x.parent {
|
|
| 360 |
- if x == n {
|
|
| 361 |
- parent.setParent(n.parent) |
|
| 362 |
- break |
|
| 363 |
- } |
|
| 364 |
- } |
|
| 365 |
- |
|
| 366 |
- // Section 5.3.3: The exclusive flag causes the stream to become the sole |
|
| 367 |
- // dependency of its parent stream, causing other dependencies to become |
|
| 368 |
- // dependent on the exclusive stream. |
|
| 369 |
- if priority.Exclusive {
|
|
| 370 |
- k := parent.kids |
|
| 371 |
- for k != nil {
|
|
| 372 |
- next := k.next |
|
| 373 |
- if k != n {
|
|
| 374 |
- k.setParent(n) |
|
| 375 |
- } |
|
| 376 |
- k = next |
|
| 377 |
- } |
|
| 378 |
- } |
|
| 379 |
- |
|
| 380 |
- n.setParent(parent) |
|
| 381 |
- n.weight = priority.Weight |
|
| 382 |
-} |
|
| 383 |
- |
|
| 384 |
-func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
|
|
| 385 |
- var n *priorityNode |
|
| 386 |
- if wr.isControl() {
|
|
| 387 |
- n = &ws.root |
|
| 388 |
- } else {
|
|
| 389 |
- id := wr.StreamID() |
|
| 390 |
- n = ws.nodes[id] |
|
| 391 |
- if n == nil {
|
|
| 392 |
- // id is an idle or closed stream. wr should not be a HEADERS or |
|
| 393 |
- // DATA frame. In other case, we push wr onto the root, rather |
|
| 394 |
- // than creating a new priorityNode. |
|
| 395 |
- if wr.DataSize() > 0 {
|
|
| 396 |
- panic("add DATA on non-open stream")
|
|
| 397 |
- } |
|
| 398 |
- n = &ws.root |
|
| 399 |
- } |
|
| 400 |
- } |
|
| 401 |
- n.q.push(wr) |
|
| 402 |
-} |
|
| 403 |
- |
|
| 404 |
-func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
|
|
| 405 |
- ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
|
|
| 406 |
- limit := int32(math.MaxInt32) |
|
| 407 |
- if openParent {
|
|
| 408 |
- limit = ws.writeThrottleLimit |
|
| 409 |
- } |
|
| 410 |
- wr, ok = n.q.consume(limit) |
|
| 411 |
- if !ok {
|
|
| 412 |
- return false |
|
| 413 |
- } |
|
| 414 |
- n.addBytes(int64(wr.DataSize())) |
|
| 415 |
- // If B depends on A and B continuously has data available but A |
|
| 416 |
- // does not, gradually increase the throttling limit to allow B to |
|
| 417 |
- // steal more and more bandwidth from A. |
|
| 418 |
- if openParent {
|
|
| 419 |
- ws.writeThrottleLimit += 1024 |
|
| 420 |
- if ws.writeThrottleLimit < 0 {
|
|
| 421 |
- ws.writeThrottleLimit = math.MaxInt32 |
|
| 422 |
- } |
|
| 423 |
- } else if ws.enableWriteThrottle {
|
|
| 424 |
- ws.writeThrottleLimit = 1024 |
|
| 425 |
- } |
|
| 426 |
- return true |
|
| 427 |
- }) |
|
| 428 |
- return wr, ok |
|
| 429 |
-} |
|
| 430 |
- |
|
| 431 |
-func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
|
|
| 432 |
- if maxSize == 0 {
|
|
| 433 |
- return |
|
| 434 |
- } |
|
| 435 |
- if len(*list) == maxSize {
|
|
| 436 |
- // Remove the oldest node, then shift left. |
|
| 437 |
- ws.removeNode((*list)[0]) |
|
| 438 |
- x := (*list)[1:] |
|
| 439 |
- copy(*list, x) |
|
| 440 |
- *list = (*list)[:len(x)] |
|
| 441 |
- } |
|
| 442 |
- *list = append(*list, n) |
|
| 443 |
-} |
|
| 444 |
- |
|
| 445 |
-func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
|
|
| 446 |
- for n.kids != nil {
|
|
| 447 |
- n.kids.setParent(n.parent) |
|
| 448 |
- } |
|
| 449 |
- n.setParent(nil) |
|
| 450 |
- delete(ws.nodes, n.id) |
|
| 451 |
-} |
| 452 | 1 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,451 @@ |
| 0 |
+// Copyright 2016 The Go Authors. All rights reserved. |
|
| 1 |
+// Use of this source code is governed by a BSD-style |
|
| 2 |
+// license that can be found in the LICENSE file. |
|
| 3 |
+ |
|
| 4 |
+package http2 |
|
| 5 |
+ |
|
| 6 |
+import ( |
|
| 7 |
+ "fmt" |
|
| 8 |
+ "math" |
|
| 9 |
+ "sort" |
|
| 10 |
+) |
|
| 11 |
+ |
|
| 12 |
+// RFC 7540, Section 5.3.5: the default weight is 16. |
|
| 13 |
+const priorityDefaultWeightRFC7540 = 15 // 16 = 15 + 1 |
|
| 14 |
+ |
|
| 15 |
+// PriorityWriteSchedulerConfig configures a priorityWriteScheduler. |
|
| 16 |
+type PriorityWriteSchedulerConfig struct {
|
|
| 17 |
+ // MaxClosedNodesInTree controls the maximum number of closed streams to |
|
| 18 |
+ // retain in the priority tree. Setting this to zero saves a small amount |
|
| 19 |
+ // of memory at the cost of performance. |
|
| 20 |
+ // |
|
| 21 |
+ // See RFC 7540, Section 5.3.4: |
|
| 22 |
+ // "It is possible for a stream to become closed while prioritization |
|
| 23 |
+ // information ... is in transit. ... This potentially creates suboptimal |
|
| 24 |
+ // prioritization, since the stream could be given a priority that is |
|
| 25 |
+ // different from what is intended. To avoid these problems, an endpoint |
|
| 26 |
+ // SHOULD retain stream prioritization state for a period after streams |
|
| 27 |
+ // become closed. The longer state is retained, the lower the chance that |
|
| 28 |
+ // streams are assigned incorrect or default priority values." |
|
| 29 |
+ MaxClosedNodesInTree int |
|
| 30 |
+ |
|
| 31 |
+ // MaxIdleNodesInTree controls the maximum number of idle streams to |
|
| 32 |
+ // retain in the priority tree. Setting this to zero saves a small amount |
|
| 33 |
+ // of memory at the cost of performance. |
|
| 34 |
+ // |
|
| 35 |
+ // See RFC 7540, Section 5.3.4: |
|
| 36 |
+ // Similarly, streams that are in the "idle" state can be assigned |
|
| 37 |
+ // priority or become a parent of other streams. This allows for the |
|
| 38 |
+ // creation of a grouping node in the dependency tree, which enables |
|
| 39 |
+ // more flexible expressions of priority. Idle streams begin with a |
|
| 40 |
+ // default priority (Section 5.3.5). |
|
| 41 |
+ MaxIdleNodesInTree int |
|
| 42 |
+ |
|
| 43 |
+ // ThrottleOutOfOrderWrites enables write throttling to help ensure that |
|
| 44 |
+ // data is delivered in priority order. This works around a race where |
|
| 45 |
+ // stream B depends on stream A and both streams are about to call Write |
|
| 46 |
+ // to queue DATA frames. If B wins the race, a naive scheduler would eagerly |
|
| 47 |
+ // write as much data from B as possible, but this is suboptimal because A |
|
| 48 |
+ // is a higher-priority stream. With throttling enabled, we write a small |
|
| 49 |
+ // amount of data from B to minimize the amount of bandwidth that B can |
|
| 50 |
+ // steal from A. |
|
| 51 |
+ ThrottleOutOfOrderWrites bool |
|
| 52 |
+} |
|
| 53 |
+ |
|
| 54 |
+// NewPriorityWriteScheduler constructs a WriteScheduler that schedules |
|
| 55 |
+// frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3. |
|
| 56 |
+// If cfg is nil, default options are used. |
|
| 57 |
+func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
|
|
| 58 |
+ if cfg == nil {
|
|
| 59 |
+ // For justification of these defaults, see: |
|
| 60 |
+ // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY |
|
| 61 |
+ cfg = &PriorityWriteSchedulerConfig{
|
|
| 62 |
+ MaxClosedNodesInTree: 10, |
|
| 63 |
+ MaxIdleNodesInTree: 10, |
|
| 64 |
+ ThrottleOutOfOrderWrites: false, |
|
| 65 |
+ } |
|
| 66 |
+ } |
|
| 67 |
+ |
|
| 68 |
+ ws := &priorityWriteSchedulerRFC7540{
|
|
| 69 |
+ nodes: make(map[uint32]*priorityNodeRFC7540), |
|
| 70 |
+ maxClosedNodesInTree: cfg.MaxClosedNodesInTree, |
|
| 71 |
+ maxIdleNodesInTree: cfg.MaxIdleNodesInTree, |
|
| 72 |
+ enableWriteThrottle: cfg.ThrottleOutOfOrderWrites, |
|
| 73 |
+ } |
|
| 74 |
+ ws.nodes[0] = &ws.root |
|
| 75 |
+ if cfg.ThrottleOutOfOrderWrites {
|
|
| 76 |
+ ws.writeThrottleLimit = 1024 |
|
| 77 |
+ } else {
|
|
| 78 |
+ ws.writeThrottleLimit = math.MaxInt32 |
|
| 79 |
+ } |
|
| 80 |
+ return ws |
|
| 81 |
+} |
|
| 82 |
+ |
|
| 83 |
+type priorityNodeStateRFC7540 int |
|
| 84 |
+ |
|
| 85 |
+const ( |
|
| 86 |
+ priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota |
|
| 87 |
+ priorityNodeClosedRFC7540 |
|
| 88 |
+ priorityNodeIdleRFC7540 |
|
| 89 |
+) |
|
| 90 |
+ |
|
| 91 |
+// priorityNodeRFC7540 is a node in an HTTP/2 priority tree. |
|
| 92 |
+// Each node is associated with a single stream ID. |
|
| 93 |
+// See RFC 7540, Section 5.3. |
|
| 94 |
+type priorityNodeRFC7540 struct {
|
|
| 95 |
+ q writeQueue // queue of pending frames to write |
|
| 96 |
+ id uint32 // id of the stream, or 0 for the root of the tree |
|
| 97 |
+ weight uint8 // the actual weight is weight+1, so the value is in [1,256] |
|
| 98 |
+ state priorityNodeStateRFC7540 // open | closed | idle |
|
| 99 |
+ bytes int64 // number of bytes written by this node, or 0 if closed |
|
| 100 |
+ subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree |
|
| 101 |
+ |
|
| 102 |
+ // These links form the priority tree. |
|
| 103 |
+ parent *priorityNodeRFC7540 |
|
| 104 |
+ kids *priorityNodeRFC7540 // start of the kids list |
|
| 105 |
+ prev, next *priorityNodeRFC7540 // doubly-linked list of siblings |
|
| 106 |
+} |
|
| 107 |
+ |
|
| 108 |
+func (n *priorityNodeRFC7540) setParent(parent *priorityNodeRFC7540) {
|
|
| 109 |
+ if n == parent {
|
|
| 110 |
+ panic("setParent to self")
|
|
| 111 |
+ } |
|
| 112 |
+ if n.parent == parent {
|
|
| 113 |
+ return |
|
| 114 |
+ } |
|
| 115 |
+ // Unlink from current parent. |
|
| 116 |
+ if parent := n.parent; parent != nil {
|
|
| 117 |
+ if n.prev == nil {
|
|
| 118 |
+ parent.kids = n.next |
|
| 119 |
+ } else {
|
|
| 120 |
+ n.prev.next = n.next |
|
| 121 |
+ } |
|
| 122 |
+ if n.next != nil {
|
|
| 123 |
+ n.next.prev = n.prev |
|
| 124 |
+ } |
|
| 125 |
+ } |
|
| 126 |
+ // Link to new parent. |
|
| 127 |
+ // If parent=nil, remove n from the tree. |
|
| 128 |
+ // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder). |
|
| 129 |
+ n.parent = parent |
|
| 130 |
+ if parent == nil {
|
|
| 131 |
+ n.next = nil |
|
| 132 |
+ n.prev = nil |
|
| 133 |
+ } else {
|
|
| 134 |
+ n.next = parent.kids |
|
| 135 |
+ n.prev = nil |
|
| 136 |
+ if n.next != nil {
|
|
| 137 |
+ n.next.prev = n |
|
| 138 |
+ } |
|
| 139 |
+ parent.kids = n |
|
| 140 |
+ } |
|
| 141 |
+} |
|
| 142 |
+ |
|
| 143 |
+func (n *priorityNodeRFC7540) addBytes(b int64) {
|
|
| 144 |
+ n.bytes += b |
|
| 145 |
+ for ; n != nil; n = n.parent {
|
|
| 146 |
+ n.subtreeBytes += b |
|
| 147 |
+ } |
|
| 148 |
+} |
|
| 149 |
+ |
|
| 150 |
+// walkReadyInOrder iterates over the tree in priority order, calling f for each node |
|
| 151 |
+// with a non-empty write queue. When f returns true, this function returns true and the |
|
| 152 |
+// walk halts. tmp is used as scratch space for sorting. |
|
| 153 |
+// |
|
| 154 |
+// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true |
|
| 155 |
+// if any ancestor p of n is still open (ignoring the root node). |
|
| 156 |
+func (n *priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*priorityNodeRFC7540, f func(*priorityNodeRFC7540, bool) bool) bool {
|
|
| 157 |
+ if !n.q.empty() && f(n, openParent) {
|
|
| 158 |
+ return true |
|
| 159 |
+ } |
|
| 160 |
+ if n.kids == nil {
|
|
| 161 |
+ return false |
|
| 162 |
+ } |
|
| 163 |
+ |
|
| 164 |
+ // Don't consider the root "open" when updating openParent since |
|
| 165 |
+ // we can't send data frames on the root stream (only control frames). |
|
| 166 |
+ if n.id != 0 {
|
|
| 167 |
+ openParent = openParent || (n.state == priorityNodeOpenRFC7540) |
|
| 168 |
+ } |
|
| 169 |
+ |
|
| 170 |
+ // Common case: only one kid or all kids have the same weight. |
|
| 171 |
+ // Some clients don't use weights; other clients (like web browsers) |
|
| 172 |
+ // use mostly-linear priority trees. |
|
| 173 |
+ w := n.kids.weight |
|
| 174 |
+ needSort := false |
|
| 175 |
+ for k := n.kids.next; k != nil; k = k.next {
|
|
| 176 |
+ if k.weight != w {
|
|
| 177 |
+ needSort = true |
|
| 178 |
+ break |
|
| 179 |
+ } |
|
| 180 |
+ } |
|
| 181 |
+ if !needSort {
|
|
| 182 |
+ for k := n.kids; k != nil; k = k.next {
|
|
| 183 |
+ if k.walkReadyInOrder(openParent, tmp, f) {
|
|
| 184 |
+ return true |
|
| 185 |
+ } |
|
| 186 |
+ } |
|
| 187 |
+ return false |
|
| 188 |
+ } |
|
| 189 |
+ |
|
| 190 |
+ // Uncommon case: sort the child nodes. We remove the kids from the parent, |
|
| 191 |
+ // then re-insert after sorting so we can reuse tmp for future sort calls. |
|
| 192 |
+ *tmp = (*tmp)[:0] |
|
| 193 |
+ for n.kids != nil {
|
|
| 194 |
+ *tmp = append(*tmp, n.kids) |
|
| 195 |
+ n.kids.setParent(nil) |
|
| 196 |
+ } |
|
| 197 |
+ sort.Sort(sortPriorityNodeSiblingsRFC7540(*tmp)) |
|
| 198 |
+ for i := len(*tmp) - 1; i >= 0; i-- {
|
|
| 199 |
+ (*tmp)[i].setParent(n) // setParent inserts at the head of n.kids |
|
| 200 |
+ } |
|
| 201 |
+ for k := n.kids; k != nil; k = k.next {
|
|
| 202 |
+ if k.walkReadyInOrder(openParent, tmp, f) {
|
|
| 203 |
+ return true |
|
| 204 |
+ } |
|
| 205 |
+ } |
|
| 206 |
+ return false |
|
| 207 |
+} |
|
| 208 |
+ |
|
| 209 |
+type sortPriorityNodeSiblingsRFC7540 []*priorityNodeRFC7540 |
|
| 210 |
+ |
|
| 211 |
+func (z sortPriorityNodeSiblingsRFC7540) Len() int { return len(z) }
|
|
| 212 |
+func (z sortPriorityNodeSiblingsRFC7540) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
|
|
| 213 |
+func (z sortPriorityNodeSiblingsRFC7540) Less(i, k int) bool {
|
|
| 214 |
+ // Prefer the subtree that has sent fewer bytes relative to its weight. |
|
| 215 |
+ // See sections 5.3.2 and 5.3.4. |
|
| 216 |
+ wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes) |
|
| 217 |
+ wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes) |
|
| 218 |
+ if bi == 0 && bk == 0 {
|
|
| 219 |
+ return wi >= wk |
|
| 220 |
+ } |
|
| 221 |
+ if bk == 0 {
|
|
| 222 |
+ return false |
|
| 223 |
+ } |
|
| 224 |
+ return bi/bk <= wi/wk |
|
| 225 |
+} |
|
| 226 |
+ |
|
| 227 |
+type priorityWriteSchedulerRFC7540 struct {
|
|
| 228 |
+ // root is the root of the priority tree, where root.id = 0. |
|
| 229 |
+ // The root queues control frames that are not associated with any stream. |
|
| 230 |
+ root priorityNodeRFC7540 |
|
| 231 |
+ |
|
| 232 |
+ // nodes maps stream ids to priority tree nodes. |
|
| 233 |
+ nodes map[uint32]*priorityNodeRFC7540 |
|
| 234 |
+ |
|
| 235 |
+ // maxID is the maximum stream id in nodes. |
|
| 236 |
+ maxID uint32 |
|
| 237 |
+ |
|
| 238 |
+ // lists of nodes that have been closed or are idle, but are kept in |
|
| 239 |
+ // the tree for improved prioritization. When the lengths exceed either |
|
| 240 |
+ // maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded. |
|
| 241 |
+ closedNodes, idleNodes []*priorityNodeRFC7540 |
|
| 242 |
+ |
|
| 243 |
+ // From the config. |
|
| 244 |
+ maxClosedNodesInTree int |
|
| 245 |
+ maxIdleNodesInTree int |
|
| 246 |
+ writeThrottleLimit int32 |
|
| 247 |
+ enableWriteThrottle bool |
|
| 248 |
+ |
|
| 249 |
+ // tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations. |
|
| 250 |
+ tmp []*priorityNodeRFC7540 |
|
| 251 |
+ |
|
| 252 |
+ // pool of empty queues for reuse. |
|
| 253 |
+ queuePool writeQueuePool |
|
| 254 |
+} |
|
| 255 |
+ |
|
| 256 |
+func (ws *priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options OpenStreamOptions) {
|
|
| 257 |
+ // The stream may be currently idle but cannot be opened or closed. |
|
| 258 |
+ if curr := ws.nodes[streamID]; curr != nil {
|
|
| 259 |
+ if curr.state != priorityNodeIdleRFC7540 {
|
|
| 260 |
+ panic(fmt.Sprintf("stream %d already opened", streamID))
|
|
| 261 |
+ } |
|
| 262 |
+ curr.state = priorityNodeOpenRFC7540 |
|
| 263 |
+ return |
|
| 264 |
+ } |
|
| 265 |
+ |
|
| 266 |
+ // RFC 7540, Section 5.3.5: |
|
| 267 |
+ // "All streams are initially assigned a non-exclusive dependency on stream 0x0. |
|
| 268 |
+ // Pushed streams initially depend on their associated stream. In both cases, |
|
| 269 |
+ // streams are assigned a default weight of 16." |
|
| 270 |
+ parent := ws.nodes[options.PusherID] |
|
| 271 |
+ if parent == nil {
|
|
| 272 |
+ parent = &ws.root |
|
| 273 |
+ } |
|
| 274 |
+ n := &priorityNodeRFC7540{
|
|
| 275 |
+ q: *ws.queuePool.get(), |
|
| 276 |
+ id: streamID, |
|
| 277 |
+ weight: priorityDefaultWeightRFC7540, |
|
| 278 |
+ state: priorityNodeOpenRFC7540, |
|
| 279 |
+ } |
|
| 280 |
+ n.setParent(parent) |
|
| 281 |
+ ws.nodes[streamID] = n |
|
| 282 |
+ if streamID > ws.maxID {
|
|
| 283 |
+ ws.maxID = streamID |
|
| 284 |
+ } |
|
| 285 |
+} |
|
| 286 |
+ |
|
| 287 |
+func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
|
|
| 288 |
+ if streamID == 0 {
|
|
| 289 |
+ panic("violation of WriteScheduler interface: cannot close stream 0")
|
|
| 290 |
+ } |
|
| 291 |
+ if ws.nodes[streamID] == nil {
|
|
| 292 |
+ panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
|
|
| 293 |
+ } |
|
| 294 |
+ if ws.nodes[streamID].state != priorityNodeOpenRFC7540 {
|
|
| 295 |
+ panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
|
|
| 296 |
+ } |
|
| 297 |
+ |
|
| 298 |
+ n := ws.nodes[streamID] |
|
| 299 |
+ n.state = priorityNodeClosedRFC7540 |
|
| 300 |
+ n.addBytes(-n.bytes) |
|
| 301 |
+ |
|
| 302 |
+ q := n.q |
|
| 303 |
+ ws.queuePool.put(&q) |
|
| 304 |
+ n.q.s = nil |
|
| 305 |
+ if ws.maxClosedNodesInTree > 0 {
|
|
| 306 |
+ ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n) |
|
| 307 |
+ } else {
|
|
| 308 |
+ ws.removeNode(n) |
|
| 309 |
+ } |
|
| 310 |
+} |
|
| 311 |
+ |
|
| 312 |
+func (ws *priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority PriorityParam) {
|
|
| 313 |
+ if streamID == 0 {
|
|
| 314 |
+ panic("adjustPriority on root")
|
|
| 315 |
+ } |
|
| 316 |
+ |
|
| 317 |
+ // If streamID does not exist, there are two cases: |
|
| 318 |
+ // - A closed stream that has been removed (this will have ID <= maxID) |
|
| 319 |
+ // - An idle stream that is being used for "grouping" (this will have ID > maxID) |
|
| 320 |
+ n := ws.nodes[streamID] |
|
| 321 |
+ if n == nil {
|
|
| 322 |
+ if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
|
|
| 323 |
+ return |
|
| 324 |
+ } |
|
| 325 |
+ ws.maxID = streamID |
|
| 326 |
+ n = &priorityNodeRFC7540{
|
|
| 327 |
+ q: *ws.queuePool.get(), |
|
| 328 |
+ id: streamID, |
|
| 329 |
+ weight: priorityDefaultWeightRFC7540, |
|
| 330 |
+ state: priorityNodeIdleRFC7540, |
|
| 331 |
+ } |
|
| 332 |
+ n.setParent(&ws.root) |
|
| 333 |
+ ws.nodes[streamID] = n |
|
| 334 |
+ ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n) |
|
| 335 |
+ } |
|
| 336 |
+ |
|
| 337 |
+ // Section 5.3.1: A dependency on a stream that is not currently in the tree |
|
| 338 |
+ // results in that stream being given a default priority (Section 5.3.5). |
|
| 339 |
+ parent := ws.nodes[priority.StreamDep] |
|
| 340 |
+ if parent == nil {
|
|
| 341 |
+ n.setParent(&ws.root) |
|
| 342 |
+ n.weight = priorityDefaultWeightRFC7540 |
|
| 343 |
+ return |
|
| 344 |
+ } |
|
| 345 |
+ |
|
| 346 |
+ // Ignore if the client tries to make a node its own parent. |
|
| 347 |
+ if n == parent {
|
|
| 348 |
+ return |
|
| 349 |
+ } |
|
| 350 |
+ |
|
| 351 |
+ // Section 5.3.3: |
|
| 352 |
+ // "If a stream is made dependent on one of its own dependencies, the |
|
| 353 |
+ // formerly dependent stream is first moved to be dependent on the |
|
| 354 |
+ // reprioritized stream's previous parent. The moved dependency retains |
|
| 355 |
+ // its weight." |
|
| 356 |
+ // |
|
| 357 |
+ // That is: if parent depends on n, move parent to depend on n.parent. |
|
| 358 |
+ for x := parent.parent; x != nil; x = x.parent {
|
|
| 359 |
+ if x == n {
|
|
| 360 |
+ parent.setParent(n.parent) |
|
| 361 |
+ break |
|
| 362 |
+ } |
|
| 363 |
+ } |
|
| 364 |
+ |
|
| 365 |
+ // Section 5.3.3: The exclusive flag causes the stream to become the sole |
|
| 366 |
+ // dependency of its parent stream, causing other dependencies to become |
|
| 367 |
+ // dependent on the exclusive stream. |
|
| 368 |
+ if priority.Exclusive {
|
|
| 369 |
+ k := parent.kids |
|
| 370 |
+ for k != nil {
|
|
| 371 |
+ next := k.next |
|
| 372 |
+ if k != n {
|
|
| 373 |
+ k.setParent(n) |
|
| 374 |
+ } |
|
| 375 |
+ k = next |
|
| 376 |
+ } |
|
| 377 |
+ } |
|
| 378 |
+ |
|
| 379 |
+ n.setParent(parent) |
|
| 380 |
+ n.weight = priority.Weight |
|
| 381 |
+} |
|
| 382 |
+ |
|
| 383 |
+func (ws *priorityWriteSchedulerRFC7540) Push(wr FrameWriteRequest) {
|
|
| 384 |
+ var n *priorityNodeRFC7540 |
|
| 385 |
+ if wr.isControl() {
|
|
| 386 |
+ n = &ws.root |
|
| 387 |
+ } else {
|
|
| 388 |
+ id := wr.StreamID() |
|
| 389 |
+ n = ws.nodes[id] |
|
| 390 |
+ if n == nil {
|
|
| 391 |
+ // id is an idle or closed stream. wr should not be a HEADERS or |
|
| 392 |
+ // DATA frame. In other case, we push wr onto the root, rather |
|
| 393 |
+ // than creating a new priorityNode. |
|
| 394 |
+ if wr.DataSize() > 0 {
|
|
| 395 |
+ panic("add DATA on non-open stream")
|
|
| 396 |
+ } |
|
| 397 |
+ n = &ws.root |
|
| 398 |
+ } |
|
| 399 |
+ } |
|
| 400 |
+ n.q.push(wr) |
|
| 401 |
+} |
|
| 402 |
+ |
|
| 403 |
+func (ws *priorityWriteSchedulerRFC7540) Pop() (wr FrameWriteRequest, ok bool) {
|
|
| 404 |
+ ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNodeRFC7540, openParent bool) bool {
|
|
| 405 |
+ limit := int32(math.MaxInt32) |
|
| 406 |
+ if openParent {
|
|
| 407 |
+ limit = ws.writeThrottleLimit |
|
| 408 |
+ } |
|
| 409 |
+ wr, ok = n.q.consume(limit) |
|
| 410 |
+ if !ok {
|
|
| 411 |
+ return false |
|
| 412 |
+ } |
|
| 413 |
+ n.addBytes(int64(wr.DataSize())) |
|
| 414 |
+ // If B depends on A and B continuously has data available but A |
|
| 415 |
+ // does not, gradually increase the throttling limit to allow B to |
|
| 416 |
+ // steal more and more bandwidth from A. |
|
| 417 |
+ if openParent {
|
|
| 418 |
+ ws.writeThrottleLimit += 1024 |
|
| 419 |
+ if ws.writeThrottleLimit < 0 {
|
|
| 420 |
+ ws.writeThrottleLimit = math.MaxInt32 |
|
| 421 |
+ } |
|
| 422 |
+ } else if ws.enableWriteThrottle {
|
|
| 423 |
+ ws.writeThrottleLimit = 1024 |
|
| 424 |
+ } |
|
| 425 |
+ return true |
|
| 426 |
+ }) |
|
| 427 |
+ return wr, ok |
|
| 428 |
+} |
|
| 429 |
+ |
|
| 430 |
+func (ws *priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*priorityNodeRFC7540, maxSize int, n *priorityNodeRFC7540) {
|
|
| 431 |
+ if maxSize == 0 {
|
|
| 432 |
+ return |
|
| 433 |
+ } |
|
| 434 |
+ if len(*list) == maxSize {
|
|
| 435 |
+ // Remove the oldest node, then shift left. |
|
| 436 |
+ ws.removeNode((*list)[0]) |
|
| 437 |
+ x := (*list)[1:] |
|
| 438 |
+ copy(*list, x) |
|
| 439 |
+ *list = (*list)[:len(x)] |
|
| 440 |
+ } |
|
| 441 |
+ *list = append(*list, n) |
|
| 442 |
+} |
|
| 443 |
+ |
|
| 444 |
+func (ws *priorityWriteSchedulerRFC7540) removeNode(n *priorityNodeRFC7540) {
|
|
| 445 |
+ for n.kids != nil {
|
|
| 446 |
+ n.kids.setParent(n.parent) |
|
| 447 |
+ } |
|
| 448 |
+ n.setParent(nil) |
|
| 449 |
+ delete(ws.nodes, n.id) |
|
| 450 |
+} |
| 0 | 451 |
new file mode 100644 |
| ... | ... |
@@ -0,0 +1,209 @@ |
| 0 |
+// Copyright 2025 The Go Authors. All rights reserved. |
|
| 1 |
+// Use of this source code is governed by a BSD-style |
|
| 2 |
+// license that can be found in the LICENSE file. |
|
| 3 |
+ |
|
| 4 |
+package http2 |
|
| 5 |
+ |
|
| 6 |
+import ( |
|
| 7 |
+ "fmt" |
|
| 8 |
+ "math" |
|
| 9 |
+) |
|
| 10 |
+ |
|
| 11 |
+type streamMetadata struct {
|
|
| 12 |
+ location *writeQueue |
|
| 13 |
+ priority PriorityParam |
|
| 14 |
+} |
|
| 15 |
+ |
|
| 16 |
+type priorityWriteSchedulerRFC9218 struct {
|
|
| 17 |
+ // control contains control frames (SETTINGS, PING, etc.). |
|
| 18 |
+ control writeQueue |
|
| 19 |
+ |
|
| 20 |
+ // heads contain the head of a circular list of streams. |
|
| 21 |
+ // We put these heads within a nested array that represents urgency and |
|
| 22 |
+ // incremental, as defined in |
|
| 23 |
+ // https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters. |
|
| 24 |
+ // 8 represents u=0 up to u=7, and 2 represents i=false and i=true. |
|
| 25 |
+ heads [8][2]*writeQueue |
|
| 26 |
+ |
|
| 27 |
+ // streams contains a mapping between each stream ID and their metadata, so |
|
| 28 |
+ // we can quickly locate them when needing to, for example, adjust their |
|
| 29 |
+ // priority. |
|
| 30 |
+ streams map[uint32]streamMetadata |
|
| 31 |
+ |
|
| 32 |
+ // queuePool are empty queues for reuse. |
|
| 33 |
+ queuePool writeQueuePool |
|
| 34 |
+ |
|
| 35 |
+ // prioritizeIncremental is used to determine whether we should prioritize |
|
| 36 |
+ // incremental streams or not, when urgency is the same in a given Pop() |
|
| 37 |
+ // call. |
|
| 38 |
+ prioritizeIncremental bool |
|
| 39 |
+} |
|
| 40 |
+ |
|
| 41 |
+func newPriorityWriteSchedulerRFC9128() WriteScheduler {
|
|
| 42 |
+ ws := &priorityWriteSchedulerRFC9218{
|
|
| 43 |
+ streams: make(map[uint32]streamMetadata), |
|
| 44 |
+ } |
|
| 45 |
+ return ws |
|
| 46 |
+} |
|
| 47 |
+ |
|
| 48 |
+func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
|
|
| 49 |
+ if ws.streams[streamID].location != nil {
|
|
| 50 |
+ panic(fmt.Errorf("stream %d already opened", streamID))
|
|
| 51 |
+ } |
|
| 52 |
+ q := ws.queuePool.get() |
|
| 53 |
+ ws.streams[streamID] = streamMetadata{
|
|
| 54 |
+ location: q, |
|
| 55 |
+ priority: opt.priority, |
|
| 56 |
+ } |
|
| 57 |
+ |
|
| 58 |
+ u, i := opt.priority.urgency, opt.priority.incremental |
|
| 59 |
+ if ws.heads[u][i] == nil {
|
|
| 60 |
+ ws.heads[u][i] = q |
|
| 61 |
+ q.next = q |
|
| 62 |
+ q.prev = q |
|
| 63 |
+ } else {
|
|
| 64 |
+ // Queues are stored in a ring. |
|
| 65 |
+ // Insert the new stream before ws.head, putting it at the end of the list. |
|
| 66 |
+ q.prev = ws.heads[u][i].prev |
|
| 67 |
+ q.next = ws.heads[u][i] |
|
| 68 |
+ q.prev.next = q |
|
| 69 |
+ q.next.prev = q |
|
| 70 |
+ } |
|
| 71 |
+} |
|
| 72 |
+ |
|
| 73 |
+func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
|
|
| 74 |
+ metadata := ws.streams[streamID] |
|
| 75 |
+ q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental |
|
| 76 |
+ if q == nil {
|
|
| 77 |
+ return |
|
| 78 |
+ } |
|
| 79 |
+ if q.next == q {
|
|
| 80 |
+ // This was the only open stream. |
|
| 81 |
+ ws.heads[u][i] = nil |
|
| 82 |
+ } else {
|
|
| 83 |
+ q.prev.next = q.next |
|
| 84 |
+ q.next.prev = q.prev |
|
| 85 |
+ if ws.heads[u][i] == q {
|
|
| 86 |
+ ws.heads[u][i] = q.next |
|
| 87 |
+ } |
|
| 88 |
+ } |
|
| 89 |
+ delete(ws.streams, streamID) |
|
| 90 |
+ ws.queuePool.put(q) |
|
| 91 |
+} |
|
| 92 |
+ |
|
| 93 |
+func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
|
|
| 94 |
+ metadata := ws.streams[streamID] |
|
| 95 |
+ q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental |
|
| 96 |
+ if q == nil {
|
|
| 97 |
+ return |
|
| 98 |
+ } |
|
| 99 |
+ |
|
| 100 |
+ // Remove stream from current location. |
|
| 101 |
+ if q.next == q {
|
|
| 102 |
+ // This was the only open stream. |
|
| 103 |
+ ws.heads[u][i] = nil |
|
| 104 |
+ } else {
|
|
| 105 |
+ q.prev.next = q.next |
|
| 106 |
+ q.next.prev = q.prev |
|
| 107 |
+ if ws.heads[u][i] == q {
|
|
| 108 |
+ ws.heads[u][i] = q.next |
|
| 109 |
+ } |
|
| 110 |
+ } |
|
| 111 |
+ |
|
| 112 |
+ // Insert stream to the new queue. |
|
| 113 |
+ u, i = priority.urgency, priority.incremental |
|
| 114 |
+ if ws.heads[u][i] == nil {
|
|
| 115 |
+ ws.heads[u][i] = q |
|
| 116 |
+ q.next = q |
|
| 117 |
+ q.prev = q |
|
| 118 |
+ } else {
|
|
| 119 |
+ // Queues are stored in a ring. |
|
| 120 |
+ // Insert the new stream before ws.head, putting it at the end of the list. |
|
| 121 |
+ q.prev = ws.heads[u][i].prev |
|
| 122 |
+ q.next = ws.heads[u][i] |
|
| 123 |
+ q.prev.next = q |
|
| 124 |
+ q.next.prev = q |
|
| 125 |
+ } |
|
| 126 |
+ |
|
| 127 |
+ // Update the metadata. |
|
| 128 |
+ ws.streams[streamID] = streamMetadata{
|
|
| 129 |
+ location: q, |
|
| 130 |
+ priority: priority, |
|
| 131 |
+ } |
|
| 132 |
+} |
|
| 133 |
+ |
|
| 134 |
+func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
|
|
| 135 |
+ if wr.isControl() {
|
|
| 136 |
+ ws.control.push(wr) |
|
| 137 |
+ return |
|
| 138 |
+ } |
|
| 139 |
+ q := ws.streams[wr.StreamID()].location |
|
| 140 |
+ if q == nil {
|
|
| 141 |
+ // This is a closed stream. |
|
| 142 |
+ // wr should not be a HEADERS or DATA frame. |
|
| 143 |
+ // We push the request onto the control queue. |
|
| 144 |
+ if wr.DataSize() > 0 {
|
|
| 145 |
+ panic("add DATA on non-open stream")
|
|
| 146 |
+ } |
|
| 147 |
+ ws.control.push(wr) |
|
| 148 |
+ return |
|
| 149 |
+ } |
|
| 150 |
+ q.push(wr) |
|
| 151 |
+} |
|
| 152 |
+ |
|
| 153 |
+func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
|
|
| 154 |
+ // Control and RST_STREAM frames first. |
|
| 155 |
+ if !ws.control.empty() {
|
|
| 156 |
+ return ws.control.shift(), true |
|
| 157 |
+ } |
|
| 158 |
+ |
|
| 159 |
+ // On the next Pop(), we want to prioritize incremental if we prioritized |
|
| 160 |
+ // non-incremental request of the same urgency this time. Vice-versa. |
|
| 161 |
+ // i.e. when there are incremental and non-incremental requests at the same |
|
| 162 |
+ // priority, we give 50% of our bandwidth to the incremental ones in |
|
| 163 |
+ // aggregate and 50% to the first non-incremental one (since |
|
| 164 |
+ // non-incremental streams do not use round-robin writes). |
|
| 165 |
+ ws.prioritizeIncremental = !ws.prioritizeIncremental |
|
| 166 |
+ |
|
| 167 |
+ // Always prioritize lowest u (i.e. highest urgency level). |
|
| 168 |
+ for u := range ws.heads {
|
|
| 169 |
+ for i := range ws.heads[u] {
|
|
| 170 |
+ // When we want to prioritize incremental, we try to pop i=true |
|
| 171 |
+ // first before i=false when u is the same. |
|
| 172 |
+ if ws.prioritizeIncremental {
|
|
| 173 |
+ i = (i + 1) % 2 |
|
| 174 |
+ } |
|
| 175 |
+ q := ws.heads[u][i] |
|
| 176 |
+ if q == nil {
|
|
| 177 |
+ continue |
|
| 178 |
+ } |
|
| 179 |
+ for {
|
|
| 180 |
+ if wr, ok := q.consume(math.MaxInt32); ok {
|
|
| 181 |
+ if i == 1 {
|
|
| 182 |
+ // For incremental streams, we update head to q.next so |
|
| 183 |
+ // we can round-robin between multiple streams that can |
|
| 184 |
+ // immediately benefit from partial writes. |
|
| 185 |
+ ws.heads[u][i] = q.next |
|
| 186 |
+ } else {
|
|
| 187 |
+ // For non-incremental streams, we try to finish one to |
|
| 188 |
+ // completion rather than doing round-robin. However, |
|
| 189 |
+ // we update head here so that if q.consume() is !ok |
|
| 190 |
+ // (e.g. the stream has no more frame to consume), head |
|
| 191 |
+ // is updated to the next q that has frames to consume |
|
| 192 |
+ // on future iterations. This way, we do not prioritize |
|
| 193 |
+ // writing to unavailable stream on next Pop() calls, |
|
| 194 |
+ // preventing head-of-line blocking. |
|
| 195 |
+ ws.heads[u][i] = q |
|
| 196 |
+ } |
|
| 197 |
+ return wr, true |
|
| 198 |
+ } |
|
| 199 |
+ q = q.next |
|
| 200 |
+ if q == ws.heads[u][i] {
|
|
| 201 |
+ break |
|
| 202 |
+ } |
|
| 203 |
+ } |
|
| 204 |
+ |
|
| 205 |
+ } |
|
| 206 |
+ } |
|
| 207 |
+ return FrameWriteRequest{}, false
|
|
| 208 |
+} |
| ... | ... |
@@ -25,7 +25,7 @@ type roundRobinWriteScheduler struct {
|
| 25 | 25 |
} |
| 26 | 26 |
|
| 27 | 27 |
// newRoundRobinWriteScheduler constructs a new write scheduler. |
| 28 |
-// The round robin scheduler priorizes control frames |
|
| 28 |
+// The round robin scheduler prioritizes control frames |
|
| 29 | 29 |
// like SETTINGS and PING over DATA frames. |
| 30 | 30 |
// When there are no control frames to send, it performs a round-robin |
| 31 | 31 |
// selection from the ready streams. |
| ... | ... |
@@ -51,7 +51,7 @@ type EncodeHeadersParam struct {
|
| 51 | 51 |
DefaultUserAgent string |
| 52 | 52 |
} |
| 53 | 53 |
|
| 54 |
-// EncodeHeadersParam is the result of EncodeHeaders. |
|
| 54 |
+// EncodeHeadersResult is the result of EncodeHeaders. |
|
| 55 | 55 |
type EncodeHeadersResult struct {
|
| 56 | 56 |
HasBody bool |
| 57 | 57 |
HasTrailers bool |
| ... | ... |
@@ -399,7 +399,7 @@ type ServerRequestResult struct {
|
| 399 | 399 |
|
| 400 | 400 |
// If the request should be rejected, this is a short string suitable for passing |
| 401 | 401 |
// to the http2 package's CountError function. |
| 402 |
- // It might be a bit odd to return errors this way rather than returing an error, |
|
| 402 |
+ // It might be a bit odd to return errors this way rather than returning an error, |
|
| 403 | 403 |
// but this ensures we don't forget to include a CountError reason. |
| 404 | 404 |
InvalidReason string |
| 405 | 405 |
} |
| ... | ... |
@@ -1476,7 +1476,7 @@ golang.org/x/crypto/ssh/internal/bcrypt_pbkdf |
| 1476 | 1476 |
golang.org/x/mod/internal/lazyregexp |
| 1477 | 1477 |
golang.org/x/mod/module |
| 1478 | 1478 |
golang.org/x/mod/semver |
| 1479 |
-# golang.org/x/net v0.44.0 |
|
| 1479 |
+# golang.org/x/net v0.45.0 |
|
| 1480 | 1480 |
## explicit; go 1.24.0 |
| 1481 | 1481 |
golang.org/x/net/bpf |
| 1482 | 1482 |
golang.org/x/net/context |