diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java index d478e0f175aaedc3944b32ad666967a0cc0e98fb..76f8bbd83744161a1d17ac5f543bd751c8f6ff20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -34,7 +34,7 @@ public interface ConnectionManager { /** * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}. */ - PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException; + PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException; /** * Closes opened ChannelConnections in case of a resource release diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index 5d03c1580d05e9530bfdbb72fcb02687d27f00bf..260ea7e4e46a5a05a42a5ae6f5eba2fc4f9c6b44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -49,7 +49,7 @@ public class NettyConnectionManager implements ConnectionManager { } @Override - public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException { + public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException { return partitionRequestClientFactory.createPartitionRequestClient(remoteAddress); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java index d7e6efd9e884fe49342cc0713f71cdd7f4db1363..d4c022b6ee0bc56dbebb0d90228d3787dc98d39b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java @@ -48,7 +48,7 @@ class PartitionRequestClientFactory { * Atomically establishes a TCP connection to the given remote address and * creates a {@link PartitionRequestClient} instance for this connection. */ - PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException { + PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException { Object entry; PartitionRequestClient client = null; @@ -182,15 +182,10 @@ class PartitionRequestClientFactory { private volatile Throwable error; - private PartitionRequestClient waitForChannel() throws IOException { + private PartitionRequestClient waitForChannel() throws IOException, InterruptedException { synchronized (connectLock) { while (error == null && partitionRequestClient == null) { - try { - connectLock.wait(2000); - } - catch (InterruptedException e) { - throw new RuntimeException("Wait for channel connection interrupted."); - } + connectLock.wait(2000); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 31b67ca659a56b10931beb5b47164bb940fd6729..7173566de3e29636f5e8d2b4e1d661a6f137bc96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -92,7 +92,7 @@ public abstract class InputChannel { * The queue index to request depends on which sub task the channel belongs * to and is specified by the consumer of this channel. */ - public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException; + public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException; /** * Returns the next buffer from the consumed subpartition. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 8d28084fa4cb7b26a11661edc175225b6d2f3e22..43cdd298b418cce322cfa9a7e806bab4f10d64d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -29,7 +29,7 @@ public interface InputGate { public boolean isFinished(); - public void requestPartitions() throws IOException; + public void requestPartitions() throws IOException, InterruptedException; public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index daf94e6a57db32f19b4cf334f179eadad75f2593..d50ddc239c829c2831481f57f01a942d454b2e0f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -85,7 +85,7 @@ public class RemoteInputChannel extends InputChannel { // ------------------------------------------------------------------------ @Override - public void requestIntermediateResultPartition(int queueIndex) throws IOException { + public void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException { if (partitionRequestClient == null) { LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex, partitionId, producerExecutionId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 0383cca1bc8c3d58ab70ce2e2d8ce41d8db78138..19898c662061d9baf0ba1027590abf99a712ab64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -192,7 +192,7 @@ public class SingleInputGate implements InputGate { } } - public void updateInputChannel(PartitionInfo partitionInfo) throws IOException { + public void updateInputChannel(PartitionInfo partitionInfo) throws IOException, InterruptedException { synchronized (requestLock) { if (releasedResourcesFlag) { // There was a race with a task failure/cancel @@ -273,7 +273,7 @@ public class SingleInputGate implements InputGate { } @Override - public void requestPartitions() throws IOException { + public void requestPartitions() throws IOException, InterruptedException { if (!requestedPartitionsFlag) { // Sanity check if (numberOfInputChannels != inputChannels.size()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 4994f130298328fe7dfade1d1f5731a706b04cca..5a7a5b0141718ee35c1b84c413852f557c516a31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -120,7 +120,7 @@ public class UnionInputGate implements InputGate { } @Override - public void requestPartitions() throws IOException { + public void requestPartitions() throws IOException, InterruptedException { if (!requestedPartitionsFlag) { for (InputGate inputGate : inputGates) { inputGate.requestPartitions(); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java index bb3a6598879cb1fad1061994175a7c540dfacfde..84b08f74890d264485af0c6f95f6c6f80412803f 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java @@ -78,7 +78,7 @@ public class CoRecordReader