Skip to content

Commit e0520e6

Browse files
committed
Refactor stream
1 parent fd8bd4b commit e0520e6

File tree

1 file changed

+53
-33
lines changed

1 file changed

+53
-33
lines changed

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ export abstract class PersistentStream<
212212
this.backoff = new ExponentialBackoff(queue, connectionTimerId);
213213
}
214214

215+
/**
216+
* Count of response messages received.
217+
*/
218+
protected responseCount: number = 0;
219+
215220
/**
216221
* Returns true if start() has been called and no error has occurred. True
217222
* indicates the stream is open or in the process of opening (which
@@ -246,6 +251,7 @@ export abstract class PersistentStream<
246251
* When start returns, isStarted() will return true.
247252
*/
248253
start(): void {
254+
this.responseCount = 0;
249255
if (this.state === PersistentStreamState.Error) {
250256
this.performBackoff();
251257
return;
@@ -429,11 +435,18 @@ export abstract class PersistentStream<
429435
): Stream<SendType, ReceiveType>;
430436

431437
/**
432-
* Called after the stream has received a message. The function will be
433-
* called on the right queue and must return a Promise.
438+
* Called when the stream receives first message.
439+
* The function will be called on the right queue and must return a Promise.
440+
* @param message - The message received from the stream.
441+
*/
442+
protected abstract onFirst(message: ReceiveType): Promise<void>;
443+
444+
/**
445+
* Called on subsequent messages after the stream has received first message.
446+
* The function will be called on the right queue and must return a Promise.
434447
* @param message - The message received from the stream.
435448
*/
436-
protected abstract onMessage(message: ReceiveType): Promise<void>;
449+
protected abstract onNext(message: ReceiveType): Promise<void>;
437450

438451
private auth(): void {
439452
debugAssert(
@@ -522,7 +535,7 @@ export abstract class PersistentStream<
522535
});
523536
this.stream.onMessage((msg: ReceiveType) => {
524537
dispatchIfNotClosed(() => {
525-
return this.onMessage(msg);
538+
return ++this.responseCount === 1 ? this.onFirst(msg) : this.onNext(msg);
526539
});
527540
});
528541
}
@@ -643,7 +656,11 @@ export class PersistentListenStream extends PersistentStream<
643656
);
644657
}
645658

646-
protected onMessage(watchChangeProto: ProtoListenResponse): Promise<void> {
659+
protected onFirst(watchChangeProto: ProtoListenResponse): Promise<void> {
660+
return this.onNext(watchChangeProto);
661+
}
662+
663+
protected onNext(watchChangeProto: ProtoListenResponse): Promise<void> {
647664
// A successful response means the stream is healthy
648665
this.backoff.reset();
649666

@@ -723,8 +740,6 @@ export class PersistentWriteStream extends PersistentStream<
723740
ProtoWriteResponse,
724741
WriteStreamListener
725742
> {
726-
private handshakeComplete_ = false;
727-
728743
constructor(
729744
queue: AsyncQueue,
730745
connection: Connection,
@@ -760,18 +775,17 @@ export class PersistentWriteStream extends PersistentStream<
760775
* the stream is ready to accept mutations.
761776
*/
762777
get handshakeComplete(): boolean {
763-
return this.handshakeComplete_;
778+
return this.responseCount > 0;
764779
}
765780

766781
// Override of PersistentStream.start
767782
start(): void {
768-
this.handshakeComplete_ = false;
769783
this.lastStreamToken = undefined;
770784
super.start();
771785
}
772786

773787
protected tearDown(): void {
774-
if (this.handshakeComplete_) {
788+
if (this.handshakeComplete) {
775789
this.writeMutations([]);
776790
}
777791
}
@@ -787,35 +801,41 @@ export class PersistentWriteStream extends PersistentStream<
787801
);
788802
}
789803

790-
protected onMessage(responseProto: ProtoWriteResponse): Promise<void> {
804+
protected onFirst(responseProto: ProtoWriteResponse): Promise<void> {
805+
// Always capture the last stream token.
806+
hardAssert(
807+
!!responseProto.streamToken,
808+
'Got a write handshake response without a stream token'
809+
);
810+
this.lastStreamToken = responseProto.streamToken;
811+
812+
// The first response is always the handshake response
813+
hardAssert(
814+
!responseProto.writeResults || responseProto.writeResults.length === 0,
815+
'Got mutation results for handshake'
816+
);
817+
return this.listener!.onHandshakeComplete();
818+
}
819+
820+
protected onNext(responseProto: ProtoWriteResponse): Promise<void> {
791821
// Always capture the last stream token.
792822
hardAssert(
793823
!!responseProto.streamToken,
794824
'Got a write response without a stream token'
795825
);
796826
this.lastStreamToken = responseProto.streamToken;
797827

798-
if (!this.handshakeComplete_) {
799-
// The first response is always the handshake response
800-
hardAssert(
801-
!responseProto.writeResults || responseProto.writeResults.length === 0,
802-
'Got mutation results for handshake'
803-
);
804-
this.handshakeComplete_ = true;
805-
return this.listener!.onHandshakeComplete();
806-
} else {
807-
// A successful first write response means the stream is healthy,
808-
// Note, that we could consider a successful handshake healthy, however,
809-
// the write itself might be causing an error we want to back off from.
810-
this.backoff.reset();
828+
// A successful first write response means the stream is healthy,
829+
// Note, that we could consider a successful handshake healthy, however,
830+
// the write itself might be causing an error we want to back off from.
831+
this.backoff.reset();
811832

812-
const results = fromWriteResults(
813-
responseProto.writeResults,
814-
responseProto.commitTime
815-
);
816-
const commitVersion = fromVersion(responseProto.commitTime!);
817-
return this.listener!.onMutationResult(commitVersion, results);
818-
}
833+
const results = fromWriteResults(
834+
responseProto.writeResults,
835+
responseProto.commitTime
836+
);
837+
const commitVersion = fromVersion(responseProto.commitTime!);
838+
return this.listener!.onMutationResult(commitVersion, results);
819839
}
820840

821841
/**
@@ -825,7 +845,7 @@ export class PersistentWriteStream extends PersistentStream<
825845
*/
826846
writeHandshake(): void {
827847
debugAssert(this.isOpen(), 'Writing handshake requires an opened stream');
828-
debugAssert(!this.handshakeComplete_, 'Handshake already completed');
848+
debugAssert(!this.handshakeComplete, 'Handshake already completed');
829849
debugAssert(
830850
!this.lastStreamToken,
831851
'Stream token should be empty during handshake'
@@ -841,7 +861,7 @@ export class PersistentWriteStream extends PersistentStream<
841861
writeMutations(mutations: Mutation[]): void {
842862
debugAssert(this.isOpen(), 'Writing mutations requires an opened stream');
843863
debugAssert(
844-
this.handshakeComplete_,
864+
this.handshakeComplete,
845865
'Handshake must be complete before writing mutations'
846866
);
847867
debugAssert(

0 commit comments

Comments
 (0)