Skip to content

Commit 8cbc71c

Browse files
committed
Call didSendRequestPart at the right time
# Motivation Right now, we call `didSendRequestPart` after passing the write to the executor. However, this does not mean that the write hit the socket. To implement proper backpressure using the delegate, we should only call this method once the write was successful. # Modification Pass a promise to the actual channel write and only call the delegate once that promise succeeds. # Result The delegate method `didSendRequestPart` is only called after the write was successful.
1 parent 89b0da2 commit 8cbc71c

11 files changed

+332
-193
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ final class Transaction: @unchecked Sendable {
6363

6464
switch writeAction {
6565
case .writeAndWait(let executor), .writeAndContinue(let executor):
66-
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self)
66+
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self, promise: nil)
6767

6868
case .fail:
6969
// an error/cancellation has happened. we don't need to continue here
@@ -105,14 +105,14 @@ final class Transaction: @unchecked Sendable {
105105
switch self.state.writeNextRequestPart() {
106106
case .writeAndContinue(let executor):
107107
self.stateLock.unlock()
108-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
108+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
109109

110110
case .writeAndWait(let executor):
111111
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
112112
self.state.waitForRequestBodyDemand(continuation: continuation)
113113
self.stateLock.unlock()
114114

115-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
115+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
116116
}
117117

118118
case .fail:
@@ -132,7 +132,7 @@ final class Transaction: @unchecked Sendable {
132132
break
133133

134134
case .forwardStreamFinished(let executor, let succeedContinuation):
135-
executor.finishRequestBodyStream(self)
135+
executor.finishRequestBodyStream(self, promise: nil)
136136
succeedContinuation?.resume(returning: nil)
137137
}
138138
return

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
185185
case .sendRequestHead(let head, startBody: let startBody):
186186
self.sendRequestHead(head, startBody: startBody, context: context)
187187

188-
case .sendBodyPart(let part):
189-
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
188+
case .sendBodyPart(let part, let writePromise):
189+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)
190190

191-
case .sendRequestEnd:
192-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
191+
case .sendRequestEnd(let writePromise):
192+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
193193

194194
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
195195
self.runTimeoutAction(timeoutAction, context: context)
@@ -258,36 +258,58 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
258258
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
259259

260260
switch finalAction {
261-
case .close:
261+
case .close(let writePromise):
262262
context.close(promise: nil)
263-
case .sendRequestEnd:
264-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
265-
case .informConnectionIsIdle:
263+
writePromise?.succeed(())
264+
oldRequest.succeedRequest(buffer)
265+
case .sendRequestEnd(let writePromise):
266+
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
267+
// We need to defer succeeding the old request to avoid ordering issues
268+
writePromise.futureResult.whenComplete { _ in
269+
oldRequest.succeedRequest(buffer)
270+
}
271+
272+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
273+
case .informConnectionIsIdle(let writePromise):
266274
self.connection.taskCompleted()
275+
writePromise?.succeed(())
276+
oldRequest.succeedRequest(buffer)
267277
case .none:
268-
break
278+
oldRequest.succeedRequest(buffer)
269279
}
270280

271-
oldRequest.succeedRequest(buffer)
272-
273281
case .failRequest(let error, let finalAction):
274282
// see comment in the `succeedRequest` case.
275283
let oldRequest = self.request!
276284
self.request = nil
277285
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
278286

279287
switch finalAction {
280-
case .close:
281-
context.close(promise: nil)
282-
case .sendRequestEnd:
283-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
284-
case .informConnectionIsIdle:
288+
case .close(let writePromise):
289+
context.close(promise: writePromise)
290+
writePromise?.fail(error)
291+
oldRequest.fail(error)
292+
293+
case .sendRequestEnd(let writePromise):
294+
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
295+
// We need to defer failing the old request to avoid ordering issues
296+
writePromise.futureResult.whenComplete { _ in
297+
oldRequest.fail(error)
298+
}
299+
300+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
301+
302+
case .informConnectionIsIdle(let writePromise):
285303
self.connection.taskCompleted()
304+
writePromise?.fail(error)
305+
oldRequest.fail(error)
306+
286307
case .none:
287-
break
308+
oldRequest.fail(error)
288309
}
289310

290-
oldRequest.fail(error)
311+
case .failSendBodyPart(let error, let writePromise), .failSendStreamFinished(let error, let writePromise):
312+
writePromise?.fail(error)
291313
}
292314
}
293315

@@ -355,27 +377,28 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
355377

356378
// MARK: Private HTTPRequestExecutor
357379

358-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
380+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
359381
guard self.request === request, let context = self.channelContext else {
360382
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
361383
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
362384
// the request has been popped by the state machine or the ChannelHandler has been
363385
// removed from the Channel pipeline. This is a normal threading issue, noone has
364386
// screwed up.
387+
promise?.fail(HTTPClientError.requestStreamCancelled)
365388
return
366389
}
367390

368-
let action = self.state.requestStreamPartReceived(data)
391+
let action = self.state.requestStreamPartReceived(data, promise: promise)
369392
self.run(action, context: context)
370393
}
371394

372-
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest) {
395+
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
373396
guard self.request === request, let context = self.channelContext else {
374397
// See code comment in `writeRequestBodyPart0`
375398
return
376399
}
377400

378-
let action = self.state.requestStreamFinished()
401+
let action = self.state.requestStreamFinished(promise: promise)
379402
self.run(action, context: context)
380403
}
381404

@@ -405,22 +428,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
405428
}
406429

407430
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
408-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
431+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
409432
if self.eventLoop.inEventLoop {
410-
self.writeRequestBodyPart0(data, request: request)
433+
self.writeRequestBodyPart0(data, request: request, promise: promise)
411434
} else {
412435
self.eventLoop.execute {
413-
self.writeRequestBodyPart0(data, request: request)
436+
self.writeRequestBodyPart0(data, request: request, promise: promise)
414437
}
415438
}
416439
}
417440

418-
func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
441+
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
419442
if self.eventLoop.inEventLoop {
420-
self.finishRequestBodyStream0(request)
443+
self.finishRequestBodyStream0(request, promise: promise)
421444
} else {
422445
self.eventLoop.execute {
423-
self.finishRequestBodyStream0(request)
446+
self.finishRequestBodyStream0(request, promise: promise)
424447
}
425448
}
426449
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,29 @@ struct HTTP1ConnectionStateMachine {
3030
/// A action to execute, when we consider a request "done".
3131
enum FinalStreamAction {
3232
/// Close the connection
33-
case close
33+
///
34+
/// The promise is an optional write promise.
35+
case close(EventLoopPromise<Void>?)
3436
/// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded,
3537
/// as soon as we wrote the request end onto the wire.
36-
case sendRequestEnd
38+
///
39+
/// The promise is an optional write promise.
40+
case sendRequestEnd(EventLoopPromise<Void>?)
3741
/// Inform an observer that the connection has become idle
38-
case informConnectionIsIdle
42+
///
43+
/// The promise is an optional write promise.
44+
case informConnectionIsIdle(EventLoopPromise<Void>?)
3945
/// Do nothing.
40-
case none
46+
///
47+
/// The promise is an optional write promise.
48+
case none(EventLoopPromise<Void>?)
4149
}
4250

4351
case sendRequestHead(HTTPRequestHead, startBody: Bool)
44-
case sendBodyPart(IOData)
45-
case sendRequestEnd
52+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
53+
case sendRequestEnd(EventLoopPromise<Void>?)
54+
case failSendBodyPart(Error, EventLoopPromise<Void>?)
55+
case failSendStreamFinished(Error, EventLoopPromise<Void>?)
4656

4757
case pauseRequestBodyStream
4858
case resumeRequestBodyStream
@@ -173,7 +183,7 @@ struct HTTP1ConnectionStateMachine {
173183
// as closed.
174184
//
175185
// TODO: AHC should support a fast rescheduling mechanism here.
176-
return .failRequest(HTTPClientError.remoteConnectionClosed, .none)
186+
return .failRequest(HTTPClientError.remoteConnectionClosed, .none(nil))
177187

178188
case .idle:
179189
var requestStateMachine = HTTPRequestStateMachine(isChannelWritable: self.isChannelWritable)
@@ -189,25 +199,25 @@ struct HTTP1ConnectionStateMachine {
189199
}
190200
}
191201

192-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
202+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
193203
guard case .inRequest(var requestStateMachine, let close) = self.state else {
194204
preconditionFailure("Invalid state: \(self.state)")
195205
}
196206

197207
return self.avoidingStateMachineCoW { state -> Action in
198-
let action = requestStateMachine.requestStreamPartReceived(part)
208+
let action = requestStateMachine.requestStreamPartReceived(part, promise: promise)
199209
state = .inRequest(requestStateMachine, close: close)
200210
return state.modify(with: action)
201211
}
202212
}
203213

204-
mutating func requestStreamFinished() -> Action {
214+
mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
205215
guard case .inRequest(var requestStateMachine, let close) = self.state else {
206216
preconditionFailure("Invalid state: \(self.state)")
207217
}
208218

209219
return self.avoidingStateMachineCoW { state -> Action in
210-
let action = requestStateMachine.requestStreamFinished()
220+
let action = requestStateMachine.requestStreamFinished(promise: promise)
211221
state = .inRequest(requestStateMachine, close: close)
212222
return state.modify(with: action)
213223
}
@@ -377,10 +387,10 @@ extension HTTP1ConnectionStateMachine.State {
377387
return .pauseRequestBodyStream
378388
case .resumeRequestBodyStream:
379389
return .resumeRequestBodyStream
380-
case .sendBodyPart(let part):
381-
return .sendBodyPart(part)
382-
case .sendRequestEnd:
383-
return .sendRequestEnd
390+
case .sendBodyPart(let part, let writePromise):
391+
return .sendBodyPart(part, writePromise)
392+
case .sendRequestEnd(let writePromise):
393+
return .sendRequestEnd(writePromise)
384394
case .forwardResponseHead(let head, let pauseRequestBodyStream):
385395
return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream)
386396
case .forwardResponseBodyParts(let parts):
@@ -392,39 +402,63 @@ extension HTTP1ConnectionStateMachine.State {
392402

393403
let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalStreamAction
394404
switch finalAction {
395-
case .close:
405+
case .close(let writePromise):
396406
self = .closing
397-
newFinalAction = .close
398-
case .sendRequestEnd:
399-
newFinalAction = .sendRequestEnd
407+
newFinalAction = .close(writePromise)
408+
case .sendRequestEnd(let writePromise):
409+
newFinalAction = .sendRequestEnd(writePromise)
400410
case .none:
401411
self = .idle
402-
newFinalAction = close ? .close : .informConnectionIsIdle
412+
newFinalAction = close ? .close(nil) : .informConnectionIsIdle(nil)
403413
}
404414
return .succeedRequest(newFinalAction, finalParts)
405415

406416
case .failRequest(let error, let finalAction):
407-
switch self {
408-
case .initialized:
417+
switch (self, finalAction) {
418+
case (.initialized, _):
409419
preconditionFailure("Invalid state: \(self)")
410-
case .idle:
420+
421+
case (.idle, _):
411422
preconditionFailure("How can we fail a task, if we are idle")
412-
case .inRequest(_, close: let close):
413-
if close || finalAction == .close {
414-
self = .closing
415-
return .failRequest(error, .close)
416-
} else {
417-
self = .idle
418-
return .failRequest(error, .informConnectionIsIdle)
419-
}
420423

421-
case .closing:
422-
return .failRequest(error, .none)
423-
case .closed:
424+
// If we are either in .inRequest(_, close: true) or the final action is .close
425+
// we have to fail the request with .close()
426+
case (.inRequest(_, let close), .sendRequestEnd(let writePromise)) where close:
427+
self = .closing
428+
return .failRequest(error, .close(writePromise))
429+
430+
case (.inRequest(_, let close), .none) where close:
431+
self = .closing
432+
return .failRequest(error, .close(nil))
433+
434+
case (.inRequest(_, _), .close(let writePromise)):
435+
self = .closing
436+
return .failRequest(error, .close(writePromise))
437+
438+
// otherwise we fail with .informConnectionIsIdle
439+
case (.inRequest(_, _), .sendRequestEnd(let writePromise)):
440+
self = .idle
441+
return .failRequest(error, .informConnectionIsIdle(writePromise))
442+
443+
case (.inRequest(_, _), .none):
444+
self = .idle
445+
return .failRequest(error, .informConnectionIsIdle(nil))
446+
447+
case (.closing, .close(let writePromise)), (.closing, .sendRequestEnd(let writePromise)):
448+
return .failRequest(error, .none(writePromise))
449+
450+
case (.closing, .none):
451+
return .failRequest(error, .none(nil))
452+
453+
case (.closed, .close(let writePromise)), (.closed, .sendRequestEnd(let writePromise)):
424454
// this state can be reached, if the connection was unexpectedly closed by remote
425-
return .failRequest(error, .none)
455+
return .failRequest(error, .none(writePromise))
426456

427-
case .modifying:
457+
case (.closed, .none):
458+
// this state can be reached, if the connection was unexpectedly closed by remote
459+
return .failRequest(error, .none(nil))
460+
461+
case (.modifying, _):
428462
preconditionFailure("Invalid state: \(self)")
429463
}
430464

@@ -433,6 +467,12 @@ extension HTTP1ConnectionStateMachine.State {
433467

434468
case .wait:
435469
return .wait
470+
471+
case .failSendBodyPart(let error, let writePromise):
472+
return .failSendBodyPart(error, writePromise)
473+
474+
case .failSendStreamFinished(let error, let writePromise):
475+
return .failSendStreamFinished(error, writePromise)
436476
}
437477
}
438478
}

0 commit comments

Comments
 (0)