@@ -187,13 +187,8 @@ void GrpcStream::Shutdown() {
187
187
// called, otherwise the real failure cause will be overwritten by status
188
188
// "canceled".)
189
189
context_->TryCancel ();
190
- // All completions issued by this call must be taken off the queue before
191
- // finish operation can be enqueued.
192
- FastFinishCompletionsBlocking ();
193
-
194
- GrpcCompletion* completion = NewCompletion (Type::Finish, {});
195
- call_->Finish (completion->status (), completion);
196
-
190
+ FinishCall ({});
191
+ // Wait until "finish" is off the queue.
197
192
FastFinishCompletionsBlocking ();
198
193
}
199
194
@@ -204,6 +199,14 @@ void GrpcStream::MaybeUnregister() {
204
199
}
205
200
}
206
201
202
+ void GrpcStream::FinishCall (const OnSuccess& callback) {
203
+ // All completions issued by this call must be taken off the queue before
204
+ // finish operation can be enqueued.
205
+ FastFinishCompletionsBlocking ();
206
+ GrpcCompletion* completion = NewCompletion (Type::Finish, callback);
207
+ call_->Finish (completion->status (), completion);
208
+ }
209
+
207
210
void GrpcStream::FastFinishCompletionsBlocking () {
208
211
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it
209
212
// should never be called again after a call to Finish.
@@ -274,20 +277,10 @@ void GrpcStream::OnWrite() {
274
277
}
275
278
276
279
void GrpcStream::OnOperationFailed () {
277
- if (!completions_.empty ()) {
278
- // It is only valid to finish a call once all other completions issued from
279
- // this call have been taken off the queue, so wait until the queue is
280
- // drained. Once a single operation has failed, the rest are guaranteed to
281
- // fail, too.
282
- return ;
283
- }
284
-
285
- GrpcCompletion* completion =
286
- NewCompletion (Type::Finish, [this ](const GrpcCompletion* completion) {
280
+ FinishCall ([this ](const GrpcCompletion* completion) {
287
281
Status status = ConvertStatus (*completion->status ());
288
282
FinishAndNotify (status);
289
283
});
290
- call_->Finish (completion->status (), completion);
291
284
}
292
285
293
286
void GrpcStream::RemoveCompletion (const GrpcCompletion* to_remove) {
@@ -304,7 +297,9 @@ GrpcCompletion* GrpcStream::NewCompletion(Type tag,
304
297
RemoveCompletion (completion);
305
298
306
299
if (ok) {
307
- on_success (completion);
300
+ if (on_success) {
301
+ on_success (completion);
302
+ }
308
303
} else {
309
304
// Use the same error-handling for all operations; all errors are
310
305
// unrecoverable.
0 commit comments