@@ -38,19 +38,19 @@ export class NodeHttp2Handler implements HttpHandler {
38
38
private readonly disableConcurrentStreams ?: boolean ;
39
39
40
40
public readonly metadata = { handlerProtocol : "h2" } ;
41
- private sessionList : ClientHttp2Session [ ] ;
42
- private sessionCache : Map < string , ClientHttp2Session > ;
41
+ private sessionCache : Map < string , ClientHttp2Session [ ] > ;
43
42
44
43
constructor ( { requestTimeout, sessionTimeout, disableConcurrentStreams } : NodeHttp2HandlerOptions = { } ) {
45
44
this . requestTimeout = requestTimeout ;
46
45
this . sessionTimeout = sessionTimeout ;
47
46
this . disableConcurrentStreams = disableConcurrentStreams ;
48
- this . sessionList = [ ] ;
49
- this . sessionCache = new Map < string , ClientHttp2Session > ( ) ;
47
+ this . sessionCache = new Map < string , ClientHttp2Session [ ] > ( ) ;
50
48
}
51
49
52
50
destroy ( ) : void {
53
- this . sessionList . forEach ( ( session ) => this . destroySession ( session ) ) ;
51
+ for ( const sessions of this . sessionCache . values ( ) ) {
52
+ sessions . forEach ( ( session ) => this . destroySession ( session ) ) ;
53
+ }
54
54
this . sessionCache . clear ( ) ;
55
55
}
56
56
@@ -71,7 +71,7 @@ export class NodeHttp2Handler implements HttpHandler {
71
71
72
72
const { hostname, method, port, protocol, path, query } = request ;
73
73
const authority = `${ protocol } //${ hostname } ${ port ? `:${ port } ` : "" } ` ;
74
- const session = this . disableConcurrentStreams ? this . getSession ( authority ) : this . getSessionFromCache ( authority ) ;
74
+ const session = this . getSession ( authority , this . disableConcurrentStreams || false ) ;
75
75
76
76
const reject = ( err : Error ) => {
77
77
if ( this . disableConcurrentStreams ) {
@@ -101,6 +101,7 @@ export class NodeHttp2Handler implements HttpHandler {
101
101
// Gracefully closes the Http2Session, allowing any existing streams to complete
102
102
// on their own and preventing new Http2Stream instances from being created.
103
103
session . close ( ) ;
104
+ this . deleteSessionFromCache ( authority , session ) ;
104
105
}
105
106
} ) ;
106
107
@@ -133,66 +134,49 @@ export class NodeHttp2Handler implements HttpHandler {
133
134
// http2stream.rstCode property. If the code is any value other than NGHTTP2_NO_ERROR (0),
134
135
// an 'error' event will have also been emitted.
135
136
req . on ( "close" , ( ) => {
137
+ if ( this . disableConcurrentStreams ) {
138
+ session . destroy ( ) ;
139
+ }
136
140
if ( ! fulfilled ) {
137
141
reject ( new Error ( "Unexpected error: http2 request did not get a response" ) ) ;
138
142
}
139
143
} ) ;
144
+
140
145
writeRequestBody ( req , request ) ;
141
146
} ) ;
142
147
}
143
148
144
149
/**
145
- * Returns a new session for the given URL.
150
+ * Returns a session for the given URL.
151
+ *
146
152
* @param authority The URL to create a session for.
147
- * @returns A new session for the given URL.
153
+ * @param disableConcurrentStreams If true, a new session will be created for each request.
154
+ * @returns A session for the given URL.
148
155
*/
149
- private getSession ( authority : string ) : ClientHttp2Session {
156
+ private getSession ( authority : string , disableConcurrentStreams : boolean ) : ClientHttp2Session {
157
+ const sessionCache = this . sessionCache ;
158
+ const existingSessions = sessionCache . get ( authority ) || [ ] ;
159
+
160
+ // If concurrent streams are not disabled, we can use the existing session.
161
+ if ( existingSessions . length > 0 && ! disableConcurrentStreams ) return existingSessions [ 0 ] ;
162
+
150
163
const newSession = connect ( authority ) ;
151
164
const destroySessionCb = ( ) => {
152
165
this . destroySession ( newSession ) ;
166
+ this . deleteSessionFromCache ( authority , newSession ) ;
153
167
} ;
154
168
newSession . on ( "goaway" , destroySessionCb ) ;
155
169
newSession . on ( "error" , destroySessionCb ) ;
156
170
newSession . on ( "frameError" , destroySessionCb ) ;
157
171
158
172
const sessionTimeout = this . sessionTimeout ;
159
173
if ( sessionTimeout ) {
160
- newSession . setTimeout ( sessionTimeout , ( ) => {
161
- this . destroySession ( newSession ) ;
162
- } ) ;
174
+ newSession . setTimeout ( sessionTimeout , destroySessionCb ) ;
163
175
}
164
176
165
- this . sessionList . push ( newSession ) ;
166
- return newSession ;
167
- }
168
-
169
- /**
170
- * Returns a session for the given URL. If the session is not cached, it will be created.
171
- * @param authority The URL to create a session for.
172
- * @returns A session for the given URL.
173
- */
174
- private getSessionFromCache ( authority : string ) : ClientHttp2Session {
175
- const connectionPool = this . sessionCache ;
176
- const existingSession = connectionPool . get ( authority ) ;
177
- if ( existingSession ) return existingSession ;
178
-
179
- const newSession = this . getSession ( authority ) ;
180
- connectionPool . set ( authority , newSession ) ;
181
- const destroySessionCb = ( ) => {
182
- this . deleteSessionFromCache ( authority , newSession ) ;
183
- } ;
184
- newSession . on ( "goaway" , destroySessionCb ) ;
185
- newSession . on ( "error" , destroySessionCb ) ;
186
- newSession . on ( "frameError" , destroySessionCb ) ;
177
+ existingSessions . push ( newSession ) ;
178
+ sessionCache . set ( authority , existingSessions ) ;
187
179
188
- const sessionTimeout = this . sessionTimeout ;
189
- if ( sessionTimeout ) {
190
- newSession . setTimeout ( sessionTimeout , ( ) => {
191
- if ( connectionPool . get ( authority ) === newSession ) {
192
- connectionPool . delete ( authority ) ;
193
- }
194
- } ) ;
195
- }
196
180
return newSession ;
197
181
}
198
182
@@ -204,7 +188,6 @@ export class NodeHttp2Handler implements HttpHandler {
204
188
if ( ! session . destroyed ) {
205
189
session . destroy ( ) ;
206
190
}
207
- this . sessionList = this . sessionList . filter ( ( s ) => s !== session ) ;
208
191
}
209
192
210
193
/**
@@ -213,10 +196,14 @@ export class NodeHttp2Handler implements HttpHandler {
213
196
* @param session The session to delete.
214
197
*/
215
198
private deleteSessionFromCache ( authority : string , session : ClientHttp2Session ) : void {
216
- if ( this . sessionCache . get ( authority ) !== session ) {
217
- // If the session is not in the pool, it has already been deleted.
199
+ const existingSessions = this . sessionCache . get ( authority ) || [ ] ;
200
+ if ( ! existingSessions . includes ( session ) ) {
201
+ // If the session is not in the cache, it has already been deleted.
218
202
return ;
219
203
}
220
- this . sessionCache . delete ( authority ) ;
204
+ this . sessionCache . set (
205
+ authority ,
206
+ existingSessions . filter ( ( s ) => s !== session )
207
+ ) ;
221
208
}
222
209
}
0 commit comments