提交 8a98a0b6 编写于 作者: S sewen

Changed PactProgram to not instantiate plan multiple times.

Removed obsolete reflection visitor.
Cleaned up context checker.
上级 0a4ef24d
......@@ -53,6 +53,9 @@ public class PactProgram {
* Property name of the pact assembler definition in the JAR manifest file.
*/
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "Pact-Assembler-Class";
private static final Pattern BREAK_TAGS = Pattern.compile("<(b|B)(r|R) */?>");
private static final Pattern REMOVE_TAGS = Pattern.compile("<.+?>");
// --------------------------------------------------------------------------------------------
......@@ -63,6 +66,8 @@ public class PactProgram {
private final String[] args;
private File[] extractedTempLibraries;
private Plan plan;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
......@@ -120,7 +125,10 @@ public class PactProgram {
* missing parameters for generation.
*/
public Plan getPlan() throws ProgramInvocationException, ErrorInPlanAssemblerException {
return createPlanFromJar(assemblerClass, args);
if (this.plan == null) {
this.plan = createPlanFromJar(this.assemblerClass, this.args);
}
return this.plan;
}
/**
......@@ -167,7 +175,7 @@ public class PactProgram {
* @return The jar-file of the PactProgram.
*/
public File getJarFile() {
return jarFile;
return this.jarFile;
}
/**
......@@ -183,7 +191,7 @@ public class PactProgram {
* missing parameters for generation.
*/
public String getDescription() throws ProgramInvocationException {
PlanAssembler assembler = createAssemblerFromJar(assemblerClass);
PlanAssembler assembler = instantiateAssemblerFromClass(this.assemblerClass);
if (assembler instanceof PlanAssemblerDescription) {
return ((PlanAssemblerDescription) assembler).getDescription();
} else {
......@@ -209,8 +217,7 @@ public class PactProgram {
if (descr == null || descr.length() == 0) {
return null;
} else {
final Pattern BREAK_TAGS = Pattern.compile("<(b|B)(r|R) */?>");
final Pattern REMOVE_TAGS = Pattern.compile("<.+?>");
Matcher m = BREAK_TAGS.matcher(descr);
descr = m.replaceAll("\n");
m = REMOVE_TAGS.matcher(descr);
......@@ -328,12 +335,10 @@ public class PactProgram {
* @throws ErrorInPlanAssemblerException
* Thrown, if an error occurred in the user-provided pact assembler.
*/
protected Plan createPlanFromJar(Class<? extends PlanAssembler> clazz, String[] options)
protected static Plan createPlanFromJar(Class<? extends PlanAssembler> clazz, String[] options)
throws ProgramInvocationException, ErrorInPlanAssemblerException
{
PlanAssembler assembler = createAssemblerFromJar(clazz);
// run the user-provided assembler class
PlanAssembler assembler = instantiateAssemblerFromClass(clazz);
try {
return assembler.getPlan(options);
} catch (Throwable t) {
......@@ -351,27 +356,22 @@ public class PactProgram {
* @throws ProgramInvocationException
* is thrown if class can't be found or instantiated
*/
protected PlanAssembler createAssemblerFromJar(Class<? extends PlanAssembler> clazz)
protected static PlanAssembler instantiateAssemblerFromClass(Class<? extends PlanAssembler> clazz)
throws ProgramInvocationException
{
// we have the class. now create a classloader that can load the
// contents of the jar
PlanAssembler assembler = null;
try {
assembler = clazz.newInstance();
return clazz.newInstance();
} catch (InstantiationException e) {
throw new ProgramInvocationException("ERROR: The pact plan assembler class could not be instantiated. "
+ "Make sure that the class is a proper class (not abstract) and has a "
+ "Make sure that the class is a proper class (not abstract/interface) and has a "
+ "public constructor with no arguments.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException("ERROR: The pact plan assembler class could not be instantiated. "
+ "Make sure that the class has a public constructor with no arguments.", e);
} catch (Throwable t) {
throw new ProgramInvocationException("An unknown problem ocurred during the instantiation of the "
throw new ProgramInvocationException("An error ocurred during the instantiation of the "
+ "program assembler: " + t.getMessage(), t);
}
return assembler;
}
private Class<? extends PlanAssembler> getPactAssemblerFromJar(File jarFile)
......@@ -467,12 +467,10 @@ public class PactProgram {
private void checkJarFile(File jar) throws ProgramInvocationException {
if (!jar.exists()) {
throw new ProgramInvocationException("JAR file does not exist '" + jarFile.getPath() + "'");
}
if (!jar.canRead()) {
throw new ProgramInvocationException("JAR file can't be read '" + jarFile.getPath() + "'");
}
// TODO: Check if proper JAR file
}
}
......@@ -31,9 +31,6 @@ import eu.stratosphere.pact.generic.contract.SingleInputContract;
/**
* Traverses a plan and checks whether all Contracts are correctly connected to
* their input contracts.
*
* @author Max Heimel
* @author Fabian Hueske
*/
public class ContextChecker implements Visitor<Contract> {
......@@ -41,19 +38,12 @@ public class ContextChecker implements Visitor<Contract> {
* A set of all already visited nodes during DAG traversal. Is used
* to avoid processing one node multiple times.
*/
public Set<Contract> visitedNodes = new HashSet<Contract>();
/**
* Used for checking whether a contract does not have any children.
*/
//private Map<Contract, Set<Contract>> contractChildren =
// new HashMap<Contract, Set<Contract>>();
private final Set<Contract> visitedNodes = new HashSet<Contract>();
/**
* Default constructor
*/
public ContextChecker() {
}
public ContextChecker() {}
/**
* Checks whether the given plan is valid. In particular it is checked that
......@@ -74,13 +64,10 @@ public class ContextChecker implements Visitor<Contract> {
*/
@Override
public boolean preVisit(Contract node) {
// check if node was already visited
if (this.visitedNodes.contains(node)) {
if (!this.visitedNodes.add(node)) {
return false;
}
//contractChildren.put(node, new HashSet<Contract>());
// apply the appropriate check method
if (node instanceof FileDataSink) {
......@@ -94,24 +81,11 @@ public class ContextChecker implements Visitor<Contract> {
} else if (node instanceof DualInputContract<?>) {
checkDualInputContract((DualInputContract<?>) node);
}
// Data sources must not be checked, since correctness of input type is
// checked.
// mark node as visited
this.visitedNodes.add(node);
return true;
}
@Override
public void postVisit(Contract node) {
/* if (contractChildren.get(node).size() == 0) {
// we did not visit any node that had this node
// as input
throw new MissingChildException("Node " +
node.getName() + " does not have any childern.");
}*/
}
public void postVisit(Contract node) {}
/**
* Checks if DataSinkContract is correctly connected. In case that the
......@@ -127,8 +101,6 @@ public class ContextChecker implements Visitor<Contract> {
if (input == null) {
throw new MissingChildException();
}
//contractChildren.get(input).add(dataSinkContract);
}
/**
......@@ -189,9 +161,6 @@ public class ContextChecker implements Visitor<Contract> {
if (input.size() == 0) {
throw new MissingChildException();
}
/* for (Contract in : input) {
contractChildren.get(in).add(singleInputContract);
}*/
}
/**
......@@ -209,12 +178,5 @@ public class ContextChecker implements Visitor<Contract> {
if (input1.size() == 0 || input2.size() == 0) {
throw new MissingChildException();
}
/* for (Contract in : input1) {
contractChildren.get(in).add(dualInputContract);
}
for (Contract in : input2) {
contractChildren.get(in).add(dualInputContract);
}*/
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.contextcheck;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.pact.common.plan.Visitable;
import eu.stratosphere.pact.common.plan.Visitor;
public abstract class ReflectionVisitor<T extends Visitable<T>> implements Visitor<T> {
private final Map<Class<?>, Method> preVisits = new HashMap<Class<?>, Method>();
private final Map<Class<?>, Method> postVisits = new HashMap<Class<?>, Method>();
protected ReflectionVisitor() {
{
Method[] preVisitsMethods = getClass().getDeclaredMethods();
for (Method method : preVisitsMethods) {
if (method.getName().startsWith("preVisit"))
preVisits.put(method.getParameterTypes()[0], method);
}
}
{
Method[] prostVisitsMethods = getClass().getDeclaredMethods();
for (Method method : prostVisitsMethods) {
if (method.getName().startsWith("postVisit"))
postVisits.put(method.getParameterTypes()[0], method);
}
}
}
public boolean preVisit(T node) {
try {
Class<?> clazz = node.getClass();
while (Arrays.asList(clazz.getInterfaces()).contains(Visitable.class)) {
if (preVisits.containsKey(clazz)) {
preVisits.get(clazz).invoke(this, node);
return true;
} else {
clazz = clazz.getSuperclass();
}
}
preVisitDefault(node);
} catch (Exception e) {
throw new RuntimeException(e);
}
return true;
}
public void postVisit(T node) {
try {
Class<?> clazz = node.getClass();
while (Arrays.asList(clazz.getInterfaces()).contains(Visitable.class)) {
if (postVisits.containsKey(clazz)) {
postVisits.get(clazz).invoke(this, node);
return;
} else {
clazz = clazz.getSuperclass();
}
}
postVisitDefault(node);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
void preVisitDefault(T node) {
}
void postVisitDefault(T node) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册