提交 4f841fc2 编写于 作者: T Till Rohrmann

Reworked the EventCollector and the ArchiveListener as actors.

Replacing the EventCollector and the MemoryArchivist.

Finished EventCollector and adjusted ExecutionGraph and ExecutionVertex to register actors as listeners.
上级 1e74b4c3
......@@ -19,7 +19,7 @@ under the License.
-->
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
http://maven.apache.org/xsd/settings-1.0.0.xsd">
<servers>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -16,7 +16,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -32,7 +32,7 @@ import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.instance.LocalInstanceManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobmanagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import java.lang.reflect.Method;
......@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.util.EnvironmentInformation;
import scala.concurrent.Await;
import scala.concurrent.Future;
......@@ -225,8 +226,7 @@ public class NepheleMiniCluster {
Configuration configuration = GlobalConfiguration.getConfiguration();
// start the job manager
jobManager = JobManager.startActorSystemAndActor("flink", HOSTNAME, jobManagerRpcPort, "jobmanager",
configuration);
jobManager = JobManager.startActorSystemAndActor(HOSTNAME, jobManagerRpcPort, configuration);
int tmRPCPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
......@@ -237,8 +237,7 @@ public class NepheleMiniCluster {
Configuration tmConfiguration = GlobalConfiguration.getConfiguration();
tmConfiguration.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, tmRPCPort + i);
tmConfiguration.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, tmDataPort + i);
ActorSystem taskManager = TaskManager.startActorSystemAndActor("flink", HOSTNAME, tmRPCPort+i,
"taskmanager" + (i+1), configuration);
ActorSystem taskManager = TaskManager.startActorSystemAndActor(HOSTNAME, tmRPCPort+i, configuration);
taskManagers.add(taskManager);
}
......@@ -275,19 +274,22 @@ public class NepheleMiniCluster {
// ------------------------------------------------------------------------
private void waitForJobManagerToBecomeReady(int numTaskManagers) throws Exception {
LOG.debug("Wait until " + numTaskManagers + " task managers are ready.");
boolean notReady = true;
ActorSelection jobmanagerSelection = jobManager.actorSelection("jobmanager");
Timeout timeout = new Timeout(1L, TimeUnit.MINUTES);
ActorSelection jobManagerSelection = jobManager.actorSelection("/user/jobmanager");
while(notReady){
Future<Object> futureNumTaskManagers = Patterns.ask(jobmanagerSelection,
JobmanagerMessages.RequestNumberRegisteredTaskManager$.MODULE$, timeout);
Future<Object> futureNumTaskManagers = Patterns.ask(jobManagerSelection,
JobManagerMessages.RequestNumberRegisteredTaskManager$.MODULE$, timeout);
int numRegisteredTaskManagers = (Integer)Await.result(futureNumTaskManagers, timeout.duration());
LOG.debug("Number of registered task manager: " + numRegisteredTaskManagers);
if(numRegisteredTaskManagers < numTaskManagers){
Thread.sleep(50);
Thread.sleep(500);
}
// make sure that not just the jobmanager has the slots, but also the taskmanager
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,39 +17,132 @@
*/
import java.io.File
import java.net.InetAddress
import akka.actor._
import org.apache.flink.configuration.Configuration
import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.messages.JobmanagerMessages.RequestNumberRegisteredTaskManager
import org.apache.flink.runtime.executiongraph.{ExecutionGraph}
import org.apache.flink.runtime.instance.{InstanceManager}
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler
import org.apache.flink.runtime.messages.EventCollectorMessages.RegisterArchiveListener
import org.apache.flink.runtime.messages.JobManagerMessages.{CancelJob, SubmitJob, RequestNumberRegisteredTaskManager}
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
import org.apache.flink.runtime.profiling.ProfilingUtils
import org.apache.flink.runtime.profiling.impl.JobManagerProfilerImpl
import org.slf4j.LoggerFactory
class JobManager(archiveCount: Int, profiling: Boolean, recommendedPollingInterval: Int) extends Actor with
ActorLogMessages with ActorLogging {
val profiler = if(profiling){
new JobManagerProfilerImpl(InetAddress.getByName(self.path.address.host.getOrElse("localhost")))
}else{
null
}
// will be removed
val archive = context.actorOf(Props(classOf[MemoryArchivist], archiveCount), "archive")
val eventCollector = context.actorOf(Props(classOf[EventCollector], recommendedPollingInterval), "eventcollector")
val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
val instanceManager = new InstanceManager()
val scheduler = new DefaultScheduler()
val webserver = null
import scala.collection.mutable
val currentJobs = scala.collection.concurrent.TrieMap[JobID, ExecutionGraph]()
class JobManager extends Actor with ActorLogging {
eventCollector ! RegisterArchiveListener(archive)
val taskManagers = new mutable.HashSet[ActorRef]()
instanceManager.addInstanceListener(scheduler)
override def receive: Receive = {
case RegisterTaskManager(hardwareInformation) =>
override def postStop(): Unit = {
instanceManager.shutdown()
scheduler.shutdown()
}
override def receiveWithLogMessages: Receive = {
case RegisterTaskManager(hardwareInformation, numberOfSlots) =>
val taskManager = sender()
taskManagers += taskManager
val instanceID = instanceManager.registerTaskManager(taskManager, hardwareInformation, numberOfSlots)
context.watch(taskManager);
taskManager ! AcknowledgeRegistration
taskManager ! AcknowledgeRegistration(instanceID)
case RequestNumberRegisteredTaskManager =>
sender() ! taskManagers.size
sender() ! instanceManager.getNumberOfRegisteredTaskManagers
case SubmitJob(jobGraph) =>
case CancelJob(jobID) =>
case Heartbeat(instanceID) =>
instanceManager.reportHeartBeat(instanceID)
}
}
object JobManager{
def startActorSystemAndActor(systemName: String, hostname: String, port: Int, actorName: String,
configuration: Configuration): ActorSystem = {
val actorSystem = AkkaUtils.createActorSystem(systemName, hostname, port, configuration)
startActor(actorSystem, actorName)
val LOG = LoggerFactory.getLogger(classOf[JobManager])
val FAILURE_RETURN_CODE = 1
def main(args: Array[String]):Unit = {
val (hostname, port, configuration) = initialize(args)
val jobManagerSystem = startActorSystemAndActor(hostname, port, configuration)
jobManagerSystem.awaitTermination()
}
def initialize(args: Array[String]):(String, Int, Configuration) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager"){
head("flink jobmanager")
opt[String]("configDir") action { (x, c) =>
c.copy(configDir = x)
} text("Specify configuration directory.")
}
parser.parse(args, JobManagerCLIConfiguration()) map {
config =>
GlobalConfiguration.loadConfiguration(config.configDir)
val configuration = GlobalConfiguration.getConfiguration()
if(config.configDir != null && new File(config.configDir).isDirectory){
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
}
val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
(hostname, port, configuration)
} getOrElse {
LOG.error("CLI Parsing failed. Usage: " + parser.usage)
sys.exit(FAILURE_RETURN_CODE)
}
}
def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration): ActorSystem = {
val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
startActor(actorSystem, configuration)
actorSystem
}
def startActor(actorSystem: ActorSystem, actorName: String): ActorRef = {
actorSystem.actorOf(Props(classOf[JobManager]), actorName)
def startActor(actorSystem: ActorSystem, configuration: Configuration): ActorRef = {
val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)
val profilingEnabled = configuration.getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)
val recommendedPollingInterval = configuration.getInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY,
ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL)
actorSystem.actorOf(Props(classOf[JobManager], archiveCount, profilingEnabled, recommendedPollingInterval), "jobmanager")
}
def getAkkaURL(address: String): String = {
s"akka.tcp://flink@${address}/user/jobmanager"
}
}
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -18,7 +18,7 @@ under the License.
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>bin</id>
<formats>
......
......@@ -18,7 +18,7 @@ under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<!-- Note: This file has been copied and adapted from: http://maven.apache.org/plugins/maven-assembly-plugin/descriptor-refs.html#jar-with-dependencies -->
......
......@@ -19,7 +19,7 @@ under the License.
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>yarn</id>
<formats>
......
......@@ -191,7 +191,7 @@ JVM_ARGS=""
CLASSPATH=`manglePathList $( echo $FLINK_LIB_DIR/*.jar . | sed 's/ /:/g' )`
# Auxilliary function which extracts the name of host from a line which
# also potentialy includes topology information and the instance type
# also potentialy includes topology information and the taskManager type
extractHostName() {
# extract first part of string (before any whitespace characters)
SLAVE=$1
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -16,7 +16,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -16,7 +16,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -16,7 +16,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -16,7 +16,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......@@ -114,6 +114,18 @@ under the License.
<artifactId>akka-remote_2.10</artifactId>
<version>2.3.5</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.10</artifactId>
<version>2.3.5</version>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.10</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
......
......@@ -73,8 +73,101 @@ public class ExecutionVertex {
// --------------------------------------------------------------------------------------------
public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets) {
this(jobVertex, subTaskIndex, producedDataSets, System.currentTimeMillis());
/**
* The ID of the vertex.
*/
private final ExecutionVertexID vertexID;
/**
* The group vertex this vertex belongs to.
*/
private final ExecutionGroupVertex groupVertex;
/**
* The execution graph is vertex belongs to.
*/
private final ExecutionGraph executionGraph;
/**
* The allocated resources assigned to this vertex.
*/
private final AtomicReference<AllocatedResource> allocatedResource = new AtomicReference<AllocatedResource>(null);
/**
* The allocation ID identifying the allocated resources used by this vertex
* within the instance.
*/
private volatile AllocationID allocationID = null;
/**
* A list of {@link VertexAssignmentListener} objects to be notified about changes in the instance assignment.
*/
private final CopyOnWriteArrayList<VertexAssignmentListener> vertexAssignmentListeners = new CopyOnWriteArrayList<VertexAssignmentListener>();
private final CopyOnWriteArrayList<ActorRef> vertexAssignmentListenerActors = new
CopyOnWriteArrayList<ActorRef>();
/**
* A map of {@link ExecutionListener} objects to be notified about the state changes of a vertex.
*/
private final ConcurrentMap<Integer, ExecutionListener> executionListeners = new ConcurrentSkipListMap<Integer, ExecutionListener>();
private final CopyOnWriteArrayList<ActorRef> executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
/**
* The current execution state of the task represented by this vertex
*/
private final AtomicEnum<ExecutionState> executionState = new AtomicEnum<ExecutionState>(ExecutionState.CREATED);
/**
* The output gates attached to this vertex.
*/
private final ExecutionGate[] outputGates;
/**
* The input gates attached to his vertex.
*/
private final ExecutionGate[] inputGates;
/**
* The index of this vertex in the vertex group.
*/
private volatile int indexInVertexGroup = 0;
/**
* Stores the number of times the vertex may be still be started before the corresponding task is considered to be
* failed.
*/
private final AtomicInteger retriesLeft;
/**
* The execution pipeline this vertex is part of.
*/
private final AtomicReference<ExecutionPipeline> executionPipeline = new AtomicReference<ExecutionPipeline>(null);
/**
* Flag to indicate whether the vertex has been requested to cancel while in state STARTING
*/
private final AtomicBoolean cancelRequested = new AtomicBoolean(false);
/**
* Create a new execution vertex and instantiates its environment.
*
* @param executionGraph
* the execution graph the new vertex belongs to
* @param groupVertex
* the group vertex the new vertex belongs to
* @param numberOfOutputGates
* the number of output gates attached to this vertex
* @param numberOfInputGates
* the number of input gates attached to this vertex
*/
public ExecutionVertex(final ExecutionGraph executionGraph, final ExecutionGroupVertex groupVertex,
final int numberOfOutputGates, final int numberOfInputGates) {
this(new ExecutionVertexID(), executionGraph, groupVertex, numberOfOutputGates, numberOfInputGates);
this.groupVertex.addInitialSubtask(this);
}
public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, IntermediateResult[] producedDataSets, long createTimestamp) {
......@@ -122,22 +215,19 @@ public class ExecutionVertex {
public String getTaskName() {
return this.jobVertex.getJobVertex().getName();
}
public int getTotalNumberOfParallelSubtasks() {
return this.jobVertex.getParallelism();
}
public int getParallelSubtaskIndex() {
return this.subTaskIndex;
}
public int getNumberOfInputs() {
return this.inputEdges.length;
}
public ExecutionEdge[] getInputEdges(int input) {
if (input < 0 || input >= this.inputEdges.length) {
throw new IllegalArgumentException(String.format("Input %d is out of range [0..%d)", input, this.inputEdges.length));
/**
* Inserts the input gate at the given position.
*
* @param pos
* the position to insert the input gate
* @param inputGate
* the input gate to be inserted
*/
void insertInputGate(final int pos, final ExecutionGate inputGate) {
if (this.inputGates[pos] != null) {
throw new IllegalStateException("Input gate at position " + pos + " is not null");
}
return inputEdges[input];
}
......@@ -165,9 +255,62 @@ public class ExecutionVertex {
public AllocatedSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource();
}
public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
/**
* Updates the vertex's current execution state.
*
* @param newExecutionState
* the new execution state
* @param optionalMessage
* an optional message related to the state change
*/
public ExecutionState updateExecutionState(ExecutionState newExecutionState, final String optionalMessage) {
if (newExecutionState == null) {
throw new IllegalArgumentException("Argument newExecutionState must not be null");
}
final ExecutionState currentExecutionState = this.executionState.get();
if (currentExecutionState == ExecutionState.CANCELING) {
// If we are in CANCELING, ignore state changes to FINISHING
if (newExecutionState == ExecutionState.FINISHING) {
return currentExecutionState;
}
// Rewrite FINISHED to CANCELED if the task has been marked to be canceled
if (newExecutionState == ExecutionState.FINISHED) {
LOG.info("Received transition from CANCELING to FINISHED for vertex " + toString()
+ ", converting it to CANCELED");
newExecutionState = ExecutionState.CANCELED;
}
}
// Check and save the new execution state
final ExecutionState previousState = this.executionState.getAndSet(newExecutionState);
if (previousState == newExecutionState) {
return previousState;
}
// Check the transition
ExecutionStateTransition.checkTransition(true, toString(), previousState, newExecutionState);
// Notify the listener objects
final Iterator<ExecutionListener> it = this.executionListeners.values().iterator();
while (it.hasNext()) {
it.next().executionStateChanged(this.executionGraph.getJobID(), this.vertexID, newExecutionState,
optionalMessage);
}
for(ActorRef actor: executionListenerActors){
actor.tell(new ExecutionGraphMessages.ExecutionStateChanged(this.executionGraph.getJobID(),
this.vertexID, newExecutionState, optionalMessage), ActorRef.noSender());
}
// The vertex was requested to be canceled by another thread
checkCancelRequestedFlag();
return previousState;
}
// --------------------------------------------------------------------------------------------
......@@ -206,6 +349,26 @@ public class ExecutionVertex {
ee.getSource().addConsumer(ee, consumerNumber);
graph.registerExecutionEdge(ee);
}
// Check the transition
ExecutionStateTransition.checkTransition(true, toString(), expected, update);
// Notify the listener objects
final Iterator<ExecutionListener> it = this.executionListeners.values().iterator();
while (it.hasNext()) {
it.next().executionStateChanged(this.executionGraph.getJobID(), this.vertexID, update,
null);
}
for(ActorRef actor: executionListenerActors){
actor.tell(new ExecutionGraphMessages.ExecutionStateChanged(this.executionGraph.getJobID(),
this.vertexID, update, null), ActorRef.noSender());
}
// Check if the vertex was requested to be canceled by another thread
checkCancelRequestedFlag();
return true;
}
private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
......@@ -218,14 +381,17 @@ public class ExecutionVertex {
return edges;
}
private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
final int numSources = sourcePartitions.length;
final int parallelism = getTotalNumberOfParallelSubtasks();
// simple case same number of sources as targets
if (numSources == parallelism) {
return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
/**
* Assigns the execution vertex with an {@link org.apache.flink.runtime.instance.AllocatedResource}.
*
* @param allocatedResource
* the resources which are supposed to be allocated to this vertex
*/
public void setAllocatedResource(final AllocatedResource allocatedResource) {
if (allocatedResource == null) {
throw new IllegalArgumentException("Argument allocatedResource must not be null");
}
else if (numSources < parallelism) {
......@@ -274,6 +440,11 @@ public class ExecutionVertex {
return edges;
}
}
for(ActorRef actor: vertexAssignmentListenerActors){
actor.tell(new ExecutionGraphMessages.VertexAssignmentChanged(this.getExecutionGraph().getJobID(),vertexID,
allocatedResource), ActorRef.noSender());
}
}
/**
......@@ -384,12 +555,50 @@ public class ExecutionVertex {
void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
}
TaskDeploymentDescriptor createDeploymentDescriptor(ExecutionAttemptID executionId, AllocatedSlot slot) {
// create the input gate deployment descriptors
List<GateDeploymentDescriptor> inputGates = new ArrayList<GateDeploymentDescriptor>(inputEdges.length);
for (ExecutionEdge[] channels : inputEdges) {
inputGates.add(GateDeploymentDescriptor.fromEdges(channels));
/**
* Registers the {@link VertexAssignmentListener} object for this vertex. This object
* will be notified about reassignments of this vertex to another instance.
*
* @param vertexAssignmentListener
* the object to be notified about reassignments of this vertex to another instance
*/
public void registerVertexAssignmentListener(final VertexAssignmentListener vertexAssignmentListener) {
this.vertexAssignmentListeners.addIfAbsent(vertexAssignmentListener);
}
public void registerVertexAssignmentListener(final ActorRef vertexAssignmentListener){
this.vertexAssignmentListenerActors.addIfAbsent(vertexAssignmentListener);
}
/**
* Unregisters the {@link VertexAssignmentListener} object for this vertex. This object
* will no longer be notified about reassignments of this vertex to another instance.
*
* @param vertexAssignmentListener
* the listener to be unregistered
*/
public void unregisterVertexAssignmentListener(final VertexAssignmentListener vertexAssignmentListener) {
this.vertexAssignmentListeners.remove(vertexAssignmentListener);
}
/**
* Registers the {@link ExecutionListener} object for this vertex. This object
* will be notified about particular events during the vertex's lifetime.
*
* @param executionListener
* the object to be notified about particular events during the vertex's lifetime
*/
public void registerExecutionListener(final ExecutionListener executionListener) {
final Integer priority = Integer.valueOf(executionListener.getPriority());
if (priority.intValue() < 0) {
LOG.error("Priority for execution listener " + executionListener.getClass() + " must be non-negative.");
return;
}
// create the output gate deployment descriptors
......@@ -407,14 +616,21 @@ public class ExecutionVertex {
getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(),
jobVertex.getJobVertex().getInvokableClassName(), outputGates, inputGates, jarFiles, slot.getSlotNumber());
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
public void execute(Runnable action) {
this.jobVertex.execute(action);
public void registerExecutionListener(final ActorRef executionListener){
this.executionListenerActors.addIfAbsent(executionListener);
}
/**
* Unregisters the {@link ExecutionListener} object for this vertex. This object
* will no longer be notified about particular events during the vertex's lifetime.
*
* @param executionListener
* the object to be unregistered
*/
public void unregisterExecutionListener(final ExecutionListener executionListener) {
this.executionListeners.remove(Integer.valueOf(executionListener.getPriority()));
}
/**
......
......@@ -18,16 +18,12 @@
package org.apache.flink.runtime.instance;
import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.Serializable;
/**
* A hardware description describes the resources available to a task manager.
*/
public final class HardwareDescription implements IOReadableWritable, java.io.Serializable {
public final class HardwareDescription implements Serializable {
private static final long serialVersionUID = 3380016608300325361L;
......@@ -100,26 +96,6 @@ public final class HardwareDescription implements IOReadableWritable, java.io.Se
return this.sizeOfManagedMemory;
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(this.numberOfCPUCores);
out.writeLong(this.sizeOfPhysicalMemory);
out.writeLong(this.sizeOfJvmHeap);
out.writeLong(this.sizeOfManagedMemory);
}
@Override
public void read(DataInputView in) throws IOException {
this.numberOfCPUCores = in.readInt();
this.sizeOfPhysicalMemory = in.readLong();
this.sizeOfJvmHeap = in.readLong();
this.sizeOfManagedMemory = in.readLong();
}
// --------------------------------------------------------------------------------------------
// Utils
// --------------------------------------------------------------------------------------------
......
......@@ -19,7 +19,6 @@
package org.apache.flink.runtime.instance;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
......@@ -27,28 +26,35 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.ipc.RPC;
import akka.actor.ActorRef;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.eclipse.jetty.util.log.Log;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
import scala.concurrent.Await;
import scala.concurrent.Future;
/**
* An instance represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
* An taskManager represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
*/
public class Instance {
/** The lock on which to synchronize allocations and failure state changes */
private final Object instanceLock = new Object();
/** The connection info to connect to the task manager represented by this instance. */
private final InstanceConnectionInfo instanceConnectionInfo;
/** The actor ref to the task manager represented by this taskManager. */
private final ActorRef taskManager;
/** A description of the resources of the task manager */
private final HardwareDescription resources;
/** The ID identifying the instance. */
/** The ID identifying the taskManager. */
private final InstanceID instanceId;
/** The number of task slots available on the node */
......@@ -57,19 +63,15 @@ public class Instance {
/** A list of available slot positions */
private final Queue<Integer> availableSlots;
/** Allocated slots on this instance */
/** Allocated slots on this taskManager */
private final Set<AllocatedSlot> allocatedSlots = new HashSet<AllocatedSlot>();
/** A listener to be notified upon new slot availability */
private SlotAvailabilityListener slotAvailabilityListener;
/** The RPC proxy to send calls to the task manager represented by this instance */
private volatile TaskOperationProtocol taskManager;
/**
* Time when last heat beat has been received from the task manager running on this instance.
* Time when last heat beat has been received from the task manager running on this taskManager.
*/
private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
......@@ -78,15 +80,15 @@ public class Instance {
// --------------------------------------------------------------------------------------------
/**
* Constructs an abstract instance object.
* Constructs an abstract taskManager object.
*
* @param instanceConnectionInfo The connection info under which to reach the TaskManager instance.
* @param id The id under which the instance is registered.
* @param taskManager The actor reference of the represented task manager.
* @param id The id under which the taskManager is registered.
* @param resources The resources available on the machine.
* @param numberOfSlots The number of task slots offered by this instance.
* @param numberOfSlots The number of task slots offered by this taskManager.
*/
public Instance(InstanceConnectionInfo instanceConnectionInfo, InstanceID id, HardwareDescription resources, int numberOfSlots) {
this.instanceConnectionInfo = instanceConnectionInfo;
public Instance(ActorRef taskManager, InstanceID id, HardwareDescription resources, int numberOfSlots) {
this.taskManager = taskManager;
this.instanceId = id;
this.resources = resources;
this.numberOfSlots = numberOfSlots;
......@@ -113,15 +115,6 @@ public class Instance {
return numberOfSlots;
}
/**
* Returns the instance's connection information object.
*
* @return the instance's connection information object
*/
public InstanceConnectionInfo getInstanceConnectionInfo() {
return this.instanceConnectionInfo;
}
// --------------------------------------------------------------------------------------------
// Life and Death
// --------------------------------------------------------------------------------------------
......@@ -173,49 +166,72 @@ public class Instance {
allocatedSlots.clear();
availableSlots.clear();
}
destroyTaskManagerProxy();
}
// --------------------------------------------------------------------------------------------
// Connection to the TaskManager
// --------------------------------------------------------------------------------------------
public TaskOperationProtocol getTaskManagerProxy() throws IOException {
if (isDead) {
throw new IOException("Instance has died");
public void checkLibraryAvailability(final JobID jobID) throws IOException {
final String[] requiredLibraries = LibraryCacheManager.getRequiredJarFiles(jobID);
if (requiredLibraries == null) {
throw new IOException("No entry of required libraries for job " + jobID);
}
LibraryCacheProfileRequest request = new LibraryCacheProfileRequest();
request.setRequiredLibraries(requiredLibraries);
Future<Object> futureResponse = Patterns.ask(taskManager, new TaskManagerMessages.RequestLibraryCacheProfile
(request), AkkaUtils.FUTURE_TIMEOUT());
TaskOperationProtocol tm = this.taskManager;
if (tm == null) {
synchronized (this) {
if (this.taskManager == null) {
this.taskManager = RPC.getProxy(TaskOperationProtocol.class,
new InetSocketAddress(getInstanceConnectionInfo().address(),
getInstanceConnectionInfo().ipcPort()), NetUtils.getSocketFactory());
Future<Iterable<Object>> updateFuture = futureResponse.flatMap(new Mapper<Object, Future<Iterable<Object>>>() {
public Future<Iterable<Object>> apply(final Object o) {
LibraryCacheProfileResponse response = (LibraryCacheProfileResponse) o;
List<Future<Object>> futureAcks = new ArrayList<Future<Object>>();
for (int i = 0; i < requiredLibraries.length; i++) {
if (!response.isCached(i)) {
LibraryCacheUpdate update = new LibraryCacheUpdate(requiredLibraries[i]);
Future<Object> future = Patterns.ask(taskManager, update, AkkaUtils.FUTURE_TIMEOUT());
futureAcks.add(future);
}
}
tm = this.taskManager;
return Futures.sequence(futureAcks, ExecutionContexts.global());
}
}, ExecutionContexts.global());
try {
Await.result(updateFuture, AkkaUtils.AWAIT_DURATION());
}catch(IOException ioe){
throw ioe;
}catch(Exception e){
throw new RuntimeException("Encountered exception while updating library cache.", e);
}
return tm;
}
/** Destroys and removes the RPC stub object for this instance's task manager. */
private void destroyTaskManagerProxy() {
synchronized (this) {
if (this.taskManager != null) {
try {
RPC.stopProxy(this.taskManager);
} catch (Throwable t) {
Log.debug("Error shutting down RPC proxy.", t);
}
}
public TaskOperationResult submitTask(TaskDeploymentDescriptor tdd) throws IOException{
Future<Object> futureResponse = Patterns.ask(taskManager, new TaskManagerMessages.SubmitTask(tdd),
AkkaUtils.FUTURE_TIMEOUT());
try{
return (TaskOperationResult) Await.result(futureResponse, AkkaUtils.AWAIT_DURATION());
}catch(IOException ioe){
throw ioe;
}catch(Exception e){
throw new RuntimeException("Caught exception while submitting task.", e);
}
}
public TaskOperationResult cancelTask(JobVertexID jobVertexID, int subtaskIndex) throws IOException{
Future<Object> futureResponse = Patterns.ask(taskManager, new TaskManagerMessages.CancelTask(jobVertexID,
subtaskIndex), AkkaUtils.FUTURE_TIMEOUT());
try{
return (TaskOperationResult) Await.result(futureResponse, AkkaUtils.AWAIT_DURATION());
}catch(IOException ioe){
throw ioe;
}catch(Exception e){
throw new RuntimeException("Caught exception while cancelling task.", e);
}
// --------------------------------------------------------------------------------------------
// Heartbeats
......@@ -243,7 +259,7 @@ public class Instance {
*
* @param now The timestamp representing the current time.
* @param cleanUpInterval The maximum time (in msecs) that the last heartbeat may lie in the past.
* @return True, if this instance is considered alive, false otherwise.
* @return True, if this taskManager is considered alive, false otherwise.
*/
public boolean isStillAlive(long now, long cleanUpInterval) {
return this.lastReceivedHeartBeat + cleanUpInterval > now;
......@@ -275,9 +291,9 @@ public class Instance {
}
public boolean returnAllocatedSlot(AllocatedSlot slot) {
// the slot needs to be in the returned to instance state
// the slot needs to be in the returned to taskManager state
if (slot == null || slot.getInstance() != this) {
throw new IllegalArgumentException("Slot is null or belongs to the wrong instance.");
throw new IllegalArgumentException("Slot is null or belongs to the wrong taskManager.");
}
if (slot.isAlive()) {
throw new IllegalArgumentException("Slot is still alive");
......@@ -298,7 +314,7 @@ public class Instance {
return true;
} else {
throw new IllegalArgumentException("Slot was not allocated from the instance.");
throw new IllegalArgumentException("Slot was not allocated from the taskManager.");
}
}
} else {
......@@ -317,6 +333,14 @@ public class Instance {
allocatedSlots.clear();
}
}
public ActorRef getTaskManager() {
return taskManager;
}
public String getPath(){
return taskManager.path().toString();
}
public int getNumberOfAvailableSlots() {
return this.availableSlots.size();
......@@ -356,6 +380,6 @@ public class Instance {
@Override
public String toString() {
return instanceId + " @" + this.instanceConnectionInfo + ' ' + numberOfSlots + " slots";
return instanceId + " @" + taskManager.path() + ' ' + numberOfSlots + " slots";
}
}
......@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import akka.actor.ActorRef;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.slf4j.Logger;
......@@ -51,10 +52,10 @@ public class InstanceManager {
private final Map<InstanceID, Instance> registeredHostsById;
/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
private final Map<InstanceConnectionInfo, Instance> registeredHostsByConnection;
private final Map<ActorRef, Instance> registeredHostsByConnection;
/** Set of hosts that were present once and have died */
private final Set<InstanceConnectionInfo> deadHosts;
private final Set<ActorRef> deadHosts;
/** Listeners that want to be notified about availability and disappearance of instances */
private final List<InstanceListener> instanceListeners = new ArrayList<InstanceListener>();
......@@ -92,8 +93,8 @@ public class InstanceManager {
}
this.registeredHostsById = new HashMap<InstanceID, Instance>();
this.registeredHostsByConnection = new HashMap<InstanceConnectionInfo, Instance>();
this.deadHosts = new HashSet<InstanceConnectionInfo>();
this.registeredHostsByConnection = new HashMap<ActorRef, Instance>();
this.deadHosts = new HashSet<ActorRef>();
this.heartbeatTimeout = heartbeatTimeout;
new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
......@@ -159,22 +160,23 @@ public class InstanceManager {
}
}
public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfSlots){
public InstanceID registerTaskManager(ActorRef taskManager, HardwareDescription resources,
int numberOfSlots){
synchronized(this.lock){
if (this.shutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
}
Instance prior = registeredHostsByConnection.get(instanceConnectionInfo);
Instance prior = registeredHostsByConnection.get(taskManager);
if (prior != null) {
LOG.error("Registration attempt from TaskManager with connection info " + instanceConnectionInfo +
LOG.error("Registration attempt from TaskManager at " + taskManager.path() +
". This connection is already registered under ID " + prior.getId());
return null;
}
boolean wasDead = this.deadHosts.remove(instanceConnectionInfo);
boolean wasDead = this.deadHosts.remove(taskManager);
if (wasDead) {
LOG.info("Registering TaskManager with connection info " + instanceConnectionInfo +
LOG.info("Registering TaskManager at " + taskManager.path() +
" which was marked as dead earlier because of a heart-beat timeout.");
}
......@@ -184,16 +186,16 @@ public class InstanceManager {
} while (registeredHostsById.containsKey(id));
Instance host = new Instance(instanceConnectionInfo, id, resources, numberOfSlots);
Instance host = new Instance(taskManager, id, resources, numberOfSlots);
registeredHostsById.put(id, host);
registeredHostsByConnection.put(instanceConnectionInfo, host);
registeredHostsByConnection.put(taskManager, host);
totalNumberOfAliveTaskSlots += numberOfSlots;
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered hosts is %d.",
instanceConnectionInfo, id, registeredHostsById.size()));
taskManager.path(), id, registeredHostsById.size()));
}
host.reportHeartBeat();
......@@ -284,17 +286,17 @@ public class InstanceManager {
// remove from the living
entries.remove();
registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
registeredHostsByConnection.remove(host.getTaskManager());
// add to the dead
deadHosts.add(host.getInstanceConnectionInfo());
deadHosts.add(host.getTaskManager());
host.markDead();
totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
host.getId(), host.getPath(), heartbeatTimeout, registeredHostsById.size()));
// report to all listeners
notifyDeadInstance(host);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.taskmanager.TaskManager2;
import org.slf4j.LoggerFactory;
/**
* A variant of the {@link InstanceManager} that internally spawn task managers as instances, rather than waiting for external
* TaskManagers to register.
*/
public class LocalInstanceManager extends InstanceManager {
private final List<TaskManager> taskManagers = new ArrayList<TaskManager>();
public LocalInstanceManager(int numTaskManagers) throws Exception {
ExecutionMode execMode = numTaskManagers == 1 ? ExecutionMode.LOCAL : ExecutionMode.CLUSTER;
final int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
final int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, -1);
for (int i = 0; i < numTaskManagers; i++) {
// configure ports, if necessary
if (ipcPort > 0 || dataPort > 0) {
Configuration tm = new Configuration();
if (ipcPort > 0) {
tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
}
if (dataPort > 0) {
tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
}
GlobalConfiguration.includeConfiguration(tm);
}
taskManagers.add(TaskManager.createTaskManager(execMode));
}
}
@Override
public void shutdown() {
try {
for (TaskManager taskManager: taskManagers){
try {
taskManager.shutdown();
}
catch (Throwable t) {
// log and continue in any case
// we initialize the log lazily, because this is the only place we log
// and most likely we never log.
LoggerFactory.getLogger(LocalInstanceManager.class).error("Error shutting down local embedded TaskManager.", t);
}
}
} finally {
this.taskManagers.clear();
super.shutdown();
}
}
public TaskManager[] getTaskManagers() {
return (TaskManager[]) this.taskManagers.toArray(new TaskManager[this.taskManagers.size()]);
}
}
......@@ -77,7 +77,7 @@ public class DataInputDeserializer implements DataInputView {
this.buffer = buffer;
this.position = start;
this.end = start * len;
this.end = start + len;
}
// ----------------------------------------------------------------------------------------
......
......@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
* for being fetched by a client. The collected events have an expiration time. In a configurable interval
* the event collector removes all intervals which are older than the interval.
*/
public final class EventCollector extends TimerTask implements ProfilingListener {
public final class EventCollector2 extends TimerTask implements ProfilingListener {
private static final Logger LOG = LoggerFactory.getLogger(EventCollector.class);
......@@ -91,7 +91,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
final VertexEvent vertexEvent = new VertexEvent(timestamp, vertexId, taskName, totalNumberOfSubtasks,
subtask, executionId, newExecutionState, optionalMessage);
this.eventCollector.addEvent(jobID, vertexEvent);
this.eventCollector2.addEvent(jobID, vertexEvent);
final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexId, subtask,
executionId, newExecutionState);
......@@ -126,7 +126,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
/**
* Constructs a new job status listener wrapper.
*
* @param eventCollector
* @param eventCollector2
* the event collector to forward the events to
* @param jobName
* the name of the job
......@@ -194,7 +194,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener
* @param clientQueryInterval
* the interval with which clients query for events
*/
public EventCollector(final int clientQueryInterval) {
public EventCollector2(final int clientQueryInterval) {
this.timerTaskInterval = clientQueryInterval * 1000L * 2L; // Double the interval, clients will take care of
// duplicate notifications
......
......@@ -82,8 +82,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager;
import org.apache.flink.runtime.jobmanager.archive.ArchiveListener;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist2;
import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
import org.apache.flink.runtime.jobmanager.web.WebInfoServer;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
......@@ -187,7 +187,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT);
// Load the job progress collector
this.eventCollector = new EventCollector(this.recommendedClientPollingInterval);
this.eventCollector2 = new EventCollector2(this.recommendedClientPollingInterval);
this.libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(),
GlobalConfiguration.getConfiguration());
......@@ -196,8 +196,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
int archived_items = GlobalConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT);
if (archived_items > 0) {
this.archive = new MemoryArchivist(archived_items);
this.eventCollector.registerArchivist(archive);
this.archive = new MemoryArchivist2(archived_items);
this.eventCollector2.registerArchivist(archive);
}
else {
this.archive = null;
......@@ -286,8 +286,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
// Stop and clean up the job progress collector
if (this.eventCollector != null) {
this.eventCollector.shutdown();
if (this.eventCollector2 != null) {
this.eventCollector2.shutdown();
}
// Finally, shut down the scheduler
......@@ -541,7 +541,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
}
final SerializableArrayList<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
this.eventCollector.getEventsForJob(jobID, eventList, false);
this.eventCollector2.getEventsForJob(jobID, eventList, false);
return new JobProgressResult(ReturnCode.SUCCESS, null, eventList);
}
......@@ -586,11 +586,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
final List<RecentJobEvent> eventList = new SerializableArrayList<RecentJobEvent>();
if (this.eventCollector == null) {
if (this.eventCollector2 == null) {
throw new IOException("No instance of the event collector found");
}
this.eventCollector.getRecentJobs(eventList);
this.eventCollector2.getRecentJobs(eventList);
return eventList;
}
......@@ -601,11 +601,11 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
final List<AbstractEvent> eventList = new SerializableArrayList<AbstractEvent>();
if (this.eventCollector == null) {
if (this.eventCollector2 == null) {
throw new IOException("No instance of the event collector found");
}
this.eventCollector.getEventsForJob(jobID, eventList, true);
this.eventCollector2.getEventsForJob(jobID, eventList, true);
return eventList;
}
......
......@@ -35,7 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobID;
* This class must be thread safe, because it is accessed by the JobManager events and by the
* web server concurrently.
*/
public class MemoryArchivist implements ArchiveListener {
public class MemoryArchivist2 implements ArchiveListener {
/** The global lock */
private final Object lock = new Object();
......
......@@ -39,8 +39,8 @@ public class JsonFactory {
json.append("\"vertexname\": \"" + StringUtils.escapeHtml(vertex.getSimpleName()) + "\",");
json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
AllocatedSlot slot = vertex.getCurrentAssignedResource();
String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname();
AllocatedSlot slot = vertex.getAssignedSlot();
String instanceName = slot == null ? "(null)" : slot.getInstance().getPath();
json.append("\"vertexinstancename\": \"" + instanceName + "\"");
json.append("}");
......
......@@ -32,14 +32,19 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import scala.concurrent.Await;
import scala.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -57,10 +62,10 @@ public class SetupInfoServlet extends HttpServlet {
private Configuration globalC;
private JobManager2 jobmanager;
private ActorRef jobmanager;
public SetupInfoServlet(JobManager jm) {
public SetupInfoServlet(ActorRef jm) {
globalC = GlobalConfiguration.getConfiguration();
this.jobmanager = jm;
}
......@@ -100,51 +105,53 @@ public class SetupInfoServlet extends HttpServlet {
private void writeTaskmanagers(HttpServletResponse resp) throws IOException {
List<Instance> instances = new ArrayList<Instance>(jobmanager.getInstanceManager().getAllRegisteredInstances().values());
Collections.sort(instances, INSTANCE_SORTER);
JSONObject obj = new JSONObject();
JSONArray array = new JSONArray();
for (Instance instance : instances) {
JSONObject objInner = new JSONObject();
long time = new Date().getTime() - instance.getLastHeartBeat();
try {
objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress());
objInner.put("ipcPort", instance.getInstanceConnectionInfo().ipcPort());
objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
objInner.put("timeSinceLastHeartbeat", time / 1000);
objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
objInner.put("cpuCores", instance.getResources().getNumberOfCPUCores());
objInner.put("physicalMemory", instance.getResources().getSizeOfPhysicalMemory() >>> 20);
objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
array.put(objInner);
}
catch (JSONException e) {
LOG.warn("Json object creation failed", e);
}
}
try {
obj.put("taskmanagers", array);
} catch (JSONException e) {
LOG.warn("Json object creation failed", e);
}
PrintWriter w = resp.getWriter();
w.write(obj.toString());
// List<Instance> instances = new ArrayList<Instance>(jobmanager.getInstanceManager().getAllRegisteredInstances().values());
//
// Collections.sort(instances, INSTANCE_SORTER);
//
// JSONObject obj = new JSONObject();
// JSONArray array = new JSONArray();
// for (Instance instance : instances) {
// JSONObject objInner = new JSONObject();
//
// long time = new Date().getTime() - instance.getLastHeartBeat();
//
// try {
// objInner.put("inetAdress", instance.getInstanceConnectionInfo().getInetAdress());
// objInner.put("ipcPort", instance.getInstanceConnectionInfo().ipcPort());
// objInner.put("dataPort", instance.getInstanceConnectionInfo().dataPort());
// objInner.put("timeSinceLastHeartbeat", time / 1000);
// objInner.put("slotsNumber", instance.getTotalNumberOfSlots());
// objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
// objInner.put("cpuCores", instance.getResources().getNumberOfCPUCores());
// objInner.put("physicalMemory", instance.getResources().getSizeOfPhysicalMemory() >>> 20);
// objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
// objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
// array.put(objInner);
// }
// catch (JSONException e) {
// LOG.warn("Json object creation failed", e);
// }
// obj.put("taskmanagers", array);
//
// PrintWriter w = resp.getWriter();
// w.write(obj.toString());
// }catch (JSONException e) {
// LOG.warn("Aggregated JSON object creation failed", e);
// }catch(IOException ioe){
// throw ioe;
// }catch(Exception e){
// throw new RuntimeException("Caught exception while requesting instances from jobmanager.", e);
// }
}
// --------------------------------------------------------------------------------------------
private static final Comparator<Instance> INSTANCE_SORTER = new Comparator<Instance>() {
@Override
public int compare(Instance o1, Instance o2) {
return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
}
};
// private static final Comparator<Instance> INSTANCE_SORTER = new Comparator<Instance>() {
// @Override
// public int compare(Instance o1, Instance o2) {
// return o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
// }
// };
}
......@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobmanager.JobManager2;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.eclipse.jetty.http.security.Constraint;
import org.eclipse.jetty.security.ConstraintMapping;
import org.eclipse.jetty.security.ConstraintSecurityHandler;
......@@ -73,7 +73,7 @@ public class WebInfoServer {
* @throws IOException
* Thrown, if the server setup failed for an I/O related reason.
*/
public WebInfoServer(Configuration nepheleConfig, int port, JobManager2 jobmanager) throws IOException {
public WebInfoServer(Configuration nepheleConfig, int port, ActorRef jobmanager) throws IOException {
this.port = port;
// if no explicit configuration is given, use the global configuration
......
......@@ -28,8 +28,10 @@ package org.apache.flink.runtime.net;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
......@@ -38,6 +40,7 @@ import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -328,4 +331,128 @@ public class NetUtils {
}
return hostNames;
}
/**
* The states of address detection mechanism.
* There is only a state transition if the current state failed to determine the address.
*/
private enum AddressDetectionState {
ADDRESS(50), //detect own IP based on the JobManagers IP address. Look for common prefix
FAST_CONNECT(50), //try to connect to the JobManager on all Interfaces and all their addresses.
//this state uses a low timeout (say 50 ms) for fast detection.
SLOW_CONNECT(1000), //same as FAST_CONNECT, but with a timeout of 1000 ms (1s).
HEURISTIC(0);
private int timeout;
AddressDetectionState(int timeout) {
this.timeout = timeout;
}
public int getTimeout() {
return timeout;
}
}
/**
* Find out the TaskManager's own IP address.
*/
public static InetAddress resolveAddress(InetSocketAddress jobManagerAddress) throws IOException {
AddressDetectionState strategy = jobManagerAddress != null ? AddressDetectionState.ADDRESS: AddressDetectionState.HEURISTIC;
while (true) {
Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
while (e.hasMoreElements()) {
NetworkInterface n = e.nextElement();
Enumeration<InetAddress> ee = n.getInetAddresses();
while (ee.hasMoreElements()) {
InetAddress i = ee.nextElement();
switch (strategy) {
case ADDRESS:
if (hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
if (tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
return i;
}
}
break;
case FAST_CONNECT:
case SLOW_CONNECT:
boolean correct = tryToConnect(i, jobManagerAddress, strategy.getTimeout());
if (correct) {
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
return i;
}
break;
case HEURISTIC:
if(!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof Inet4Address){
LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to " +
"loopback address. Using instead " + i.getHostAddress() + " on network " +
"interface " + n.getName() + ".");
return i;
}
break;
default:
throw new RuntimeException("Unkown address detection strategy: " + strategy);
}
}
}
// state control
switch (strategy) {
case ADDRESS:
strategy = AddressDetectionState.FAST_CONNECT;
break;
case FAST_CONNECT:
strategy = AddressDetectionState.SLOW_CONNECT;
break;
case SLOW_CONNECT:
if(!InetAddress.getLocalHost().isLoopbackAddress()){
return InetAddress.getLocalHost();
}else {
strategy = AddressDetectionState.HEURISTIC;
break;
}
case HEURISTIC:
throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address: '"+jobManagerAddress+"').");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Defaulting to detection strategy " + strategy);
}
}
}
/**
* Checks if two addresses have a common prefix (first 2 bytes).
* Example: 192.168.???.???
* Works also with ipv6, but accepts probably too many addresses
*/
private static boolean hasCommonPrefix(byte[] address, byte[] address2) {
return address[0] == address2[0] && address[1] == address2[1];
}
public static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket, int timeout) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to connect to JobManager (" + toSocket + ") from local address " + fromAddress
+ " with timeout " + timeout);
}
boolean connectable = true;
Socket socket = null;
try {
socket = new Socket();
SocketAddress bindP = new InetSocketAddress(fromAddress, 0); // 0 = let the OS choose the port on this
// machine
socket.bind(bindP);
socket.connect(toSocket, timeout);
} catch (Exception ex) {
LOG.info("Failed to connect to JobManager from address '" + fromAddress + "': " + ex.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Failed with exception", ex);
}
connectable = false;
} finally {
if (socket != null) {
socket.close();
}
}
return connectable;
}
}
......@@ -37,6 +37,7 @@ public class DeserializationDelegate<T> implements IOReadableWritable {
public DeserializationDelegate(TypeSerializer<T> serializer) {
this.serializer = serializer;
}
public void setInstance(T instance) {
this.instance = instance;
......
......@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import akka.actor.ActorRef;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.profiling.ProfilingException;
import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
......@@ -50,7 +51,7 @@ public class InstanceProfiler {
private static final int PERCENT = 100;
private final InstanceConnectionInfo instanceConnectionInfo;
private final String instancePath;
private long lastTimestamp = 0;
......@@ -76,10 +77,10 @@ public class InstanceProfiler {
private long firstTimestamp;
public InstanceProfiler(InstanceConnectionInfo instanceConnectionInfo)
public InstanceProfiler(String instancePath)
throws ProfilingException {
this.instanceConnectionInfo = instanceConnectionInfo;
this.instancePath = instancePath;
this.firstTimestamp = System.currentTimeMillis();
// Initialize counters by calling generateProfilingData once and ignore the return value
generateProfilingData(this.firstTimestamp);
......@@ -90,7 +91,7 @@ public class InstanceProfiler {
final long profilingInterval = timestamp - lastTimestamp;
final InternalInstanceProfilingData profilingData = new InternalInstanceProfilingData(
this.instanceConnectionInfo, (int) profilingInterval);
this.instancePath, (int) profilingInterval);
updateCPUUtilization(profilingData);
updateMemoryUtilization(profilingData);
......@@ -332,7 +333,7 @@ public class InstanceProfiler {
final long profilingInterval = System.currentTimeMillis() - this.firstTimestamp;
final InternalInstanceProfilingData profilingData = new InternalInstanceProfilingData(
this.instanceConnectionInfo, (int) profilingInterval);
this.instancePath, (int) profilingInterval);
updateCPUUtilization(profilingData);
updateMemoryUtilization(profilingData);
......
......@@ -162,8 +162,8 @@ public class JobManagerProfilerImpl implements JobManagerProfiler, ProfilerImplP
profilingData.getSoftIrqCPU(), profilingData.getTotalMemory(), profilingData.getFreeMemory(),
profilingData.getBufferedMemory(), profilingData.getCachedMemory(), profilingData
.getCachedSwapMemory(), profilingData.getReceivedBytes(), profilingData.getTransmittedBytes(),
jobID, timestamp, timestamp - jobProfilingData.getProfilingStart(), profilingData
.getInstanceConnectionInfo().toString());
jobID, timestamp, timestamp - jobProfilingData.getProfilingStart(),
profilingData.getInstancePath());
synchronized (this.registeredListeners) {
......
......@@ -28,7 +28,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
......@@ -39,7 +38,8 @@ public class JobProfilingData {
private final long profilingStart;
private final Map<InstanceConnectionInfo, InternalInstanceProfilingData> collectedInstanceProfilingData = new HashMap<InstanceConnectionInfo, InternalInstanceProfilingData>();
private final Map<String, InternalInstanceProfilingData> collectedInstanceProfilingData = new
HashMap<String, InternalInstanceProfilingData>();
public JobProfilingData(ExecutionGraph executionGraph) {
......@@ -58,12 +58,12 @@ public class JobProfilingData {
public boolean addIfInstanceIsAllocatedByJob(InternalInstanceProfilingData instanceProfilingData) {
for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
if (slot != null && slot.getInstance().getInstanceConnectionInfo().equals(
instanceProfilingData.getInstanceConnectionInfo()))
for (ExecutionVertex2 executionVertex : this.executionGraph.getAllExecutionVertices()) {
AllocatedSlot slot = executionVertex.getAssignedSlot();
if (slot != null && slot.getInstance().getPath().equals(
instanceProfilingData.getInstancePath()))
{
this.collectedInstanceProfilingData.put(instanceProfilingData.getInstanceConnectionInfo(), instanceProfilingData);
this.collectedInstanceProfilingData.put(instanceProfilingData.getInstancePath(), instanceProfilingData);
return true;
}
}
......@@ -75,8 +75,8 @@ public class JobProfilingData {
final Set<Instance> tempSet = new HashSet<Instance>();
for (ExecutionVertex executionVertex : this.executionGraph.getAllExecutionVertices()) {
AllocatedSlot slot = executionVertex.getCurrentAssignedResource();
for (ExecutionVertex2 executionVertex : this.executionGraph.getAllExecutionVertices()) {
AllocatedSlot slot = executionVertex.getAssignedSlot();
if (slot != null) {
tempSet.add(slot.getInstance());
}
......@@ -95,7 +95,7 @@ public class JobProfilingData {
final int numberOfInstances = this.collectedInstanceProfilingData.size();
final Iterator<InstanceConnectionInfo> instanceIterator = this.collectedInstanceProfilingData.keySet().iterator();
final Iterator<String> instanceIterator = this.collectedInstanceProfilingData.keySet().iterator();
long freeMemorySum = 0;
long totalMemorySum = 0;
......
......@@ -36,6 +36,7 @@ import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingDa
import org.apache.flink.runtime.profiling.impl.types.ProfilingDataContainer;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.StringUtils;
import akka.actor.ActorRef;
import java.io.IOException;
import java.lang.management.ManagementFactory;
......@@ -66,7 +67,7 @@ public class TaskManagerProfilerImpl extends TimerTask implements TaskManagerPro
private final Map<Environment, EnvironmentThreadSet> monitoredThreads = new HashMap<Environment, EnvironmentThreadSet>();
public TaskManagerProfilerImpl(InetAddress jobManagerAddress, InstanceConnectionInfo instanceConnectionInfo)
public TaskManagerProfilerImpl(InetAddress jobManagerAddress, String instancePath)
throws ProfilingException {
// Create RPC stub for communication with job manager's profiling component.
......@@ -90,7 +91,7 @@ public class TaskManagerProfilerImpl extends TimerTask implements TaskManagerPro
}
// Create instance profiler
this.instanceProfiler = new InstanceProfiler(instanceConnectionInfo);
this.instanceProfiler = new InstanceProfiler(instancePath);
// Set and trigger timer
this.timerInterval = (long) (GlobalConfiguration.getInteger(ProfilingUtils.TASKMANAGER_REPORTINTERVAL_KEY,
......
......@@ -22,11 +22,10 @@ import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
public class InternalInstanceProfilingData implements InternalProfilingData {
private InstanceConnectionInfo instanceConnectionInfo;
private String instancePath;
private int profilingInterval;
......@@ -60,7 +59,7 @@ public class InternalInstanceProfilingData implements InternalProfilingData {
this.freeMemory = -1;
this.ioWaitCPU = -1;
this.idleCPU = -1;
this.instanceConnectionInfo = new InstanceConnectionInfo();
this.instancePath = "";
this.profilingInterval = -1;
this.systemCPU = -1;
this.totalMemory = -1;
......@@ -72,9 +71,9 @@ public class InternalInstanceProfilingData implements InternalProfilingData {
this.transmittedBytes = -1;
}
public InternalInstanceProfilingData(InstanceConnectionInfo instanceConnectionInfo, int profilingInterval) {
public InternalInstanceProfilingData(String instancePath, int profilingInterval) {
this.instanceConnectionInfo = instanceConnectionInfo;
this.instancePath = instancePath;
this.profilingInterval = profilingInterval;
this.freeMemory = -1;
this.ioWaitCPU = -1;
......@@ -109,8 +108,8 @@ public class InternalInstanceProfilingData implements InternalProfilingData {
return this.softIrqCPU;
}
public InstanceConnectionInfo getInstanceConnectionInfo() {
return this.instanceConnectionInfo;
public String getInstancePath() {
return this.instancePath;
}
public int getProfilingInterval() {
......@@ -155,7 +154,7 @@ public class InternalInstanceProfilingData implements InternalProfilingData {
this.freeMemory = in.readLong();
this.ioWaitCPU = in.readInt();
this.idleCPU = in.readInt();
this.instanceConnectionInfo.read(in);
this.instancePath = in.readUTF();
this.profilingInterval = in.readInt();
this.systemCPU = in.readInt();
this.totalMemory = in.readLong();
......@@ -176,7 +175,7 @@ public class InternalInstanceProfilingData implements InternalProfilingData {
out.writeLong(this.freeMemory);
out.writeInt(this.ioWaitCPU);
out.writeInt(this.idleCPU);
this.instanceConnectionInfo.write(out);
out.writeUTF(instancePath);
out.writeInt(this.profilingInterval);
out.writeInt(this.systemCPU);
out.writeLong(this.totalMemory);
......
......@@ -33,6 +33,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.profiling.TaskManagerProfiler;
import org.apache.flink.util.ExceptionUtils;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime
import _root_.akka.actor.Actor
import _root_.akka.event.LoggingAdapter
trait ActorLogMessages {
self: Actor =>
override def receive: Receive = new Actor.Receive {
private val _receiveWithLogMessages = receiveWithLogMessages
override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x)
override def apply(x: Any):Unit = {
log.debug(s"Received message $x from ${self.sender}.")
val start = System.nanoTime()
_receiveWithLogMessages(x)
val duration = (System.nanoTime() - start) / 1000000
log.debug(s"Handled message $x in $duration ms from ${self.sender}.")
}
}
def receiveWithLogMessages: Receive
protected def log: LoggingAdapter
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.akka.serialization
import akka.serialization.JSerializer
import org.apache.flink.core.io.IOReadableWritable
import org.apache.flink.runtime.io.network.serialization.{DataInputDeserializer, DataOutputSerializer}
import org.apache.flink.util.InstantiationUtil
class IOReadableWritableSerializer extends JSerializer {
val INITIAL_BUFFER_SIZE = 8096
override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
val in = new DataInputDeserializer(bytes, 0, bytes.length)
val instance = InstantiationUtil.instantiate(manifest)
if(!instance.isInstanceOf[IOReadableWritable]){
throw new RuntimeException(s"Class $manifest is not of type IOReadableWritable.")
}
val ioRW = instance.asInstanceOf[IOReadableWritable]
ioRW.read(in)
ioRW
}
override def includeManifest: Boolean = true
override def toBinary(o: AnyRef): Array[Byte] = {
if(!o.isInstanceOf[IOReadableWritable]){
throw new RuntimeException("Object is not of type IOReadableWritable.")
}
val ioRW = o.asInstanceOf[IOReadableWritable]
val out = new DataOutputSerializer(INITIAL_BUFFER_SIZE)
ioRW.write(out)
out.wrapAsByteBuffer().array()
}
override def identifier: Int = 1337
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import akka.actor.{Terminated, ActorRef, Actor, ActorLogging}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.event.job._
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.executiongraph._
import org.apache.flink.runtime.jobgraph.{JobVertexID, JobStatus, JobID}
import org.apache.flink.runtime.messages.ArchiveMessages.{ArchiveExecutionGraph, ArchiveJobEvent, ArchiveEvent}
import org.apache.flink.runtime.messages.EventCollectorMessages._
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{JobStatusChanged, ExecutionStateChanged}
import org.apache.hadoop.net.NetworkTopology
import scala.collection.convert.{WrapAsScala, DecorateAsJava}
import scala.concurrent.duration._
class EventCollector(val timerTaskInterval: Int) extends Actor with ActorLogMessages with ActorLogging with
DecorateAsJava with WrapAsScala {
import context.dispatcher
val collectedEvents = collection.mutable.HashMap[JobID, List[AbstractEvent]]()
val recentJobs = collection.mutable.HashMap[JobID, RecentJobEvent]()
val recentExecutionGraphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
val archiveListeners = collection.mutable.HashSet[ActorRef]()
val jobInformation = collection.mutable.HashMap[JobID, (String, Boolean, Long)]()
override def preStart():Unit = {
startArchiveExpiredEvent()
}
override def postStop(): Unit ={
collectedEvents.clear()
recentJobs.clear()
recentExecutionGraphs.clear()
archiveListeners.clear()
jobInformation.clear()
}
def startArchiveExpiredEvent():Unit = {
val schedulerDuration = FiniteDuration(2*timerTaskInterval, SECONDS)
context.system.scheduler.schedule(schedulerDuration, schedulerDuration, self, ArchiveExpiredEvents)
}
override def receiveWithLogMessages: Receive = {
case ArchiveExpiredEvents =>
val currentTime = System.currentTimeMillis()
collectedEvents.retain{
(jobID, events) =>
val (outdatedElements, currentElements) = events.partition{
event => event.getTimestamp + timerTaskInterval < currentTime
}
outdatedElements foreach ( archiveEvent(jobID, _) )
currentElements.nonEmpty
}
recentJobs.retain{
(jobID, recentJobEvent) =>
import JobStatus._
val status = recentJobEvent.getJobStatus
// only remove jobs which have stopped running
if((status == FINISHED || status == CANCELED || status != FAILED) &&
recentJobEvent.getTimestamp + timerTaskInterval < currentTime){
archiveJobEvent(jobID, recentJobEvent)
archiveExecutionGraph(jobID, recentExecutionGraphs.remove(jobID).get)
jobInformation.remove(jobID)
false
}else{
true
}
}
case RequestJobEvents(jobID, includeManagementEvents) =>
val events = collectedEvents.getOrElse(jobID, List())
val filteredEvents = events filter { event => !event.isInstanceOf[ManagementEvent] || includeManagementEvents}
sender() ! filteredEvents.asJava
case RequestRecentJobs =>
sender() ! recentJobs.values.asJavaCollection
case RegisterJob(executionGraph, profilingAvailable, submissionTimestamp) =>
val jobID = executionGraph.getJobID
executionGraph.registerExecutionListener(self)
executionGraph.registerJobStatusListener(self)
jobInformation += jobID -> (executionGraph.getJobName, profilingAvailable, submissionTimestamp)
case ExecutionStateChanged(jobID, vertexID, subtask, executionID, newExecutionState, optionalMessage) =>
val timestamp = System.currentTimeMillis()
recentExecutionGraphs.get(jobID) match {
case Some(graph) =>
val vertex = graph.getJobVertex(vertexID)
val taskName = if(vertex != null) vertex.getJobVertex.getName else "(null)"
val totalNumberOfSubtasks = if(vertex != null) vertex.getParallelism else -1
val vertexEvent = new VertexEvent(timestamp, vertexID, taskName, totalNumberOfSubtasks, subtask, executionID,
newExecutionState, optionalMessage)
val events = collectedEvents.getOrElse(jobID, List())
val executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, vertexID, subtask,
executionID, newExecutionState)
collectedEvents += jobID -> (executionStateChangeEvent :: vertexEvent :: events)
case None =>
log.warning(s"Could not find execution graph with jobID ${jobID}.")
}
case JobStatusChanged(executionGraph, newJobStatus, optionalMessage) =>
val jobID = executionGraph.getJobID()
if(newJobStatus == JobStatus.RUNNING){
this.recentExecutionGraphs += jobID -> executionGraph
}
val currentTime = System.currentTimeMillis()
val (jobName, isProfilingEnabled, submissionTimestamp) = jobInformation(jobID)
recentJobs.put(jobID, new RecentJobEvent(jobID, jobName, newJobStatus, isProfilingEnabled, submissionTimestamp,
currentTime))
val events = collectedEvents.getOrElse(jobID, List())
collectedEvents += jobID -> ((new JobEvent(currentTime, newJobStatus, optionalMessage))::events)
case ProcessProfilingEvent(profilingEvent) =>
val events = collectedEvents.getOrElse(profilingEvent.getJobID, List())
collectedEvents += profilingEvent.getJobID -> (profilingEvent::events)
case RegisterArchiveListener(actorListener) =>
context.watch(actorListener)
archiveListeners += actorListener
case Terminated(terminatedListener) =>
archiveListeners -= terminatedListener
}
private def archiveEvent(jobID: JobID, event: AbstractEvent): Unit = {
for(listener <- archiveListeners){
listener ! ArchiveEvent(jobID, event)
}
}
private def archiveJobEvent(jobID: JobID, event: RecentJobEvent): Unit = {
for(listener <- archiveListeners){
listener ! ArchiveJobEvent(jobID, event)
}
}
private def archiveExecutionGraph(jobID: JobID, graph: ExecutionGraph): Unit = {
for(listener <- archiveListeners){
listener ! ArchiveExecutionGraph(jobID, graph)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
case class JobManagerCLIConfiguration(configDir: String = null) {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import akka.actor.{ActorLogging, Actor}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.event.job.{RecentJobEvent, AbstractEvent}
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.messages.ArchiveMessages._
import scala.collection.convert.DecorateAsJava
import scala.collection.mutable.ListBuffer
class MemoryArchivist(private val max_entries: Int) extends Actor with ActorLogMessages with ActorLogging with
DecorateAsJava {
/**
* The map which stores all collected events until they are either
* fetched by the client or discarded.
*/
private val collectedEvents = collection.mutable.HashMap[JobID, ListBuffer[AbstractEvent]]()
/**
* Map of recently started jobs with the time stamp of the last received job event.
*/
private val oldJobs = collection.mutable.HashMap[JobID, RecentJobEvent]()
/**
* Map of execution graphs belonging to recently started jobs with the time stamp of the last received job event.
*/
private val graphs = collection.mutable.HashMap[JobID, ExecutionGraph]()
private val lru = collection.mutable.Queue[JobID]()
override def receiveWithLogMessages: Receive = {
case ArchiveEvent(jobID, event) =>
val list = collectedEvents.getOrElseUpdate(jobID, ListBuffer())
list += event
cleanup(jobID)
case ArchiveJobEvent(jobID, event) =>
oldJobs.update(jobID, event)
cleanup(jobID)
case ArchiveExecutionGraph(jobID, graph) =>
graphs.update(jobID, graph)
cleanup(jobID)
case GetJobs =>
oldJobs.values.toSeq.asJava
case GetJob(jobID) =>
sender() ! oldJobs.get(jobID)
case GetEvents(jobID) =>
sender() ! collectedEvents.get(jobID)
case GetExecutionGraph(jobID) =>
sender() ! (graphs.get(jobID) match{
case Some(graph) => graph
case None => akka.actor.Status.Failure(new IllegalArgumentException(s"Could not find execution graph for job " +
s"id $jobID."))
})
}
def cleanup(jobID: JobID): Unit = {
if(!lru.contains(jobID)){
lru.enqueue(jobID)
}
while(lru.size > max_entries){
val removedJobID = lru.dequeue()
collectedEvents.remove(removedJobID)
oldJobs.remove(removedJobID)
graphs.remove(removedJobID)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages
import org.apache.flink.runtime.event.job.{RecentJobEvent, AbstractEvent}
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.jobgraph.JobID
object ArchiveMessages {
case class ArchiveEvent(jobID: JobID, event: AbstractEvent)
case class ArchiveJobEvent(jobID: JobID, event: RecentJobEvent)
case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
case class GetJob(jobID: JobID)
case class GetExecutionGraph(jobID: JobID)
case class GetEvents(jobID: JobID)
case object GetJobs
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages
import akka.actor.ActorRef
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.profiling.types.ProfilingEvent
object EventCollectorMessages {
case class ProcessProfilingEvent(profilingEvent: ProfilingEvent)
case class RegisterArchiveListener(listener: ActorRef)
case class RequestJobEvents(jobID: JobID, includeManagementEvents: Boolean)
case class RegisterJob(executionGraph: ExecutionGraph, profilingAvailable: Boolean, submissionTimestamp: Long)
case object ArchiveExpiredEvents
case object RequestRecentJobs
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages
import org.apache.flink.runtime.execution.{ExecutionState2}
import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID}
object ExecutionGraphMessages {
case class ExecutionStateChanged(jobID: JobID, vertexID: JobVertexID, subtask: Int, executionID: ExecutionAttemptID,
newExecutionState: ExecutionState2, optionalMessage: String)
case class JobStatusChanged(executionGraph: ExecutionGraph, newJobStatus: JobStatus, optionalMessage: String)
}
......@@ -18,9 +18,12 @@
package org.apache.flink.runtime.messages
import org.apache.flink.runtime.instance.HardwareDescription
import org.apache.flink.runtime.jobgraph.{JobID, JobGraph}
object RegistrationMessages {
case class RegisterTaskManager(hardwareDescription: HardwareDescription)
case object AcknowledgeRegistration
object JobManagerMessages {
case class SubmitJob(jobGraph: JobGraph)
case class CancelJob(jobID: JobID)
case object RequestInstances
case object RequestNumberRegisteredTaskManager
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.messages
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
import org.apache.flink.runtime.execution.librarycache.LibraryCacheProfileRequest
import org.apache.flink.runtime.instance.InstanceID
import org.apache.flink.runtime.jobgraph.JobVertexID
object TaskManagerMessages {
case class RequestLibraryCacheProfile(request: LibraryCacheProfileRequest)
case class CancelTask(jobVertexID: JobVertexID, subtaskIndex: Int)
case class SubmitTask(tasks: TaskDeploymentDescriptor)
case class Heartbeat(instanceID: InstanceID)
case object RegisterAtMaster
case object SendHeartbeat
case object AcknowledgeLibraryCacheUpdate
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.taskmanager
import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription}
object RegistrationMessages {
case class RegisterTaskManager(hardwareDescription: HardwareDescription, numberOfSlots: Int)
case class AcknowledgeRegistration(instanceID: InstanceID)
}
......@@ -17,18 +17,122 @@
*/
import java.net.InetSocketAddress
import akka.actor._
import org.apache.flink.configuration.Configuration
import akka.pattern.ask
import org.apache.flink.configuration.{GlobalConfiguration, ConfigConstants, Configuration}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, RegisteredTaskManager}
import org.apache.flink.runtime.execution.ExecutionState2
import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
import org.apache.flink.runtime.instance.{HardwareDescription, InstanceID}
import org.apache.flink.runtime.jobgraph.JobID
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
import org.apache.flink.runtime.messages.RegistrationMessages.{RegisterTaskManager, AcknowledgeRegistration}
import org.apache.flink.runtime.messages.TaskManagerMessages.{AcknowledgeLibraryCacheUpdate, SendHeartbeat, Heartbeat, RegisterAtMaster}
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.util.Failure
class TaskManager(jobManagerURL: String, numberOfSlots: Int, memorySize: Long,
pageSize: Int) extends Actor with ActorLogMessages with
ActorLogging {
import context.dispatcher
import AkkaUtils.FUTURE_TIMEOUT
val REGISTRATION_DELAY = 0 seconds
val REGISTRATION_INTERVAL = 10 seconds
val MAX_REGISTRATION_ATTEMPTS = 1
val HEARTBEAT_INTERVAL = 200 millisecond
val memoryManager = new DefaultMemoryManager(memorySize, numberOfSlots, pageSize)
val hardwareDescription = HardwareDescription.extractFromSystem(memoryManager.getMemorySize)
var registrationScheduler: Option[Cancellable] = None
var registrationAttempts: Int = 0
var registered: Boolean = false
var currentJobManager = ActorRef.noSender
var instanceID: InstanceID = null;
override def preStart(): Unit = {
tryJobManagerRegistration()
}
def tryJobManagerRegistration(): Unit = {
registrationAttempts = 0
import context.dispatcher
registrationScheduler = Some(context.system.scheduler.schedule(REGISTRATION_DELAY, REGISTRATION_INTERVAL,
self, RegisterAtMaster))
}
override def receiveWithLogMessages: Receive = {
case RegisterAtMaster =>
registrationAttempts += 1
if(registered){
registrationScheduler.foreach(_.cancel())
} else if(registrationAttempts <= MAX_REGISTRATION_ATTEMPTS){
val jobManagerURL = getJobManagerURL
log.info(s"Try to register at master ${jobManagerURL}. ${registrationAttempts}. Attempt")
val jobManager = context.actorSelection(jobManagerURL)
jobManager ! RegisterTaskManager(hardwareDescription, numberOfSlots)
}else{
log.error("TaskManager could not register at JobManager.");
throw new RuntimeException("TaskManager could not register at JobManager");
}
case AcknowledgeRegistration(id) =>
registered = true
currentJobManager = sender()
instanceID = id
val jobManagerAddress = currentJobManager.path.toString()
log.info(s"TaskManager successfully registered at JobManager $jobManagerAddress.")
context.system.scheduler.schedule(HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, self, SendHeartbeat)
case SendHeartbeat =>
currentJobManager ! Heartbeat(instanceID)
case x:LibraryCacheUpdate =>
log.info(s"Registered library ${x.getLibraryFileName}.")
sender() ! AcknowledgeLibraryCacheUpdate
}
def notifyExecutionStateChange(jobID: JobID, executionID: ExecutionAttemptID, executionState : ExecutionState2,
message: String): Unit = {
val futureResponse = currentJobManager ? new TaskExecutionState(jobID, executionID, executionState, message)
futureResponse.onComplete{
x =>
x match {
case Failure(ex) =>
log.error(ex, "Error sending task state update to JobManager.")
case _ =>
}
if(executionState == ExecutionState2.FINISHED || executionState == ExecutionState2.CANCELED || executionState ==
ExecutionState2.FAILED){
unregisterTask(executionID)
}
}
}
import org.apache.flink.api.common.typeutils.TypePairComparator;
def unregisterTask(executionID: ExecutionAttemptID): Unit = {
override def receive: Receive = {
case AcknowledgeRegistration =>
println("Got registered at " + sender().toString())
Thread.sleep(1000)
self ! PoisonPill
}
private def getJobManagerURL: String = {
JobManager.getAkkaURL(jobManagerURL)
}
}
......@@ -37,14 +141,92 @@ public class IntPairPairComparator extends TypePairComparator<IntPair, IntPair>
private int key;
def startActorSystemAndActor(systemName: String, hostname: String, port: Int, actorName: String,
configuration: Configuration) = {
val actorSystem = AkkaUtils.createActorSystem(systemName, hostname, port, configuration)
startActor(actorSystem, actorName)
val LOG = LoggerFactory.getLogger(classOf[TaskManager])
val FAILURE_RETURN_CODE = -1
def main(args: Array[String]): Unit = {
val (hostname, port, configuration) = initialize(args)
val taskManagerSystem = startActorSystemAndActor(hostname, port, configuration)
taskManagerSystem.awaitTermination()
}
private def initialize(args: Array[String]):(String, Int, Configuration) = {
val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager"){
head("flink task manager")
opt[String]("configDir") action { (x, c) =>
c.copy(configDir = x)
} text("Specify configuration directory.")
opt[String]("tempDir") action { (x, c) =>
c.copy(tmpDir = x)
} text("Specify temporary directory.")
}
parser.parse(args, TaskManagerCLIConfiguration()) map {
config =>
GlobalConfiguration.loadConfiguration(config.configDir)
val configuration = GlobalConfiguration.getConfiguration()
if(config.tmpDir != null && GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
null) == null){
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, config.tmpDir)
}
val jobManagerHostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
val jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort);
val port = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
val hostname = NetUtils.resolveAddress(jobManagerAddress).getHostName;
(hostname, port, configuration)
} getOrElse {
LOG.error("CLI parsing failed. Usage: " + parser.usage)
sys.exit(FAILURE_RETURN_CODE)
}
}
def startActorSystemAndActor(hostname: String, port: Int, configuration: Configuration) = {
val actorSystem = AkkaUtils.createActorSystem(hostname, port, configuration)
startActor(actorSystem, configuration)
actorSystem
}
def startActor(actorSystem: ActorSystem, actorName: String): ActorRef = {
actorSystem.actorOf(Props(classOf[TaskManager]), actorName);
def startActor(actorSystem: ActorSystem, configuration: Configuration): ActorRef = {
val jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
val jobManagerRPCPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
if(jobManagerAddress == null){
throw new RuntimeException("JobManager address has not been specified in the configuration.")
}
val jobManagerURL = jobManagerAddress + ":" + jobManagerRPCPort
val slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)
val numberOfSlots = if(slots > 0) slots else 1
val configuredMemory:Long = configuration.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1)
val memorySize = if(configuredMemory > 0){
configuredMemory << 20
} else{
val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag * fraction).toLong
}
val pageSize = configuration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE)
actorSystem.actorOf(Props(classOf[TaskManager], jobManagerURL, numberOfSlots, memorySize, pageSize), "taskmanager");
}
def getAkkaURL(address: String): String = {
s"akka.tcp://flink@${address}/user/taskmanager"
}
}
......@@ -32,9 +32,9 @@ import java.io.BufferedReader;
import java.io.FileReader;
import java.net.InetAddress;
import akka.actor.ActorRef;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.profiling.ProfilingException;
import org.apache.flink.runtime.profiling.impl.InstanceProfiler;
import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
import org.junit.Before;
import org.junit.Test;
......@@ -127,7 +127,7 @@ public class InstanceProfilerTest {
PowerMockito.mockStatic(System.class);
when(System.currentTimeMillis()).thenReturn(0L);
this.out = new InstanceProfiler(this.infoMock);
this.out = new InstanceProfiler(ActorRef.noSender());
}
@Test
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,7 +17,7 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......
......@@ -17,14 +17,21 @@
*/
import akka.actor.{ExtendedActorSystem, ActorSystem}
import akka.actor.ActorSystem
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.core.io.IOReadableWritable
import org.apache.flink.runtime.akka.serialization.IOReadableWritableSerializer
import scala.concurrent.duration._
object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int, configuration: Configuration): ActorSystem = {
implicit val FUTURE_TIMEOUT: Timeout = 1 minute
implicit val AWAIT_DURATION: Duration = 1 minute
def createActorSystem(host: String, port: Int, configuration: Configuration): ActorSystem = {
val akkaConfig = ConfigFactory.parseString(AkkaUtils.getConfigString(host, port, configuration))
val actorSystem = ActorSystem.create(name, akkaConfig)
val actorSystem = ActorSystem.create("flink", akkaConfig)
actorSystem
}
......@@ -52,6 +59,8 @@ object AkkaUtils {
val logLifecycleEvents = if(lifecycleEvents) "on" else "off"
val ioRWSerializerClass = classOf[IOReadableWritableSerializer].getCanonicalName
val ioRWClass = classOf[IOReadableWritable].getCanonicalName
s"""akka.daemonic = on
|akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
......@@ -77,6 +86,12 @@ object AkkaUtils {
|akka.remote.log-remote-lifecycle-events = $logLifecycleEvents
|akka.log-dead-letters = $logLifecycleEvents
|akka.log-dead-letters-during-shutdown = $logLifecycleEvents
|akka.actor.serializers {
| IOReadableWritable = "$ioRWSerializerClass"
|}
|akka.actor.serialization-bindings {
| "$ioRWClass" = IOReadableWritable
|}
""".stripMargin
}
}
......@@ -18,7 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-taskManager">
<parent>
<groupId>org.apache</groupId>
......@@ -60,11 +60,12 @@ under the License.
<module>flink-compiler</module>
<module>flink-examples</module>
<module>flink-clients</module>
<module>flink-tests</module>
<module>flink-test-utils</module>
<module>flink-addons</module>
<module>flink-quickstart</module>
<module>flink-dist</module>
<!--<module>flink-tests</module>-->
<!--<module>flink-test-utils</module>-->
<!--<module>flink-addons</module>-->
<!--<module>flink-quickstart</module>-->
<!--<module>flink-dist</module>-->
<module>testing</module>
</modules>
<properties>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册