提交 c175ebe8 编写于 作者: T Till Rohrmann

Fixed JobManagerITCase to properly wait for task managers to deregister their...

Fixed JobManagerITCase to properly wait for task managers to deregister their tasks. Replaced the scheduler's execution service with akka's futures. Introduced TestStreamEnvironment to use ForkableFlinkMiniCluster for test execution.
上级 bd4ee47b
......@@ -25,7 +25,7 @@ import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
......@@ -39,12 +39,12 @@ public class AvroExternalJarProgramITCase {
@Test
public void testExternalProgram() {
LocalFlinkMiniCluster testMiniCluster = null;
ForkableFlinkMiniCluster testMiniCluster = null;
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
testMiniCluster = new LocalFlinkMiniCluster(config);
testMiniCluster = new ForkableFlinkMiniCluster(config);
String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
......
......@@ -46,6 +46,13 @@ under the License.
<artifactId>commons-math</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......
......@@ -44,9 +44,4 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName),
getDegreeOfParallelism());
}
public void executeTest(long memorySize) throws Exception {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism(),
memorySize);
}
}
......@@ -90,7 +90,7 @@ public abstract class StreamExecutionEnvironment {
/**
* Gets the degree of parallelism with which operation are executed by
* default. Operations can individually override this value to use a
* specific degree of parallelism via {@link DataStream#setParallelism}.
* specific degree of parallelism.
*
* @return The degree of parallelism used by operations, unless they
* override that value.
......
......@@ -81,5 +81,4 @@ public class ClusterUtil {
public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) throws Exception {
runOnMiniCluster(jobGraph, numOfSlots, -1);
}
}
......@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
......@@ -73,7 +74,7 @@ public class IterateTest {
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.setBufferTimeout(10);
......@@ -86,7 +87,7 @@ public class IterateTest {
iteration.closeWith(increment).addSink(new MySink());
env.executeTest(MEMORYSIZE);
env.execute();
assertTrue(iterated);
......
......@@ -21,8 +21,8 @@ import java.io.Serializable;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class PrintTest implements Serializable {
......@@ -50,9 +50,8 @@ public class PrintTest implements Serializable {
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
env.executeTest(MEMORYSIZE);
env.execute();
}
}
......@@ -25,10 +25,10 @@ import java.util.ArrayList;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class WindowCrossJoinTest implements Serializable {
......@@ -45,7 +45,7 @@ public class WindowCrossJoinTest implements Serializable {
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
env.setBufferTimeout(1);
ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>();
......@@ -111,7 +111,7 @@ public class WindowCrossJoinTest implements Serializable {
})
.addSink(new CrossResultSink());
env.executeTest(MEMORYSIZE);
env.execute();
assertEquals(joinExpectedResults, joinResults);
assertEquals(crossExpectedResults, crossResults);
......
......@@ -28,9 +28,9 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -132,7 +132,7 @@ public class WriteAsCsvTest {
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsCsv(PREFIX + "test1.txt");
......@@ -159,7 +159,7 @@ public class WriteAsCsvTest {
fillExpected5();
env.executeTest(MEMORYSIZE);
env.execute();
readFile(PREFIX + "test1.txt", result1);
readFile(PREFIX + "test2.txt", result2);
......
......@@ -28,9 +28,9 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -132,7 +132,7 @@ public class WriteAsTextTest {
@Test
public void test() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
@SuppressWarnings("unused")
DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1()).writeAsText(PREFIX + "test1.txt");
......@@ -159,7 +159,7 @@ public class WriteAsTextTest {
fillExpected5();
env.executeTest(MEMORYSIZE);
env.execute();
readFile(PREFIX + "test1.txt", result1);
readFile(PREFIX + "test2.txt", result2);
......
......@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
public class DirectedOutputTest {
......@@ -104,8 +105,7 @@ public class DirectedOutputTest {
@Test
public void outputSelectorTest() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
StreamExecutionEnvironment env = new TestStreamEnvironment(1, 128);
SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
source.select(EVEN).addSink(new ListSink(EVEN));
......@@ -113,7 +113,7 @@ public class DirectedOutputTest {
source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
source.selectAll().addSink(new ListSink(ALL));
env.executeTest(128);
env.execute();
assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
......
......@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
......@@ -145,8 +146,7 @@ public class StreamVertexTest {
@Test
public void coTest() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(SOURCE_PARALELISM);
StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
DataStream<String> fromStringElements = env.fromElements("aa", "bb", "cc");
DataStream<Long> generatedSequence = env.generateSequence(0, 3);
......@@ -154,7 +154,7 @@ public class StreamVertexTest {
fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
resultSet = new HashSet<String>();
env.executeTest(MEMORYSIZE);
env.execute();
HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
"2", "3"));
......@@ -163,12 +163,11 @@ public class StreamVertexTest {
@Test
public void runStream() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(SOURCE_PARALELISM);
StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
env.executeTest(MEMORYSIZE);
env.execute();
assertEquals(10, data.keySet().size());
for (Integer k : data.keySet()) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.util;
import akka.actor.ActorRef;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
public class TestStreamEnvironment extends StreamExecutionEnvironment {
private static final String DEFAULT_JOBNAME = "TestStreamingJob";
private static final String CANNOT_EXECUTE_EMPTY_JOB = "Cannot execute empty job";
private long memorySize;
public TestStreamEnvironment(int degreeOfParallelism, long memorySize){
this.setDegreeOfParallelism(degreeOfParallelism);
this.memorySize = memorySize;
}
@Override
public void execute() throws Exception {
execute(DEFAULT_JOBNAME);
}
@Override
public void execute(String jobName) throws Exception {
JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
Configuration configuration = jobGraph.getJobConfiguration();
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
getDegreeOfParallelism());
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
try{
ActorRef client = cluster.getJobClient();
JobClient.submitJobAndWait(jobGraph, false, client);
}catch(JobExecutionException e){
if(e.getMessage().contains("GraphConversionException")){
throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, e);
}else{
throw e;
}
}finally{
cluster.stop();
}
}
}
......@@ -68,7 +68,7 @@ public class Client {
private final PactCompiler compiler; // the compiler to compile the jobs
private boolean printStatusDuringExecution;
private boolean printStatusDuringExecution = false;
// ------------------------------------------------------------------------
// Construction
......
......@@ -598,7 +598,7 @@ public final class ConfigConstants {
public static String DEFAULT_AKKA_FRAMESIZE = "10485760b";
public static String DEFAULT_AKKA_LOG_LEVEL = "OFF";
public static String DEFAULT_AKKA_LOG_LEVEL = "ERROR";
public static int DEFAULT_AKKA_ASK_TIMEOUT = 100;
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -25,9 +25,11 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import akka.dispatch.Futures;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
......@@ -48,9 +50,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
private final Object globalLock = new Object();
private final ExecutorService executor;
/** All instances that the scheduler can deploy to */
private final Set<Instance> allInstances = new HashSet<Instance>();
......@@ -69,13 +68,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
private int nonLocalizedAssignments;
public Scheduler() {
this(null);
}
public Scheduler(ExecutorService executorService) {
this.executor = executorService;
this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
}
......@@ -395,19 +388,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
// that leads with a high probability to deadlocks, when scheduling fast
this.newlyAvailableInstances.add(instance);
if (this.executor != null) {
this.executor.execute(new Runnable() {
@Override
public void run() {
handleNewSlot();
}
});
}
else {
// for tests, we use the synchronous variant
handleNewSlot();
}
Futures.future(new Callable<Object>() {
@Override
public Object call() throws Exception {
handleNewSlot();
return null;
}
}, AkkaUtils.globalExecutionContext());
}
private void handleNewSlot() {
......
......@@ -20,6 +20,7 @@ package org.apache.flink.runtime.client
import java.io.IOException
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
import akka.actor.Status.Failure
import akka.actor._
......@@ -83,11 +84,13 @@ object JobClient{
def startActorSystemAndActor(config: Configuration): (ActorSystem, ActorRef) = {
implicit val actorSystem = AkkaUtils.createActorSystem(host = "localhost",
port =0, configuration = config)
(actorSystem, startActorWithConfiguration(config))
}
def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem): ActorRef = {
actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL), JOB_CLIENT_NAME)
def startActor(jobManagerURL: String)(implicit actorSystem: ActorSystem, timeout: FiniteDuration):
ActorRef = {
actorSystem.actorOf(Props(classOf[JobClient], jobManagerURL, timeout), JOB_CLIENT_NAME)
}
def parseConfiguration(configuration: Configuration): String = {
......@@ -109,7 +112,10 @@ object JobClient{
}
def startActorWithConfiguration(config: Configuration)(implicit actorSystem: ActorSystem):
ActorRef= {
ActorRef = {
implicit val timeout = FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
startActor(parseConfiguration(config))
}
......
......@@ -421,7 +421,6 @@ object JobManager {
}
jobManagerSystem.awaitTermination()
println("Shutting down.")
}
def parseArgs(args: Array[String]): (String, Int, Configuration, ExecutionMode) = {
......
......@@ -22,12 +22,9 @@ import java.util.concurrent.TimeUnit
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
......@@ -36,6 +33,8 @@ import scala.concurrent.{Future, Await}
abstract class FlinkMiniCluster(userConfiguration: Configuration) {
import FlinkMiniCluster._
val HOSTNAME = "localhost"
implicit val timeout = FiniteDuration(userConfiguration.getInteger(ConfigConstants
.AKKA_ASK_TIMEOUT, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
......@@ -54,8 +53,6 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
val (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip
val jobClientActorSystem = AkkaUtils.createActorSystem()
waitForTaskManagersToBeRegistered()
def generateConfiguration(userConfiguration: Configuration): Configuration
......@@ -79,21 +76,6 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
configuration)
}
def getJobClient(): ActorRef ={
val config = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, FlinkMiniCluster.HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
}
def getJobClientActorSystem: ActorSystem = jobClientActorSystem
def getJobManagerRPCPort: Int = {
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
}
def getJobManager: ActorRef = {
jobManagerActor
}
......@@ -116,13 +98,11 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
def shutdown(): Unit = {
taskManagerActorSystems foreach { _.shutdown() }
jobManagerActorSystem.shutdown()
jobClientActorSystem.shutdown()
}
def awaitTermination(): Unit = {
jobClientActorSystem.awaitTermination()
taskManagerActorSystems foreach { _.awaitTermination()}
jobManagerActorSystem.awaitTermination()
taskManagerActorSystems foreach { _.awaitTermination()}
}
def waitForTaskManagersToBeRegistered(): Unit = {
......@@ -138,51 +118,4 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
object FlinkMiniCluster{
val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
val HOSTNAME = "localhost"
def initializeIOFormatClasses(configuration: Configuration): Unit = {
try{
val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
classOf[Configuration])
om.setAccessible(true)
om.invoke(null, configuration)
}catch {
case e: Exception =>
LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
"follow the specified default behaviour.")
}
}
def getDefaultConfig: Configuration = {
val config: Configuration = new Configuration
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_JOB_MANAGER_IPC_PORT)
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_IPC_PORT)
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_DATA_PORT)
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants
.DEFAULT_JOBCLIENT_POLLING_INTERVAL)
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants
.DEFAULT_FILESYSTEM_OVERWRITE)
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
val numTaskManager = 1
val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
memorySize = memorySize - (bufferMem * numTaskManager)
memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
memorySize >>>= 20
memorySize /= numTaskManager
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager)
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots)
config
}
}
......@@ -19,41 +19,29 @@
package org.apache.flink.runtime.minicluster
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
FlinkMiniCluster(userConfiguration){
import LocalFlinkMiniCluster._
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val forNumberString = System.getProperty("forkNumber")
val forkNumber = try {
Integer.parseInt(forNumberString)
}catch{
case e: NumberFormatException => -1
}
val jobClientActorSystem = AkkaUtils.createActorSystem()
val config = FlinkMiniCluster.getDefaultConfig
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val config = getDefaultConfig
config.addAll(userConfiguration)
if(forkNumber != -1){
val jobManagerRPC = 1024 + forkNumber*300
val taskManagerRPC = 1024 + forkNumber*300 + 100
val taskManagerData = 1024 + forkNumber*300 + 200
setMemory(config)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
}
FlinkMiniCluster.initializeIOFormatClasses(config)
initializeIOFormatClasses(config)
config
}
......@@ -80,7 +68,84 @@ FlinkMiniCluster(userConfiguration){
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
}
TaskManager.startActorWithConfiguration(FlinkMiniCluster.HOSTNAME, config, false)(system)
TaskManager.startActorWithConfiguration(HOSTNAME, config, false)(system)
}
def getJobClient(): ActorRef ={
val config = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
}
def getJobClientActorSystem: ActorSystem = jobClientActorSystem
def getJobManagerRPCPort: Int = {
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
}
override def shutdown(): Unit = {
super.shutdown()
jobClientActorSystem.shutdown()
}
override def awaitTermination(): Unit = {
jobClientActorSystem.awaitTermination()
super.awaitTermination()
}
def initializeIOFormatClasses(configuration: Configuration): Unit = {
try{
val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
classOf[Configuration])
om.setAccessible(true)
om.invoke(null, configuration)
}catch {
case e: Exception =>
LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
"follow the specified default behaviour.")
}
}
def setMemory(config: Configuration): Unit = {
var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
val numTaskManager = config.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
memorySize = memorySize - (bufferMem * numTaskManager)
memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
memorySize >>>= 20
memorySize /= numTaskManager
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
}
def getDefaultConfig: Configuration = {
val config: Configuration = new Configuration
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_JOB_MANAGER_IPC_PORT)
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_IPC_PORT)
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_DATA_PORT)
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants
.DEFAULT_JOBCLIENT_POLLING_INTERVAL)
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants
.DEFAULT_FILESYSTEM_OVERWRITE)
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, ConfigConstants
.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS)
config
}
}
......
......@@ -290,7 +290,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
} catch {
case t: Throwable =>
log.error(t, s"Could not instantiate task with execution ID ${executionID}.")
runningTasks.remove(executionID)
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
......@@ -336,9 +335,11 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
log.info(s"Unregister task with execution ID ${executionID}.")
runningTasks.remove(executionID) match {
case Some(task) =>
for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
.getJobConfiguration)) {
fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
if(task.getEnvironment != null) {
for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
.getJobConfiguration)) {
fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
}
}
channelManager foreach {
......@@ -377,6 +378,10 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val receiver = this.self
val taskName = runningTasks(executionID).getTaskName
val numberOfSubtasks = runningTasks(executionID).getNumberOfSubtasks
val indexOfSubtask = runningTasks(executionID).getSubtaskIndex
futureResponse.mapTo[Boolean].onComplete {
case Success(result) =>
if (!result || executionState == ExecutionState.FINISHED || executionState ==
......@@ -384,7 +389,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
receiver ! UnregisterTask(executionID)
}
case Failure(t) =>
log.error(t, "Execution state change notification failed.")
log.error(t, s"Execution state change notification failed for task ${executionID} " +
s"($indexOfSubtask/$numberOfSubtasks) of job ${jobID}.")
}
}
......
......@@ -135,8 +135,9 @@ public class ExecutionVertexCancelTest {
setVertexState(vertex, ExecutionState.SCHEDULED);
assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
ActorRef taskManager = system.actorOf(Props.create(new CancelSequenceTaskManagerCreator(new
TaskOperationResult(execId, true), new TaskOperationResult(execId, false))));
ActorRef taskManager = TestActorRef.create(system, Props.create(new
CancelSequenceTaskManagerCreator(new TaskOperationResult(execId, true),
new TaskOperationResult(execId, false))));
Instance instance = getInstance(taskManager);
AllocatedSlot slot = instance.allocateSlot(new JobID());
......
......@@ -55,9 +55,13 @@ public class ExecutionVertexDeploymentTest {
public void testDeployCall() {
try {
final JobVertexID jid = new JobVertexID();
TestingUtils.setCallingThreadDispatcher(system);
ActorRef tm = TestActorRef.create(system, Props.create(SimpleAcknowledgingTaskManager
.class));
// mock taskmanager to simply accept the call
Instance instance = getInstance(ActorRef.noSender());
Instance instance = getInstance(tm);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
......@@ -67,7 +71,7 @@ public class ExecutionVertexDeploymentTest {
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
// no repeated scheduling
try {
......@@ -84,6 +88,8 @@ public class ExecutionVertexDeploymentTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}finally{
TestingUtils.setGlobalExecutionContext();
}
}
......
......@@ -26,13 +26,32 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class ScheduleWithCoLocationHintTest {
private static ActorSystem system;
@BeforeClass
public static void setup(){
system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
TestingUtils.setCallingThreadDispatcher(system);
}
@AfterClass
public static void teardown(){
TestingUtils.setGlobalExecutionContext();
JavaTestKit.shutdownActorSystem(system);
}
@Test
public void scheduleAllSharedAndCoLocated() {
try {
......
......@@ -24,6 +24,11 @@ import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.g
import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
import static org.junit.Assert.*;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
......@@ -46,6 +51,19 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
* Tests for the {@link Scheduler} when scheduling individual tasks.
*/
public class SchedulerIsolatedTasksTest {
private static ActorSystem system;
@BeforeClass
public static void setup(){
system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
TestingUtils.setCallingThreadDispatcher(system);
}
@AfterClass
public static void teardown(){
TestingUtils.setGlobalExecutionContext();
JavaTestKit.shutdownActorSystem(system);
}
@Test
public void testAddAndRemoveInstance() {
......@@ -182,13 +200,13 @@ public class SchedulerIsolatedTasksTest {
final int NUM_INSTANCES = 50;
final int NUM_SLOTS_PER_INSTANCE = 3;
final int NUM_TASKS_TO_SCHEDULE = 2000;
final ExecutorService executor = Executors.newFixedThreadPool(4, ExecutorThreadFactory.INSTANCE);
TestingUtils.setGlobalExecutionContext();
try {
// note: since this test asynchronously releases slots, the executor needs release workers.
// doing the release call synchronous can lead to a deadlock
Scheduler scheduler = new Scheduler(executor);
Scheduler scheduler = new Scheduler();
for (int i = 0;i < NUM_INSTANCES; i++) {
scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
......@@ -274,9 +292,8 @@ public class SchedulerIsolatedTasksTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
executor.shutdownNow();
}finally{
TestingUtils.setCallingThreadDispatcher(system);
}
}
......
......@@ -31,6 +31,11 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
......@@ -40,6 +45,20 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
* Tests for the scheduler when scheduling tasks in slot sharing groups.
*/
public class SchedulerSlotSharingTest {
private static ActorSystem system;
@BeforeClass
public static void setup(){
system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
TestingUtils.setCallingThreadDispatcher(system);
}
@AfterClass
public static void teardown(){
TestingUtils.setGlobalExecutionContext();
JavaTestKit.shutdownActorSystem(system);
}
@Test
public void scheduleSingleVertexType() {
......@@ -776,7 +795,7 @@ public class SchedulerSlotSharingTest {
@Test
public void testSequentialAllocateAndRelease() {
final ExecutorService exec = Executors.newFixedThreadPool(8);
TestingUtils.setGlobalExecutionContext();
try {
final JobVertexID jid1 = new JobVertexID();
final JobVertexID jid2 = new JobVertexID();
......@@ -785,7 +804,7 @@ public class SchedulerSlotSharingTest {
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
final Scheduler scheduler = new Scheduler(exec);
final Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(4));
// allocate something from group 1 and 2 interleaved with schedule for group 3
......@@ -834,15 +853,15 @@ public class SchedulerSlotSharingTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
exec.shutdownNow();
}finally{
TestingUtils.setCallingThreadDispatcher(system);
}
}
@Test
public void testConcurrentAllocateAndRelease() {
final ExecutorService executor = Executors.newFixedThreadPool(20);
TestingUtils.setGlobalExecutionContext();
try {
for (int run = 0; run < 50; run++) {
final JobVertexID jid1 = new JobVertexID();
......@@ -852,7 +871,7 @@ public class SchedulerSlotSharingTest {
final SlotSharingGroup sharingGroup = new SlotSharingGroup(jid1, jid2, jid3, jid4);
final Scheduler scheduler = new Scheduler(executor);
final Scheduler scheduler = new Scheduler();
scheduler.newInstanceAvailable(getRandomInstance(4));
final AtomicInteger enumerator1 = new AtomicInteger();
......@@ -1012,6 +1031,7 @@ public class SchedulerSlotSharingTest {
}
finally {
executor.shutdownNow();
TestingUtils.setCallingThreadDispatcher(system);
}
}
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph
import akka.actor.{Props, ActorSystem}
import akka.testkit.{TestKit}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex}
import org.apache.flink.runtime.jobmanager.Tasks
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
......
......@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph
import akka.actor.{Props, ActorSystem}
import akka.testkit.TestKit
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphTestUtils}
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
.SimpleAcknowledgingTaskManager
import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex}
......
......@@ -44,7 +44,7 @@ class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(
override def startTaskManager(index: Int)(implicit system: ActorSystem) = {
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, configuration, true)
TaskManager.parseConfiguration(HOSTNAME, configuration, true)
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)
......
......@@ -19,15 +19,18 @@
package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorRef, Props}
import akka.pattern.{ask, pipe}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
WaitForAllVerticesToBeRunning, ExecutionGraphFound, RequestExecutionGraph}
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged,
ExecutionStateChanged}
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenTaskRemoved
import scala.collection.convert.WrapAsScala
import scala.concurrent.{Await, Future}
trait TestingJobManager extends ActorLogMessages with WrapAsScala {
......@@ -72,6 +75,22 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala {
if(cleanup){
waitForAllVerticesToBeRunning.remove(jobID)
}
case NotifyWhenJobRemoved(jobID) => {
val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
val responses = tms.map{
tm =>
(tm ? NotifyWhenJobRemoved(jobID))(timeout).mapTo[Boolean]
}
import context.dispatcher
val f = Future.sequence(responses)
val t = Await.result(f, timeout)
sender() ! true
// Future.fold(responses)(true)(_ & _) pipeTo sender()
}
}
......
......@@ -37,4 +37,5 @@ object TestingJobManagerMessages {
case class WaitForAllVerticesToBeRunning(jobID: JobID)
case class AllVerticesRunning(jobID: JobID)
case class NotifyWhenJobRemoved(jobID: JobID)
}
......@@ -19,16 +19,20 @@
package org.apache.flink.runtime.testingUtils
import akka.actor.ActorRef
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
import org.apache.flink.runtime.{ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
import scala.concurrent.duration._
trait TestingTaskManager extends ActorLogMessages {
self: TaskManager =>
that: TaskManager =>
val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
val waitForJobRemoval = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
abstract override def receiveWithLogMessages = {
receiveTestMessages orElse super.receiveWithLogMessages
......@@ -51,7 +55,32 @@ trait TestingTaskManager extends ActorLogMessages {
case None =>
}
case RequestBroadcastVariablesWithReferences => {
sender() ! ResponseBroadcastVariablesWithReferences(bcVarManager.getNumberOfVariablesWithReferences)
sender() ! ResponseBroadcastVariablesWithReferences(
bcVarManager.getNumberOfVariablesWithReferences)
}
case NotifyWhenJobRemoved(jobID) => {
if(runningTasks.values.exists(_.getJobID == jobID)){
val set = waitForJobRemoval.getOrElse(jobID, Set())
waitForJobRemoval += (jobID -> (set + sender()))
import context.dispatcher
context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
}else{
waitForJobRemoval.get(jobID) match {
case Some(listeners) => (listeners + sender()) foreach (_ ! true)
case None => sender() ! true
}
}
}
case CheckIfJobRemoved(jobID) => {
if(runningTasks.values.forall(_.getJobID != jobID)){
waitForJobRemoval.get(jobID) match {
case Some(listeners) => listeners foreach (_ ! true)
case None =>
}
}else{
import context.dispatcher
context.system.scheduler.scheduleOnce(200 milliseconds, this.self, CheckIfJobRemoved(jobID))
}
}
}
}
......@@ -19,10 +19,12 @@
package org.apache.flink.runtime.testingUtils
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.taskmanager.Task
object TestingTaskManagerMessages{
case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
case object RequestRunningTasks
case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
import collection.JavaConverters._
......@@ -30,4 +32,6 @@ object TestingTaskManagerMessages{
}
case object RequestBroadcastVariablesWithReferences
case class ResponseBroadcastVariablesWithReferences(number: Int)
case class CheckIfJobRemoved(jobID: JobID)
}
......@@ -208,7 +208,7 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
......@@ -232,7 +232,7 @@ under the License.
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
</plugins>
</build>
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -80,4 +80,138 @@ under the License.
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.4</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
<jvmArg>-Xms128m</jvmArg>
<jvmArg>-Xmx512m</jvmArg>
</jvmArgs>
<compilerPlugins>
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
</plugin>
<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<!-- Adding scala source directories to build path -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.5.0</version>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
......@@ -46,7 +46,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
......@@ -68,7 +67,7 @@ public abstract class AbstractTestBase {
protected final Configuration config;
protected TestingCluster executor;
protected ForkableFlinkMiniCluster executor;
private final List<File> tempFiles;
......@@ -97,16 +96,15 @@ public abstract class AbstractTestBase {
// --------------------------------------------------------------------------------------------
public void startCluster() throws Exception {
Thread.sleep(250);
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
this.executor = new TestingCluster(config);
this.executor = new ForkableFlinkMiniCluster(config);
}
public void stopCluster() throws Exception {
try {
......
......@@ -31,7 +31,6 @@ import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.java.CollectionEnvironment;
......@@ -194,12 +193,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
private static final class TestEnvironment extends ExecutionEnvironment {
private final FlinkMiniCluster executor;
private final ForkableFlinkMiniCluster executor;
private JobExecutionResult latestResult;
private TestEnvironment(FlinkMiniCluster executor, int degreeOfParallelism) {
private TestEnvironment(ForkableFlinkMiniCluster executor, int degreeOfParallelism) {
this.executor = executor;
setDegreeOfParallelism(degreeOfParallelism);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.util
import akka.actor.{Props, ActorSystem, ActorRef}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingTaskManager
class ForkableFlinkMiniCluster(userConfiguration: Configuration) extends
LocalFlinkMiniCluster(userConfiguration) {
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val forNumberString = System.getProperty("forkNumber")
val forkNumber = try {
Integer.parseInt(forNumberString)
}catch{
case e: NumberFormatException => -1
}
val config = userConfiguration.clone()
if(forkNumber != -1){
val jobManagerRPC = 1024 + forkNumber*300
val taskManagerRPC = 1024 + forkNumber*300 + 100
val taskManagerData = 1024 + forkNumber*300 + 200
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
}
super.generateConfiguration(config)
}
override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
val config = configuration.clone()
val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_IPC_PORT)
val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_DATA_PORT)
if(rpcPort > 0){
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
}
if(dataPort > 0){
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
}
val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) =
TaskManager.parseConfiguration(HOSTNAME, config, false)
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig,
networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index)
}
}
......@@ -32,6 +32,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.slf4j.Logger;
......@@ -69,7 +70,7 @@ public abstract class CancellingTestBase {
// --------------------------------------------------------------------------------------------
protected LocalFlinkMiniCluster executor;
protected ForkableFlinkMiniCluster executor;
protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
......@@ -87,7 +88,7 @@ public abstract class CancellingTestBase {
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
this.executor = new LocalFlinkMiniCluster(config);
this.executor = new ForkableFlinkMiniCluster(config);
}
@After
......
......@@ -24,8 +24,8 @@ import java.io.FileWriter;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.testdata.KMeansData;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
......@@ -36,7 +36,7 @@ public class PackagedProgramEndToEndITCase {
@Test
public void testEverything() {
LocalFlinkMiniCluster cluster = null;
ForkableFlinkMiniCluster cluster = null;
File points = null;
File clusters = null;
......@@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
cluster = new LocalFlinkMiniCluster(config);
cluster = new ForkableFlinkMiniCluster(config);
RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
......
......@@ -19,7 +19,6 @@
package org.apache.flink.test.util;
import akka.actor.ActorRef;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.junit.Assert;
import org.apache.flink.runtime.client.JobClient;
......@@ -120,7 +119,7 @@ public abstract class FailingTestBase extends RecordAPITestBase {
// reference to the timeout thread
private final Thread timeoutThread;
// cluster to submit the job to.
private final FlinkMiniCluster executor;
private final ForkableFlinkMiniCluster executor;
// job graph of the failing job (submitted first)
private final JobGraph failingJob;
// job graph of the working job (submitted after return from failing job)
......@@ -129,8 +128,8 @@ public abstract class FailingTestBase extends RecordAPITestBase {
private volatile Exception error;
public SubmissionThread(Thread timeoutThread, FlinkMiniCluster executor, JobGraph failingJob,
JobGraph job) {
public SubmissionThread(Thread timeoutThread, ForkableFlinkMiniCluster executor, JobGraph
failingJob, JobGraph job) {
this.timeoutThread = timeoutThread;
this.executor = executor;
this.failingJob = failingJob;
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册