提交 26c77948 编写于 作者: T Till Rohrmann

Removed submitTask and cancelTask from Instance. Replaced unnecessary blocking calls.

Fixed ExecutionVertexCancelTest after removing submitTask and cancelTask.
上级 e85b82f8
......@@ -42,10 +42,9 @@ public class AvroExternalJarProgramITCase {
LocalFlinkMiniCluster testMiniCluster = null;
try {
testMiniCluster = new LocalFlinkMiniCluster(null);
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
testMiniCluster.start(config);
testMiniCluster = new LocalFlinkMiniCluster(config);
String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
......
......@@ -48,7 +48,7 @@ public class ClusterUtil {
Configuration configuration = jobGraph.getJobConfiguration();
LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(null);
LocalFlinkMiniCluster exec = null;
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, degreeOfParallelism);
......@@ -57,7 +57,7 @@ public class ClusterUtil {
}
try {
exec.start(configuration);
exec = new LocalFlinkMiniCluster(configuration);
Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()),
configuration, ClusterUtil.class.getClassLoader());
......@@ -71,7 +71,9 @@ public class ClusterUtil {
} catch (Exception e) {
throw e;
} finally {
exec.stop();
if(exec != null) {
exec.stop();
}
}
}
......
......@@ -59,8 +59,6 @@ public class LocalExecutor extends PlanExecutor {
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
private String configDir;
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
// --------------------------------------------------------------------------------------------
......@@ -92,12 +90,11 @@ public class LocalExecutor extends PlanExecutor {
if (this.flink == null) {
// create the embedded runtime
this.flink = new LocalFlinkMiniCluster(configDir);
Configuration configuration = new Configuration();
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
// start it up
this.flink.start(configuration);
this.flink = new LocalFlinkMiniCluster(configuration);
} else {
throw new IllegalStateException("The local executor was already started.");
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.io;
......
......@@ -57,6 +57,6 @@ if not defined FOUND (
echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.jobmanager.JobManager --configDir %FLINK_CONF_DIR% > "%out%" 2>&1
java %JVM_ARGS% %log_setting% -cp %FLINK_JM_CLASSPATH% org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster --configDir %FLINK_CONF_DIR% > "%out%" 2>&1
endlocal
......@@ -27,9 +27,6 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import static akka.dispatch.Futures.future;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import akka.dispatch.OnComplete;
......@@ -274,38 +271,31 @@ public class Execution {
// register this execution at the execution graph, to receive call backs
vertex.getExecutionGraph().registerExecution(this);
Future<TaskOperationResult> deployAction = future(new Callable<TaskOperationResult>() {
@Override
public TaskOperationResult call() throws Exception {
Instance instance = slot.getInstance();
// return instance.submitTask(deployment);
//TODO realize as an actor
return (TaskOperationResult)Patterns.ask(instance.getTaskManager(), new TaskManagerMessages
.SubmitTask(deployment), AkkaUtils.FUTURE_TIMEOUT());
}
}, AkkaUtils.globalExecutionContext());
Instance instance = slot.getInstance();
Future<Object> deployAction = Patterns.ask(instance.getTaskManager(),
new TaskManagerMessages.SubmitTask(deployment),AkkaUtils.FUTURE_TIMEOUT());
deployAction.onComplete(new OnComplete<TaskOperationResult>(){
deployAction.onComplete(new OnComplete<Object>(){
@Override
public void onComplete(Throwable failure, TaskOperationResult success) throws Throwable {
public void onComplete(Throwable failure, Object success) throws Throwable {
if(failure != null){
markFailed(failure);
}else{
TaskOperationResult result = (TaskOperationResult) success;
if (success == null) {
markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult was null"));
}
else if (!success.executionID().equals(attemptId)) {
else if (!result.executionID().equals(attemptId)) {
markFailed(new Exception("Answer execution id does not match the request execution id."));
}
else if (success.success()) {
else if (result.success()) {
switchToRunning();
}
else {
// deployment failed :(
markFailed(new Exception("Failed to deploy the task " +
getVertexWithAttempt() + " to slot " + slot + ": " + success
getVertexWithAttempt() + " to slot " + slot + ": " + result
.description()));
}
}
......@@ -591,24 +581,19 @@ public class Execution {
return;
}
Callable<TaskOperationResult> cancelAction = new Callable<TaskOperationResult>() {
@Override
public TaskOperationResult call() throws Exception {
return slot.getInstance().cancelTask(attemptId);
}
};
Future<TaskOperationResult> cancelResult = AkkaUtils.retry(cancelAction,
NUM_CANCEL_CALL_TRIES, AkkaUtils.globalExecutionContext());
Future<Object> cancelResult = AkkaUtils.retry(slot.getInstance().getTaskManager(), new
TaskManagerMessages.CancelTask(attemptId), NUM_CANCEL_CALL_TRIES,
AkkaUtils.globalExecutionContext());
cancelResult.onComplete(new OnComplete<TaskOperationResult>(){
cancelResult.onComplete(new OnComplete<Object>(){
@Override
public void onComplete(Throwable failure, TaskOperationResult success) throws Throwable {
public void onComplete(Throwable failure, Object success) throws Throwable {
if(failure != null){
fail(new Exception("Task could not be canceled.", failure));
}else{
if(!success.success()){
TaskOperationResult result = (TaskOperationResult)success;
if(!result.success()){
LOG.debug("Cancel task call did not find task. Probably akka message call" +
" race.");
}
......
......@@ -18,7 +18,6 @@
package org.apache.flink.runtime.instance;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
......@@ -27,14 +26,8 @@ import java.util.Queue;
import java.util.Set;
import akka.actor.ActorRef;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
/**
* An taskManager represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
......@@ -170,21 +163,6 @@ public class Instance {
}
public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) throws IOException{
try{
return AkkaUtils.ask(taskManager, new SubmitTask(tdd));
}catch(IOException ioe) {
throw ioe;
}
}
public TaskOperationResult cancelTask(ExecutionAttemptID attemptID) throws IOException {
try {
return AkkaUtils.ask(taskManager, new CancelTask(attemptID));
} catch (IOException ioe) {
throw ioe;
}
}
// --------------------------------------------------------------------------------------------
// Heartbeats
// --------------------------------------------------------------------------------------------
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.minicluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import scala.concurrent.Await;
import scala.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
abstract public class FlinkMiniCluster {
private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniCluster.class);
protected static final String HOSTNAME = "localhost";
protected ActorSystem jobManagerActorSystem = null;
protected ActorRef jobManagerActor = null;
protected List<ActorSystem> taskManagerActorSystems = new ArrayList<ActorSystem>();
protected List<ActorRef> taskManagerActors = new ArrayList<ActorRef>();
protected abstract Configuration generateConfiguration(final Configuration userConfiguration);
public abstract ActorRef startJobManager(final ActorSystem system, final Configuration configuration);
public abstract ActorRef startTaskManager(final ActorSystem system, final Configuration configuration,
final int index);
ActorSystem startJobManagerActorSystem(final Configuration configuration) {
int port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
return AkkaUtils.createActorSystem(HOSTNAME, port, configuration);
}
ActorSystem startTaskManagerActorSystem(final Configuration configuration, int index){
int port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
if(port != 0){
port += index;
}
return AkkaUtils.createActorSystem(HOSTNAME, port, configuration);
}
public ActorRef getJobManager() {
return jobManagerActor;
}
public List<ActorRef> getTaskManagers() {
return taskManagerActors;
}
// ------------------------------------------------------------------------
// Life cycle and Job Submission
// ------------------------------------------------------------------------
public void start(final Configuration configuration) {
Configuration clusterConfiguration = generateConfiguration(configuration);
jobManagerActorSystem = startJobManagerActorSystem(clusterConfiguration);
jobManagerActor = startJobManager(jobManagerActorSystem, clusterConfiguration);
int numTaskManagers = clusterConfiguration.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
for(int i = 0; i < numTaskManagers; i++){
ActorSystem actorSystem = startTaskManagerActorSystem(clusterConfiguration, i);
ActorRef taskManager = startTaskManager(actorSystem, clusterConfiguration, i);
taskManagerActorSystems.add(actorSystem);
taskManagerActors.add(taskManager);
}
waitForTaskManagersToBeRegistered();
}
public void stop() {
LOG.info("Stopping FlinkMiniCluster.");
shutdown();
awaitTermination();
taskManagerActorSystems.clear();
taskManagerActors.clear();
}
protected void shutdown() {
for(ActorSystem system: taskManagerActorSystems){
system.shutdown();
}
jobManagerActorSystem.shutdown();
}
protected void awaitTermination() {
for(ActorSystem system: taskManagerActorSystems){
system.awaitTermination(AkkaUtils.AWAIT_DURATION());
}
jobManagerActorSystem.awaitTermination(AkkaUtils.AWAIT_DURATION());
}
// ------------------------------------------------------------------------
// Network utility methods
// ------------------------------------------------------------------------
private void waitForTaskManagersToBeRegistered(){
List<Future<Object>> responses = new ArrayList<Future<Object>>();
for(ActorRef taskManager: taskManagerActors){
Future<Object> response = Patterns.ask(taskManager, TaskManagerMessages.NotifyWhenRegisteredAtJobManager$
.MODULE$, AkkaUtils.FUTURE_TIMEOUT());
responses.add(response);
}
try {
Await.ready(Futures.sequence(responses, AkkaUtils.globalExecutionContext()), AkkaUtils.AWAIT_DURATION());
}catch(Exception e){
throw new RuntimeException("Not all task managers could register at the job manager.", e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.minicluster;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
public class LocalFlinkMiniCluster extends FlinkMiniCluster {
private static final Logger LOG = LoggerFactory.getLogger(LocalFlinkMiniCluster.class);
private Configuration configuration;
private final String configDir;
private ActorSystem actorSystem;
public LocalFlinkMiniCluster(String configDir){
this.configDir = configDir;
this.configuration = null;
}
// ------------------------------------------------------------------------
// Life cycle and Job Submission
// ------------------------------------------------------------------------
public ActorRef getJobClient() {
Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort());
return JobClient.startActorWithConfiguration(config, actorSystem);
}
public ActorSystem getJobClientActorSystem(){
return actorSystem;
}
@Override
public void start(Configuration configuration){
super.start(configuration);
actorSystem = AkkaUtils.createActorSystem();
}
@Override
protected void shutdown() {
super.shutdown();
if(actorSystem != null){
actorSystem.shutdown();
}
}
@Override
protected void awaitTermination() {
if(actorSystem != null){
actorSystem.awaitTermination();
}
super.awaitTermination();
}
public int getJobManagerRPCPort() {
if(configuration == null){
throw new RuntimeException("Configuration has not been set.");
}
return configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
}
protected Configuration generateConfiguration(final Configuration userConfiguration) {
if(configuration == null){
String forkNumberString = System.getProperty("forkNumber");
int forkNumber = -1;
try {
forkNumber = Integer.parseInt(forkNumberString);
} catch (NumberFormatException e) {
// running inside and IDE, so the forkNumber property is not properly set
// just ignore
}
if(configDir != null){
GlobalConfiguration.loadConfiguration(configDir);
configuration = GlobalConfiguration.getConfiguration();
}else{
configuration = getDefaultConfig(userConfiguration);
}
configuration.addAll(userConfiguration);
if(forkNumber != -1) {
int jobManagerRPC = 1024 + forkNumber * 300;
int taskManagerRPC = 1024 + forkNumber * 300 + 100;
int taskManagerDATA = 1024 + forkNumber * 300 + 200;
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC);
configuration.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC);
configuration.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerDATA);
}
initializeIOFormatClasses(configuration);
}
return configuration;
}
@Override
public ActorRef startJobManager(final ActorSystem system, final Configuration configuration) {
Configuration config = configuration.clone();
return JobManager.startActor(config, system);
}
@Override
public ActorRef startTaskManager(final ActorSystem system, final Configuration configuration, final int index) {
Configuration config = configuration.clone();
int rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
int dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
if(rpcPort > 0){
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index);
}
if(dataPort > 0){
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index);
}
return TaskManager.startActorWithConfiguration(HOSTNAME, config, false, system);
}
private static void initializeIOFormatClasses(Configuration configuration) {
try {
Method om = FileOutputFormat.class.getDeclaredMethod("initDefaultsFromConfiguration",
Configuration.class);
om.setAccessible(true);
om.invoke(null, configuration);
}
catch (Exception e) {
LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might mot follow the specified default behavior.");
}
}
public static Configuration getDefaultConfig(final Configuration userConfiguration)
{
final Configuration config = new Configuration();
// addresses and ports
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME);
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
// polling interval
config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL);
// file system behavior
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);
long memorySize = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
// at this time, we need to scale down the memory, because we cannot dedicate all free memory to the
// memory manager. we have to account for the buffer pools as well, and the job manager#s data structures
long bufferMem = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS * ConfigConstants
.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE;
int numTaskManager = userConfiguration.getInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER,
1);
int taskManagerNumSlots = userConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS);
memorySize = memorySize - (bufferMem * numTaskManager);
// apply the fraction that makes sure memory is left to the heap for other data structures and UDFs.
memorySize = (long) (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
//convert from bytes to megabytes
memorySize >>>= 20;
memorySize /= numTaskManager;
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
return config;
}
}
......@@ -22,19 +22,15 @@ import java.io.IOException
import java.util.concurrent.Callable
import akka.actor.{ActorSelection, ActorRef, ActorSystem}
import akka.pattern.Patterns
import akka.pattern.{Patterns, ask => akkaAsk}
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.core.io.IOReadableWritable
import org.apache.flink.runtime.akka.serialization.{WritableSerializer,
IOReadableWritableSerializer}
import org.apache.hadoop.io.Writable
import scala.concurrent.{ExecutionContext, Future, Await}
import scala.concurrent.duration._
object AkkaUtils {
implicit val FUTURE_TIMEOUT: Timeout = 1 minute
implicit val FUTURE_TIMEOUT: Timeout = 100 minute
implicit val AWAIT_DURATION: FiniteDuration = 1 minute
implicit val FUTURE_DURATION: FiniteDuration = 1 minute
......@@ -176,4 +172,16 @@ object AkkaUtils {
Future[T] = {
retry(callable.call(), tries)
}
def retry(target: ActorRef, message: Any, tries: Int)(implicit executionContext:
ExecutionContext): Future[Any] = {
(target ? message) recoverWith{
case t: Throwable =>
if(tries > 0){
retry(target, message, tries-1)
}else{
Future.failed(t)
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.minicluster
import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
import scala.concurrent.{Future, Await}
abstract class FlinkMiniCluster(userConfiguration: Configuration) {
import FlinkMiniCluster._
val configuration = generateConfiguration(userConfiguration)
val jobManagerActorSystem = startJobManagerActorSystem()
val jobManagerActor = startJobManager(jobManagerActorSystem)
val numTaskManagers = configuration.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
val actorSystemsTaskManagers = for(i <- 0 until numTaskManagers) yield {
val actorSystem = startTaskManagerActorSystem(i)
(actorSystem, startTaskManager(i)(actorSystem))
}
val (taskManagerActorSystems, taskManagerActors) = actorSystemsTaskManagers.unzip
waitForTaskManagersToBeRegistered()
def generateConfiguration(userConfiguration: Configuration): Configuration
def startJobManager(implicit system: ActorSystem): ActorRef
def startTaskManager(index: Int)(implicit system: ActorSystem):
ActorRef
def startJobManagerActorSystem(): ActorSystem = {
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
AkkaUtils.createActorSystem(HOSTNAME, port, configuration)
}
def startTaskManagerActorSystem(index: Int): ActorSystem = {
val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
AkkaUtils.createActorSystem(HOSTNAME, if(port != 0) port + index else port,
configuration)
}
def getJobManager: ActorRef = {
jobManagerActor
}
def getTaskManagers = {
taskManagerActors
}
def stop(): Unit = {
LOG.info("Stopping FlinkMiniCluster.")
shutdown()
awaitTermination();
}
def shutdown(): Unit = {
taskManagerActorSystems foreach { _.shutdown() }
jobManagerActorSystem.shutdown()
}
def awaitTermination(): Unit = {
taskManagerActorSystems foreach { _.awaitTermination(AkkaUtils.AWAIT_DURATION)}
jobManagerActorSystem.awaitTermination(AkkaUtils.AWAIT_DURATION)
}
def waitForTaskManagersToBeRegistered(): Unit = {
implicit val timeout = AkkaUtils.FUTURE_TIMEOUT
implicit val executionContext = AkkaUtils.globalExecutionContext
val futures = taskManagerActors map {
_ ? NotifyWhenRegisteredAtJobManager
}
Await.ready(Future.sequence(futures), AkkaUtils.AWAIT_DURATION)
}
}
object FlinkMiniCluster{
val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
val HOSTNAME = "localhost"
def initializeIOFormatClasses(configuration: Configuration): Unit = {
try{
val om = classOf[FileOutputFormat[_]].getDeclaredMethod("initDefaultsFromConfiguration",
classOf[Configuration])
om.setAccessible(true)
om.invoke(null, configuration)
}catch {
case e: Exception =>
LOG.error("Cannot (re) initialize the globally loaded defaults. Some classes might not " +
"follow the specified default behaviour.")
}
}
def getDefaultConfig: Configuration = {
val config: Configuration = new Configuration
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_JOB_MANAGER_IPC_PORT)
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_IPC_PORT)
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_DATA_PORT)
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION)
config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants
.DEFAULT_JOBCLIENT_POLLING_INTERVAL)
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, ConfigConstants
.DEFAULT_FILESYSTEM_OVERWRITE)
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY)
var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
val bufferMem: Long = ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS *
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
val numTaskManager = 1
val taskManagerNumSlots: Int = ConfigConstants.DEFAULT_TASK_MANAGER_NUM_TASK_SLOTS
memorySize = memorySize - (bufferMem * numTaskManager)
memorySize = (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION).toLong
memorySize >>>= 20
memorySize /= numTaskManager
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager)
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots)
config
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.minicluster
import java.io.File
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.taskmanager.TaskManager
import org.slf4j.LoggerFactory
import scopt.OptionParser
class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
FlinkMiniCluster(userConfiguration){
val actorSystem = AkkaUtils.createActorSystem()
override def generateConfiguration(userConfiguration: Configuration): Configuration = {
val forNumberString = System.getProperty("forkNumber")
val forkNumber = try {
Integer.parseInt(forNumberString)
}catch{
case e: NumberFormatException => -1
}
val config = FlinkMiniCluster.getDefaultConfig
config.addAll(userConfiguration)
if(forkNumber != -1){
val jobManagerRPC = 1024 + forkNumber*300
val taskManagerRPC = 1024 + forkNumber*300 + 100
val taskManagerData = 1024 + forkNumber*300 + 200
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRPC)
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, taskManagerRPC)
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
}
FlinkMiniCluster.initializeIOFormatClasses(config)
config
}
override def startJobManager(implicit system: ActorSystem):
ActorRef = {
val config = configuration.clone()
JobManager.startActor(config)
}
override def startTaskManager(index: Int)(implicit system: ActorSystem): ActorRef = {
val config = configuration.clone()
val rpcPort = config.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_IPC_PORT)
val dataPort = config.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, ConfigConstants
.DEFAULT_TASK_MANAGER_DATA_PORT)
if(rpcPort > 0){
config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + index)
}
if(dataPort > 0){
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + index)
}
TaskManager.startActorWithConfiguration(FlinkMiniCluster.HOSTNAME, config, false)(system)
}
def getJobClient(): ActorRef ={
val config = new Configuration()
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, FlinkMiniCluster.HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)
JobClient.startActorWithConfiguration(config)(actorSystem)
}
def getJobClientActorSystem: ActorSystem = actorSystem
override def shutdown(): Unit = {
super.shutdown()
actorSystem.shutdown()
}
override def awaitTermination(): Unit = {
actorSystem.awaitTermination()
super.awaitTermination()
}
def getJobManagerRPCPort: Int = {
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1)
}
}
object LocalFlinkMiniCluster{
val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
val FAILURE_RETURN_CODE = 1
def main(args: Array[String]): Unit = {
val configuration = parseArgs(args)
val cluster = new LocalFlinkMiniCluster(configuration)
cluster.awaitTermination()
}
def parseArgs(args: Array[String]): Configuration = {
val parser = new OptionParser[LocalFlinkMiniClusterConfiguration]("LocalFlinkMiniCluster") {
head("LocalFlinkMiniCluster")
opt[String]("configDir") action { (value, config) => config.copy(configDir = value) } text
{"Specify configuration directory."}
}
parser.parse(args, LocalFlinkMiniClusterConfiguration()) map {
config =>{
GlobalConfiguration.loadConfiguration(config.configDir)
val configuration = GlobalConfiguration.getConfiguration
if(config.configDir != null && new File(config.configDir).isDirectory){
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
}
configuration
}
} getOrElse{
LOG.error("CLI parsing failed. Usage: " + parser.usage)
sys.exit(FAILURE_RETURN_CODE)
}
}
case class LocalFlinkMiniClusterConfiguration(val configDir: String = "")
}
......@@ -22,7 +22,6 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
......@@ -32,7 +31,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
......@@ -50,15 +48,10 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class ExecutionGraphDeploymentTest {
private static ActorSystem system;
......@@ -111,35 +104,26 @@ public class ExecutionGraphDeploymentTest {
ExecutionJobVertex ejv = eg.getAllVertices().get(jid2);
ExecutionVertex vertex = ejv.getTaskVertices()[3];
// just some reference (needs not be atomic)
final AtomicReference<TaskDeploymentDescriptor> reference = new AtomicReference<TaskDeploymentDescriptor>();
// create synchronous task manager
final TestActorRef simpleTaskManager = TestActorRef.create(system,
Props.create(ExecutionGraphTestUtils
.SimpleAcknowledgingTaskManager.class));
final Instance instance = spy(getInstance(simpleTaskManager));
doAnswer(new Answer<TaskOperationResult>() {
ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager tm = (ExecutionGraphTestUtils
.SimpleAcknowledgingTaskManager) simpleTaskManager.underlyingActor();
final Instance instance = getInstance(simpleTaskManager);
@Override
public TaskOperationResult answer(InvocationOnMock invocation) throws Throwable {
TaskDeploymentDescriptor tdd = (TaskDeploymentDescriptor)invocation.getArguments()[0];
reference.set(tdd);
return (TaskOperationResult) invocation.callRealMethod();
}
}).when(instance).submitTask(Matchers
.<TaskDeploymentDescriptor>any());
final AllocatedSlot slot = instance.allocateSlot(jobId);
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
TaskDeploymentDescriptor descr = reference.get();
TaskDeploymentDescriptor descr = tm.lastTDD;
assertNotNull(descr);
assertEquals(jobId, descr.getJobID());
......
......@@ -24,14 +24,13 @@ import static org.mockito.Mockito.spy;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.HardwareDescription;
......@@ -109,11 +108,12 @@ public class ExecutionGraphTestUtils {
}
public static class SimpleAcknowledgingTaskManager extends UntypedActor {
public TaskDeploymentDescriptor lastTDD;
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof TaskManagerMessages.SubmitTask) {
TaskManagerMessages.SubmitTask submitTask = (TaskManagerMessages.SubmitTask) msg;
lastTDD = submitTask.tasks();
getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(), true), getSelf());
} else if (msg instanceof TaskManagerMessages.CancelTask) {
......@@ -122,6 +122,23 @@ public class ExecutionGraphTestUtils {
}
}
}
public static final String ERROR_MESSAGE = "test_failure_error_message";
public static class SimpleFailingTaskManager extends UntypedActor {
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof TaskManagerMessages.SubmitTask) {
TaskManagerMessages.SubmitTask submitTask = (TaskManagerMessages.SubmitTask) msg;
getSender().tell(new TaskOperationResult(submitTask.tasks().getExecutionId(),
false, ERROR_MESSAGE), getSelf());
} else if (msg instanceof TaskManagerMessages.CancelTask) {
TaskManagerMessages.CancelTask cancelTask = (TaskManagerMessages.CancelTask) msg;
getSender().tell(new TaskOperationResult(cancelTask.attemptID(), true), getSelf());
}
}
}
public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException {
AbstractJobVertex ajv = new AbstractJobVertex("TestVertex", id);
......
......@@ -208,7 +208,8 @@ public class ExecutionVertexCancelTest {
// task manager mock actor
// first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call)
ActorRef taskManager = system.actorOf(Props.create(new CancelSequenceTaskManagerCreator(new
TestActorRef taskManager = TestActorRef.create(system, Props.create(new
CancelSequenceTaskManagerCreator(new
TaskOperationResult(execId, false), new TaskOperationResult(execId, true))));
Instance instance = getInstance(taskManager);
......@@ -228,6 +229,8 @@ public class ExecutionVertexCancelTest {
// cancel call first
cancelAction.run();
// process onComplete callback
actions.triggerNextAction();
// did not find the task, not properly cancelled, stay in canceling
assertEquals(ExecutionState.CANCELING, vertex.getExecutionState());
......@@ -241,6 +244,9 @@ public class ExecutionVertexCancelTest {
// trigger the correcting cancel call, should properly set state to cancelled
actions.triggerNextAction();
// process onComplete callback
actions.triggerNextAction();
vertex.getCurrentExecutionAttempt().cancelingComplete();
assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
......
......@@ -22,16 +22,11 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.mockito.Matchers.any;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import akka.testkit.TestActorRef;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
......@@ -43,8 +38,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
public class ExecutionVertexDeploymentTest {
private static ActorSystem system;
......@@ -100,8 +93,11 @@ public class ExecutionVertexDeploymentTest {
TestingUtils.setCallingThreadDispatcher(system);
final JobVertexID jid = new JobVertexID();
final TestActorRef simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleAcknowledgingTaskManager.class));
final Instance instance = spy(getInstance(ActorRef.noSender()));
final Instance instance = getInstance(simpleTaskManager);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
......@@ -109,9 +105,6 @@ public class ExecutionVertexDeploymentTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
doReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(),
true)).when(instance).submitTask(Matchers.<TaskDeploymentDescriptor>any());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
......@@ -125,8 +118,6 @@ public class ExecutionVertexDeploymentTest {
}
catch (IllegalStateException e) {}
verify(instance).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
assertNull(vertex.getFailureCause());
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
......@@ -145,8 +136,11 @@ public class ExecutionVertexDeploymentTest {
public void testDeployWithAsynchronousAnswer() {
try {
final JobVertexID jid = new JobVertexID();
final TestActorRef simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleAcknowledgingTaskManager.class));
final Instance instance = spy(getInstance(ActorRef.noSender()));
final Instance instance = getInstance(simpleTaskManager);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
......@@ -154,9 +148,6 @@ public class ExecutionVertexDeploymentTest {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
doReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(),
true)).when(instance).submitTask(Matchers.<TaskDeploymentDescriptor>any());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
......@@ -184,8 +175,6 @@ public class ExecutionVertexDeploymentTest {
}
catch (IllegalStateException e) {}
verify(instance).submitTask(Matchers.any(TaskDeploymentDescriptor.class));
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) > 0);
......@@ -198,23 +187,21 @@ public class ExecutionVertexDeploymentTest {
@Test
public void testDeployFailedSynchronous() {
final String ERROR_MESSAGE = "test_failure_error_message";
try {
TestingUtils.setCallingThreadDispatcher(system);
final JobVertexID jid = new JobVertexID();
final TestActorRef simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleFailingTaskManager.class));
final Instance instance = spy(getInstance(ActorRef.noSender()));
final Instance instance = getInstance(simpleTaskManager);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
doReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(), false,
ERROR_MESSAGE)).when(instance).submitTask(Matchers.<TaskDeploymentDescriptor>any());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
......@@ -237,21 +224,19 @@ public class ExecutionVertexDeploymentTest {
@Test
public void testDeployFailedAsynchronously() {
final String ERROR_MESSAGE = "test_failure_error_message";
try {
final JobVertexID jid = new JobVertexID();
final TestActorRef simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleFailingTaskManager.class));
final Instance instance = spy(getInstance(ActorRef.noSender()));
final Instance instance = getInstance(simpleTaskManager);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
doReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt().getAttemptId(), false,
ERROR_MESSAGE)).when(instance).submitTask(Matchers.<TaskDeploymentDescriptor>any());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
......@@ -290,16 +275,16 @@ public class ExecutionVertexDeploymentTest {
TestingUtils.setExecutionContext(ec);
final Instance instance = spy(getInstance(ActorRef.noSender()));
final TestActorRef simpleTaskManager = TestActorRef.create(system,
Props.create(SimpleAcknowledgingTaskManager.class));
final Instance instance = getInstance(simpleTaskManager);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
doReturn(new TaskOperationResult(vertex.getCurrentExecutionAttempt()
.getAttemptId(), true)).when(instance).submitTask(Matchers.<TaskDeploymentDescriptor>any());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
......@@ -327,32 +312,28 @@ public class ExecutionVertexDeploymentTest {
public void testFailCallOvertakesDeploymentAnswer() {
try {
final ActionQueue queue = new ActionQueue();
final JobVertexID jid = new JobVertexID();
final TestingUtils.QueuedActionExecutionContext ec = new TestingUtils
ActionQueue queue = new ActionQueue();
TestingUtils.QueuedActionExecutionContext context = new TestingUtils
.QueuedActionExecutionContext(queue);
TestingUtils.setExecutionContext(ec);
final Instance instance = spy(getInstance(ActorRef.noSender()));
TestingUtils.setExecutionContext(context);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
final JobVertexID jid = new JobVertexID();
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0]);
final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
// the deployment call succeeds regularly
doReturn(new TaskOperationResult(eid, true)).when(instance).submitTask(Matchers
.<TaskDeploymentDescriptor>any());
// first cancel call does not find a task, second one finds it
doReturn(new TaskOperationResult(eid, false)).doReturn(new TaskOperationResult(eid,
true)).when(instance).cancelTask(Matchers.<ExecutionAttemptID>any());
final TestActorRef simpleTaskManager = TestActorRef.create(system, Props.create(new
ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new
TaskOperationResult(eid, false), new TaskOperationResult(eid, true))));
final Instance instance = getInstance(simpleTaskManager);
final AllocatedSlot slot = instance.allocateSlot(new JobID());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
......@@ -360,20 +341,24 @@ public class ExecutionVertexDeploymentTest {
vertex.fail(testError);
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
// now the deploy call returns
// cancel call overtakes deploy call
Runnable deploy = queue.popNextAction();
Runnable cancel1 = queue.popNextAction();
// cancel call overtakes
cancel1.run();
// execute onComplete callback
queue.triggerNextAction();
deploy.run();
assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
// should have sent another cancel call
queue.triggerNextAction();
// execute onComplete callback
queue.triggerNextAction();
assertEquals(testError, vertex.getFailureCause());
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
......@@ -381,10 +366,6 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
assertTrue(queue.isEmpty());
// should have received two cancel calls
verify(instance, times(2)).cancelTask(eid);
verify(instance, times(1)).submitTask(any(TaskDeploymentDescriptor.class));
}
catch (Exception e) {
e.printStackTrace();
......
......@@ -71,7 +71,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
expectMsg(AllVerticesRunning(jobID))
// kill one task manager
taskManagers.get(0) ! PoisonPill
taskManagers(0) ! PoisonPill
expectMsgType[JobResultFailed]
}
}finally{
......
......@@ -79,7 +79,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
expectMsg(AllVerticesRunning(jobID))
//kill task manager
taskManagers.get(0) ! PoisonPill
taskManagers(0) ! PoisonPill
expectMsgType[JobResultFailed]
}
......
......@@ -26,7 +26,7 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
class TestingCluster extends FlinkMiniCluster {
class TestingCluster(userConfiguration: Configuration) extends FlinkMiniCluster(userConfiguration) {
override def generateConfiguration(userConfig: Configuration): Configuration = {
val cfg = new Configuration()
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
......@@ -37,14 +37,14 @@ class TestingCluster extends FlinkMiniCluster {
cfg
}
override def startJobManager(system: ActorSystem, config: Configuration) = {
system.actorOf(Props(new JobManager(config) with TestingJobManager),
override def startJobManager(implicit system: ActorSystem) = {
system.actorOf(Props(new JobManager(configuration) with TestingJobManager),
JobManager.JOB_MANAGER_NAME)
}
override def startTaskManager(system: ActorSystem, config: Configuration, index: Int) = {
override def startTaskManager(index: Int)(implicit system: ActorSystem) = {
val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfig) =
TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, config, true)
TaskManager.parseConfiguration(FlinkMiniCluster.HOSTNAME, configuration, true)
system.actorOf(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig,
networkConnectionConfig)), TaskManager.TASK_MANAGER_NAME + index)
......
......@@ -71,9 +71,7 @@ object TestingUtils {
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
val cluster = new TestingCluster
cluster.start(config)
val cluster = new TestingCluster(config)
cluster
}
......@@ -97,9 +95,9 @@ object TestingUtils {
}
override def execute(runnable: Runnable): Unit = {
if(runnable.getClass.getName.equals("scala.concurrent.impl.CallbackRunnable")) {
if(automaticExecution){
runnable.run()
}else{
}else {
queue.queueAction(runnable)
}
}
......
......@@ -84,14 +84,13 @@ public abstract class AbstractTestBase {
public void startCluster() throws Exception {
Thread.sleep(250);
this.executor = new LocalFlinkMiniCluster(null);
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, true);
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
this.executor.start(config);
this.executor = new LocalFlinkMiniCluster(config);
}
public void stopCluster() throws Exception {
......
......@@ -85,11 +85,10 @@ public abstract class CancellingTestBase {
@Before
public void startCluster() throws Exception {
verifyJvmOptions();
this.executor = new LocalFlinkMiniCluster(null);
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
this.executor.start(config);
this.executor = new LocalFlinkMiniCluster(config);
}
@After
......
......@@ -36,7 +36,7 @@ public class PackagedProgramEndToEndITCase {
@Test
public void testEverything() {
LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(null);
LocalFlinkMiniCluster cluster = null;
File points = null;
File clusters = null;
......@@ -64,7 +64,7 @@ public class PackagedProgramEndToEndITCase {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
cluster.start(config);
cluster = new LocalFlinkMiniCluster(config);
RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
......@@ -92,7 +92,9 @@ public class PackagedProgramEndToEndITCase {
}
try {
cluster.stop();
if(cluster != null) {
cluster.stop();
}
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册