Skip to content

Commit 5520e73

Browse files
committed
DataBufferUtils.read should not take input stream/channel as parameter
Fixed by creating `Callable`-based variants, as explained in the JIRA issue. Issue: SPR-16403
1 parent 09f1f72 commit 5520e73

File tree

3 files changed

+166
-88
lines changed

3 files changed

+166
-88
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 149 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import java.nio.channels.ReadableByteChannel;
2929
import java.nio.channels.WritableByteChannel;
3030
import java.nio.file.StandardOpenOption;
31+
import java.util.concurrent.Callable;
3132
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.concurrent.atomic.AtomicLong;
33-
import java.util.function.BiFunction;
3434
import java.util.function.BinaryOperator;
3535
import java.util.function.Consumer;
3636

@@ -69,92 +69,182 @@ public abstract class DataBufferUtils {
6969
//---------------------------------------------------------------------
7070

7171
/**
72-
* Read the given {@code InputStream} into a {@code Flux} of
72+
* Read the given {@code InputStream} into a <strong>read-once</strong> {@code Flux} of
7373
* {@code DataBuffer}s. Closes the input stream when the flux is terminated.
74+
* <p>The resulting {@code Flux} can only be subscribed to once. See
75+
* {@link #readInputStream(Callable, DataBufferFactory, int)} for a variant that supports
76+
* multiple subscriptions.
7477
* @param inputStream the input stream to read from
7578
* @param dataBufferFactory the factory to create data buffers with
7679
* @param bufferSize the maximum size of the data buffers
7780
* @return a flux of data buffers read from the given channel
81+
* @deprecated as of Spring 5.0.3, in favor of
82+
* {@link #readInputStream(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
7883
*/
84+
@Deprecated
7985
public static Flux<DataBuffer> read(InputStream inputStream,
8086
DataBufferFactory dataBufferFactory, int bufferSize) {
87+
return readInputStream(() -> inputStream, dataBufferFactory, bufferSize);
88+
}
8189

82-
Assert.notNull(inputStream, "InputStream must not be null");
90+
/**
91+
* Obtain a {@link InputStream} from the given supplier, and read it into a {@code Flux} of
92+
* {@code DataBuffer}s. Closes the input stream when the flux is terminated.
93+
* @param inputStreamSupplier the supplier for the input stream to read from
94+
* @param dataBufferFactory the factory to create data buffers with
95+
* @param bufferSize the maximum size of the data buffers
96+
* @return a flux of data buffers read from the given channel
97+
*/
98+
public static Flux<DataBuffer> readInputStream(Callable<InputStream> inputStreamSupplier,
99+
DataBufferFactory dataBufferFactory, int bufferSize) {
83100

84-
ReadableByteChannel channel = Channels.newChannel(inputStream);
85-
return read(channel, dataBufferFactory, bufferSize);
101+
Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null");
102+
103+
return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()),
104+
dataBufferFactory, bufferSize);
86105
}
87106

88107
/**
89-
* Read the given {@code ReadableByteChannel} into a {@code Flux} of
108+
* Read the given {@code ReadableByteChannel} into a <strong>read-once</strong> {@code Flux} of
90109
* {@code DataBuffer}s. Closes the channel when the flux is terminated.
110+
* <p>The resulting {@code Flux} can only be subscribed to once. See
111+
* {@link #readByteChannel(Callable, DataBufferFactory, int)} for a variant that supports
112+
* multiple subscriptions.
91113
* @param channel the channel to read from
92114
* @param dataBufferFactory the factory to create data buffers with
93115
* @param bufferSize the maximum size of the data buffers
94116
* @return a flux of data buffers read from the given channel
117+
* @deprecated as of Spring 5.0.3, in favor of
118+
* {@link #readByteChannel(Callable, DataBufferFactory, int)}, to be removed in Spring 5.1
95119
*/
120+
@Deprecated
96121
public static Flux<DataBuffer> read(ReadableByteChannel channel,
97122
DataBufferFactory dataBufferFactory, int bufferSize) {
123+
return readByteChannel(() -> channel, dataBufferFactory, bufferSize);
124+
}
98125

99-
Assert.notNull(channel, "ReadableByteChannel must not be null");
100-
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
126+
/**
127+
* Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
128+
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
129+
* @param channelSupplier the supplier for the channel to read from
130+
* @param dataBufferFactory the factory to create data buffers with
131+
* @param bufferSize the maximum size of the data buffers
132+
* @return a flux of data buffers read from the given channel
133+
*/
134+
public static Flux<DataBuffer> readByteChannel(Callable<ReadableByteChannel> channelSupplier,
135+
DataBufferFactory dataBufferFactory, int bufferSize) {
136+
137+
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
138+
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
101139
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
102140

103-
return Flux.generate(() -> channel,
104-
new ReadableByteChannelGenerator(dataBufferFactory, bufferSize),
105-
DataBufferUtils::closeChannel);
141+
return Flux.using(channelSupplier,
142+
channel -> {
143+
ReadableByteChannelGenerator generator =
144+
new ReadableByteChannelGenerator(channel, dataBufferFactory,
145+
bufferSize);
146+
return Flux.generate(generator);
147+
},
148+
DataBufferUtils::closeChannel
149+
);
106150
}
107151

108152
/**
109-
* Read the given {@code AsynchronousFileChannel} into a {@code Flux} of
110-
* {@code DataBuffer}s. Closes the channel when the flux is terminated.
153+
* Read the given {@code AsynchronousFileChannel} into a <strong>read-once</strong> {@code Flux}
154+
* of {@code DataBuffer}s. Closes the channel when the flux is terminated.
155+
* <p>The resulting {@code Flux} can only be subscribed to once. See
156+
* {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} for a variant that
157+
* supports multiple subscriptions.
111158
* @param channel the channel to read from
112159
* @param dataBufferFactory the factory to create data buffers with
113160
* @param bufferSize the maximum size of the data buffers
114161
* @return a flux of data buffers read from the given channel
162+
* @deprecated as of Spring 5.0.3, in favor of
163+
* {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)}, to be removed in
164+
* Spring 5.1
115165
*/
166+
@Deprecated
116167
public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
117168
DataBufferFactory dataBufferFactory, int bufferSize) {
118-
return read(channel, 0, dataBufferFactory, bufferSize);
169+
return readAsynchronousFileChannel(() -> channel, dataBufferFactory, bufferSize);
119170
}
120171

121172
/**
122-
* Read the given {@code AsynchronousFileChannel} into a {@code Flux} of
123-
* {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is
173+
* Read the given {@code AsynchronousFileChannel} into a <strong>read-once</strong> {@code Flux}
174+
* of {@code DataBuffer}s, starting at the given position. Closes the channel when the flux is
124175
* terminated.
176+
* <p>The resulting {@code Flux} can only be subscribed to once. See
177+
* {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)} for a variant
178+
* that supports multiple subscriptions.
125179
* @param channel the channel to read from
126180
* @param position the position to start reading from
127181
* @param dataBufferFactory the factory to create data buffers with
128182
* @param bufferSize the maximum size of the data buffers
129183
* @return a flux of data buffers read from the given channel
184+
* @deprecated as of Spring 5.0.3, in favor of
185+
* {@link #readAsynchronousFileChannel(Callable, long, DataBufferFactory, int)}, to be removed
186+
* in Spring 5.1
130187
*/
188+
@Deprecated
131189
public static Flux<DataBuffer> read(AsynchronousFileChannel channel,
132190
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
191+
return readAsynchronousFileChannel(() -> channel, position, dataBufferFactory, bufferSize);
192+
}
133193

134-
Assert.notNull(channel, "'channel' must not be null");
194+
/**
195+
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
196+
* {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
197+
* @param channelSupplier the supplier for the channel to read from
198+
* @param dataBufferFactory the factory to create data buffers with
199+
* @param bufferSize the maximum size of the data buffers
200+
* @return a flux of data buffers read from the given channel
201+
*/
202+
public static Flux<DataBuffer> readAsynchronousFileChannel(
203+
Callable<AsynchronousFileChannel> channelSupplier,
204+
DataBufferFactory dataBufferFactory, int bufferSize) {
205+
206+
return readAsynchronousFileChannel(channelSupplier, 0, dataBufferFactory, bufferSize);
207+
}
208+
209+
/**
210+
* Obtain a {@code AsynchronousFileChannel} from the given supplier, and read it into a
211+
* {@code Flux} of {@code DataBuffer}s, starting at the given position. Closes the channel when
212+
* the flux is terminated.
213+
* @param channelSupplier the supplier for the channel to read from
214+
* @param position the position to start reading from
215+
* @param dataBufferFactory the factory to create data buffers with
216+
* @param bufferSize the maximum size of the data buffers
217+
* @return a flux of data buffers read from the given channel
218+
*/
219+
public static Flux<DataBuffer> readAsynchronousFileChannel(
220+
Callable<AsynchronousFileChannel> channelSupplier,
221+
long position, DataBufferFactory dataBufferFactory, int bufferSize) {
222+
223+
Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
135224
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
136225
Assert.isTrue(position >= 0, "'position' must be >= 0");
137226
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
138227

139228
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
140229
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
141230

142-
return Flux.create(sink -> {
143-
sink.onDispose(() -> closeChannel(channel));
144-
CompletionHandler<Integer, DataBuffer> completionHandler =
145-
new AsynchronousFileChannelReadCompletionHandler(channel, sink, position,
146-
dataBufferFactory, bufferSize);
147-
channel.read(byteBuffer, position, dataBuffer, completionHandler);
148-
});
231+
return Flux.using(channelSupplier,
232+
channel -> Flux.create(sink -> {
233+
CompletionHandler<Integer, DataBuffer> completionHandler =
234+
new AsynchronousFileChannelReadCompletionHandler(channel,
235+
sink, position, dataBufferFactory, bufferSize);
236+
channel.read(byteBuffer, position, dataBuffer, completionHandler);
237+
}),
238+
DataBufferUtils::closeChannel);
149239
}
150240

151241
/**
152242
* Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s.
153243
* <p>If the resource is a file, it is read into an
154244
* {@code AsynchronousFileChannel} and turned to {@code Flux} via
155-
* {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else
156-
* fall back on {@link #read(InputStream, DataBufferFactory, int)} closes
157-
* the channel when the flux is terminated.
245+
* {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
246+
* fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}.
247+
* Closes the channel when the flux is terminated.
158248
* @param resource the resource to read from
159249
* @param dataBufferFactory the factory to create data buffers with
160250
* @param bufferSize the maximum size of the data buffers
@@ -171,9 +261,9 @@ public static Flux<DataBuffer> read(Resource resource,
171261
* starting at the given position.
172262
* <p>If the resource is a file, it is read into an
173263
* {@code AsynchronousFileChannel} and turned to {@code Flux} via
174-
* {@link #read(AsynchronousFileChannel, DataBufferFactory, int)} or else
175-
* fall back on {@link #read(InputStream, DataBufferFactory, int)}. Closes
176-
* the channel when the flux is terminated.
264+
* {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
265+
* fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
266+
* Closes the channel when the flux is terminated.
177267
* @param resource the resource to read from
178268
* @param position the position to start reading from
179269
* @param dataBufferFactory the factory to create data buffers with
@@ -186,26 +276,23 @@ public static Flux<DataBuffer> read(Resource resource, long position,
186276
try {
187277
if (resource.isFile()) {
188278
File file = resource.getFile();
189-
AsynchronousFileChannel channel =
190-
AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ);
191-
return DataBufferUtils.read(channel, position, dataBufferFactory, bufferSize);
279+
280+
return readAsynchronousFileChannel(
281+
() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
282+
position, dataBufferFactory, bufferSize);
192283
}
193284
}
194285
catch (IOException ignore) {
195286
// fallback to resource.readableChannel(), below
196287
}
197288

198-
try {
199-
ReadableByteChannel channel = resource.readableChannel();
200-
Flux<DataBuffer> in = DataBufferUtils.read(channel, dataBufferFactory, bufferSize);
201-
return DataBufferUtils.skipUntilByteCount(in, position);
202-
}
203-
catch (IOException ex) {
204-
return Flux.error(ex);
205-
}
289+
Flux<DataBuffer> result = readByteChannel(resource::readableChannel, dataBufferFactory, bufferSize);
290+
return position == 0 ? result : skipUntilByteCount(result, position);
206291
}
207292

208293

294+
295+
209296
//---------------------------------------------------------------------
210297
// Writing
211298
//---------------------------------------------------------------------
@@ -304,7 +391,7 @@ public static Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousF
304391

305392
private static void closeChannel(@Nullable Channel channel) {
306393
try {
307-
if (channel != null) {
394+
if (channel != null && channel.isOpen()) {
308395
channel.close();
309396
}
310397
}
@@ -442,44 +529,49 @@ public static Mono<DataBuffer> compose(Publisher<DataBuffer> publisher) {
442529

443530

444531
private static class ReadableByteChannelGenerator
445-
implements BiFunction<ReadableByteChannel, SynchronousSink<DataBuffer>, ReadableByteChannel> {
532+
implements Consumer<SynchronousSink<DataBuffer>> {
533+
534+
private final ReadableByteChannel channel;
446535

447536
private final DataBufferFactory dataBufferFactory;
448537

449538
private final int bufferSize;
450539

451-
public ReadableByteChannelGenerator(DataBufferFactory dataBufferFactory, int bufferSize) {
540+
541+
public ReadableByteChannelGenerator(ReadableByteChannel channel,
542+
DataBufferFactory dataBufferFactory, int bufferSize) {
543+
544+
this.channel = channel;
452545
this.dataBufferFactory = dataBufferFactory;
453546
this.bufferSize = bufferSize;
454547
}
455548

456549
@Override
457-
public ReadableByteChannel apply(ReadableByteChannel channel,
458-
SynchronousSink<DataBuffer> sub) {
550+
public void accept(SynchronousSink<DataBuffer> sink) {
459551
boolean release = true;
460552
DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
461553
try {
462554
int read;
463555
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
464-
if ((read = channel.read(byteBuffer)) >= 0) {
556+
if ((read = this.channel.read(byteBuffer)) >= 0) {
465557
dataBuffer.writePosition(read);
466558
release = false;
467-
sub.next(dataBuffer);
559+
sink.next(dataBuffer);
468560
}
469561
else {
470-
sub.complete();
562+
sink.complete();
471563
}
472564
}
473565
catch (IOException ex) {
474-
sub.error(ex);
566+
sink.error(ex);
475567
}
476568
finally {
477569
if (release) {
478570
release(dataBuffer);
479571
}
480572
}
481-
return channel;
482573
}
574+
483575
}
484576

485577

@@ -494,7 +586,10 @@ private static class AsynchronousFileChannelReadCompletionHandler
494586

495587
private final int bufferSize;
496588

497-
private AtomicLong position;
589+
private final AtomicLong position;
590+
591+
private final AtomicBoolean disposed = new AtomicBoolean();
592+
498593

499594
private AsynchronousFileChannelReadCompletionHandler(
500595
AsynchronousFileChannel channel, FluxSink<DataBuffer> sink,
@@ -513,7 +608,7 @@ public void completed(Integer read, DataBuffer dataBuffer) {
513608
dataBuffer.writePosition(read);
514609
this.sink.next(dataBuffer);
515610

516-
if (!this.sink.isCancelled()) {
611+
if (!this.disposed.get()) {
517612
DataBuffer newDataBuffer =
518613
this.dataBufferFactory.allocateBuffer(this.bufferSize);
519614
ByteBuffer newByteBuffer = newDataBuffer.asByteBuffer(0, this.bufferSize);
@@ -522,17 +617,16 @@ public void completed(Integer read, DataBuffer dataBuffer) {
522617
}
523618
else {
524619
release(dataBuffer);
525-
closeChannel(this.channel);
526620
this.sink.complete();
527621
}
528622
}
529623

530624
@Override
531625
public void failed(Throwable exc, DataBuffer dataBuffer) {
532626
release(dataBuffer);
533-
closeChannel(this.channel);
534627
this.sink.error(exc);
535628
}
629+
536630
}
537631

538632

0 commit comments

Comments
 (0)