-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Closing the write and watch stream after 60s of idleness #388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
eef7419
to
7280673
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a pretty big change and there are a few parts that I'm not a huge fan of.
The first problem is that it conflates the notion of the stream delegate (callbacks for events emitted from the stream) with the GRXWriteable
we pass to gRPC. The stream delegate is a part of the API of stream while the writeable is an implementation detail. Let's split these concepts.
I'm also not super wild about bleeding the stream adapting classes into the FSTRemoteStore. This is a wart in the Android implementation that I was hoping to get rid of there. Porting that to iOS seems like a reversion, especially because these classes cannot be anonymous here as they are on Android. We really don't need two layers of adapters between the stream and the remote store: We could define these as abstract dispatchStreamOpen methods on FSTStream and have the FSTWatchStream adapt as required.
If you're up for it you've mixed some non-controversial/structural changes in with the functional one that could be worth splitting into a separate PR (or PRs). For example, consolidating the FSTBetaStreams and rewriting the stream test (before adding additional tests for idleness) could be independent of this. I also understand if you want to press on.
I've split up this change into the test rewrite (#391) and this, which is now set to merge against #391 branch (for easier review). I also split up the delegate handling for GRPC callbacks and Remote Store callbacks and reverted the porting the Stream callback structure from Android. By renaming the handleStreamOpen/handleStreamClose to notify*, I think was able to overcome the part of the design that I did not really fancy. FYI: I still kept the generic type on FSTStream, since I need to pass this generic delegate type to start:. It doesn't seem like is possible to specify the generic type in the subclasses like it is in Java, so I think as is I don't have any type safety (which might be an argument in favor of just accepting 'id' in start:, but moving the delegate instances back to FSTWatchStream and FSTWriteStream). |
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for the commit author(s). If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. |
f70f00d
to
132e636
Compare
CLAs look good, thanks! |
@wilhuff I merged in the recent changes and updated this PR. |
@@ -76,10 +72,15 @@ - (instancetype)initWithDatabase:(FSTDatabaseInfo *)database | |||
|
|||
#pragma mark - Overridden FSTWatchStream methods. | |||
|
|||
- (void)start { | |||
- (void)start:(id<FSTWatchStreamDelegate>)delegate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Labels should always reflect the arguments passed, so this should be startWithDelegate:
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
[self.delegate watchStreamDidOpen]; | ||
} | ||
|
||
- (void)notifyStreamInterrupted:(NSError *_Nullable)error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: notifyStreamInterruptedWithError:
Also prefer the nullable
attribute to the _Nullable
qualifier.
This should be
- (void)notifyStreamInterruptedWithError:(nullable NSError *)error
@interface FSTStream : NSObject | ||
@interface FSTStream <__covariant FSTStreamDelegate> : NSObject | ||
|
||
@property (nonatomic, weak, readwrite, nullable) FSTStreamDelegate delegate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the delegate
need to be publicly declared? Should anyone outside the class access it? If not, consider just moving this into the implementation.
If someone needs to read it you can declare it readonly
here and redeclare it internally as readwrite
. Note that conventionally we haven't been marking properties readwrite
since that's the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't need to be but it means that we have to redeclare it in the Mock implementations (unless we expose an test only interface)
@@ -175,7 +177,7 @@ NS_ASSUME_NONNULL_BEGIN | |||
* | |||
* When start returns, -isStarted will return YES. | |||
*/ | |||
- (void)start; | |||
- (void)start:(FSTStreamDelegate)delegate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: startWithDelegate:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* | ||
* NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" | ||
* on FSTStream for details. | ||
*/ | ||
- (void)watchStreamDidClose:(NSError *_Nullable)error; | ||
- (void)watchStreamWasInterrupted:(NSError *_Nullable)error; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: watchStreamWasInterruptedWithError:
(the old name was wrong too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -522,14 +572,16 @@ - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { | |||
[FSTDatastore prepareHeadersForRPC:_rpc | |||
databaseID:self.databaseInfo.databaseID | |||
token:token.token]; | |||
[_rpc startWithWriteable:self]; | |||
FSTAssert(_grxFilter == nil, @"GRX Filter must be nil"); | |||
_grxFilter = [[GRXFilter alloc] initWithStream:self]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, the "grx" prefix is something we should avoid. Maybe _callbackFilter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to _callbackFilter to match FSTCallbackFilter.
} | ||
|
||
/** Backs off after an error. */ | ||
- (void)performBackoff { | ||
- (void)performBackoff:(id)delegate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: performBackoffWithDelegate:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, done.
}]; | ||
} | ||
|
||
/** Resumes stream start after backing off. */ | ||
- (void)resumeStartFromBackoff { | ||
- (void)resumeStartFromBackoff:(id)delegate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming: resumeStartFromBackoffWithDelegate:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// If the caller explicitly requested a stream stop, don't notify them of a closing stream (it | ||
// could trigger undesirable recovery logic, etc.). | ||
if (finalState != FSTStreamStateStopped) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid sending messages to delegates in the middle of operations like these because they may attempt to perform operations on the stream as a result of the callback. If that happened here the stream would be in an inconsistent state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved some of the clean up logic above. I am not able to move the clearing of the delegate out, since it needs to be available in notifyStreamInterruptedWithError.
[self.workerDispatchQueue dispatchAsyncAllowingSameQueue:^() { | ||
[self handleIdleCloseTimer]; | ||
} | ||
after:kIdleTimeout]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not wild about this API. It's conflating two separate concerns: which queue to dispatch on and when.
This should be more like the backoff implementation: the dispatch queue should be wrapped by the idle timer, and we can supply an alternate implementation of the idle timer for tests. The idle timer could take the dispatch queue and idle timeout as initializer parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, we discussed offline, and this does make sense because we're modeling dispatch_after
on iOS and a ScheduledExecutorService
on Android.
I'm still not crazy about how this API ends up reading (especially because of the way clang format munges it). Maybe if we reversed the argument order it wouldn't look so nuts.
How about dispatchAfter:block:
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to dispatchAfterDelay:block:
and got rid of "allowsamequeue" as well (it's the default).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/** | ||
* Closes the stream and cleans up as necessary: | ||
* | ||
* <ul> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this isn't javadoc, so probably make this a markdown-list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* @param error the NSError the connection was closed with. | ||
*/ | ||
- (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *)error { | ||
FSTAssert(finalState == FSTStreamStateError || error == nil, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows the error to be nil if the finalState is FSTStreamStateError. Is that desirable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is directly called from GRPC and we don't want to crash the client here if the server cleanly closes our connection for some unknown reason.
This is port of the Android CLs cr/167321880 and cr/168294709.
This change is unfortunately pretty large: