提交 3d5ae611 编写于 作者: S StephanEwen

Fix client and webclient for interactive mode of new java api.

上级 bdafcd8a
...@@ -145,34 +145,21 @@ public class Client { ...@@ -145,34 +145,21 @@ public class Client {
return getOptimizedPlan(prog.getPlan()); return getOptimizedPlan(prog.getPlan());
} }
/** public JobGraph getJobGraph(PackagedProgram prog, OptimizedPlan optPlan) throws ProgramInvocationException {
* Creates the job-graph, which is ready for submission, from a compiled and optimized program. return getJobGraph(optPlan, prog.getAllLibraries());
* The original program is required to access the original jar file. }
*
* @param prog The original program. private JobGraph getJobGraph(OptimizedPlan optPlan, List<File> jarFiles) {
* @param optPlan The optimized plan.
* @return The nephele job graph, generated from the optimized plan.
*/
public JobGraph getJobGraph(JobWithJars prog, OptimizedPlan optPlan) throws ProgramInvocationException {
NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator(); NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator();
JobGraph job = gen.compileJobGraph(optPlan); JobGraph job = gen.compileJobGraph(optPlan);
for (File jar : jarFiles) {
try { job.addJar(new Path(jar.getAbsolutePath()));
List<File> jarFiles = prog.getJarFiles();
for (File jar : jarFiles) {
job.addJar(new Path(jar.getAbsolutePath()));
}
}
catch (IOException ioex) {
throw new ProgramInvocationException("Could not extract the nested libraries: " + ioex.getMessage(), ioex);
} }
return job; return job;
} }
public JobExecutionResult run(PackagedProgram prog, boolean wait) throws ProgramInvocationException { public JobExecutionResult run(PackagedProgram prog, boolean wait) throws ProgramInvocationException {
if (prog.isUsingProgramEntryPoint()) { if (prog.isUsingProgramEntryPoint()) {
return run(prog.getPlanWithJars(), wait); return run(prog.getPlanWithJars(), wait);
...@@ -188,20 +175,9 @@ public class Client { ...@@ -188,20 +175,9 @@ public class Client {
} }
} }
/** public JobExecutionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
* Runs a program on the nephele system whose job-manager is configured in this client's configuration. return run(optimizedPlan, prog.getAllLibraries(), wait);
* This method involves all steps, from compiling, job-graph generation to submission.
*
* @param prog The program to be executed.
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the execution
* on the nephele system failed.
* @throws JobInstantiationException Thrown, if the plan assembler function causes an exception.
*/
public JobExecutionResult run(JobWithJars prog) throws CompilerException, ProgramInvocationException {
return run(prog, false);
} }
/** /**
...@@ -218,64 +194,16 @@ public class Client { ...@@ -218,64 +194,16 @@ public class Client {
* @throws JobInstantiationException Thrown, if the plan assembler function causes an exception. * @throws JobInstantiationException Thrown, if the plan assembler function causes an exception.
*/ */
public JobExecutionResult run(JobWithJars prog, boolean wait) throws CompilerException, ProgramInvocationException { public JobExecutionResult run(JobWithJars prog, boolean wait) throws CompilerException, ProgramInvocationException {
return run(prog, getOptimizedPlan(prog), wait); return run(getOptimizedPlan(prog), prog.getJarFiles(), wait);
}
/**
* Submits the given program to the nephele job-manager for execution. The first step of the compilation process is skipped and
* the given compiled plan is taken.
*
* @param prog The original program.
* @param compiledPlan The optimized plan.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the execution
* on the nephele system failed.
*/
public JobExecutionResult run(JobWithJars prog, OptimizedPlan compiledPlan) throws ProgramInvocationException {
return run(prog, compiledPlan, false);
} }
/**
* Submits the given program to the nephele job-manager for execution. The first step of the compilation process is skipped and
* the given compiled plan is taken.
*
* @param prog The original program.
* @param compiledPlan The optimized plan.
* @param wait A flag that indicates whether this function call should block until the program execution is done.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file,
* or if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the execution
* on the nephele system failed.
*/
public JobExecutionResult run(JobWithJars prog, OptimizedPlan compiledPlan, boolean wait) throws ProgramInvocationException {
JobGraph job = getJobGraph(prog, compiledPlan);
return run(prog, job, wait);
}
/** public JobExecutionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
* Submits the job-graph to the nephele job-manager for execution. JobGraph job = getJobGraph(compiledPlan, libraries);
* return run(job, wait);
* @param prog The program to be submitted.
* @throws ProgramInvocationException Thrown, if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the execution
* on the nephele system failed.
*/
public JobExecutionResult run(JobWithJars program, JobGraph jobGraph) throws ProgramInvocationException {
return run(program, jobGraph, false);
} }
/**
* Submits the job-graph to the nephele job-manager for execution. public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
*
* @param prog The program to be submitted.
* @param wait Method will block until the job execution is finished if set to true.
* If set to false, the method will directly return after the job is submitted.
* @throws ProgramInvocationException Thrown, if the submission failed. That might be either due to an I/O problem,
* i.e. the job-manager is unreachable, or due to the fact that the execution
* on the nephele system failed.
*/
public JobExecutionResult run(JobWithJars program, JobGraph jobGraph, boolean wait) throws ProgramInvocationException
{
JobClient client; JobClient client;
try { try {
client = new JobClient(jobGraph, configuration); client = new JobClient(jobGraph, configuration);
...@@ -344,12 +272,7 @@ public class Client { ...@@ -344,12 +272,7 @@ public class Client {
} }
private void setAsContext() { private void setAsContext() {
if (isContextEnvironmentSet()) { initializeContextEnvironment(this);
throw new RuntimeException("The context environment has already been initialized.");
}
else {
initializeContextEnvironment(this);
}
} }
} }
......
...@@ -63,11 +63,6 @@ public class ContextEnvironment extends ExecutionEnvironment { ...@@ -63,11 +63,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
} }
public void setAsContext() { public void setAsContext() {
if (isContextEnvironmentSet()) { initializeContextEnvironment(this);
throw new RuntimeException("The context environment has already been initialized.");
}
else {
initializeContextEnvironment(this);
}
} }
} }
...@@ -68,7 +68,7 @@ public class JobWithJars { ...@@ -68,7 +68,7 @@ public class JobWithJars {
/** /**
* Returns list of jar files that need to be submitted with the plan. * Returns list of jar files that need to be submitted with the plan.
*/ */
public List<File> getJarFiles() throws IOException { public List<File> getJarFiles() {
return this.jarFiles; return this.jarFiles;
} }
......
...@@ -626,12 +626,7 @@ public class PackagedProgram { ...@@ -626,12 +626,7 @@ public class PackagedProgram {
} }
private void setAsContext() { private void setAsContext() {
if (isContextEnvironmentSet()) { initializeContextEnvironment(this);
throw new RuntimeException("The context environment has already been initialized.");
}
else {
initializeContextEnvironment(this);
}
} }
} }
} }
...@@ -77,7 +77,7 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -77,7 +77,7 @@ public class JobSubmissionServlet extends HttpServlet {
private final File planDumpDirectory; // the directory to dump the optimizer plans to private final File planDumpDirectory; // the directory to dump the optimizer plans to
private final Map<Long, ProgramJobGraphPair> submittedJobs; // map from UIDs to the running jobs private final Map<Long, JobGraph> submittedJobs; // map from UIDs to the running jobs
private final Random rand; // random number generator for UIDs private final Random rand; // random number generator for UIDs
...@@ -89,7 +89,7 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -89,7 +89,7 @@ public class JobSubmissionServlet extends HttpServlet {
this.jobStoreDirectory = jobDir; this.jobStoreDirectory = jobDir;
this.planDumpDirectory = planDir; this.planDumpDirectory = planDir;
this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, ProgramJobGraphPair>()); this.submittedJobs = Collections.synchronizedMap(new HashMap<Long, JobGraph>());
this.rand = new Random(System.currentTimeMillis()); this.rand = new Random(System.currentTimeMillis());
} }
...@@ -144,19 +144,19 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -144,19 +144,19 @@ public class JobSubmissionServlet extends HttpServlet {
params.remove(0); params.remove(0);
} }
// create the pact plan // create the plan
String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]); String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]);
PackagedProgram pactProgram = null; PackagedProgram program;
OptimizedPlan optPlan = null; OptimizedPlan optPlan;
try { try {
if (assemblerClass == null) { if (assemblerClass == null) {
pactProgram = new PackagedProgram(jarFile, options); program = new PackagedProgram(jarFile, options);
} else { } else {
pactProgram = new PackagedProgram(jarFile, assemblerClass, options); program = new PackagedProgram(jarFile, assemblerClass, options);
} }
optPlan = client.getOptimizedPlan(pactProgram.getPlanWithJars()); optPlan = client.getOptimizedPlan(program);
} }
catch (ProgramInvocationException e) { catch (ProgramInvocationException e) {
// collect the stack trace // collect the stack trace
...@@ -210,18 +210,17 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -210,18 +210,17 @@ public class JobSubmissionServlet extends HttpServlet {
// submit the job only, if it should not be suspended // submit the job only, if it should not be suspended
if (!suspend) { if (!suspend) {
try { try {
this.client.run(pactProgram.getPlanWithJars(), optPlan); this.client.run(program, optPlan, false);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Error submitting job to the job-manager.", t); LOG.error("Error submitting job to the job-manager.", t);
showErrorPage(resp, t.getMessage()); showErrorPage(resp, t.getMessage());
return; return;
} finally { } finally {
pactProgram.deleteExtractedLibraries(); program.deleteExtractedLibraries();
} }
} else { } else {
try { try {
this.submittedJobs.put(uid, this.submittedJobs.put(uid, this.client.getJobGraph(program, optPlan));
new ProgramJobGraphPair(pactProgram, this.client.getJobGraph(pactProgram.getPlanWithJars(), optPlan)));
} }
catch (ProgramInvocationException piex) { catch (ProgramInvocationException piex) {
LOG.error("Error creating JobGraph from optimized plan.", piex); LOG.error("Error creating JobGraph from optimized plan.", piex);
...@@ -241,7 +240,7 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -241,7 +240,7 @@ public class JobSubmissionServlet extends HttpServlet {
// don't show any plan. directly submit the job and redirect to the // don't show any plan. directly submit the job and redirect to the
// nephele runtime monitor // nephele runtime monitor
try { try {
client.run(pactProgram.getPlanWithJars()); client.run(program, false);
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error submitting job to the job-manager.", ex); LOG.error("Error submitting job to the job-manager.", ex);
// HACK: Is necessary because Message contains whole stack trace // HACK: Is necessary because Message contains whole stack trace
...@@ -249,7 +248,7 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -249,7 +248,7 @@ public class JobSubmissionServlet extends HttpServlet {
showErrorPage(resp, errorMessage); showErrorPage(resp, errorMessage);
return; return;
} finally { } finally {
pactProgram.deleteExtractedLibraries(); program.deleteExtractedLibraries();
} }
resp.sendRedirect(START_PAGE_URL); resp.sendRedirect(START_PAGE_URL);
} }
...@@ -271,7 +270,7 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -271,7 +270,7 @@ public class JobSubmissionServlet extends HttpServlet {
} }
// get the retained job // get the retained job
ProgramJobGraphPair job = submittedJobs.remove(uid); JobGraph job = submittedJobs.remove(uid);
if (job == null) { if (job == null) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST, resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
"No job with the given uid was retained for later submission."); "No job with the given uid was retained for later submission.");
...@@ -280,7 +279,7 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -280,7 +279,7 @@ public class JobSubmissionServlet extends HttpServlet {
// submit the job // submit the job
try { try {
client.run(job.getProgram().getPlanWithJars(), job.getJobGraph()); client.run(job, false);
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error submitting job to the job-manager.", ex); LOG.error("Error submitting job to the job-manager.", ex);
resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
...@@ -289,8 +288,6 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -289,8 +288,6 @@ public class JobSubmissionServlet extends HttpServlet {
resp.getWriter().print(errorMessage); resp.getWriter().print(errorMessage);
// resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage()); // resp.sendError(HttpServletResponse.SC_BAD_REQUEST, ex.getMessage());
return; return;
} finally {
job.getProgram().deleteExtractedLibraries();
} }
// redirect to the start page // redirect to the start page
...@@ -428,29 +425,5 @@ public class JobSubmissionServlet extends HttpServlet { ...@@ -428,29 +425,5 @@ public class JobSubmissionServlet extends HttpServlet {
} }
return list; return list;
}
// ============================================================================================
private static final class ProgramJobGraphPair
{
private final PackagedProgram program;
private final JobGraph jobGraph;
public ProgramJobGraphPair(PackagedProgram program, JobGraph jobGraph) {
this.program = program;
this.jobGraph = jobGraph;
}
public PackagedProgram getProgram() {
return program;
}
public JobGraph getJobGraph() {
return jobGraph;
}
} }
} }
...@@ -109,10 +109,10 @@ public class PactJobJSONServlet extends HttpServlet { ...@@ -109,10 +109,10 @@ public class PactJobJSONServlet extends HttpServlet {
} }
if (programDescription != null) { if (programDescription != null) {
wrt.print(", \"description\": \""); wrt.print(", \"description\": \"");
wrt.print(escapeString(programDescription)); wrt.print(escapeString(programDescription));
wrt.print("\"");
} }
wrt.print("\"");
wrt.println("}"); wrt.println("}");
} }
} }
......
...@@ -113,7 +113,7 @@ public class ClientTest { ...@@ -113,7 +113,7 @@ public class ClientTest {
when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.SUCCESS); when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.SUCCESS);
Client out = new Client(configMock); Client out = new Client(configMock);
out.run(program.getPlanWithJars()); out.run(program.getPlanWithJars(), false);
program.deleteExtractedLibraries(); program.deleteExtractedLibraries();
verify(this.compilerMock, times(1)).compile(planMock); verify(this.compilerMock, times(1)).compile(planMock);
...@@ -130,7 +130,7 @@ public class ClientTest { ...@@ -130,7 +130,7 @@ public class ClientTest {
when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.ERROR); when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.ERROR);
Client out = new Client(configMock); Client out = new Client(configMock);
out.run(program.getPlanWithJars()); out.run(program.getPlanWithJars(), false);
program.deleteExtractedLibraries(); program.deleteExtractedLibraries();
verify(this.jobClientMock).submitJob(); verify(this.jobClientMock).submitJob();
......
...@@ -293,9 +293,6 @@ public abstract class ExecutionEnvironment { ...@@ -293,9 +293,6 @@ public abstract class ExecutionEnvironment {
} }
protected static void initializeContextEnvironment(ExecutionEnvironment ctx) { protected static void initializeContextEnvironment(ExecutionEnvironment ctx) {
if (contextEnvironment != null)
throw new IllegalStateException("System context has already been initialized.");
contextEnvironment = ctx; contextEnvironment = ctx;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册