Skip to content

feat: resource update server notify and client handler #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ void testNotifyResourcesListChanged() {
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
}

@Test
void testNotifyResourcesUpdated() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

StepVerifier
.create(mcpAsyncServer
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
.verifyComplete();

assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
}

@Test
void testAddResource() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ void testNotifyResourcesListChanged() {
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
}

@Test
void testNotifyResourcesUpdated() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpSyncServer
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
.doesNotThrowAnyException();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
}

@Test
void testAddResource() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,18 @@ public class McpAsyncClient {
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED,
asyncResourcesChangeNotificationHandler(resourcesChangeConsumersFinal));

// Resources Update Notification
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumersFinal = new ArrayList<>();
resourcesUpdateConsumersFinal
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Resources updated: {}", notification)));

if (!Utils.isEmpty(features.resourcesUpdateConsumers())) {
resourcesUpdateConsumersFinal.addAll(features.resourcesUpdateConsumers());
}

notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
asyncResourcesUpdatedNotificationHandler(resourcesUpdateConsumersFinal));

// Prompts Change Notification
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumersFinal = new ArrayList<>();
promptsChangeConsumersFinal
Expand Down Expand Up @@ -708,6 +720,24 @@ private NotificationHandler asyncResourcesChangeNotificationHandler(
.then());
}

private NotificationHandler asyncResourcesUpdatedNotificationHandler(
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers) {
return params -> {
McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification = transport.unmarshalFrom(params,
new TypeReference<>() {
});

return readResource(new McpSchema.ReadResourceRequest(resourcesUpdatedNotification.uri()))
.flatMap(readResourceResult -> Flux.fromIterable(resourcesUpdateConsumers)
.flatMap(consumer -> consumer.apply(readResourceResult.contents()))
.onErrorResume(error -> {
logger.error("Error handling resource update notification", error);
return Mono.empty();
})
.then());
};
}

// --------------------------
// Prompts
// --------------------------
Expand Down
29 changes: 25 additions & 4 deletions mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class SyncSpec {

private final List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers = new ArrayList<>();

private final List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers = new ArrayList<>();

private final List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers = new ArrayList<>();

private final List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers = new ArrayList<>();
Expand Down Expand Up @@ -363,8 +365,8 @@ public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotificat
*/
public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
this.loggingConsumers, this.samplingHandler);
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler);

McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);

Expand Down Expand Up @@ -408,6 +410,8 @@ class AsyncSpec {

private final List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers = new ArrayList<>();

private final List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers = new ArrayList<>();

private final List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();

private final List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers = new ArrayList<>();
Expand Down Expand Up @@ -553,6 +557,23 @@ public AsyncSpec resourcesChangeConsumer(
return this;
}

/**
* Adds a consumer to be notified when a specific resource is updated. This allows
* the client to react to changes in individual resources, such as updates to
* their content or metadata.
* @param resourcesUpdateConsumer A consumer function that processes the updated
* resource and returns a Mono indicating the completion of the processing. Must
* not be null.
* @return This builder instance for method chaining.
* @throws IllegalArgumentException If the resourcesUpdateConsumer is null.
*/
public AsyncSpec resourcesUpdateConsumer(
Function<List<McpSchema.ResourceContents>, Mono<Void>> resourcesUpdateConsumer) {
Assert.notNull(resourcesUpdateConsumer, "Resources update consumer must not be null");
this.resourcesUpdateConsumers.add(resourcesUpdateConsumer);
return this;
}

/**
* Adds a consumer to be notified when the available prompts change. This allows
* the client to react to changes in the server's prompt templates, such as new
Expand Down Expand Up @@ -605,8 +626,8 @@ public AsyncSpec loggingConsumers(
public McpAsyncClient build() {
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
this.loggingConsumers, this.samplingHandler));
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.samplingHandler));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class McpClientFeatures {
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers,
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler) {
Expand All @@ -82,6 +83,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
Map<String, McpSchema.Root> roots,
List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers,
List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers,
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers,
List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers,
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers,
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler) {
Expand All @@ -96,6 +98,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c

this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
this.samplingHandler = samplingHandler;
Expand All @@ -122,8 +125,13 @@ public static Async fromSync(Sync syncSpec) {
.subscribeOn(Schedulers.boundedElastic()));
}

List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();
List<Function<List<McpSchema.ResourceContents>, Mono<Void>>> resourcesUpdateConsumers = new ArrayList<>();
for (Consumer<List<McpSchema.ResourceContents>> consumer : syncSpec.resourcesUpdateConsumers()) {
resourcesUpdateConsumers.add(r -> Mono.<Void>fromRunnable(() -> consumer.accept(r))
.subscribeOn(Schedulers.boundedElastic()));
}

List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();
for (Consumer<List<McpSchema.Prompt>> consumer : syncSpec.promptsChangeConsumers()) {
promptsChangeConsumers.add(p -> Mono.<Void>fromRunnable(() -> consumer.accept(p))
.subscribeOn(Schedulers.boundedElastic()));
Expand All @@ -139,8 +147,8 @@ public static Async fromSync(Sync syncSpec) {
.fromCallable(() -> syncSpec.samplingHandler().apply(r))
.subscribeOn(Schedulers.boundedElastic());
return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers, loggingConsumers,
samplingHandler);
toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers,
loggingConsumers, samplingHandler);
}
}

Expand All @@ -160,6 +168,7 @@ public static Async fromSync(Sync syncSpec) {
public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers,
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers,
List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers,
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler) {
Expand All @@ -171,13 +180,15 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
* @param roots the roots.
* @param toolsChangeConsumers the tools change consumers.
* @param resourcesChangeConsumers the resources change consumers.
* @param resourcesUpdateConsumers the resource update consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
* @param samplingHandler the sampling handler.
*/
public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers,
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers,
List<Consumer<List<McpSchema.ResourceContents>>> resourcesUpdateConsumers,
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers,
List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers,
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler) {
Expand All @@ -192,6 +203,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl

this.toolsChangeConsumers = toolsChangeConsumers != null ? toolsChangeConsumers : List.of();
this.resourcesChangeConsumers = resourcesChangeConsumers != null ? resourcesChangeConsumers : List.of();
this.resourcesUpdateConsumers = resourcesUpdateConsumers != null ? resourcesUpdateConsumers : List.of();
this.promptsChangeConsumers = promptsChangeConsumers != null ? promptsChangeConsumers : List.of();
this.loggingConsumers = loggingConsumers != null ? loggingConsumers : List.of();
this.samplingHandler = samplingHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,15 @@ public Mono<Void> notifyResourcesListChanged() {
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED, null);
}

/**
* Notifies clients that the resources have updated.
* @return A Mono that completes when all clients have been notified
*/
public Mono<Void> notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) {
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED,
resourcesUpdatedNotification);
}

private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> resourcesListRequestHandler() {
return (exchange, params) -> {
var resourceList = this.resources.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ public void notifyResourcesListChanged() {
this.asyncServer.notifyResourcesListChanged().block();
}

/**
* Notify clients that the resources have updated.
*/
public void notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) {
this.asyncServer.notifyResourcesUpdated(resourcesUpdatedNotification).block();
}

/**
* Notify clients that the list of available prompts has changed.
*/
Expand Down
13 changes: 13 additions & 0 deletions mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ private McpSchema() {

public static final String METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED = "notifications/resources/list_changed";

public static final String METHOD_NOTIFICATION_RESOURCES_UPDATED = "notifications/resources/updated";

public static final String METHOD_RESOURCES_TEMPLATES_LIST = "resources/templates/list";

public static final String METHOD_RESOURCES_SUBSCRIBE = "resources/subscribe";
Expand Down Expand Up @@ -1111,6 +1113,17 @@ public record ProgressNotification(// @formatter:off
@JsonProperty("total") Double total) {
}// @formatter:on

/**
* The Model Context Protocol (MCP) provides a standardized way for servers to send
* resources update message to clients.
*
* @param uri The updated resource uri.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public record ResourcesUpdatedNotification(// @formatter:off
@JsonProperty("uri") String uri) {
}// @formatter:on

/**
* The Model Context Protocol (MCP) provides a standardized way for servers to send
* structured log messages to clients. Clients can control logging verbosity by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,18 @@ void testNotifyResourcesListChanged() {
assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
}

@Test
void testNotifyResourcesUpdated() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

StepVerifier
.create(mcpAsyncServer
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
.verifyComplete();

assertThatCode(() -> mcpAsyncServer.closeGracefully().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
}

@Test
void testAddResource() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ void testNotifyResourcesListChanged() {
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
}

@Test
void testNotifyResourcesUpdated() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpSyncServer
.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(TEST_RESOURCE_URI)))
.doesNotThrowAnyException();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
}

@Test
void testAddResource() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider())
Expand Down