提交 b61f63c0 编写于 作者: S Stephan Ewen

[FLINK-1110] Adjusted test base to run programs both with local executor and collection executor

上级 217b03e6
......@@ -191,7 +191,10 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
result.add(inputFormat.nextRecord(serializer.createInstance()));
OUT next = inputFormat.nextRecord(serializer.createInstance());
if (next != null) {
result.add(next);
}
}
inputFormat.close();
......
......@@ -219,6 +219,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
return result;
}
@SuppressWarnings("unchecked")
private <T> TypeComparator<T> getTypeComparator(TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
if (!(inputType instanceof CompositeType)) {
throw new InvalidProgramException("Input types of coGroup must be composite types.");
......
......@@ -43,6 +43,7 @@ public class TypeComparable<T> {
if (!(o instanceof TypeComparable)) {
return false;
}
@SuppressWarnings("unchecked")
TypeComparable<T> other = (TypeComparable<T>) o;
return comparator.compare(elem, other.elem) == 0;
}
......
......@@ -41,9 +41,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
......@@ -82,7 +80,6 @@ public abstract class AbstractTestBase {
// Local Test Cluster Life Cycle
// --------------------------------------------------------------------------------------------
@Before
public void startCluster() throws Exception {
this.executor = new NepheleMiniCluster();
this.executor.setDefaultOverwriteFiles(true);
......@@ -92,8 +89,7 @@ public abstract class AbstractTestBase {
this.executor.setNumTaskManager(this.numTaskManager);
this.executor.start();
}
@After
public void stopCluster() throws Exception {
try {
if (this.executor != null) {
......
......@@ -34,6 +34,7 @@ import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple;
......@@ -84,6 +85,51 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
@Test
public void testJob() throws Exception {
startCluster();
try {
// pre-submit
try {
preSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Pre-submit work caused an error: " + e.getMessage());
}
// prepare the test environment
TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
env.setAsContext();
// call the test program
try {
testProgram();
this.latestExecutionResult = env.latestResult;
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Error while calling the test program: " + e.getMessage());
}
Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
// post-submit
try {
postSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Post-submit work caused an error: " + e.getMessage());
}
} finally {
stopCluster();
}
}
@Test
public void testJobCollectionExecution() throws Exception {
// pre-submit
try {
preSubmit();
......@@ -95,7 +141,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
// prepare the test environment
TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
CollectionTestEnvironment env = new CollectionTestEnvironment();
env.setAsContext();
// call the test program
......@@ -121,10 +167,6 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
Assert.fail("Post-submit work caused an error: " + e.getMessage());
}
}
protected ExecutionEnvironment getExecutionEnvironment() {
return new TestEnvironment(this.executor, this.degreeOfParallelism);
}
private static final class TestEnvironment extends ExecutionEnvironment {
......@@ -183,6 +225,27 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
}
}
private static final class CollectionTestEnvironment extends CollectionEnvironment {
private JobExecutionResult latestResult;
@Override
public JobExecutionResult execute() throws Exception {
return execute("test job");
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
JobExecutionResult result = super.execute(jobName);
this.latestResult = result;
return result;
}
private void setAsContext() {
initializeContextEnvironment(this);
}
}
public static class TupleComparator<T extends Tuple> implements Comparator<T> {
@SuppressWarnings("unchecked")
......
......@@ -92,48 +92,54 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
@Test
public void testJob() throws Exception {
// pre-submit
try {
preSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Pre-submit work caused an error: " + e.getMessage());
}
startCluster();
// submit job
JobGraph jobGraph = null;
try {
jobGraph = getJobGraph();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Failed to obtain JobGraph!");
}
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = this.executor.getJobClient(jobGraph);
client.setConsoleStreamForReporting(getNullPrintStream());
this.jobExecutionResult = client.submitJobAndWait();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Job execution failed!");
}
// post-submit
try {
postSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Post-submit work caused an error: " + e.getMessage());
// pre-submit
try {
preSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Pre-submit work caused an error: " + e.getMessage());
}
// submit job
JobGraph jobGraph = null;
try {
jobGraph = getJobGraph();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Failed to obtain JobGraph!");
}
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = this.executor.getJobClient(jobGraph);
client.setConsoleStreamForReporting(getNullPrintStream());
this.jobExecutionResult = client.submitJobAndWait();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Job execution failed!");
}
// post-submit
try {
postSubmit();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Post-submit work caused an error: " + e.getMessage());
}
} finally {
stopCluster();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册