提交 efec2297 编写于 作者: S Stephan Ewen

[FLINK-2084] [core] Add an option to start Flink in streaming mode

 - Streaming mode sets the memory manager to lazy memory allocation to ensure
   heap is not blocked by batch memory manager
上级 ea60678e
......@@ -20,6 +20,7 @@
STARTSTOP=$1
EXECUTIONMODE=$2
STREAMINGMODE=$3
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
......@@ -80,7 +81,7 @@ case $STARTSTOP in
rotateLogFile $out
echo "Starting Job Manager"
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null &
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" --executionMode $EXECUTIONMODE --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
echo $! > $pid
;;
......@@ -99,7 +100,7 @@ case $STARTSTOP in
;;
(*)
echo "Please specify 'start (cluster|local)' or stop"
echo "Please specify 'start (cluster|local) [batch|streaming]' or 'stop'"
;;
esac
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
HOSTLIST=$FLINK_SLAVES
if [ "$HOSTLIST" = "" ]; then
HOSTLIST="${FLINK_CONF_DIR}/slaves"
fi
if [ ! -f "$HOSTLIST" ]; then
echo $HOSTLIST is not a valid slave list
exit 1
fi
# cluster mode, bring up job manager locally and a task manager on every slave host
"$FLINK_BIN_DIR"/jobmanager.sh start cluster streaming
GOON=true
while $GOON
do
read line || GOON=false
if [ -n "$line" ]; then
HOST=$( extractHostName $line)
ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start streaming &"
fi
done < "$HOSTLIST"
......@@ -43,6 +43,6 @@ do
read line || GOON=false
if [ -n "$line" ]; then
HOST=$( extractHostName $line)
ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start &"
ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start batch &"
fi
done < "$HOSTLIST"
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# local mode, only bring up job manager. The job manager will start an internal task manager
"$FLINK_BIN_DIR"/jobmanager.sh start local streaming
......@@ -57,6 +57,6 @@ if not defined FOUND (
echo Starting Flink job manager. Webinterface by default on http://localhost:8081/.
echo Don't close this batch window. Stop job manager by pressing Ctrl+C.
java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir "%FLINK_CONF_DIR%" > "%out%" 2>&1
java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --configDir "%FLINK_CONF_DIR%" --executionMode local --streamingMode batch > "%out%" 2>&1
endlocal
......@@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# local mode, only bring up job manager. The job manager will start an internal task manager
"$FLINK_BIN_DIR"/jobmanager.sh start local
"$FLINK_BIN_DIR"/jobmanager.sh start local batch
......@@ -19,6 +19,7 @@
STARTSTOP=$1
STREAMINGMODE=$2
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
......@@ -68,7 +69,7 @@ case $STARTSTOP in
rotateLogFile $out
echo Starting task manager on host $HOSTNAME
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null &
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
echo $! > $pid
;;
......@@ -87,7 +88,7 @@ case $STARTSTOP in
;;
(*)
echo Please specify start or stop
echo "Please specify 'start [batch|streaming]' or 'stop'"
;;
esac
......@@ -16,13 +16,19 @@
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
package org.apache.flink.runtime;
/**
* Holder for command line parameters of the JobManager.
*
* @param configDir The directory to load the configuration from.
* @param executionMode Mode for the JobManager.
* The streaming mode defines whether the system starts in streaming mode,
* or in pure batch mode. Note that streaming mode can execute batch programs
* as well.
*/
case class JobManagerCLIConfiguration(configDir: String = null,
executionMode: JobManagerMode = null) {}
public enum StreamingMode {
/** This mode indicates the system can run streaming tasks, of which batch
* tasks are a special case. */
STREAMING,
/** This mode indicates that the system can run only batch tasks */
BATCH_ONLY;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager;
import org.apache.flink.runtime.StreamingMode;
/**
* The command line parameters passed to the TaskManager.
*/
public class JobManagerCliOptions {
private String configDir;
private JobManagerMode jobManagerMode;
private StreamingMode streamingMode = StreamingMode.BATCH_ONLY;
// ------------------------------------------------------------------------
public String getConfigDir() {
return configDir;
}
public void setConfigDir(String configDir) {
this.configDir = configDir;
}
public JobManagerMode getJobManagerMode() {
return jobManagerMode;
}
public void setJobManagerMode(String modeName) {
if (modeName.equalsIgnoreCase("cluster")) {
this.jobManagerMode = JobManagerMode.CLUSTER;
}
else if (modeName.equalsIgnoreCase("local")) {
this.jobManagerMode = JobManagerMode.LOCAL;
}
else {
throw new IllegalArgumentException(
"Unknown execution mode. Execution mode must be one of 'cluster' or 'local'.");
}
}
public StreamingMode getStreamingMode() {
return streamingMode;
}
public void setStreamingMode(String modeName) {
if (modeName.equalsIgnoreCase("streaming")) {
this.streamingMode = StreamingMode.STREAMING;
}
else if (modeName.equalsIgnoreCase("batch")) {
this.streamingMode = StreamingMode.BATCH_ONLY;
}
else {
throw new IllegalArgumentException(
"Unknown streaming mode. Streaming mode must be one of 'BATCH' or 'STREAMING'.");
}
}
}
......@@ -74,7 +74,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all service necessary for example to consume input data,
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
......
......@@ -16,11 +16,42 @@
* limitations under the License.
*/
package org.apache.flink.runtime.taskmanager
package org.apache.flink.runtime.taskmanager;
import org.apache.flink.runtime.StreamingMode;
/**
* Command line configuration object for the [[TaskManager]]
*
* @param configDir Path to configuration directory
* The command line parameters passed to the TaskManager.
*/
case class TaskManagerCLIConfiguration(configDir: String = null)
public class TaskManagerCliOptions {
private String configDir;
private StreamingMode mode = StreamingMode.BATCH_ONLY;
// ------------------------------------------------------------------------
public String getConfigDir() {
return configDir;
}
public void setConfigDir(String configDir) {
this.configDir = configDir;
}
public StreamingMode getMode() {
return mode;
}
public void setMode(String modeName) {
if (modeName.equalsIgnoreCase("streaming")) {
this.mode = StreamingMode.STREAMING;
}
else if (modeName.equalsIgnoreCase("batch")) {
this.mode = StreamingMode.BATCH_ONLY;
}
else {
throw new IllegalArgumentException("Mode must be one of 'BATCH' or 'STREAMING'.");
}
}
}
\ No newline at end of file
......@@ -21,7 +21,7 @@ import org.slf4j.Logger;
import sun.misc.Signal;
/**
* This signal handler / signal logger is based on Apache Hadoops org.apache.hadoop.util.SignalLogger.
* This signal handler / signal logger is based on Apache Hadoop's org.apache.hadoop.util.SignalLogger.
*/
public class SignalHandler {
private static boolean registered = false;
......
......@@ -45,7 +45,7 @@ import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.instance.InstanceManager
......@@ -99,7 +99,8 @@ class JobManager(protected val flinkConfiguration: Configuration,
protected val accumulatorManager: AccumulatorManager,
protected val defaultExecutionRetries: Int,
protected val delayBetweenRetries: Long,
protected val timeout: FiniteDuration)
protected val timeout: FiniteDuration,
protected val mode: StreamingMode)
extends Actor with ActorLogMessages with ActorSynchronousLogging {
/** List of current jobs running jobs */
......@@ -759,10 +760,11 @@ object JobManager {
val STARTUP_FAILURE_RETURN_CODE = 1
val RUNTIME_FAILURE_RETURN_CODE = 2
/** Name of the JobManager actor */
val JOB_MANAGER_NAME = "jobmanager"
val EVENT_COLLECTOR_NAME = "eventcollector"
/** Name of the archive actor */
val ARCHIVE_NAME = "archive"
val PROFILER_NAME = "profiler"
/**
......@@ -778,6 +780,7 @@ object JobManager {
// parsing the command line arguments
val (configuration: Configuration,
executionMode: JobManagerMode,
streamingMode: StreamingMode,
listeningHost: String, listeningPort: Int) =
try {
parseArgs(args)
......@@ -814,13 +817,15 @@ object JobManager {
LOG.info("Security is enabled. Starting secure JobManager.")
SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
override def run(): Unit = {
runJobManager(configuration, executionMode, listeningHost, listeningPort)
runJobManager(configuration, executionMode, streamingMode,
listeningHost, listeningPort)
}
})
}
else {
LOG.info("Security is not enabled. Starting non-authenticated JobManager.")
runJobManager(configuration, executionMode, listeningHost, listeningPort)
runJobManager(configuration, executionMode, streamingMode,
listeningHost, listeningPort)
}
}
catch {
......@@ -842,11 +847,13 @@ object JobManager {
* @param configuration The configuration object for the JobManager.
* @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an
* an additional TaskManager in the same process.
* @param streamingMode The streaming mode to run the system in (streaming vs. batch-only)
* @param listeningAddress The hostname where the JobManager should listen for messages.
* @param listeningPort The port where the JobManager should listen for messages.
*/
def runJobManager(configuration: Configuration,
executionMode: JobManagerMode,
streamingMode: StreamingMode,
listeningAddress: String,
listeningPort: Int) : Unit = {
......@@ -880,7 +887,8 @@ object JobManager {
try {
// bring up the job manager actor
LOG.info("Starting JobManager actor")
val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem)
val (jobManager, archiver) = startJobManagerActors(configuration,
jobManagerSystem, streamingMode)
// start a process reaper that watches the JobManager. If the JobManager actor dies,
// the process reaper will kill the JVM process (to ensure easy failure detection)
......@@ -898,7 +906,8 @@ object JobManager {
listeningAddress,
Some(TaskManager.TASK_MANAGER_NAME),
Some(jobManager.path.toString),
true, classOf[TaskManager])
true, streamingMode,
classOf[TaskManager])
LOG.debug("Starting TaskManager process reaper")
jobManagerSystem.actorOf(
......@@ -936,61 +945,59 @@ object JobManager {
* @param args command line arguments
* @return Quadruple of configuration, execution mode and an optional listening address
*/
def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = {
val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = {
val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") {
head("Flink JobManager")
opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text {
"The configuration directory." }
opt[String]("executionMode") action { (arg, c) =>
val argLower = arg.toLowerCase()
var result: JobManagerCLIConfiguration = null
for (mode <- JobManagerMode.values() if result == null) {
val modeName = mode.name().toLowerCase()
if (modeName.equals(argLower)) {
result = c.copy(executionMode = mode)
}
}
opt[String]("configDir") action { (arg, conf) =>
conf.setConfigDir(arg)
conf
} text {
"The configuration directory."
}
if (result == null) {
throw new Exception("Unknown execution mode: " + arg)
} else {
result
}
opt[String]("executionMode") action { (arg, conf) =>
conf.setJobManagerMode(arg)
conf
} text {
"The execution mode of the JobManager (CLUSTER / LOCAL)"
}
}
parser.parse(args, JobManagerCLIConfiguration()) map {
config =>
opt[String]("streamingMode").optional().action { (arg, conf) =>
conf.setStreamingMode(arg)
conf
} text {
"The streaming mode of the JobManager (STREAMING / BATCH)"
}
}
if (config.configDir == null) {
throw new Exception("Missing parameter '--configDir'")
}
if (config.executionMode == null) {
throw new Exception("Missing parameter '--executionMode'")
}
val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
throw new Exception(
s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
}
val configDir = config.getConfigDir()
if (configDir == null) {
throw new Exception("Missing parameter '--configDir'")
}
if (config.getJobManagerMode() == null) {
throw new Exception("Missing parameter '--executionMode'")
}
LOG.info("Loading configuration from " + config.configDir)
GlobalConfiguration.loadConfiguration(config.configDir)
val configuration = GlobalConfiguration.getConfiguration
LOG.info("Loading configuration from " + configDir)
GlobalConfiguration.loadConfiguration(configDir)
val configuration = GlobalConfiguration.getConfiguration()
if (config.configDir != null && new File(config.configDir).isDirectory) {
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..")
}
if (new File(configDir).isDirectory) {
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
}
val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
(configuration, config.executionMode, hostname, port)
} getOrElse {
throw new Exception("Invalid command line arguments: " + parser.usage)
}
(configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
}
/**
......@@ -1082,9 +1089,12 @@ object JobManager {
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(configuration: Configuration,
actorSystem: ActorSystem): (ActorRef, ActorRef) = {
actorSystem: ActorSystem,
streamingMode: StreamingMode): (ActorRef, ActorRef) = {
startJobManagerActors(configuration,actorSystem, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME))
startJobManagerActors(configuration, actorSystem,
Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME),
streamingMode)
}
/**
* Starts the JobManager and job archiver based on the given configuration, in the
......@@ -1096,18 +1106,21 @@ object JobManager {
* the actor will have the name generated by the actor system.
* @param archiverActorName Optionally the name of the archive actor. If none is given,
* the actor will have the name generated by the actor system.
* @param streamingMode The mode to run the system in (streaming vs. batch-only)
*
* @return A tuple of references (JobManager Ref, Archiver Ref)
*/
def startJobManagerActors(configuration: Configuration,
actorSystem: ActorSystem,
jobMangerActorName: Option[String],
archiverActorName: Option[String]): (ActorRef, ActorRef) = {
archiverActorName: Option[String],
streamingMode: StreamingMode): (ActorRef, ActorRef) = {
val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager,
executionRetries, delayBetweenRetries,
timeout, _) = createJobManagerComponents(configuration)
// start the archiver wither with the given name, or without (avoid name conflicts)
// start the archiver with the given name, or without (avoid name conflicts)
val archiver: ActorRef = archiverActorName match {
case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
case None => actorSystem.actorOf(archiveProps)
......@@ -1115,7 +1128,7 @@ object JobManager {
val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler,
libraryCacheManager, archiver, accumulatorManager, executionRetries,
delayBetweenRetries, timeout)
delayBetweenRetries, timeout, streamingMode)
val jobManager: ActorRef = jobMangerActorName match {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
......
......@@ -26,6 +26,7 @@ import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.Config
import org.apache.flink.api.common.JobSubmissionResult
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult}
import org.apache.flink.runtime.jobgraph.JobGraph
......@@ -44,10 +45,16 @@ import scala.concurrent.{Future, Await}
* @param userConfiguration Configuration object with the user provided configuration values
* @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
* [[ActorSystem]], otherwise false
* @param streamingMode True, if the system should be started in streaming mode, false if
* in pure batch mode.
*/
abstract class FlinkMiniCluster(val userConfiguration: Configuration,
val singleActorSystem: Boolean) {
val singleActorSystem: Boolean,
val streamingMode: StreamingMode) {
def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
// --------------------------------------------------------------------------
......
......@@ -18,18 +18,19 @@
package org.apache.flink.runtime.minicluster
import akka.actor.{ActorRef, ActorSystem}
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import org.apache.flink.api.common.io.FileOutputFormat
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.client.JobClient
import org.apache.flink.runtime.io.network.netty.NettyConfig
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory
import akka.actor.ExtendedActorSystem
/**
* Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
......@@ -41,9 +42,20 @@ import akka.actor.ExtendedActorSystem
* @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
* [[ActorSystem]], otherwise false
*/
class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
extends FlinkMiniCluster(userConfiguration, singleActorSystem) {
class LocalFlinkMiniCluster(userConfiguration: Configuration,
singleActorSystem: Boolean,
streamingMode: StreamingMode)
extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
def this(userConfiguration: Configuration, singleActorSystem: Boolean)
= this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
def this(userConfiguration: Configuration) = this(userConfiguration, true)
// --------------------------------------------------------------------------
val jobClientActorSystem = if (singleActorSystem) {
jobManagerActorSystem
} else {
......@@ -64,7 +76,9 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
override def startJobManager(system: ActorSystem): ActorRef = {
val config = configuration.clone()
val (jobManager, archiver) = JobManager.startJobManagerActors(config, system)
val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) {
val webServer = new WebInfoServer(configuration, jobManager, archiver)
webServer.start()
......@@ -103,12 +117,13 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem:
} else {
None
}
TaskManager.startTaskManagerComponentsAndActor(config, system,
HOSTNAME, // network interface to bind to
Some(taskManagerActorName), // actor name
jobManagerPath, // job manager akka URL
localExecution, // start network stack?
streamingMode,
classOf[TaskManager])
}
......
......@@ -37,7 +37,7 @@ import grizzled.slf4j.Logger
import org.apache.flink.configuration._
import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobService, BlobCache}
import org.apache.flink.runtime.broadcast.BroadcastVariableManager
......@@ -999,8 +999,8 @@ object TaskManager {
/** Return code for critical errors during the runtime */
val RUNTIME_FAILURE_RETURN_CODE = 2
/** The name of the TaskManager actor */
val TASK_MANAGER_NAME = "taskmanager"
val PROFILER_NAME = "profiler"
/** Maximum time (msecs) that the TaskManager will spend searching for a
* suitable network interface to use for communication */
......@@ -1033,7 +1033,8 @@ object TaskManager {
EnvironmentInformation.checkJavaVersion()
// try to parse the command line arguments
val configuration = try {
val (configuration: Configuration,
mode: StreamingMode) = try {
parseArgsAndLoadConfig(args)
}
catch {
......@@ -1050,13 +1051,13 @@ object TaskManager {
LOG.info("Security is enabled. Starting secure TaskManager.")
SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
override def run(): Unit = {
selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
}
})
}
else {
LOG.info("Security is not enabled. Starting non-authenticated TaskManager.")
selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager])
selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager])
}
}
catch {
......@@ -1074,31 +1075,44 @@ object TaskManager {
* @return The parsed configuration.
*/
@throws(classOf[Exception])
def parseArgsAndLoadConfig(args: Array[String]): Configuration = {
def parseArgsAndLoadConfig(args: Array[String]): (Configuration, StreamingMode) = {
// set up the command line parser
val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") {
head("flink task manager")
opt[String]("configDir") action { (x, c) =>
c.copy(configDir = x)
} text "Specify configuration directory."
val parser = new scopt.OptionParser[TaskManagerCliOptions]("TaskManager") {
head("Flink TaskManager")
opt[String]("configDir") action { (param, conf) =>
conf.setConfigDir(param)
conf
} text {
"Specify configuration directory."
}
opt[String]("streamingMode").optional().action { (param, conf) =>
conf.setMode(param)
conf
} text {
"The streaming mode of the JobManager (STREAMING / BATCH)"
}
}
// parse the CLI arguments
val cliConfig = parser.parse(args, TaskManagerCLIConfiguration()).getOrElse {
val cliConfig = parser.parse(args, new TaskManagerCliOptions()).getOrElse {
throw new Exception(
s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}")
}
// load the configuration
try {
LOG.info("Loading configuration from " + cliConfig.configDir)
GlobalConfiguration.loadConfiguration(cliConfig.configDir)
val conf: Configuration = try {
LOG.info("Loading configuration from " + cliConfig.getConfigDir())
GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir())
GlobalConfiguration.getConfiguration()
}
catch {
case e: Exception => throw new Exception("Could not load configuration", e)
}
(conf, cliConfig.getMode)
}
// --------------------------------------------------------------------------
......@@ -1120,11 +1134,13 @@ object TaskManager {
* (library cache, shuffle network stack, ...), and starts the TaskManager itself.
* @param configuration The configuration for the TaskManager.
* @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param taskManagerClass The actor class to instantiate.
* Allows to use TaskManager subclasses for example for YARN.
*/
@throws(classOf[Exception])
def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration,
streamingMode: StreamingMode,
taskManagerClass: Class[_ <: TaskManager]) : Unit = {
val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
......@@ -1132,7 +1148,8 @@ object TaskManager {
val (taskManagerHostname, actorSystemPort) =
selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort)
runTaskManager(taskManagerHostname, actorSystemPort, configuration, taskManagerClass)
runTaskManager(taskManagerHostname, actorSystemPort, configuration,
streamingMode, taskManagerClass)
}
@throws(classOf[IOException])
......@@ -1196,14 +1213,17 @@ object TaskManager {
* @param taskManagerHostname The hostname/address of the interface where the actor system
* will communicate.
* @param actorSystemPort The port at which the actor system will communicate.
* @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param configuration The configuration for the TaskManager.
*/
@throws(classOf[Exception])
def runTaskManager(taskManagerHostname: String,
actorSystemPort: Int,
configuration: Configuration) : Unit = {
actorSystemPort: Int,
configuration: Configuration,
streamingMode: StreamingMode) : Unit = {
runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager])
runTaskManager(taskManagerHostname, actorSystemPort, configuration,
streamingMode, classOf[TaskManager])
}
/**
......@@ -1218,6 +1238,7 @@ object TaskManager {
* will communicate.
* @param actorSystemPort The port at which the actor system will communicate.
* @param configuration The configuration for the TaskManager.
* @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager
* subclasses for example for YARN.
*/
......@@ -1225,6 +1246,7 @@ object TaskManager {
def runTaskManager(taskManagerHostname: String,
actorSystemPort: Int,
configuration: Configuration,
streamingMode: StreamingMode,
taskManagerClass: Class[_ <: TaskManager]) : Unit = {
LOG.info("Starting TaskManager")
......@@ -1264,6 +1286,7 @@ object TaskManager {
taskManagerHostname,
Some(TASK_MANAGER_NAME),
None, false,
streamingMode,
taskManagerClass)
// start a process reaper that watches the JobManager. If the JobManager actor dies,
......@@ -1317,6 +1340,7 @@ object TaskManager {
* JobManager hostname an port specified in the configuration.
* @param localTaskManagerCommunication If true, the TaskManager will not initiate the
* TCP network stack.
* @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
* @param taskManagerClass The class of the TaskManager actor. May be used to give
* subclasses that understand additional actor messages.
*
......@@ -1339,6 +1363,7 @@ object TaskManager {
taskManagerActorName: Option[String],
jobManagerPath: Option[String],
localTaskManagerCommunication: Boolean,
streamingMode: StreamingMode,
taskManagerClass: Class[_ <: TaskManager]): ActorRef = {
// get and check the JobManager config
......@@ -1391,13 +1416,15 @@ object TaskManager {
relativeMemSize
}
val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY
// now start the memory manager
val memoryManager = try {
new DefaultMemoryManager(memorySize,
taskManagerConfig.numberOfSlots,
netConfig.networkBufferSize,
true)
preAllocateMemory)
}
catch {
case e: OutOfMemoryError => throw new Exception(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册