提交 d5da8234 编写于 作者: A Arjen Poutsma

Fix race condition for AsynchronousFileChannel

This commit fixes an issue in the DataBufferUtils.write variant that
takes a AsynchronousFileChannel.

Issue: SPR-15798
上级 d904e9ed
...@@ -517,7 +517,7 @@ public abstract class DataBufferUtils { ...@@ -517,7 +517,7 @@ public abstract class DataBufferUtils {
private final AtomicBoolean completed = new AtomicBoolean(); private final AtomicBoolean completed = new AtomicBoolean();
private long position; private final AtomicLong position;
@Nullable @Nullable
private DataBuffer dataBuffer; private DataBuffer dataBuffer;
...@@ -526,7 +526,7 @@ public abstract class DataBufferUtils { ...@@ -526,7 +526,7 @@ public abstract class DataBufferUtils {
FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) { FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
this.sink = sink; this.sink = sink;
this.channel = channel; this.channel = channel;
this.position = position; this.position = new AtomicLong(position);
} }
@Override @Override
...@@ -539,7 +539,7 @@ public abstract class DataBufferUtils { ...@@ -539,7 +539,7 @@ public abstract class DataBufferUtils {
this.dataBuffer = value; this.dataBuffer = value;
ByteBuffer byteBuffer = value.asByteBuffer(); ByteBuffer byteBuffer = value.asByteBuffer();
this.channel.write(byteBuffer, this.position, byteBuffer, this); this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
} }
@Override @Override
...@@ -550,17 +550,23 @@ public abstract class DataBufferUtils { ...@@ -550,17 +550,23 @@ public abstract class DataBufferUtils {
@Override @Override
protected void hookOnComplete() { protected void hookOnComplete() {
this.completed.set(true); this.completed.set(true);
if (this.dataBuffer == null) {
this.sink.complete();
}
} }
@Override @Override
public void completed(Integer written, ByteBuffer byteBuffer) { public void completed(Integer written, ByteBuffer byteBuffer) {
this.position += written; this.position.addAndGet(written);
if (byteBuffer.hasRemaining()) { if (byteBuffer.hasRemaining()) {
this.channel.write(byteBuffer, this.position, byteBuffer, this); this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
return; return;
} }
else if (this.dataBuffer != null) {
if (this.dataBuffer != null) {
this.sink.next(this.dataBuffer); this.sink.next(this.dataBuffer);
this.dataBuffer = null;
} }
if (this.completed.get()) { if (this.completed.get()) {
this.sink.complete(); this.sink.complete();
......
...@@ -207,7 +207,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { ...@@ -207,7 +207,6 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase {
} }
@Test @Test
@Ignore // SPR-15798
public void writeAsynchronousFileChannel() throws Exception { public void writeAsynchronousFileChannel() throws Exception {
DataBuffer foo = stringBuffer("foo"); DataBuffer foo = stringBuffer("foo");
DataBuffer bar = stringBuffer("bar"); DataBuffer bar = stringBuffer("bar");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册