提交 64b2602d 编写于 作者: T Till Rohrmann

Add proper task resource removal in case that a task submission fails

上级 04b97c94
......@@ -18,35 +18,26 @@
package org.apache.flink.test.hadoopcompatibility.mapred;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
......
......@@ -69,7 +69,7 @@ trait YarnJobManager extends ActorLogMessages {
case StopYarnSession(status) =>
log.info("Stopping Yarn Session.")
instanceManager.getAllRegisteredInstances foreach {
instanceManager.getAllRegisteredInstances.asScala foreach {
instance =>
instance.getTaskManager ! StopYarnSession(status)
}
......@@ -196,7 +196,7 @@ trait YarnJobManager extends ActorLogMessages {
case Some(rmClient) => {
val response = rmClient.allocate(completedContainers.toFloat / numTaskManager)
for (container <- response.getAllocatedContainers) {
for (container <- response.getAllocatedContainers.asScala) {
log.info(s"Got new container for TM ${container.getId} on host ${
container.getNodeId.getHost}")
......@@ -220,7 +220,7 @@ trait YarnJobManager extends ActorLogMessages {
}
}
for (status <- response.getCompletedContainersStatuses) {
for (status <- response.getCompletedContainersStatuses.asScala) {
completedContainers += 1
log.info(s"Completed container ${status.getContainerId}. Total completed " +
s"$completedContainers.")
......
......@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParserTest.MyWritable;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.HashMultiset;
......
......@@ -46,13 +46,14 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, He
import org.apache.flink.runtime.profiling.ProfilingUtils
import org.slf4j.LoggerFactory
import scala.collection.convert.WrapAsScala
import scala.concurrent.{Future}
import scala.concurrent.duration._
class JobManager(val configuration: Configuration) extends
Actor with ActorLogMessages with ActorLogging with WrapAsScala {
Actor with ActorLogMessages with ActorLogging {
import context._
import scala.collection.JavaConverters._
implicit val timeout = FiniteDuration(configuration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
......@@ -158,7 +159,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
.getName}}).")
}
for (vertex <- jobGraph.getVertices) {
for (vertex <- jobGraph.getVertices.asScala) {
val executableClass = vertex.getInvokableClassName
if (executableClass == null || executableClass.length == 0) {
throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " +
......
......@@ -58,7 +58,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.util.ExceptionUtils
import org.slf4j.LoggerFactory
import scala.collection.convert.{WrapAsScala, DecorateAsScala}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
......@@ -67,10 +66,11 @@ import scala.util.Success
class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkkaURL: String,
val taskManagerConfig: TaskManagerConfiguration,
val networkConnectionConfig: NetworkConnectionConfiguration)
extends Actor with ActorLogMessages with ActorLogging with DecorateAsScala with WrapAsScala {
extends Actor with ActorLogMessages with ActorLogging {
import context._
import taskManagerConfig.{timeout => tmTimeout, _}
import scala.collection.JavaConverters._
implicit val timeout = tmTimeout
log.info(s"Starting task manager at ${self.path}.")
......@@ -230,7 +230,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val executionID = tdd.getExecutionId
val taskIndex = tdd.getIndexInSubtaskGroup
val numSubtasks = tdd.getCurrentNumberOfSubtasks
var jarsRegistered = false
var startRegisteringTask = 0L
try {
......@@ -243,7 +242,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
startRegisteringTask)/1000.0}s")
}
jarsRegistered = true
val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)
......@@ -285,7 +283,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val cpTasks = new util.HashMap[String, FutureTask[Path]]()
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration).asScala) {
val cp = fileCache.createTmpFile(entry.getKey, entry.getValue, jobID)
cpTasks.put(entry.getKey, cp)
}
......@@ -299,28 +297,18 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
} catch {
case t: Throwable =>
log.error(t, s"Could not instantiate task with execution ID ${executionID}.")
runningTasks.remove(executionID)
for (entry <- DistributedCache.readFileInfoFromConfig(tdd.getJobConfiguration)) {
fileCache.deleteTmpFile(entry.getKey, entry.getValue, jobID)
}
if (jarsRegistered) {
try {
libraryCacheManager.unregisterTask(jobID, executionID)
} catch {
case ioe: IOException =>
if(log.isDebugEnabled) {
log.debug(s"Unregistering the execution ${executionID} caused an IOException.")
}
}
}
unregisterTask(executionID)
sender ! new TaskOperationResult(executionID, false,
ExceptionUtils.stringifyException(t))
}
}
case UnregisterTask(executionID) => {
unregisterTask(executionID)
}
case SendHeartbeat => {
currentJobManager ! Heartbeat(instanceID)
}
......@@ -348,38 +336,6 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
}
}
case UnregisterTask(executionID) => {
log.info(s"Unregister task with execution ID ${executionID}.")
runningTasks.remove(executionID) match {
case Some(task) =>
if(task.getEnvironment != null) {
for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
.getJobConfiguration)) {
fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
}
}
channelManager foreach {
_.unregister(executionID, task)
}
profiler foreach {
_ ! UnmonitorTask(task.getExecutionId)
}
task.unregisterMemoryManager(memoryManager)
try {
libraryCacheManager.unregisterTask(task.getJobID, executionID)
} catch {
case ioe: IOException =>
log.error(ioe, s"Unregistering the execution ${executionID} caused an IOException.")
}
case None =>
log.error(s"Cannot find task with ID ${executionID} to unregister.")
}
}
case Terminated(jobManager) => {
log.info(s"Job manager ${jobManager.path} is no longer reachable. Try to reregister.")
tryJobManagerRegistration()
......@@ -393,13 +349,11 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
val futureResponse = (currentJobManager ? UpdateTaskExecutionState(new TaskExecutionState
(jobID, executionID, executionState, optionalError)))(timeout)
val receiver = this.self
futureResponse.mapTo[Boolean].onComplete {
case Success(result) =>
if (!result || executionState == ExecutionState.FINISHED || executionState ==
ExecutionState.CANCELED || executionState == ExecutionState.FAILED) {
receiver ! UnregisterTask(executionID)
self ! UnregisterTask(executionID)
}
case Failure(t) =>
log.warning(s"Execution state change notification failed for task ${executionID} " +
......@@ -461,6 +415,33 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
}
}
}
def unregisterTask(executionID: ExecutionAttemptID): Unit = {
log.info(s"Unregister task with execution ID ${executionID}.")
runningTasks.remove(executionID) match {
case Some(task) =>
if(task.getEnvironment != null) {
for (entry <- DistributedCache.readFileInfoFromConfig(task.getEnvironment
.getJobConfiguration).asScala) {
fileCache.deleteTmpFile(entry.getKey, entry.getValue, task.getJobID)
}
}
channelManager foreach {
_.unregister(executionID, task)
}
profiler foreach {
_ ! UnmonitorTask(task.getExecutionId)
}
task.unregisterMemoryManager(memoryManager)
libraryCacheManager.unregisterTask(task.getJobID, executionID)
case None =>
log.error(s"Cannot find task with ID ${executionID} to unregister.")
}
}
}
object TaskManager {
......
......@@ -20,7 +20,6 @@ package org.apache.flink.runtime.testingUtils
import akka.actor.{ActorSystem, Props}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.instance.InstanceConnectionInfo
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
......
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......
......@@ -22,7 +22,7 @@ import akka.actor.{Props, ActorSystem, ActorRef}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.{TestingUtils, TestingTaskManager}
import org.apache.flink.runtime.testingUtils.{TestingTaskManager}
class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean)
extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
......
......@@ -771,76 +771,79 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
}
out.collect(concat.toString());
}
case 27: {
/*
* Test Java collections within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("key")
.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
@Override
public void reduce(
Iterable<CollectionDataSets.PojoWithCollection> values,
Collector<String> out) throws Exception {
StringBuilder concat = new StringBuilder();
concat.append("call");
for(CollectionDataSets.PojoWithCollection value : values) {
concat.append("For key "+value.key+" we got: ");
for(CollectionDataSets.Pojo1 p :value.pojos) {
concat.append("pojo.a="+p.a);
}
}
out.collect(concat.toString());
}
});
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
}
}
case 28: {
/*
* Group by generic type
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("bigInt")
.reduceGroup(new GroupReduceFunction<CollectionDataSets.PojoWithCollection, String>() {
@Override
public void reduce(
Iterable<CollectionDataSets.PojoWithCollection> values,
Collector<String> out) throws Exception {
StringBuilder concat = new StringBuilder();
concat.append("call");
for(CollectionDataSets.PojoWithCollection value : values) {
concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
}
out.collect(concat.toString());
}
});
reduceDs.writeAsText(resultPath);
env.execute();
// return expected result
return "call\n" +
"For key 92233720368547758070 we got:\n" +
"PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
"For key 92233720368547758070 we got:\n" +
"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
@Test
public void testJavaCollectionsWithinPojos() throws Exception {
/*
* Test Java collections within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("key")
.reduceGroup(new GroupReducer7());
reduceDs.writeAsText(resultPath);
env.execute();
expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
}
public static class GroupReducer7 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
Iterable<CollectionDataSets.PojoWithCollection> values,
Collector<String> out) throws Exception {
StringBuilder concat = new StringBuilder();
concat.append("call");
for(CollectionDataSets.PojoWithCollection value : values) {
concat.append("For key "+value.key+" we got: ");
for(CollectionDataSets.Pojo1 p :value.pojos) {
concat.append("pojo.a="+p.a);
}
}
out.collect(concat.toString());
}
}
@Test
public void testGroupByGenericType() throws Exception {
/*
* Group by generic type
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("bigInt")
.reduceGroup(new GroupReducer8());
reduceDs.writeAsText(resultPath);
env.execute();
expected = "call\n" +
"For key 92233720368547758070 we got:\n" +
"PojoWithCollection{pojos.size()=2, key=0, sqlDate=2033-05-18, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=10, mixed=[{someKey=1}, /this/is/wrong, uhlala]}\n" +
"For key 92233720368547758070 we got:\n" +
"PojoWithCollection{pojos.size()=2, key=0, sqlDate=1976-05-03, bigInt=92233720368547758070, bigDecimalKeepItNull=null, scalaBigInt=31104000, mixed=null}\n";
}
public static class GroupReducer8 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
public void reduce(
Iterable<CollectionDataSets.PojoWithCollection> values,
Collector<String> out) throws Exception {
StringBuilder concat = new StringBuilder();
concat.append("call");
for(CollectionDataSets.PojoWithCollection value : values) {
concat.append("\nFor key "+value.bigInt+" we got:\n"+value);
}
out.collect(concat.toString());
}
}
public static class NestedTupleReducer implements GroupReduceFunction<Tuple2<Tuple2<Integer,Integer>,String>, String> {
@Override
public void reduce(
......
......@@ -26,7 +26,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册