@@ -1142,6 +1142,45 @@ public void testWorkaround858() {
1142
1142
rule .assertHasNoLeaks ();
1143
1143
}
1144
1144
1145
+ @ Test
1146
+ // see https://github.com/rsocket/rsocket-java/issues/959
1147
+ public void testWorkaround959 () {
1148
+ for (int i = 1 ; i < 100000 ; i += 2 ) {
1149
+ ByteBuf buffer = rule .alloc ().buffer ();
1150
+ buffer .writeCharSequence ("test" , CharsetUtil .UTF_8 );
1151
+
1152
+ final AssertSubscriber <Payload > assertSubscriber = new AssertSubscriber <>(3 );
1153
+ rule .socket .requestStream (ByteBufPayload .create (buffer )).subscribe (assertSubscriber );
1154
+
1155
+ final ByteBuf payloadFrame =
1156
+ PayloadFrameCodec .encode (
1157
+ rule .alloc (), i , false , false , true , Unpooled .EMPTY_BUFFER , Unpooled .EMPTY_BUFFER );
1158
+
1159
+ RaceTestUtils .race (
1160
+ () -> {
1161
+ rule .connection .addToReceivedBuffer (payloadFrame .copy ());
1162
+ rule .connection .addToReceivedBuffer (payloadFrame .copy ());
1163
+ rule .connection .addToReceivedBuffer (payloadFrame );
1164
+ },
1165
+ () -> {
1166
+ assertSubscriber .request (1 );
1167
+ assertSubscriber .request (1 );
1168
+ assertSubscriber .request (1 );
1169
+ });
1170
+
1171
+ Assertions .assertThat (rule .connection .getSent ())
1172
+ .allMatch (ByteBuf ::release );
1173
+
1174
+ Assertions .assertThat (rule .socket .isDisposed ()).isFalse ();
1175
+
1176
+ assertSubscriber .values ().forEach (ReferenceCountUtil ::safeRelease );
1177
+ assertSubscriber .assertNoError ();
1178
+
1179
+ rule .connection .clearSendReceiveBuffers ();
1180
+ rule .assertHasNoLeaks ();
1181
+ }
1182
+ }
1183
+
1145
1184
public static class ClientSocketRule extends AbstractSocketRule <RSocketRequester > {
1146
1185
@ Override
1147
1186
protected RSocketRequester newRSocket () {
0 commit comments