diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index f2619e8aae3473183f29ec243917407b62e73c1d..6c013504d7c56cc3a030b122765b9119a550f26f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -174,11 +173,6 @@ public class NetworkEnvironment {
public void registerTask(Task task) throws IOException {
final ResultPartition[] producedPartitions = task.getProducedPartitions();
- final ResultPartitionWriter[] writers = task.getAllWriters();
-
- if (writers.length != producedPartitions.length) {
- throw new IllegalStateException("Unequal number of writers and partitions.");
- }
synchronized (lock) {
if (isShutdown) {
@@ -187,7 +181,6 @@ public class NetworkEnvironment {
for (int i = 0; i < producedPartitions.length; i++) {
final ResultPartition partition = producedPartitions[i];
- final ResultPartitionWriter writer = writers[i];
// Buffer pool for the partition
BufferPool bufferPool = null;
@@ -214,7 +207,7 @@ public class NetworkEnvironment {
}
// Register writer with task event dispatcher
- taskEventDispatcher.registerPartition(writer.getPartitionId());
+ taskEventDispatcher.registerPartition(partition.getPartitionId());
}
// Setup the buffer pool for each buffer reader
@@ -263,18 +256,9 @@ public class NetworkEnvironment {
resultPartitionManager.releasePartitionsProducedBy(executionId, task.getFailureCause());
}
- ResultPartitionWriter[] writers = task.getAllWriters();
- if (writers != null) {
- for (ResultPartitionWriter writer : writers) {
- taskEventDispatcher.unregisterPartition(writer.getPartitionId());
- }
- }
-
- ResultPartition[] partitions = task.getProducedPartitions();
- if (partitions != null) {
- for (ResultPartition partition : partitions) {
- partition.destroyBufferPool();
- }
+ for (ResultPartition partition : task.getProducedPartitions()) {
+ taskEventDispatcher.unregisterPartition(partition.getPartitionId());
+ partition.destroyBufferPool();
}
final SingleInputGate[] inputGates = task.getAllInputGates();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 623dc62b204b0cde948b179a228adad38db3cd5d..4729800dfab65adb5f8154bf50e4a781299106af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -36,11 +36,11 @@ import static org.apache.flink.runtime.io.network.api.serialization.RecordSerial
/**
* A record-oriented runtime result writer.
- *
- * The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of
+ *
+ *
The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of
* serializing records into buffers.
- *
- * Important: it is necessary to call {@link #flush()} after
+ *
+ *
Important: it is necessary to call {@link #flush()} after
* all records have been written with {@link #emit(IOReadableWritable)}. This
* ensures that all produced records are written to the output stream (incl.
* partially filled ones).
@@ -71,7 +71,7 @@ public class RecordWriter {
this.targetPartition = writer;
this.channelSelector = channelSelector;
- this.numChannels = writer.getNumberOfOutputChannels();
+ this.numChannels = writer.getNumberOfSubpartitions();
/**
* The runtime exposes a channel abstraction for the produced results
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 777c7ad7eb02ae152ae979ba96d6a23ca8ab45bf..3a66e53dc5144c8021e8a0e2f1789bfa9749eb8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -20,73 +20,50 @@ package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import java.io.IOException;
/**
- * A buffer-oriented runtime result writer.
- *
- * The {@link ResultPartitionWriter} is the runtime API for producing results. It
- * supports two kinds of data to be sent: buffers and events.
+ * A buffer-oriented runtime result writer API for producing results.
*/
-public class ResultPartitionWriter {
+public interface ResultPartitionWriter {
- private final ResultPartition partition;
+ BufferProvider getBufferProvider();
- public ResultPartitionWriter(ResultPartition partition) {
- this.partition = partition;
- }
-
- // ------------------------------------------------------------------------
- // Attributes
- // ------------------------------------------------------------------------
+ ResultPartitionID getPartitionId();
- public ResultPartitionID getPartitionId() {
- return partition.getPartitionId();
- }
+ int getNumberOfSubpartitions();
- public BufferProvider getBufferProvider() {
- return partition.getBufferProvider();
- }
+ int getNumTargetKeyGroups();
- public int getNumberOfOutputChannels() {
- return partition.getNumberOfSubpartitions();
- }
-
- public int getNumTargetKeyGroups() {
- return partition.getNumTargetKeyGroups();
- }
-
- // ------------------------------------------------------------------------
- // Data processing
- // ------------------------------------------------------------------------
-
- public void writeBuffer(Buffer buffer, int targetChannel) throws IOException {
- partition.add(buffer, targetChannel);
- }
+ /**
+ * Adds a buffer to the subpartition with the given index.
+ *
+ *
For PIPELINED {@link org.apache.flink.runtime.io.network.partition.ResultPartitionType}s,
+ * this will trigger the deployment of consuming tasks after the first buffer has been added.
+ */
+ void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException;
/**
- * Writes the given buffer to all available target channels.
+ * Writes the given buffer to all available target subpartitions.
*
- * The buffer is taken over and used for each of the channels.
+ *
The buffer is taken over and used for each of the channels.
* It will be recycled afterwards.
*
- * @param eventBuffer the buffer to write
- * @throws IOException
+ * @param buffer the buffer to write
*/
- public void writeBufferToAllChannels(final Buffer eventBuffer) throws IOException {
+ default void writeBufferToAllSubpartitions(final Buffer buffer) throws IOException {
try {
- for (int targetChannel = 0; targetChannel < partition.getNumberOfSubpartitions(); targetChannel++) {
+ for (int subpartition = 0; subpartition < getNumberOfSubpartitions(); subpartition++) {
// retain the buffer so that it can be recycled by each channel of targetPartition
- eventBuffer.retain();
- writeBuffer(eventBuffer, targetChannel);
+ buffer.retain();
+ writeBuffer(buffer, subpartition);
}
} finally {
// we do not need to further retain the eventBuffer
// (it will be recycled after the last channel stops using it)
- eventBuffer.recycle();
+ buffer.recycle();
}
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 9b02e4d603c30c84f00d7ffe0bcf274cc8bcccb0..be050b33125bb38c8336848355dad572c29517d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -74,7 +75,7 @@ import static org.apache.flink.util.Preconditions.checkState;
*
*
State management
*/
-public class ResultPartition implements BufferPoolOwner {
+public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
@@ -209,10 +210,12 @@ public class ResultPartition implements BufferPoolOwner {
return partitionId;
}
+ @Override
public int getNumberOfSubpartitions() {
return subpartitions.length;
}
+ @Override
public BufferProvider getBufferProvider() {
return bufferPool;
}
@@ -260,13 +263,8 @@ public class ResultPartition implements BufferPoolOwner {
// ------------------------------------------------------------------------
- /**
- * Adds a buffer to the subpartition with the given index.
- *
- * For PIPELINED results, this will trigger the deployment of consuming tasks after the
- * first buffer has been added.
- */
- public void add(Buffer buffer, int subpartitionIndex) throws IOException {
+ @Override
+ public void writeBuffer(Buffer buffer, int subpartitionIndex) throws IOException {
boolean success = false;
try {
@@ -381,6 +379,7 @@ public class ResultPartition implements BufferPoolOwner {
return cause;
}
+ @Override
public int getNumTargetKeyGroups() {
return numTargetKeyGroups;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 65dd8ac56a85db799acacb12bad3eea97ea2f098..2a95a65111139c5ec0a99c4b28e7ddfa439de64b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -441,6 +441,6 @@ public class IterationHeadTask extends AbstractIte
log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
}
- this.toSync.writeBufferToAllChannels(EventSerializer.toBuffer(event));
+ this.toSync.writeBufferToAllSubpartitions(EventSerializer.toBuffer(event));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e54adb9fcbb69761317dd1d3bdf92b833abbe3a0..a049063a549663621192541e71bd1a7bdd3900a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -51,7 +51,6 @@ import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -187,8 +186,6 @@ public class Task implements Runnable, TaskActions {
private final ResultPartition[] producedPartitions;
- private final ResultPartitionWriter[] writers;
-
private final SingleInputGate[] inputGates;
private final Map inputGatesById;
@@ -360,7 +357,6 @@ public class Task implements Runnable, TaskActions {
// Produced intermediate result partitions
this.producedPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()];
- this.writers = new ResultPartitionWriter[resultPartitionDeploymentDescriptors.size()];
int counter = 0;
@@ -380,8 +376,6 @@ public class Task implements Runnable, TaskActions {
ioManager,
desc.sendScheduleOrUpdateConsumersMessage());
- writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
-
++counter;
}
@@ -445,10 +439,6 @@ public class Task implements Runnable, TaskActions {
return this.taskConfiguration;
}
- public ResultPartitionWriter[] getAllWriters() {
- return writers;
- }
-
public SingleInputGate[] getAllInputGates() {
return inputGates;
}
@@ -682,7 +672,7 @@ public class Task implements Runnable, TaskActions {
kvStateRegistry,
inputSplitProvider,
distributedCacheEntries,
- writers,
+ producedPartitions,
inputGates,
network.getTaskEventDispatcher(),
checkpointResponder,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 123082f4fdf9c129ba9e2386b7ec48086d448da8..ba92bdf80278b582c1e3c6c98e9e05d9925f0edd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -37,8 +37,6 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import java.io.IOException;
-
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -82,9 +80,6 @@ public class NetworkEnvironmentTest {
ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
- final ResultPartitionWriter[] resultPartitionWriters = new ResultPartitionWriter[] {
- new ResultPartitionWriter(rp1), new ResultPartitionWriter(rp2),
- new ResultPartitionWriter(rp3), new ResultPartitionWriter(rp4)};
// input gates
SingleInputGate ig1 = createSingleInputGateMock(ResultPartitionType.PIPELINED, 2);
@@ -96,7 +91,6 @@ public class NetworkEnvironmentTest {
// overall task to register
Task task = mock(Task.class);
when(task.getProducedPartitions()).thenReturn(resultPartitions);
- when(task.getAllWriters()).thenReturn(resultPartitionWriters);
when(task.getAllInputGates()).thenReturn(inputGates);
network.registerTask(task);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ff001c219c550ffd343bea49ba33caab3b3e3053..95090130cc2a45bb078802b3c0630849ad188912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -181,7 +181,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
- when(partitionWriter.getNumberOfOutputChannels()).thenReturn(1);
+ when(partitionWriter.getNumberOfSubpartitions()).thenReturn(1);
// Recycle buffer and throw Exception
doAnswer(new Answer() {
@@ -454,7 +454,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferProvider));
- when(partitionWriter.getNumberOfOutputChannels()).thenReturn(numChannels);
+ when(partitionWriter.getNumberOfSubpartitions()).thenReturn(numChannels);
doAnswer(new Answer() {
@Override
@@ -512,7 +512,7 @@ public class RecordWriterTest {
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferProvider));
- when(partitionWriter.getNumberOfOutputChannels()).thenReturn(1);
+ when(partitionWriter.getNumberOfSubpartitions()).thenReturn(1);
// Recycle each written buffer.
doAnswer(new Answer() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
deleted file mode 100644
index 3b54247c082e37502599409ccb287fc4889b27e6..0000000000000000000000000000000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.writer;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class ResultPartitionWriterTest {
-
- // ---------------------------------------------------------------------------------------------
- // Resource release tests
- // ---------------------------------------------------------------------------------------------
-
- /**
- * Tests that event buffers are properly recycled when broadcasting events
- * to multiple channels.
- *
- * @throws Exception
- */
- @Test
- public void testWriteBufferToAllChannelsReferenceCounting() throws Exception {
- Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
- ResultPartition partition = new ResultPartition(
- "TestTask",
- mock(TaskActions.class),
- new JobID(),
- new ResultPartitionID(),
- ResultPartitionType.PIPELINED,
- 2,
- 2,
- mock(ResultPartitionManager.class),
- mock(ResultPartitionConsumableNotifier.class),
- mock(IOManager.class),
- false);
- ResultPartitionWriter partitionWriter =
- new ResultPartitionWriter(
- partition);
-
- partitionWriter.writeBufferToAllChannels(buffer);
-
- // Verify added to all queues, i.e. two buffers in total
- assertEquals(2, partition.getTotalNumberOfBuffers());
- // release the buffers in the partition
- partition.release();
-
- assertTrue(buffer.isRecycled());
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 0cd359197efcb7cee40595ecb07af51b053e66d4..9fb7fd36add3badc1ba1c45e75723800b04dd7a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,12 +20,16 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -44,7 +48,7 @@ public class ResultPartitionTest {
// Pipelined, send message => notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true);
- partition.add(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
@@ -52,7 +56,7 @@ public class ResultPartitionTest {
// Pipelined, don't send message => don't notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false);
- partition.add(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
@@ -60,7 +64,7 @@ public class ResultPartitionTest {
// Blocking, send message => don't notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true);
- partition.add(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
@@ -68,7 +72,7 @@ public class ResultPartitionTest {
// Blocking, don't send message => don't notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false);
- partition.add(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
}
@@ -84,7 +88,7 @@ public class ResultPartitionTest {
}
/**
- * Tests {@link ResultPartition#add} on a partition which has already finished.
+ * Tests {@link ResultPartition#writeBuffer} on a partition which has already finished.
*
* @param pipelined the result partition type to set up
*/
@@ -97,7 +101,7 @@ public class ResultPartitionTest {
partition.finish();
reset(notifier);
// partition.add() should fail
- partition.add(buffer, 0);
+ partition.writeBuffer(buffer, 0);
Assert.fail("exception expected");
} catch (IllegalStateException e) {
// expected => ignored
@@ -122,7 +126,7 @@ public class ResultPartitionTest {
}
/**
- * Tests {@link ResultPartition#add} on a partition which has already been released.
+ * Tests {@link ResultPartition#writeBuffer} on a partition which has already been released.
*
* @param pipelined the result partition type to set up
*/
@@ -134,7 +138,7 @@ public class ResultPartitionTest {
ResultPartition partition = createPartition(notifier, pipelined, true);
partition.release();
// partition.add() silently drops the buffer but recycles it
- partition.add(buffer, 0);
+ partition.writeBuffer(buffer, 0);
} finally {
if (!buffer.isRecycled()) {
Assert.fail("buffer not recycled");
@@ -145,6 +149,37 @@ public class ResultPartitionTest {
}
}
+ /**
+ * Tests that event buffers are properly added and recycled when broadcasting events
+ * to multiple channels.
+ */
+ @Test
+ public void testWriteBufferToAllSubpartitionsReferenceCounting() throws Exception {
+ Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+ ResultPartition partition = new ResultPartition(
+ "TestTask",
+ mock(TaskActions.class),
+ new JobID(),
+ new ResultPartitionID(),
+ ResultPartitionType.PIPELINED,
+ 2,
+ 2,
+ mock(ResultPartitionManager.class),
+ mock(ResultPartitionConsumableNotifier.class),
+ mock(IOManager.class),
+ false);
+
+ partition.writeBufferToAllSubpartitions(buffer);
+
+ // Verify added to all queues, i.e. two buffers in total
+ assertEquals(2, partition.getTotalNumberOfBuffers());
+ // release the buffers in the partition
+ partition.release();
+
+ assertTrue(buffer.isRecycled());
+ }
+
// ------------------------------------------------------------------------
private static ResultPartition createPartition(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
index e694dfe99ea0bba85dba998a88713be9bb0fb146..53f95c45419092c183ef7dd31e49414699a7c4e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -77,12 +77,12 @@ public class TestPartitionProducer implements Callable {
int targetChannelIndex = bufferOrEvent.getChannelIndex();
if (bufferOrEvent.isBuffer()) {
- partition.add(bufferOrEvent.getBuffer(), targetChannelIndex);
+ partition.writeBuffer(bufferOrEvent.getBuffer(), targetChannelIndex);
}
else if (bufferOrEvent.isEvent()) {
final Buffer buffer = EventSerializer.toBuffer(bufferOrEvent.getEvent());
- partition.add(buffer, targetChannelIndex);
+ partition.writeBuffer(buffer, targetChannelIndex);
}
else {
throw new IllegalStateException("BufferOrEvent instance w/o buffer nor event.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index c8ca6541a149753689231aa52080aff85040b418..f655b127d6abfc8176e46ad6779ee3b7bc5e07e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -204,7 +204,7 @@ public class MockEnvironment implements Environment {
});
ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
- when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
+ when(mockWriter.getNumberOfSubpartitions()).thenReturn(1);
when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
final Record record = new Record();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index eff8a291fb7912dad5f13d465d526e22eda98268..a3b16e6dab973d22d61c7c88a292024278a18e1d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -632,7 +632,7 @@ public abstract class StreamTask>
for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
try {
- output.writeBufferToAllChannels(EventSerializer.toBuffer(message));
+ output.writeBufferToAllSubpartitions(EventSerializer.toBuffer(message));
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index b1b86b127a0480769d41082a716a5567a86f7f54..78d4303c4524fcb33f793313f364a3ff179993ba 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -98,7 +98,7 @@ public class StreamRecordWriterTest {
ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
- when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
+ when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions);
return mockWriter;
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 6b6506ab043db10e66cf6862270219c734d9ec1f..277ca51d7c5fc3637fe6e3180236636ddbf1c83d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -161,7 +161,7 @@ public class StreamMockEnvironment implements Environment {
});
ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
- when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
+ when(mockWriter.getNumberOfSubpartitions()).thenReturn(1);
when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
final RecordDeserializer> recordDeserializer = new AdaptiveSpanningRecordDeserializer>();
@@ -186,7 +186,7 @@ public class StreamMockEnvironment implements Environment {
addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
return null;
}
- }).when(mockWriter).writeBufferToAllChannels(any(Buffer.class));
+ }).when(mockWriter).writeBufferToAllSubpartitions(any(Buffer.class));
outputs.add(mockWriter);
}