提交 0df34948 编写于 作者: D Daniel Warneke

Worked on handling of new InterruptedException for RPC calls

上级 1f84ad42
......@@ -117,15 +117,13 @@ public class JobClient {
+ ":\tJobClient is shutting down, canceling job...");
this.jobClient.cancelJob();
}
// Close the RPC object
this.jobClient.close();
} catch (IOException ioe) {
LOG.warn(StringUtils.stringifyException(ioe));
} catch (Exception e) {
LOG.debug(StringUtils.stringifyException(e));
}
}
// Close the RPC object
this.jobClient.close();
}
}
/**
......@@ -216,8 +214,10 @@ public class JobClient {
* @return a <code>JobSubmissionResult</code> object encapsulating the results of the job submission
* @throws IOException
* thrown in case of submission errors while transmitting the data to the job manager
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
public JobSubmissionResult submitJob() throws IOException {
public JobSubmissionResult submitJob() throws IOException, InterruptedException {
return this.jobSubmitClient.submitJob(this.jobGraph);
}
......@@ -228,8 +228,10 @@ public class JobClient {
* @return a <code>JobCancelResult</code> object encapsulating the result of the job cancel request
* @throws IOException
* thrown if an error occurred while transmitting the request to the job manager
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
public JobCancelResult cancelJob() throws IOException {
public JobCancelResult cancelJob() throws IOException, InterruptedException {
return this.jobSubmitClient.cancelJob(this.jobGraph.getJobID());
}
......@@ -240,8 +242,10 @@ public class JobClient {
* @return a <code>JobProgressResult</code> object including the current job progress
* @throws IOException
* thrown if an error occurred while transmitting the request
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
public JobProgressResult getJobProgress() throws IOException {
public JobProgressResult getJobProgress() throws IOException, InterruptedException {
return this.jobSubmitClient.getJobProgress(this.jobGraph.getJobID());
}
......@@ -253,10 +257,12 @@ public class JobClient {
* @return the duration of the job execution in milliseconds
* @throws IOException
* thrown if an error occurred while transmitting the request
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
* @throws JobExecutionException
* thrown if the job has been aborted either by the user or as a result of an error
*/
public long submitJobAndWait() throws IOException, JobExecutionException {
public long submitJobAndWait() throws IOException, InterruptedException, JobExecutionException {
final JobSubmissionResult submissionResult = this.jobSubmitClient.submitJob(this.jobGraph);
if (submissionResult.getReturnCode() == AbstractJobResult.ReturnCode.ERROR) {
......@@ -364,8 +370,10 @@ public class JobClient {
* @return the interval in seconds
* @throws IOException
* thrown if an error occurred while transmitting the request
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
public int getRecommendedPollingInterval() throws IOException {
public int getRecommendedPollingInterval() throws IOException, InterruptedException {
return this.jobSubmitClient.getRecommendedPollingInterval();
}
......
......@@ -40,8 +40,10 @@ public interface JobManagementProtocol extends RPCProtocol {
* @return a protocol of the job submission including the success status
* @throws IOException
* thrown if an error occurred while transmitting the submit request
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
JobSubmissionResult submitJob(JobGraph job) throws IOException;
JobSubmissionResult submitJob(JobGraph job) throws IOException, InterruptedException;
/**
* Retrieves the current status of the job specified by the given ID. Consecutive
......@@ -53,8 +55,10 @@ public interface JobManagementProtocol extends RPCProtocol {
* @return a {@link JobProgressResult} object including the current job progress
* @throws IOException
* thrown if an error occurred while transmitting the request
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
JobProgressResult getJobProgress(JobID jobID) throws IOException;
JobProgressResult getJobProgress(JobID jobID) throws IOException, InterruptedException;
/**
* Requests to cancel the job specified by the given ID.
......@@ -64,8 +68,10 @@ public interface JobManagementProtocol extends RPCProtocol {
* @return a {@link JobCancelResult} containing the result of the cancel request
* @throws IOException
* thrown if an error occurred while transmitting the request
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
JobCancelResult cancelJob(JobID jobID) throws IOException;
JobCancelResult cancelJob(JobID jobID) throws IOException, InterruptedException;
/**
* Returns the recommended interval in seconds in which a client
......@@ -74,6 +80,8 @@ public interface JobManagementProtocol extends RPCProtocol {
* @return the interval in seconds
* @throws IOException
* thrown if an error occurred while transmitting the request
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the response of the remote procedure call
*/
int getRecommendedPollingInterval() throws IOException;
int getRecommendedPollingInterval() throws IOException, InterruptedException;
}
......@@ -202,7 +202,7 @@ public class BroadcastJob {
* writer object to write the duration results for each run
*/
private static void runJob(final int run, final BufferedWriter throughputWriter, final BufferedWriter durationWriter)
throws JobGraphDefinitionException, IOException, JobExecutionException {
throws JobGraphDefinitionException, IOException, InterruptedException, JobExecutionException {
// Construct job graph
final JobGraph jobGraph = new JobGraph("Broadcast Job (Run " + run + ")");
......
......@@ -15,10 +15,7 @@
package eu.stratosphere.nephele.example.compression;
import java.io.IOException;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.fs.Path;
import eu.stratosphere.nephele.io.channels.ChannelType;
......@@ -79,9 +76,7 @@ public class CompressionTest {
jobClient.submitJobAndWait();
} catch (IOException e) {
e.printStackTrace();
} catch (JobExecutionException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
......
......@@ -46,8 +46,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import java.io.IOException;
/**
* @author casp
*/
......@@ -91,8 +89,8 @@ public class EventExample {
JobClient jobClient = new JobClient(jobGraph, conf);
JobSubmissionResult result = jobClient.submitJob();
System.out.println(result.getDescription());
} catch (IOException ioe) {
ioe.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
......
......@@ -16,10 +16,8 @@
package eu.stratosphere.nephele.example.speedtest;
import java.io.File;
import java.io.IOException;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.fs.Path;
......@@ -196,14 +194,8 @@ public final class SpeedTest {
// Calculate throughput in MBit/s and output it
System.out.print("Job finished with a throughput of " + toMBitPerSecond(amountOfDataToSend, executionTime));
} catch (IOException ioe) {
ioe.printStackTrace();
System.exit(1);
return;
} catch (JobExecutionException jee) {
jee.printStackTrace();
System.exit(1);
return;
} catch (Exception e) {
e.printStackTrace();
}
}
......
......@@ -42,8 +42,10 @@ public interface ChannelLookupProtocol extends RPCProtocol {
* the ID of the channel to resolve
* @return the lookup response containing the connection info and a return code
* @throws IOException
* thrown if an error occurs during the IPC call
* thrown if an error occurs during the remote procedure call
* @throws InterruptedException
* thrown if the caller is interrupted while waiting for the result of the remote procedure call
*/
ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID,
ChannelID sourceChannelID) throws IOException;
ChannelID sourceChannelID) throws IOException, InterruptedException;
}
......@@ -565,9 +565,7 @@ public class FailingJobITCase {
try {
jobClient = new JobClient(jobGraph, configuration);
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
} finally {
if (jobClient != null) {
......@@ -624,9 +622,7 @@ public class FailingJobITCase {
try {
jobClient = new JobClient(jobGraph, configuration);
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
} finally {
if (jobClient != null) {
......@@ -690,9 +686,7 @@ public class FailingJobITCase {
try {
jobClient = new JobClient(jobGraph, configuration);
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
} finally {
if (jobClient != null) {
......@@ -763,9 +757,7 @@ public class FailingJobITCase {
try {
jobClient = new JobClient(jobGraph, configuration);
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
} finally {
if (jobClient != null) {
......@@ -840,9 +832,7 @@ public class FailingJobITCase {
try {
jobClient = new JobClient(jobGraph, configuration);
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
} finally {
if (jobClient != null) {
......@@ -917,6 +907,8 @@ public class FailingJobITCase {
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (InterruptedException ie) {
fail(StringUtils.stringifyException(ie));
} catch (JobExecutionException e) {
// This is expected here
assert (e.isJobCanceledByUser() == false);
......@@ -995,9 +987,7 @@ public class FailingJobITCase {
try {
jobClient = new JobClient(jobGraph, configuration);
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
} finally {
if (jobClient != null) {
......@@ -1070,9 +1060,7 @@ public class FailingJobITCase {
try {
jobClient = new JobClient(jobGraph, configuration);
jobClient.submitJobAndWait();
} catch (IOException ioe) {
fail(StringUtils.stringifyException(ioe));
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
} finally {
if (jobClient != null) {
......
......@@ -291,15 +291,9 @@ public class JobManagerITCase {
bufferedReader.close();
} catch (NumberFormatException e) {
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
} catch (JobExecutionException e) {
e.printStackTrace();
fail(e.getMessage());
} catch (IOException ioe) {
ioe.printStackTrace();
fail(ioe.getMessage());
} finally {
// Remove temporary files
if (inputFile1 != null) {
......@@ -396,6 +390,8 @@ public class JobManagerITCase {
fail(jgde.getMessage());
} catch (IOException ioe) {
fail(ioe.getMessage());
} catch (InterruptedException ie) {
fail(ie.getMessage());
} finally {
// Remove temporary files
......@@ -486,6 +482,8 @@ public class JobManagerITCase {
fail(jgde.getMessage());
} catch (IOException ioe) {
fail(ioe.getMessage());
} catch (InterruptedException ie) {
fail(ie.getMessage());
} finally {
// Remove temporary files
......@@ -587,6 +585,8 @@ public class JobManagerITCase {
} catch (IOException ioe) {
ioe.printStackTrace();
fail(ioe.getMessage());
} catch (InterruptedException ie) {
fail(ie.getMessage());
} finally {
if (jobClient != null) {
jobClient.close();
......@@ -607,6 +607,9 @@ public class JobManagerITCase {
private void test(final int limit) {
JobClient jobClient = null;
File inputFile = null;
File outputFile = null;
File jarFile = null;
try {
......@@ -614,10 +617,10 @@ public class JobManagerITCase {
final String forwardClassName = ForwardTask.class.getSimpleName();
// Create input and jar files
final File inputFile = ServerTestUtils.createInputFile(limit);
final File outputFile = new File(ServerTestUtils.getTempDir() + File.separator
inputFile = ServerTestUtils.createInputFile(limit);
outputFile = new File(ServerTestUtils.getTempDir() + File.separator
+ ServerTestUtils.getRandomFilename());
final File jarFile = ServerTestUtils.createJarFile(forwardClassName);
jarFile = ServerTestUtils.createJarFile(forwardClassName);
// Create job graph
final JobGraph jg = new JobGraph("Job Graph 1");
......@@ -678,15 +681,26 @@ public class JobManagerITCase {
bufferedReader.close();
// Remove temporary files
inputFile.delete();
outputFile.delete();
jarFile.delete();
} catch (IOException ioe) {
ioe.printStackTrace();
fail(ioe.getMessage());
} catch (InterruptedException ie) {
fail(ie.getMessage());
} finally {
// Remove temporary files
if (inputFile != null) {
inputFile.delete();
}
if (outputFile != null) {
outputFile.delete();
}
if (jarFile != null) {
jarFile.delete();
}
if (jobClient != null) {
jobClient.close();
}
......@@ -749,12 +763,8 @@ public class JobManagerITCase {
jobClient.submitJobAndWait();
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(e.getMessage());
} catch (JobGraphDefinitionException jgde) {
fail(jgde.getMessage());
} catch (IOException ioe) {
fail(ioe.getMessage());
} finally {
// Remove temporary files
......@@ -821,12 +831,8 @@ public class JobManagerITCase {
jobClient = new JobClient(jg, configuration);
jobClient.submitJobAndWait();
} catch (JobExecutionException e) {
} catch (Exception e) {
fail(e.getMessage());
} catch (JobGraphDefinitionException jgde) {
fail(jgde.getMessage());
} catch (IOException ioe) {
fail(ioe.getMessage());
} finally {
// Remove temporary files
......@@ -981,6 +987,8 @@ public class JobManagerITCase {
fail(jgde.getMessage());
} catch (IOException ioe) {
fail(ioe.getMessage());
} catch (InterruptedException ie) {
fail(ie.getMessage());
} finally {
// Remove temporary files
......@@ -1092,6 +1100,8 @@ public class JobManagerITCase {
fail(jgde.getMessage());
} catch (IOException ioe) {
fail(ioe.getMessage());
} catch (InterruptedException ie) {
fail(ie.getMessage());
} finally {
// Remove temporary files
......
......@@ -90,7 +90,7 @@ public class SWTVisualization {
// Get the query interval
queryInterval = jobManager.getRecommendedPollingInterval();
} catch (IOException e) {
} catch (Exception e) {
e.printStackTrace();
rpcService.shutDown();
System.exit(1);
......
......@@ -864,6 +864,8 @@ public class SWTVisualizationGUI implements SelectionListener, Runnable {
msgBox.setText("Canceling job " + visualizationData.getJobID() + " failed");
msgBox.setMessage("Canceling job " + visualizationData.getJobID()
+ " failed:\r\n\r\n" + ioe.getMessage());
} catch (InterruptedException ie) {
return;
}
}
......
......@@ -266,6 +266,9 @@ public class Client {
throw new ProgramInvocationException("The program execution failed: " + jex.getMessage());
}
}
catch (InterruptedException ie) {
throw new ProgramInvocationException("The program has been interrupted: " + ie.getMessage());
}
finally {
program.deleteExtractedLibraries();
}
......
......@@ -683,6 +683,8 @@ public class TestPlan implements Closeable {
e1.printStackTrace();
} catch (JobExecutionException e1) {
e1.printStackTrace();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
......
......@@ -108,7 +108,7 @@ public class ClientTest {
}
@Test
public void shouldSubmitToJobClient() throws ProgramInvocationException, ErrorInPlanAssemblerException, IOException
public void shouldSubmitToJobClient() throws ProgramInvocationException, ErrorInPlanAssemblerException, IOException, InterruptedException
{
when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.SUCCESS);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册