Skip to content

Refactor stream #8376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 57 additions & 33 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ export abstract class PersistentStream<
this.backoff = new ExponentialBackoff(queue, connectionTimerId);
}

/**
* Count of response messages received.
*/
protected responseCount: number = 0;

/**
* Returns true if start() has been called and no error has occurred. True
* indicates the stream is open or in the process of opening (which
Expand Down Expand Up @@ -246,6 +251,7 @@ export abstract class PersistentStream<
* When start returns, isStarted() will return true.
*/
start(): void {
this.responseCount = 0;
if (this.state === PersistentStreamState.Error) {
this.performBackoff();
return;
Expand Down Expand Up @@ -429,11 +435,18 @@ export abstract class PersistentStream<
): Stream<SendType, ReceiveType>;

/**
* Called after the stream has received a message. The function will be
* called on the right queue and must return a Promise.
* Called when the stream receives first message.
* The function will be called on the right queue and must return a Promise.
* @param message - The message received from the stream.
*/
protected abstract onFirst(message: ReceiveType): Promise<void>;

/**
* Called on subsequent messages after the stream has received first message.
* The function will be called on the right queue and must return a Promise.
* @param message - The message received from the stream.
*/
protected abstract onMessage(message: ReceiveType): Promise<void>;
protected abstract onNext(message: ReceiveType): Promise<void>;

private auth(): void {
debugAssert(
Expand Down Expand Up @@ -522,7 +535,11 @@ export abstract class PersistentStream<
});
this.stream.onMessage((msg: ReceiveType) => {
dispatchIfNotClosed(() => {
return this.onMessage(msg);
if (++this.responseCount === 1) {
return this.onFirst(msg);
} else {
return this.onNext(msg);
}
});
});
}
Expand Down Expand Up @@ -643,7 +660,11 @@ export class PersistentListenStream extends PersistentStream<
);
}

protected onMessage(watchChangeProto: ProtoListenResponse): Promise<void> {
protected onFirst(watchChangeProto: ProtoListenResponse): Promise<void> {
return this.onNext(watchChangeProto);
}

protected onNext(watchChangeProto: ProtoListenResponse): Promise<void> {
// A successful response means the stream is healthy
this.backoff.reset();

Expand Down Expand Up @@ -723,8 +744,6 @@ export class PersistentWriteStream extends PersistentStream<
ProtoWriteResponse,
WriteStreamListener
> {
private handshakeComplete_ = false;

constructor(
queue: AsyncQueue,
connection: Connection,
Expand Down Expand Up @@ -760,18 +779,17 @@ export class PersistentWriteStream extends PersistentStream<
* the stream is ready to accept mutations.
*/
get handshakeComplete(): boolean {
return this.handshakeComplete_;
return this.responseCount > 0;
}

// Override of PersistentStream.start
start(): void {
this.handshakeComplete_ = false;
this.lastStreamToken = undefined;
super.start();
}

protected tearDown(): void {
if (this.handshakeComplete_) {
if (this.handshakeComplete) {
this.writeMutations([]);
}
}
Expand All @@ -787,35 +805,41 @@ export class PersistentWriteStream extends PersistentStream<
);
}

protected onMessage(responseProto: ProtoWriteResponse): Promise<void> {
protected onFirst(responseProto: ProtoWriteResponse): Promise<void> {
// Always capture the last stream token.
hardAssert(
!!responseProto.streamToken,
'Got a write handshake response without a stream token'
);
this.lastStreamToken = responseProto.streamToken;

// The first response is always the handshake response
hardAssert(
!responseProto.writeResults || responseProto.writeResults.length === 0,
'Got mutation results for handshake'
);
return this.listener!.onHandshakeComplete();
}

protected onNext(responseProto: ProtoWriteResponse): Promise<void> {
// Always capture the last stream token.
hardAssert(
!!responseProto.streamToken,
'Got a write response without a stream token'
);
this.lastStreamToken = responseProto.streamToken;

if (!this.handshakeComplete_) {
// The first response is always the handshake response
hardAssert(
!responseProto.writeResults || responseProto.writeResults.length === 0,
'Got mutation results for handshake'
);
this.handshakeComplete_ = true;
return this.listener!.onHandshakeComplete();
} else {
// A successful first write response means the stream is healthy,
// Note, that we could consider a successful handshake healthy, however,
// the write itself might be causing an error we want to back off from.
this.backoff.reset();
// A successful first write response means the stream is healthy,
// Note, that we could consider a successful handshake healthy, however,
// the write itself might be causing an error we want to back off from.
this.backoff.reset();

const results = fromWriteResults(
responseProto.writeResults,
responseProto.commitTime
);
const commitVersion = fromVersion(responseProto.commitTime!);
return this.listener!.onMutationResult(commitVersion, results);
}
const results = fromWriteResults(
responseProto.writeResults,
responseProto.commitTime
);
const commitVersion = fromVersion(responseProto.commitTime!);
return this.listener!.onMutationResult(commitVersion, results);
}

/**
Expand All @@ -825,7 +849,7 @@ export class PersistentWriteStream extends PersistentStream<
*/
writeHandshake(): void {
debugAssert(this.isOpen(), 'Writing handshake requires an opened stream');
debugAssert(!this.handshakeComplete_, 'Handshake already completed');
debugAssert(!this.handshakeComplete, 'Handshake already completed');
debugAssert(
!this.lastStreamToken,
'Stream token should be empty during handshake'
Expand All @@ -841,7 +865,7 @@ export class PersistentWriteStream extends PersistentStream<
writeMutations(mutations: Mutation[]): void {
debugAssert(this.isOpen(), 'Writing mutations requires an opened stream');
debugAssert(
this.handshakeComplete_,
this.handshakeComplete,
'Handshake must be complete before writing mutations'
);
debugAssert(
Expand Down
Loading