Skip to content

Commit 2e7dd08

Browse files
author
Tomas Drabek
committed
DefaultSubscriptionRegistry: Reduced thread contention
* DestinationCache is now synchronized on multiple 'destination' locks (previously a single shared lock) * DestinationCache keeps destinations without any subscriptions (previously such destinations were recomputed over and over) * SessionSubscriptionRegistry is now a 'sessionId -> subscriptionId -> (destination,selector)' map for faster lookups (previously 'sessionId -> destination -> set of (subscriptionId,selector)') closes gh-24395
1 parent ed648b7 commit 2e7dd08

File tree

4 files changed

+402
-227
lines changed

4 files changed

+402
-227
lines changed

spring-messaging/spring-messaging.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,5 @@ dependencies {
3434
testRuntime("com.sun.xml.bind:jaxb-core")
3535
testRuntime("com.sun.xml.bind:jaxb-impl")
3636
testRuntime("com.sun.activation:javax.activation")
37+
testRuntime(project(":spring-context"))
3738
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.broker;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.stream.IntStream;
21+
22+
import org.openjdk.jmh.annotations.Benchmark;
23+
import org.openjdk.jmh.annotations.BenchmarkMode;
24+
import org.openjdk.jmh.annotations.Level;
25+
import org.openjdk.jmh.annotations.Mode;
26+
import org.openjdk.jmh.annotations.Param;
27+
import org.openjdk.jmh.annotations.Scope;
28+
import org.openjdk.jmh.annotations.Setup;
29+
import org.openjdk.jmh.annotations.State;
30+
import org.openjdk.jmh.infra.Blackhole;
31+
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
34+
import org.springframework.messaging.simp.SimpMessageType;
35+
import org.springframework.messaging.support.MessageBuilder;
36+
import org.springframework.util.MultiValueMap;
37+
38+
@BenchmarkMode(Mode.Throughput)
39+
public class DefaultSubscriptionRegistryBenchmark {
40+
41+
@State(Scope.Benchmark)
42+
public static class ServerState {
43+
@Param("1000")
44+
public int sessions;
45+
46+
@Param("10")
47+
public int destinations;
48+
49+
@Param({"0", "1024"})
50+
int cacheSizeLimit;
51+
52+
@Param({"none", "patternSubscriptions", "selectorHeaders"})
53+
String specialization;
54+
55+
public DefaultSubscriptionRegistry registry;
56+
57+
public String[] destinationIds;
58+
59+
public String[] sessionIds;
60+
61+
public AtomicInteger uniqueIdGenerator;
62+
63+
public Message<?> findMessage;
64+
65+
@Setup(Level.Trial)
66+
public void doSetup() {
67+
this.findMessage = MessageBuilder.createMessage("", SimpMessageHeaderAccessor.create().getMessageHeaders());
68+
this.uniqueIdGenerator = new AtomicInteger();
69+
70+
this.registry = new DefaultSubscriptionRegistry();
71+
this.registry.setCacheLimit(this.cacheSizeLimit);
72+
this.registry.setSelectorHeaderName("selectorHeaders".equals(this.specialization) ? "someSelector" : null);
73+
74+
this.destinationIds = IntStream.range(0, this.destinations)
75+
.mapToObj(i -> "/some/destination/" + i)
76+
.toArray(String[]::new);
77+
78+
this.sessionIds = IntStream.range(0, this.sessions)
79+
.mapToObj(i -> "sessionId_" + i)
80+
.toArray(String[]::new);
81+
82+
for (String sessionId : this.sessionIds) {
83+
for (String destinationId : this.destinationIds) {
84+
registerSubscriptions(sessionId, destinationId);
85+
}
86+
}
87+
}
88+
89+
public void registerSubscriptions(String sessionId, String destination) {
90+
if ("patternSubscriptions".equals(this.specialization)) {
91+
destination = "/**/" + destination;
92+
}
93+
String subscriptionId = "subscription_" + this.uniqueIdGenerator.incrementAndGet();
94+
this.registry.registerSubscription(subscribeMessage(sessionId, subscriptionId, destination));
95+
}
96+
}
97+
98+
@State(Scope.Thread)
99+
public static class Requests {
100+
@Param({"none", "sameDestination", "sameSession"})
101+
String contention;
102+
103+
public String session;
104+
105+
public Message<?> subscribe;
106+
107+
public String findDestination;
108+
109+
public Message<?> unsubscribe;
110+
111+
@Setup(Level.Trial)
112+
public void doSetup(ServerState serverState) {
113+
int uniqueNumber = serverState.uniqueIdGenerator.incrementAndGet();
114+
115+
if ("sameDestination".equals(this.contention)) {
116+
this.findDestination = serverState.destinationIds[0];
117+
}
118+
else {
119+
this.findDestination = serverState.destinationIds[uniqueNumber % serverState.destinationIds.length];
120+
}
121+
122+
if ("sameSession".equals(this.contention)) {
123+
this.session = serverState.sessionIds[0];
124+
}
125+
else {
126+
this.session = serverState.sessionIds[uniqueNumber % serverState.sessionIds.length];
127+
}
128+
129+
String subscription = String.valueOf(uniqueNumber);
130+
String subscribeDestination = "patternSubscriptions".equals(serverState.specialization) ?
131+
"/**/" + this.findDestination : this.findDestination;
132+
this.subscribe = subscribeMessage(this.session, subscription, subscribeDestination);
133+
134+
this.unsubscribe = unsubscribeMessage(this.session, subscription);
135+
}
136+
}
137+
138+
@State(Scope.Thread)
139+
public static class FindRequest {
140+
@Param({"none", "noSubscribers", "sameDestination"})
141+
String contention;
142+
143+
public String destination;
144+
145+
@Setup(Level.Trial)
146+
public void doSetup(ServerState serverState) {
147+
switch (this.contention) {
148+
case "noSubscribers":
149+
this.destination = "someDestination_withNoSubscribers_" + serverState.uniqueIdGenerator.incrementAndGet();
150+
break;
151+
case "sameDestination":
152+
this.destination = serverState.destinationIds[0];
153+
break;
154+
case "none":
155+
int uniqueNumber = serverState.uniqueIdGenerator.getAndIncrement();
156+
this.destination = serverState.destinationIds[uniqueNumber % serverState.destinationIds.length];
157+
break;
158+
default:
159+
throw new IllegalStateException();
160+
}
161+
}
162+
}
163+
164+
@Benchmark
165+
public void registerUnregister(ServerState serverState, Requests request, Blackhole blackhole) {
166+
serverState.registry.registerSubscription(request.subscribe);
167+
blackhole.consume(serverState.registry.findSubscriptionsInternal(request.findDestination, serverState.findMessage));
168+
serverState.registry.unregisterSubscription(request.unsubscribe);
169+
blackhole.consume(serverState.registry.findSubscriptionsInternal(request.findDestination, serverState.findMessage));
170+
}
171+
172+
@Benchmark
173+
public MultiValueMap<String, String> find(ServerState serverState, FindRequest request) {
174+
return serverState.registry.findSubscriptionsInternal(request.destination, serverState.findMessage);
175+
}
176+
177+
public static Message<?> subscribeMessage(String sessionId, String subscriptionId, String dest) {
178+
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.SUBSCRIBE);
179+
accessor.setSessionId(sessionId);
180+
accessor.setSubscriptionId(subscriptionId);
181+
accessor.setDestination(dest);
182+
accessor.setNativeHeader("someSelector", "true");
183+
return MessageBuilder.createMessage("", accessor.getMessageHeaders());
184+
}
185+
186+
public static Message<?> unsubscribeMessage(String sessionId, String subscriptionId) {
187+
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.UNSUBSCRIBE);
188+
accessor.setSessionId(sessionId);
189+
accessor.setSubscriptionId(subscriptionId);
190+
return MessageBuilder.createMessage("", accessor.getMessageHeaders());
191+
}
192+
}

0 commit comments

Comments
 (0)