提交 94070415 编写于 作者: U Ufuk Celebi

[distributed runtime] Throw interrupted exception during partition request client creation

上级 6da093a6
......@@ -34,7 +34,7 @@ public interface ConnectionManager {
/**
* Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
*/
PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException;
PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException;
/**
* Closes opened ChannelConnections in case of a resource release
......
......@@ -49,7 +49,7 @@ public class NettyConnectionManager implements ConnectionManager {
}
@Override
public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(remoteAddress);
}
......
......@@ -48,7 +48,7 @@ class PartitionRequestClientFactory {
* Atomically establishes a TCP connection to the given remote address and
* creates a {@link PartitionRequestClient} instance for this connection.
*/
PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException {
Object entry;
PartitionRequestClient client = null;
......@@ -182,15 +182,10 @@ class PartitionRequestClientFactory {
private volatile Throwable error;
private PartitionRequestClient waitForChannel() throws IOException {
private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {
synchronized (connectLock) {
while (error == null && partitionRequestClient == null) {
try {
connectLock.wait(2000);
}
catch (InterruptedException e) {
throw new RuntimeException("Wait for channel connection interrupted.");
}
connectLock.wait(2000);
}
}
......
......@@ -92,7 +92,7 @@ public abstract class InputChannel {
* The queue index to request depends on which sub task the channel belongs
* to and is specified by the consumer of this channel.
*/
public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException;
public abstract void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException;
/**
* Returns the next buffer from the consumed subpartition.
......
......@@ -29,7 +29,7 @@ public interface InputGate {
public boolean isFinished();
public void requestPartitions() throws IOException;
public void requestPartitions() throws IOException, InterruptedException;
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException;
......
......@@ -85,7 +85,7 @@ public class RemoteInputChannel extends InputChannel {
// ------------------------------------------------------------------------
@Override
public void requestIntermediateResultPartition(int queueIndex) throws IOException {
public void requestIntermediateResultPartition(int queueIndex) throws IOException, InterruptedException {
if (partitionRequestClient == null) {
LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex, partitionId, producerExecutionId);
......
......@@ -192,7 +192,7 @@ public class SingleInputGate implements InputGate {
}
}
public void updateInputChannel(PartitionInfo partitionInfo) throws IOException {
public void updateInputChannel(PartitionInfo partitionInfo) throws IOException, InterruptedException {
synchronized (requestLock) {
if (releasedResourcesFlag) {
// There was a race with a task failure/cancel
......@@ -273,7 +273,7 @@ public class SingleInputGate implements InputGate {
}
@Override
public void requestPartitions() throws IOException {
public void requestPartitions() throws IOException, InterruptedException {
if (!requestedPartitionsFlag) {
// Sanity check
if (numberOfInputChannels != inputChannels.size()) {
......
......@@ -120,7 +120,7 @@ public class UnionInputGate implements InputGate {
}
@Override
public void requestPartitions() throws IOException {
public void requestPartitions() throws IOException, InterruptedException {
if (!requestedPartitionsFlag) {
for (InputGate inputGate : inputGates) {
inputGate.requestPartitions();
......
......@@ -78,7 +78,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
bufferReader2.registerListener(this);
}
public void requestPartitionsOnce() throws IOException {
public void requestPartitionsOnce() throws IOException, InterruptedException {
if (!hasRequestedPartitions) {
bufferReader1.requestPartitions();
bufferReader2.requestPartitions();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册