|
26 | 26 | import org.junit.AfterClass;
|
27 | 27 | import org.junit.BeforeClass;
|
28 | 28 | import org.junit.Test;
|
29 |
| -import org.junit.runner.RunWith; |
30 | 29 | import org.mockito.InOrder;
|
31 | 30 | import org.mockito.Mockito;
|
32 |
| -import org.mockito.runners.MockitoJUnitRunner; |
33 | 31 |
|
34 | 32 | import java.util.Collections;
|
| 33 | +import java.util.concurrent.CompletableFuture; |
35 | 34 |
|
36 | 35 | import static org.assertj.core.api.Assertions.assertThat;
|
37 | 36 |
|
38 |
| - |
39 | 37 | /**
|
40 | 38 | * Tests for {@link Http2MultiplexedChannelPool}.
|
41 | 39 | */
|
42 |
| -@RunWith(MockitoJUnitRunner.class) |
43 | 40 | public class Http2MultiplexedChannelPoolTest {
|
44 | 41 | private static EventLoopGroup loopGroup;
|
45 | 42 |
|
@@ -93,4 +90,45 @@ public void acquireAfterCloseFails() throws InterruptedException {
|
93 | 90 |
|
94 | 91 | assertThat(h2Pool.acquire().await().isSuccess()).isFalse();
|
95 | 92 | }
|
| 93 | + |
| 94 | + @Test(timeout = 5_000) |
| 95 | + public void interruptDuringClosePreservesFlag() throws InterruptedException { |
| 96 | + SocketChannel channel = new NioSocketChannel(); |
| 97 | + try { |
| 98 | + loopGroup.register(channel).awaitUninterruptibly(); |
| 99 | + Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next()); |
| 100 | + channelPromise.setSuccess(channel); |
| 101 | + |
| 102 | + ChannelPool connectionPool = Mockito.mock(ChannelPool.class); |
| 103 | + Promise<Void> releasePromise = Mockito.spy(new DefaultPromise<>(loopGroup.next())); |
| 104 | + |
| 105 | + Mockito.when(connectionPool.release(Mockito.eq(channel))).thenReturn(releasePromise); |
| 106 | + |
| 107 | + MultiplexedChannelRecord record = new MultiplexedChannelRecord(channelPromise, |
| 108 | + channel, |
| 109 | + 8, |
| 110 | + (ch, rec) -> { |
| 111 | + }); |
| 112 | + Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), 2, Collections.singletonList(record)); |
| 113 | + |
| 114 | + CompletableFuture<Boolean> interrupteFlagPreserved = new CompletableFuture<>(); |
| 115 | + |
| 116 | + Thread t = new Thread(() -> { |
| 117 | + try { |
| 118 | + h2Pool.close(); |
| 119 | + } catch (Exception e) { |
| 120 | + if (e.getCause() instanceof InterruptedException && Thread.currentThread().isInterrupted()) { |
| 121 | + interrupteFlagPreserved.complete(true); |
| 122 | + } |
| 123 | + } |
| 124 | + }); |
| 125 | + |
| 126 | + t.start(); |
| 127 | + t.interrupt(); |
| 128 | + t.join(); |
| 129 | + assertThat(interrupteFlagPreserved.join()).isTrue(); |
| 130 | + } finally { |
| 131 | + channel.close().awaitUninterruptibly(); |
| 132 | + } |
| 133 | + } |
96 | 134 | }
|
0 commit comments