diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh index 273cdd37d0568554a5de1b4b27f1dd1696bad8da..0f1e4fad57399a803f1d7034f16da59eaf9f59db 100755 --- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh @@ -20,6 +20,7 @@ STARTSTOP=$1 EXECUTIONMODE=$2 +STREAMINGMODE=$3 bin=`dirname "$0"` bin=`cd "$bin"; pwd` @@ -80,7 +81,7 @@ case $STARTSTOP in rotateLogFile $out echo "Starting Job Manager" - $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --executionMode $EXECUTIONMODE --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null & + $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" --executionMode $EXECUTIONMODE --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null & echo $! > $pid ;; @@ -99,7 +100,7 @@ case $STARTSTOP in ;; (*) - echo "Please specify 'start (cluster|local)' or stop" + echo "Please specify 'start (cluster|local) [batch|streaming]' or 'stop'" ;; esac diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh new file mode 100755 index 0000000000000000000000000000000000000000..86a87cd5b708a75bef9742613e28e766ba9f7f66 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +HOSTLIST=$FLINK_SLAVES + +if [ "$HOSTLIST" = "" ]; then + HOSTLIST="${FLINK_CONF_DIR}/slaves" +fi + +if [ ! -f "$HOSTLIST" ]; then + echo $HOSTLIST is not a valid slave list + exit 1 +fi + +# cluster mode, bring up job manager locally and a task manager on every slave host +"$FLINK_BIN_DIR"/jobmanager.sh start cluster streaming + +GOON=true +while $GOON +do + read line || GOON=false + if [ -n "$line" ]; then + HOST=$( extractHostName $line) + ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start streaming &" + fi +done < "$HOSTLIST" diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh b/flink-dist/src/main/flink-bin/bin/start-cluster.sh index db65032e57169b30eeb60df7730e1ae5e916a4df..666edeb6e998b1e855f92b949e62492e5d5d05fb 100755 --- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh +++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh @@ -43,6 +43,6 @@ do read line || GOON=false if [ -n "$line" ]; then HOST=$( extractHostName $line) - ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start &" + ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l $FLINK_BIN_DIR/taskmanager.sh start batch &" fi done < "$HOSTLIST" diff --git a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh new file mode 100755 index 0000000000000000000000000000000000000000..2cb4d4a41cf536ee01494555325f3557f04291bf --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +# local mode, only bring up job manager. The job manager will start an internal task manager +"$FLINK_BIN_DIR"/jobmanager.sh start local streaming diff --git a/flink-dist/src/main/flink-bin/bin/start-local.bat b/flink-dist/src/main/flink-bin/bin/start-local.bat index 386a6311d8e49e7ec72dbaa4d8799baa641290ab..202c7d9b4eb86d8fafdd4ceaf8155c6b751b64dc 100644 --- a/flink-dist/src/main/flink-bin/bin/start-local.bat +++ b/flink-dist/src/main/flink-bin/bin/start-local.bat @@ -57,6 +57,6 @@ if not defined FOUND ( echo Starting Flink job manager. Webinterface by default on http://localhost:8081/. echo Don't close this batch window. Stop job manager by pressing Ctrl+C. -java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --executionMode local --configDir "%FLINK_CONF_DIR%" > "%out%" 2>&1 +java %JVM_ARGS% %log_setting% -cp "%FLINK_JM_CLASSPATH%"; org.apache.flink.runtime.jobmanager.JobManager --configDir "%FLINK_CONF_DIR%" --executionMode local --streamingMode batch > "%out%" 2>&1 endlocal diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh b/flink-dist/src/main/flink-bin/bin/start-local.sh index f382763b1ec9aa7b47342867453b3e033a106c06..7ea3ff400f90c7a65ee41a9b7a3d671d64c0c976 100755 --- a/flink-dist/src/main/flink-bin/bin/start-local.sh +++ b/flink-dist/src/main/flink-bin/bin/start-local.sh @@ -24,4 +24,4 @@ bin=`cd "$bin"; pwd` . "$bin"/config.sh # local mode, only bring up job manager. The job manager will start an internal task manager -"$FLINK_BIN_DIR"/jobmanager.sh start local +"$FLINK_BIN_DIR"/jobmanager.sh start local batch diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index 557ea2b8a279ba6aa89e4271c0cd6da7e7788a56..a99d39d0f40ea3e514b5275d6ec1d008cdf5cd11 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -19,6 +19,7 @@ STARTSTOP=$1 +STREAMINGMODE=$2 bin=`dirname "$0"` bin=`cd "$bin"; pwd` @@ -68,7 +69,7 @@ case $STARTSTOP in rotateLogFile $out echo Starting task manager on host $HOSTNAME - $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" > "$out" 2>&1 < /dev/null & + $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null & echo $! > $pid ;; @@ -87,7 +88,7 @@ case $STARTSTOP in ;; (*) - echo Please specify start or stop + echo "Please specify 'start [batch|streaming]' or 'stop'" ;; esac diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java similarity index 65% rename from flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala rename to flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java index 4cd02c5e72ce7c5ff780f120c9085d0583bb583c..bdcbcf932e959ce2bd7ccc8776f9be4cc2184c13 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/StreamingMode.java @@ -16,13 +16,19 @@ * limitations under the License. */ -package org.apache.flink.runtime.jobmanager +package org.apache.flink.runtime; /** - * Holder for command line parameters of the JobManager. - * - * @param configDir The directory to load the configuration from. - * @param executionMode Mode for the JobManager. + * The streaming mode defines whether the system starts in streaming mode, + * or in pure batch mode. Note that streaming mode can execute batch programs + * as well. */ -case class JobManagerCLIConfiguration(configDir: String = null, - executionMode: JobManagerMode = null) {} +public enum StreamingMode { + + /** This mode indicates the system can run streaming tasks, of which batch + * tasks are a special case. */ + STREAMING, + + /** This mode indicates that the system can run only batch tasks */ + BATCH_ONLY; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java new file mode 100644 index 0000000000000000000000000000000000000000..988e3a7756b55b9399f97150c6078cfdaf107980 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.runtime.StreamingMode; + +/** + * The command line parameters passed to the TaskManager. + */ +public class JobManagerCliOptions { + + private String configDir; + + private JobManagerMode jobManagerMode; + + private StreamingMode streamingMode = StreamingMode.BATCH_ONLY; + + // ------------------------------------------------------------------------ + + public String getConfigDir() { + return configDir; + } + + public void setConfigDir(String configDir) { + this.configDir = configDir; + } + + public JobManagerMode getJobManagerMode() { + return jobManagerMode; + } + + public void setJobManagerMode(String modeName) { + if (modeName.equalsIgnoreCase("cluster")) { + this.jobManagerMode = JobManagerMode.CLUSTER; + } + else if (modeName.equalsIgnoreCase("local")) { + this.jobManagerMode = JobManagerMode.LOCAL; + } + else { + throw new IllegalArgumentException( + "Unknown execution mode. Execution mode must be one of 'cluster' or 'local'."); + } + } + + public StreamingMode getStreamingMode() { + return streamingMode; + } + + public void setStreamingMode(String modeName) { + if (modeName.equalsIgnoreCase("streaming")) { + this.streamingMode = StreamingMode.STREAMING; + } + else if (modeName.equalsIgnoreCase("batch")) { + this.streamingMode = StreamingMode.BATCH_ONLY; + } + else { + throw new IllegalArgumentException( + "Unknown streaming mode. Streaming mode must be one of 'BATCH' or 'STREAMING'."); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 51ce91f3821c73464e04d44e78f580c181af8921..40198dc94f4a8471f6e4ad56e3f342a7707568de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -74,7 +74,7 @@ import static com.google.common.base.Preconditions.checkNotNull; /** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and - * runs it, providing all service necessary for example to consume input data, + * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java new file mode 100644 index 0000000000000000000000000000000000000000..a648cafa12fcb1d865fed794bd16050247d2bbe8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerCliOptions.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.runtime.StreamingMode; + +/** + * The command line parameters passed to the TaskManager. + */ +public class TaskManagerCliOptions { + + private String configDir; + + private StreamingMode mode = StreamingMode.BATCH_ONLY; + + // ------------------------------------------------------------------------ + + public String getConfigDir() { + return configDir; + } + + public void setConfigDir(String configDir) { + this.configDir = configDir; + } + + public StreamingMode getMode() { + return mode; + } + + public void setMode(String modeName) { + if (modeName.equalsIgnoreCase("streaming")) { + this.mode = StreamingMode.STREAMING; + } + else if (modeName.equalsIgnoreCase("batch")) { + this.mode = StreamingMode.BATCH_ONLY; + } + else { + throw new IllegalArgumentException("Mode must be one of 'BATCH' or 'STREAMING'."); + } + } +} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java index 546e14231d49c14cf312314a9b2f9e999004eadb..bcd3dc4a84789b41388131f9546f1f8dc57849da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java @@ -21,7 +21,7 @@ import org.slf4j.Logger; import sun.misc.Signal; /** - * This signal handler / signal logger is based on Apache Hadoops org.apache.hadoop.util.SignalLogger. + * This signal handler / signal logger is based on Apache Hadoop's org.apache.hadoop.util.SignalLogger. */ public class SignalHandler { private static boolean registered = false; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0c71938aeba8681a42e5c68eb2e073e3a8b80645..ba819ca56b0b223dc721cfe908f92e02494ed6a3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -45,7 +45,7 @@ import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation} -import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} +import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager @@ -99,7 +99,8 @@ class JobManager(protected val flinkConfiguration: Configuration, protected val accumulatorManager: AccumulatorManager, protected val defaultExecutionRetries: Int, protected val delayBetweenRetries: Long, - protected val timeout: FiniteDuration) + protected val timeout: FiniteDuration, + protected val mode: StreamingMode) extends Actor with ActorLogMessages with ActorSynchronousLogging { /** List of current jobs running jobs */ @@ -759,10 +760,11 @@ object JobManager { val STARTUP_FAILURE_RETURN_CODE = 1 val RUNTIME_FAILURE_RETURN_CODE = 2 + /** Name of the JobManager actor */ val JOB_MANAGER_NAME = "jobmanager" - val EVENT_COLLECTOR_NAME = "eventcollector" + + /** Name of the archive actor */ val ARCHIVE_NAME = "archive" - val PROFILER_NAME = "profiler" /** @@ -778,6 +780,7 @@ object JobManager { // parsing the command line arguments val (configuration: Configuration, executionMode: JobManagerMode, + streamingMode: StreamingMode, listeningHost: String, listeningPort: Int) = try { parseArgs(args) @@ -814,13 +817,15 @@ object JobManager { LOG.info("Security is enabled. Starting secure JobManager.") SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { override def run(): Unit = { - runJobManager(configuration, executionMode, listeningHost, listeningPort) + runJobManager(configuration, executionMode, streamingMode, + listeningHost, listeningPort) } }) } else { LOG.info("Security is not enabled. Starting non-authenticated JobManager.") - runJobManager(configuration, executionMode, listeningHost, listeningPort) + runJobManager(configuration, executionMode, streamingMode, + listeningHost, listeningPort) } } catch { @@ -842,11 +847,13 @@ object JobManager { * @param configuration The configuration object for the JobManager. * @param executionMode The execution mode in which to run. Execution mode LOCAL will spawn an * an additional TaskManager in the same process. + * @param streamingMode The streaming mode to run the system in (streaming vs. batch-only) * @param listeningAddress The hostname where the JobManager should listen for messages. * @param listeningPort The port where the JobManager should listen for messages. */ def runJobManager(configuration: Configuration, executionMode: JobManagerMode, + streamingMode: StreamingMode, listeningAddress: String, listeningPort: Int) : Unit = { @@ -880,7 +887,8 @@ object JobManager { try { // bring up the job manager actor LOG.info("Starting JobManager actor") - val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem) + val (jobManager, archiver) = startJobManagerActors(configuration, + jobManagerSystem, streamingMode) // start a process reaper that watches the JobManager. If the JobManager actor dies, // the process reaper will kill the JVM process (to ensure easy failure detection) @@ -898,7 +906,8 @@ object JobManager { listeningAddress, Some(TaskManager.TASK_MANAGER_NAME), Some(jobManager.path.toString), - true, classOf[TaskManager]) + true, streamingMode, + classOf[TaskManager]) LOG.debug("Starting TaskManager process reaper") jobManagerSystem.actorOf( @@ -936,61 +945,59 @@ object JobManager { * @param args command line arguments * @return Quadruple of configuration, execution mode and an optional listening address */ - def parseArgs(args: Array[String]): (Configuration, JobManagerMode, String, Int) = { - val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") { + def parseArgs(args: Array[String]): (Configuration, JobManagerMode, StreamingMode, String, Int) = { + val parser = new scopt.OptionParser[JobManagerCliOptions]("JobManager") { head("Flink JobManager") - opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text { - "The configuration directory." } - - opt[String]("executionMode") action { (arg, c) => - val argLower = arg.toLowerCase() - var result: JobManagerCLIConfiguration = null - - for (mode <- JobManagerMode.values() if result == null) { - val modeName = mode.name().toLowerCase() - - if (modeName.equals(argLower)) { - result = c.copy(executionMode = mode) - } - } + opt[String]("configDir") action { (arg, conf) => + conf.setConfigDir(arg) + conf + } text { + "The configuration directory." + } - if (result == null) { - throw new Exception("Unknown execution mode: " + arg) - } else { - result - } + opt[String]("executionMode") action { (arg, conf) => + conf.setJobManagerMode(arg) + conf } text { "The execution mode of the JobManager (CLUSTER / LOCAL)" } - } - parser.parse(args, JobManagerCLIConfiguration()) map { - config => + opt[String]("streamingMode").optional().action { (arg, conf) => + conf.setStreamingMode(arg) + conf + } text { + "The streaming mode of the JobManager (STREAMING / BATCH)" + } + } - if (config.configDir == null) { - throw new Exception("Missing parameter '--configDir'") - } - if (config.executionMode == null) { - throw new Exception("Missing parameter '--executionMode'") - } + val config = parser.parse(args, new JobManagerCliOptions()).getOrElse { + throw new Exception( + s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}") + } + + val configDir = config.getConfigDir() + + if (configDir == null) { + throw new Exception("Missing parameter '--configDir'") + } + if (config.getJobManagerMode() == null) { + throw new Exception("Missing parameter '--executionMode'") + } - LOG.info("Loading configuration from " + config.configDir) - GlobalConfiguration.loadConfiguration(config.configDir) - val configuration = GlobalConfiguration.getConfiguration + LOG.info("Loading configuration from " + configDir) + GlobalConfiguration.loadConfiguration(configDir) + val configuration = GlobalConfiguration.getConfiguration() - if (config.configDir != null && new File(config.configDir).isDirectory) { - configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, config.configDir + "/..") - } + if (new File(configDir).isDirectory) { + configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..") + } - val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) - val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - (configuration, config.executionMode, hostname, port) - } getOrElse { - throw new Exception("Invalid command line arguments: " + parser.usage) - } + (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port) } /** @@ -1082,9 +1089,12 @@ object JobManager { * @return A tuple of references (JobManager Ref, Archiver Ref) */ def startJobManagerActors(configuration: Configuration, - actorSystem: ActorSystem): (ActorRef, ActorRef) = { + actorSystem: ActorSystem, + streamingMode: StreamingMode): (ActorRef, ActorRef) = { - startJobManagerActors(configuration,actorSystem, Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME)) + startJobManagerActors(configuration, actorSystem, + Some(JOB_MANAGER_NAME), Some(ARCHIVE_NAME), + streamingMode) } /** * Starts the JobManager and job archiver based on the given configuration, in the @@ -1096,18 +1106,21 @@ object JobManager { * the actor will have the name generated by the actor system. * @param archiverActorName Optionally the name of the archive actor. If none is given, * the actor will have the name generated by the actor system. + * @param streamingMode The mode to run the system in (streaming vs. batch-only) + * * @return A tuple of references (JobManager Ref, Archiver Ref) */ def startJobManagerActors(configuration: Configuration, actorSystem: ActorSystem, jobMangerActorName: Option[String], - archiverActorName: Option[String]): (ActorRef, ActorRef) = { + archiverActorName: Option[String], + streamingMode: StreamingMode): (ActorRef, ActorRef) = { val (instanceManager, scheduler, libraryCacheManager, archiveProps, accumulatorManager, executionRetries, delayBetweenRetries, timeout, _) = createJobManagerComponents(configuration) - // start the archiver wither with the given name, or without (avoid name conflicts) + // start the archiver with the given name, or without (avoid name conflicts) val archiver: ActorRef = archiverActorName match { case Some(actorName) => actorSystem.actorOf(archiveProps, actorName) case None => actorSystem.actorOf(archiveProps) @@ -1115,7 +1128,7 @@ object JobManager { val jobManagerProps = Props(classOf[JobManager], configuration, instanceManager, scheduler, libraryCacheManager, archiver, accumulatorManager, executionRetries, - delayBetweenRetries, timeout) + delayBetweenRetries, timeout, streamingMode) val jobManager: ActorRef = jobMangerActorName match { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 8a6c39461e4b63f66fc2a9a0ff3b1d1e44b9cfc0..edc12a1176ec00799fa0015f0944c2ecb7ed867c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -26,6 +26,7 @@ import akka.actor.{ActorRef, ActorSystem} import com.typesafe.config.Config import org.apache.flink.api.common.JobSubmissionResult import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.{JobExecutionException, JobClient, SerializedJobExecutionResult} import org.apache.flink.runtime.jobgraph.JobGraph @@ -44,10 +45,16 @@ import scala.concurrent.{Future, Await} * @param userConfiguration Configuration object with the user provided configuration values * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same * [[ActorSystem]], otherwise false + * @param streamingMode True, if the system should be started in streaming mode, false if + * in pure batch mode. */ abstract class FlinkMiniCluster(val userConfiguration: Configuration, - val singleActorSystem: Boolean) { + val singleActorSystem: Boolean, + val streamingMode: StreamingMode) { + def this(userConfiguration: Configuration, singleActorSystem: Boolean) + = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) + protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster]) // -------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index e2d7cc18c388413ae63a6cae4bf9c7cac4a65e3c..663307db3179a8c121eceec71feeecd1ce9e09a3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -18,18 +18,19 @@ package org.apache.flink.runtime.minicluster -import akka.actor.{ActorRef, ActorSystem} +import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} + import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.StreamingMode import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.io.network.netty.NettyConfig import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation + import org.slf4j.LoggerFactory -import akka.actor.ExtendedActorSystem /** * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same @@ -41,9 +42,20 @@ import akka.actor.ExtendedActorSystem * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same * [[ActorSystem]], otherwise false */ -class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true) - extends FlinkMiniCluster(userConfiguration, singleActorSystem) { - +class LocalFlinkMiniCluster(userConfiguration: Configuration, + singleActorSystem: Boolean, + streamingMode: StreamingMode) + extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) { + + + def this(userConfiguration: Configuration, singleActorSystem: Boolean) + = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY) + + def this(userConfiguration: Configuration) = this(userConfiguration, true) + + // -------------------------------------------------------------------------- + + val jobClientActorSystem = if (singleActorSystem) { jobManagerActorSystem } else { @@ -64,7 +76,9 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: override def startJobManager(system: ActorSystem): ActorRef = { val config = configuration.clone() - val (jobManager, archiver) = JobManager.startJobManagerActors(config, system) + + val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode) + if (config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false)) { val webServer = new WebInfoServer(configuration, jobManager, archiver) webServer.start() @@ -103,12 +117,13 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: } else { None } - + TaskManager.startTaskManagerComponentsAndActor(config, system, HOSTNAME, // network interface to bind to Some(taskManagerActorName), // actor name jobManagerPath, // job manager akka URL localExecution, // start network stack? + streamingMode, classOf[TaskManager]) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a95b5cb18ed7423eb9318133e0b5b2c2819b2a8c..8a45fa426b8fbbb93dbf49576875904c98eb5b00 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -37,7 +37,7 @@ import grizzled.slf4j.Logger import org.apache.flink.configuration._ import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage} -import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages} +import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.{BlobService, BlobCache} import org.apache.flink.runtime.broadcast.BroadcastVariableManager @@ -999,8 +999,8 @@ object TaskManager { /** Return code for critical errors during the runtime */ val RUNTIME_FAILURE_RETURN_CODE = 2 + /** The name of the TaskManager actor */ val TASK_MANAGER_NAME = "taskmanager" - val PROFILER_NAME = "profiler" /** Maximum time (msecs) that the TaskManager will spend searching for a * suitable network interface to use for communication */ @@ -1033,7 +1033,8 @@ object TaskManager { EnvironmentInformation.checkJavaVersion() // try to parse the command line arguments - val configuration = try { + val (configuration: Configuration, + mode: StreamingMode) = try { parseArgsAndLoadConfig(args) } catch { @@ -1050,13 +1051,13 @@ object TaskManager { LOG.info("Security is enabled. Starting secure TaskManager.") SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] { override def run(): Unit = { - selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager]) + selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager]) } }) } else { LOG.info("Security is not enabled. Starting non-authenticated TaskManager.") - selectNetworkInterfaceAndRunTaskManager(configuration, classOf[TaskManager]) + selectNetworkInterfaceAndRunTaskManager(configuration, mode, classOf[TaskManager]) } } catch { @@ -1074,31 +1075,44 @@ object TaskManager { * @return The parsed configuration. */ @throws(classOf[Exception]) - def parseArgsAndLoadConfig(args: Array[String]): Configuration = { - + def parseArgsAndLoadConfig(args: Array[String]): (Configuration, StreamingMode) = { + // set up the command line parser - val parser = new scopt.OptionParser[TaskManagerCLIConfiguration]("taskmanager") { - head("flink task manager") - opt[String]("configDir") action { (x, c) => - c.copy(configDir = x) - } text "Specify configuration directory." + val parser = new scopt.OptionParser[TaskManagerCliOptions]("TaskManager") { + head("Flink TaskManager") + + opt[String]("configDir") action { (param, conf) => + conf.setConfigDir(param) + conf + } text { + "Specify configuration directory." + } + + opt[String]("streamingMode").optional().action { (param, conf) => + conf.setMode(param) + conf + } text { + "The streaming mode of the JobManager (STREAMING / BATCH)" + } } // parse the CLI arguments - val cliConfig = parser.parse(args, TaskManagerCLIConfiguration()).getOrElse { + val cliConfig = parser.parse(args, new TaskManagerCliOptions()).getOrElse { throw new Exception( s"Invalid command line agruments: ${args.mkString(" ")}. Usage: ${parser.usage}") } // load the configuration - try { - LOG.info("Loading configuration from " + cliConfig.configDir) - GlobalConfiguration.loadConfiguration(cliConfig.configDir) + val conf: Configuration = try { + LOG.info("Loading configuration from " + cliConfig.getConfigDir()) + GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir()) GlobalConfiguration.getConfiguration() } catch { case e: Exception => throw new Exception("Could not load configuration", e) } + + (conf, cliConfig.getMode) } // -------------------------------------------------------------------------- @@ -1120,11 +1134,13 @@ object TaskManager { * (library cache, shuffle network stack, ...), and starts the TaskManager itself. * @param configuration The configuration for the TaskManager. + * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only) * @param taskManagerClass The actor class to instantiate. * Allows to use TaskManager subclasses for example for YARN. */ @throws(classOf[Exception]) def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration, + streamingMode: StreamingMode, taskManagerClass: Class[_ <: TaskManager]) : Unit = { val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration) @@ -1132,7 +1148,8 @@ object TaskManager { val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort) - runTaskManager(taskManagerHostname, actorSystemPort, configuration, taskManagerClass) + runTaskManager(taskManagerHostname, actorSystemPort, configuration, + streamingMode, taskManagerClass) } @throws(classOf[IOException]) @@ -1196,14 +1213,17 @@ object TaskManager { * @param taskManagerHostname The hostname/address of the interface where the actor system * will communicate. * @param actorSystemPort The port at which the actor system will communicate. + * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only) * @param configuration The configuration for the TaskManager. */ @throws(classOf[Exception]) def runTaskManager(taskManagerHostname: String, - actorSystemPort: Int, - configuration: Configuration) : Unit = { + actorSystemPort: Int, + configuration: Configuration, + streamingMode: StreamingMode) : Unit = { - runTaskManager(taskManagerHostname, actorSystemPort, configuration, classOf[TaskManager]) + runTaskManager(taskManagerHostname, actorSystemPort, configuration, + streamingMode, classOf[TaskManager]) } /** @@ -1218,6 +1238,7 @@ object TaskManager { * will communicate. * @param actorSystemPort The port at which the actor system will communicate. * @param configuration The configuration for the TaskManager. + * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only) * @param taskManagerClass The actor class to instantiate. Allows the use of TaskManager * subclasses for example for YARN. */ @@ -1225,6 +1246,7 @@ object TaskManager { def runTaskManager(taskManagerHostname: String, actorSystemPort: Int, configuration: Configuration, + streamingMode: StreamingMode, taskManagerClass: Class[_ <: TaskManager]) : Unit = { LOG.info("Starting TaskManager") @@ -1264,6 +1286,7 @@ object TaskManager { taskManagerHostname, Some(TASK_MANAGER_NAME), None, false, + streamingMode, taskManagerClass) // start a process reaper that watches the JobManager. If the JobManager actor dies, @@ -1317,6 +1340,7 @@ object TaskManager { * JobManager hostname an port specified in the configuration. * @param localTaskManagerCommunication If true, the TaskManager will not initiate the * TCP network stack. + * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only) * @param taskManagerClass The class of the TaskManager actor. May be used to give * subclasses that understand additional actor messages. * @@ -1339,6 +1363,7 @@ object TaskManager { taskManagerActorName: Option[String], jobManagerPath: Option[String], localTaskManagerCommunication: Boolean, + streamingMode: StreamingMode, taskManagerClass: Class[_ <: TaskManager]): ActorRef = { // get and check the JobManager config @@ -1391,13 +1416,15 @@ object TaskManager { relativeMemSize } + + val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY // now start the memory manager val memoryManager = try { new DefaultMemoryManager(memorySize, taskManagerConfig.numberOfSlots, netConfig.networkBufferSize, - true) + preAllocateMemory) } catch { case e: OutOfMemoryError => throw new Exception( diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala deleted file mode 100644 index 5c71f5e0c5897ba980f3e76a80457241f1ee2375..0000000000000000000000000000000000000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerCLIConfiguration.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager - -/** - * Command line configuration object for the [[TaskManager]] - * - * @param configDir Path to configuration directory - */ -case class TaskManagerCLIConfiguration(configDir: String = null)