Skip to content

Add support for receiving and dispatching NotificationResponse messages #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from Dec 12, 2019
Merged

Add support for receiving and dispatching NotificationResponse messages #60

merged 12 commits into from Dec 12, 2019

Conversation

akirchhoff-modular
Copy link

This is an initial stab at trying to fix #28. PostgresNIO already has sufficient support for sending LISTEN and NOTIFY commands, through e.g. simpleQuery, but lacks support for handling the NotificationResponse messages that result when a notification comes through. This PR adds support for handling NotificationResponse messages by adding an additional channel handler before PostgresRequestHandler, that filters out NotificationResponse messages (since PostgresRequests are usually ill-prepared to handle them) and diverts them to handlers registered by channel in a PostgresNotificationHandlerMap that can be accessed on PostgresConnection. I'm not tied to the current API exposed here; I could imagine a couple different alternatives, but this seemed like the simplest thing that could possibly work and looks reasonably ergonomic to use.

@tanner0101 tanner0101 added the enhancement New feature or request label Dec 4, 2019
Copy link
Member

@tanner0101 tanner0101 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could achieve this using PostgresRequestHandler without changing the pipeline at all. The key would be to not use simpleQuery since that is designed for a request / response pattern.

Instead, we could have separate PostgresConnection.listen / notify methods.

conn.listen(channel: "foo") { message, context in
    print(message) // the notify message
    print(context) // context for the current listen
    context.stop() // context can be used to stop listening
}
conn.notify(channel: "foo", message: "bar")

These could be handled by a custom PostgresRequest conformer.

Re: failing CI, see #60 for fix. You may need to merge this into your fork.

@akirchhoff-modular
Copy link
Author

That does look like a nicer API. If it turns into a PostgresRequest though, wouldn't that mean you couldn't run other commands while a listen is active? I understand that some other protocols (namely Redis's) have that restriction, but Postgres doesn't. I can imagine use cases where you'd want to fetch data from the database in response to a notification, for example, which sounds like it would be difficult to do if implemented this way if I'm understanding correctly. (You could use another connection but that seems like kind of a heavy-weight solution.)

@tanner0101
Copy link
Member

Hmm... that's a good point. Maybe LISTEN / NOTIFY could bypass PostgresRequest entirely then. Each LISTEN / conn.listen(...) could append a channel handler to the pipeline listening for notification responses on that channel. When the listen is cancelled, the handler is removed. As long as those handlers are before the PostgresRequestHandler and consume notification responses then we wouldn't need to touch anything there.

@tanner0101
Copy link
Member

@akirchhoff-modular
Copy link
Author

Thanks for the feedback. I've rebased onto the fixed master branch for the CI fixes from #62, switched to a listen-with-closure interface for registering notification response handlers, and switched to adding channel handlers dynamically. In order to handle multiple notification handlers for a given channel, a handler does not consume the notification response, so the request handler was nevertheless modified to ignore notification responses. LISTEN and UNLISTEN queries are not generated currently (still leaving that to the application) as it's not clear to me that generating SQL (incl. escaping the channel names as LISTEN and UNLISTEN would require) is in scope for this project.

Copy link
Member

@tanner0101 tanner0101 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that generating SQL seems out of scope for this project, that's a good point. Though if this method isn't actually going to generate the LISTEN query maybe something like conn.addListener would be a better name.

Besides that, I just have some small nits and ideas for simplification. Overall, really nice work on this, thank you!

let listenContext = PostgresListenContext()
let channelHandler = PostgresNotificationHandler(channel: channel, notificationHandler: notificationHandler, listenContext: listenContext)
let pipeline = self.channel.pipeline
_ = pipeline.addHandler(channelHandler, name: nil, position: .before(requestHandler))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since PostgresRequestHandler is ignoring .notificationResponse now, I think you should be able to simply append the PostgresNotificationHandler. That would make thing simpler not needing to pass around requestHandler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Please use self. whenever you access a property (pretend you're in an escaping closure all the time).

typealias InboundOut = PostgresMessage

let channel: String
let notificationHandler: (PostgresMessage.NotificationResponse, PostgresListenContext) -> Void
Copy link
Member

@tanner0101 tanner0101 Dec 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIO methods tend to pass context first. It might be nice to follow that pattern here.

notificationHandler(notification, listenContext)
}
} catch let error {
errorCaught(context: context, error: error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errorCaught will put the error on the pipeline and eventually close the connection. I'm not sure that's what we want here. It might be better to just log the error to the connection's logger.

@@ -0,0 +1,56 @@
import NIO

public final class PostgresListenContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything public should have docblocks.

@akirchhoff-modular
Copy link
Author

Thanks for the additional feedback. I believe I've addressed all your comments. Please take another look.

Copy link
Member

@tanner0101 tanner0101 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thank you!

@tanner0101 tanner0101 merged commit 43e3a77 into vapor:master Dec 12, 2019
@duncangroenewald
Copy link

Are there any code examples of using this ?

@akirchhoff-modular
Copy link
Author

Are there any code examples of using this ?

It has been a very long time since I have touched this code, but the added tests do show how this can be used. These tests remain present in the current version of the code:

func testNotificationsEmptyPayload() {
var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow( try conn?.close().wait() ) }
let receivedNotifications = ManagedAtomic<Int>(0)
conn?.addListener(channel: "example") { context, notification in
receivedNotifications.wrappingIncrement(ordering: .relaxed)
XCTAssertEqual(notification.channel, "example")
XCTAssertEqual(notification.payload, "")
}
XCTAssertNoThrow(_ = try conn?.simpleQuery("LISTEN example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
// Notifications are asynchronous, so we should run at least one more query to make sure we'll have received the notification response by then
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
XCTAssertEqual(receivedNotifications.load(ordering: .relaxed), 1)
}
func testNotificationsNonEmptyPayload() {
var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow( try conn?.close().wait() ) }
let receivedNotifications = ManagedAtomic<Int>(0)
conn?.addListener(channel: "example") { context, notification in
receivedNotifications.wrappingIncrement(ordering: .relaxed)
XCTAssertEqual(notification.channel, "example")
XCTAssertEqual(notification.payload, "Notification payload example")
}
XCTAssertNoThrow(_ = try conn?.simpleQuery("LISTEN example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example, 'Notification payload example'").wait())
// Notifications are asynchronous, so we should run at least one more query to make sure we'll have received the notification response by then
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
XCTAssertEqual(receivedNotifications.load(ordering: .relaxed), 1)
}
func testNotificationsRemoveHandlerWithinHandler() {
var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow( try conn?.close().wait() ) }
let receivedNotifications = ManagedAtomic<Int>(0)
conn?.addListener(channel: "example") { context, notification in
receivedNotifications.wrappingIncrement(ordering: .relaxed)
context.stop()
}
XCTAssertNoThrow(_ = try conn?.simpleQuery("LISTEN example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
XCTAssertEqual(receivedNotifications.load(ordering: .relaxed), 1)
}
func testNotificationsRemoveHandlerOutsideHandler() {
var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow( try conn?.close().wait() ) }
let receivedNotifications = ManagedAtomic<Int>(0)
let context = conn?.addListener(channel: "example") { context, notification in
receivedNotifications.wrappingIncrement(ordering: .relaxed)
}
XCTAssertNotNil(context)
XCTAssertNoThrow(_ = try conn?.simpleQuery("LISTEN example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
context?.stop()
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
XCTAssertEqual(receivedNotifications.load(ordering: .relaxed), 1)
}
func testNotificationsMultipleRegisteredHandlers() {
var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow( try conn?.close().wait() ) }
let receivedNotifications1 = ManagedAtomic<Int>(0)
conn?.addListener(channel: "example") { context, notification in
receivedNotifications1.wrappingIncrement(ordering: .relaxed)
}
let receivedNotifications2 = ManagedAtomic<Int>(0)
conn?.addListener(channel: "example") { context, notification in
receivedNotifications2.wrappingIncrement(ordering: .relaxed)
}
XCTAssertNoThrow(_ = try conn?.simpleQuery("LISTEN example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
XCTAssertEqual(receivedNotifications1.load(ordering: .relaxed), 1)
XCTAssertEqual(receivedNotifications2.load(ordering: .relaxed), 1)
}
func testNotificationsMultipleRegisteredHandlersRemoval() throws {
var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow( try conn?.close().wait() ) }
let receivedNotifications1 = ManagedAtomic<Int>(0)
XCTAssertNotNil(conn?.addListener(channel: "example") { context, notification in
receivedNotifications1.wrappingIncrement(ordering: .relaxed)
context.stop()
})
let receivedNotifications2 = ManagedAtomic<Int>(0)
XCTAssertNotNil(conn?.addListener(channel: "example") { context, notification in
receivedNotifications2.wrappingIncrement(ordering: .relaxed)
})
XCTAssertNoThrow(_ = try conn?.simpleQuery("LISTEN example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY example").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
XCTAssertEqual(receivedNotifications1.load(ordering: .relaxed), 1)
XCTAssertEqual(receivedNotifications2.load(ordering: .relaxed), 2)
}
func testNotificationHandlerFiltersOnChannel() {
var conn: PostgresConnection?
XCTAssertNoThrow(conn = try PostgresConnection.test(on: eventLoop).wait())
defer { XCTAssertNoThrow( try conn?.close().wait() ) }
XCTAssertNotNil(conn?.addListener(channel: "desired") { context, notification in
XCTFail("Received notification on channel that handler was not registered for")
})
XCTAssertNoThrow(_ = try conn?.simpleQuery("LISTEN undesired").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("NOTIFY undesired").wait())
XCTAssertNoThrow(_ = try conn?.simpleQuery("SELECT 1").wait())
}

Hopefully this helps. I am not likely to be of much further use due to the length of time it has been since I worked on this, and I no longer work on anything that uses this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

LISTEN / NOTIFY support
3 participants