7
7
import io .rsocket .RSocket ;
8
8
import io .rsocket .frame .decoder .PayloadDecoder ;
9
9
import io .rsocket .plugins .RequestInterceptor ;
10
+ import java .util .Objects ;
10
11
import java .util .function .Function ;
11
12
import reactor .util .annotation .Nullable ;
12
13
@@ -118,9 +119,14 @@ public int addAndGetNextStreamId(FrameHandler frameHandler) {
118
119
}
119
120
120
121
public synchronized boolean add (int streamId , FrameHandler frameHandler ) {
121
- final FrameHandler previousHandler = this .activeStreams .putIfAbsent (streamId , frameHandler );
122
-
123
- return previousHandler == null ;
122
+ final IntObjectMap <FrameHandler > activeStreams = this .activeStreams ;
123
+ // copy of Map.putIfAbsent(key, value) without `streamId` boxing
124
+ final FrameHandler previousHandler = activeStreams .get (streamId );
125
+ if (previousHandler == null ) {
126
+ activeStreams .put (streamId , frameHandler );
127
+ return true ;
128
+ }
129
+ return false ;
124
130
}
125
131
126
132
/**
@@ -143,6 +149,13 @@ public synchronized FrameHandler get(int streamId) {
143
149
* instance equals to the passed one
144
150
*/
145
151
public synchronized boolean remove (int streamId , FrameHandler frameHandler ) {
146
- return this .activeStreams .remove (streamId , frameHandler );
152
+ final IntObjectMap <FrameHandler > activeStreams = this .activeStreams ;
153
+ // copy of Map.remove(key, value) without `streamId` boxing
154
+ final FrameHandler curValue = activeStreams .get (streamId );
155
+ if (!Objects .equals (curValue , frameHandler )) {
156
+ return false ;
157
+ }
158
+ activeStreams .remove (streamId );
159
+ return true ;
147
160
}
148
161
}
0 commit comments