[yarn] Major YARN-Client improvements

- HDFS security token support
- "ship/" directory to transfer files to all TaskManagers (user-files)
- Log4j-based logging (YARN now respects the logging configuration)
- the YARN-client deletes all "temp" files from HDFS.
- The JVMs started by YARN now respect the configured JVM opts in the yaml-file
- the JobManager webinterface shows the log file (e.g it is aware of the YARN-specific log-directory)
- The YARN-client now creates a hidden ".yarn-jobmanager" with the address of the JobManager in YARN. users do not have to specify the -m argument anymore.
- Fix a little bug with the JobManager's "cloud" model for taskManager with less that 1GB memory.
- Tested on Cloudera Hadoop 5 Beta 2
- Tested on Cloduera Hadoop 5 Beta 2 WITH CDH5-B2 Maven Dependencies.
- Tested on Hadoop 2.2.0
- Tested on Amazon EMR
......@@ -38,5 +38,11 @@
......@@ -19,8 +19,10 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
......@@ -30,6 +32,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
......@@ -45,6 +50,8 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.base.Preconditions;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.jobmanager.JobManager;
......@@ -77,26 +84,35 @@ public class ApplicationMaster {
public static void main(String[] args) throws Exception {
// Initialize clients to ResourceManager and NodeManagers
Configuration conf = Utils.initializeYarnConfiguration();
FileSystem fs = FileSystem.get(conf);
Map<String, String> envs = System.getenv();
final String currDir = envs.get(Environment.PWD.key());
final String logDirs = envs.get(Environment.LOG_DIRS.key());
final String ownHostname = envs.get(Environment.NM_HOST.key());
final String appId = envs.get(Client.ENV_APP_ID);
final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
final String remoteStratosphereJarPath = envs.get(Client.STRATOSPHERE_JAR_PATH);
final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
final int heapLimit = (int)((float)memoryPerTaskManager*0.7);
if(currDir == null) throw new RuntimeException("Current directory unknown");
if(ownHostname == null) throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
if(currDir == null) {
throw new RuntimeException("Current directory unknown");
if(ownHostname == null) {
throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
LOG.info("Working directory "+currDir);
// load Stratosphere configuration.
final String localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
......@@ -119,7 +135,10 @@ public class ApplicationMaster {
// just to make sure.
output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
if(localDirs != null) output.append(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY+": "+localDirs+"\n");
output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
if(localDirs != null) {
output.append(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY+": "+localDirs+"\n");
File newConf = new File(currDir+"/stratosphere-conf-modified.yaml");
......@@ -169,31 +188,58 @@ public class ApplicationMaster {
// register Stratosphere Jar with remote HDFS
final Path remoteJarPath = new Path(remoteStratosphereJarPath);
Utils.registerLocalResource(fs, remoteJarPath, stratosphereJar);
// Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/stratosphere.jar"), stratosphereJar);
// register conf with local fs.
Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/stratosphere-conf-modified.yaml"), stratosphereConf);
Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/stratosphere-conf-modified.yaml"), stratosphereConf, new Path(clientHomeDir));
LOG.info("Prepared localresource for modified yaml: "+stratosphereConf);
boolean hasLog4j = new File(currDir+"/log4j.properties").exists();
// prepare the files to ship
LocalResource[] remoteShipRsc = null;
String[] remoteShipPaths = shipListString.split(",");
if(!shipListString.isEmpty()) {
remoteShipRsc = new LocalResource[remoteShipPaths.length];
{ // scope for i
int i = 0;
for(String remoteShipPathStr : remoteShipPaths) {
if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
remoteShipRsc[i] = Records.newRecord(LocalResource.class);
Path remoteShipPath = new Path(remoteShipPathStr);
Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
// respect custom JVM options in the YAML file
final String javaOpts = GlobalConfiguration.getString(ConfigConstants.STRATOSPHERE_JVM_OPTIONS, "");
// Obtain allocated containers and launch
int allocatedContainers = 0;
int completedContainers = 0;
while (allocatedContainers < taskManagerCount) {
AllocateResponse response = rmClient.allocate(0);
for (Container container : response.getAllocatedContainers()) {
LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
// Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m "
+ " eu.stratosphere.nephele.taskmanager.TaskManager -configDir . "
String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
if(hasLog4j) {
tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
tmCommand += " eu.stratosphere.nephele.taskmanager.TaskManager -configDir . "
+ " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stdout"
+ "/taskmanager-stdout.log"
+ " 2>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stderr";
+ "/taskmanager-stderr.log";
LOG.info("Starting TM with command="+tmCommand);
......@@ -203,19 +249,42 @@ public class ApplicationMaster {
localResources.put("stratosphere.jar", stratosphereJar);
localResources.put("stratosphere-conf.yaml", stratosphereConf);
// add ship resources
if(!shipListString.isEmpty()) {
for( int i = 0; i < remoteShipPaths.length; i++) {
localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
// Setup CLASSPATH for Container (=TaskTracker)
Map<String, String> containerEnv = new HashMap<String, String>();
Utils.setupEnv(conf, containerEnv); //add stratosphere.jar to class path.
UserGroupInformation user = UserGroupInformation.getCurrentUser();
try {
Credentials credentials = user.getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
0, dob.getLength());
} catch (IOException e) {
LOG.warn("Getting current user info failed when trying to launch the container"
+ e.getMessage());
LOG.info("Launching container " + allocatedContainers);
nmClient.startContainer(container, ctx);
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
LOG.info("Diagnostics "+status.getDiagnostics());
......@@ -228,10 +297,11 @@ public class ApplicationMaster {
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
LOG.info("Diagnostics "+status.getDiagnostics());
LOG.info("Shutting down JobManager");
......@@ -20,6 +20,9 @@ import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
......@@ -38,6 +41,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
......@@ -67,6 +73,8 @@ import eu.stratosphere.configuration.GlobalConfiguration;
* https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
* and
* https://github.com/hortonworks/simple-yarn-app
* and
* https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
* The Stratosphere jar is uploaded to HDFS by this client.
* The application master and all the TaskManager containers get the jar file downloaded
......@@ -84,7 +92,8 @@ public class Client {
private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
private static final Option STRATOSPHERE_CONF = new Option("c","conf",true, "Path to Stratosphere configuration file");
private static final Option SHIP_PATH = new Option("s","ship",true, "Ship files in the specified directory");
private static final Option STRATOSPHERE_CONF_DIR = new Option("c","confDir",true, "Path to Stratosphere configuration directory");
private static final Option STRATOSPHERE_JAR = new Option("j","jar",true, "Path to Stratosphere jar file");
private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
......@@ -101,17 +110,21 @@ public class Client {
public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
public final static String ENV_APP_ID = "_APP_ID";
public final static String STRATOSPHERE_JAR_PATH = "_STRATOSPHERE_JAR_PATH"; // the stratosphere jar resource location (in HDFS).
public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
private static final String CONFIG_FILE_NAME = "stratosphere-conf.yaml";
private Configuration conf;
public void run(String[] args) throws Exception {
// Command Line Options
Options options = new Options();
......@@ -120,6 +133,7 @@ public class Client {
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
......@@ -167,9 +181,15 @@ public class Client {
// Conf Path
Path confPath = null;
if(cmd.hasOption(STRATOSPHERE_CONF.getOpt())) {
confPath = new Path(cmd.getOptionValue(STRATOSPHERE_CONF.getOpt()));
String confDirPath = "";
if(cmd.hasOption(STRATOSPHERE_CONF_DIR.getOpt())) {
confDirPath = cmd.getOptionValue(STRATOSPHERE_CONF_DIR.getOpt())+"/";
File confFile = new File(confDirPath+CONFIG_FILE_NAME);
if(!confFile.exists()) {
LOG.fatal("Unable to locate configuration file in "+confFile);
confPath = new Path(confFile.getAbsolutePath());
} else {
System.out.println("No configuration file has been specified");
......@@ -197,10 +217,36 @@ public class Client {
List<File> shipFiles = null;
// path to directory to ship
if(cmd.hasOption(SHIP_PATH.getOpt())) {
String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
File shipDir = new File(shipPath);
if(shipDir.isDirectory()) {
shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return !(name.equals(".") || name.equals("..") );
} else {
LOG.warn("Ship directory is not a directory!");
boolean hasLog4j = false;
//check if there is a log4j file
if(confDirPath != null) {
File l4j = new File(confDirPath+"/log4j.properties");
if(l4j.exists()) {
hasLog4j = true;
// queue
String queue = "default";
if(cmd.hasOption(QUERY.getOpt())) {
queue = cmd.getOptionValue(QUERY.getOpt());
if(cmd.hasOption(QUEUE.getOpt())) {
queue = cmd.getOptionValue(QUEUE.getOpt());
// JobManager Memory
......@@ -210,7 +256,7 @@ public class Client {
// Task Managers memory
int tmMemory = 1000;
int tmMemory = 1024;
if(cmd.hasOption(TM_MEMORY.getOpt())) {
tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
......@@ -232,9 +278,8 @@ public class Client {
LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
final FileSystem fs = FileSystem.get(conf);
// Create yarnClient
final YarnClient yarnClient = YarnClient.createYarnClient();
......@@ -278,18 +323,25 @@ public class Client {
// respect custom JVM options in the YAML file
final String javaOpts = GlobalConfiguration.getString(ConfigConstants.STRATOSPHERE_JVM_OPTIONS, "");
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records
final String amCommand = "$JAVA_HOME/bin/java"
+ " -Xmx"+jmMemory+"M" + " eu.stratosphere.yarn.ApplicationMaster" + " "
+ " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stderr";
String amCommand = "$JAVA_HOME/bin/java"
+ " -Xmx"+jmMemory+"M " +javaOpts;
if(hasLog4j) {
amCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
amCommand += " eu.stratosphere.yarn.ApplicationMaster" + " "
+ " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
// Set-up ApplicationSubmissionContext for the application
......@@ -299,16 +351,41 @@ public class Client {
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
LocalResource stratosphereConf = Records.newRecord(LocalResource.class);
Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar);
Utils.setupLocalResource(conf, fs, appId.toString(), confPath, stratosphereConf);
Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), localJarPath, appMasterJar, fs.getHomeDirectory());
Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), confPath, stratosphereConf, fs.getHomeDirectory());
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
localResources.put("stratosphere.jar", appMasterJar);
localResources.put("stratosphere-conf.yaml", stratosphereConf);
// setup security tokens (code from apache storm)
final Path[] paths = new Path[3 + shipFiles.size()];
StringBuffer envShipFileList = new StringBuffer();
// upload ship files
for (int i = 0; i < shipFiles.size(); i++) {
File shipFile = shipFiles.get(i);
LocalResource shipResources = Records.newRecord(LocalResource.class);
Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
shipLocalPath, shipResources, fs.getHomeDirectory());
localResources.put(shipFile.getName(), shipResources);
envShipFileList.append(paths[3 + i]);
if(i+1 < shipFiles.size()) {
paths[0] = remotePathJar;
paths[1] = remotePathConf;
paths[2] = new Path(fs.getHomeDirectory(), ".stratosphere/" + appId.toString() + "/");
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
fs.setPermission(paths[2], permission); // set permission for path.
Utils.setTokensFor(amContainer, paths, this.conf);
LOG.debug("Security is enabled: "+ UserGroupInformation.isSecurityEnabled());
// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
......@@ -319,6 +396,8 @@ public class Client {
appMasterEnv.put(Client.ENV_TM_MEMORY, String.valueOf(tmMemory));
appMasterEnv.put(Client.STRATOSPHERE_JAR_PATH, remotePathJar.toString() );
appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
......@@ -326,25 +405,22 @@ public class Client {
Resource capability = Records.newRecord(Resource.class);
appContext.setApplicationName("Stratosphere"); // application name
LOG.info("Submitting application master " + appId);
Runtime.getRuntime().addShutdownHook(new Thread() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
LOG.info("Killing the Stratosphere-YARN application.");
LOG.info("Deleting files in "+paths[2]);
FileSystem shutFS = FileSystem.get(conf);
shutFS.delete(paths[2], true); // delete conf and jar file.
} catch (Exception e) {
LOG.warn("Exception while killing the YARN application", e);
......@@ -352,30 +428,34 @@ public class Client {
LOG.info("Submitting application master " + appId);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
boolean told = false;
char[] el = { '/', '|', '\\', '-'};
int i = 0;
while (appState != YarnApplicationState.FINISHED
&& appState != YarnApplicationState.KILLED
&& appState != YarnApplicationState.FAILED) {
if(!told && appState == YarnApplicationState.RUNNING) {
System.err.println("JobManager is now running on "+appReport.getHost()+":"+jmPort);
System.err.println("Stratosphere JobManager is now running on "+appReport.getHost()+":"+jmPort);
System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
// write jobmanager connect information
PrintWriter out = new PrintWriter(confDirPath+".yarn-jobmanager");
told = true;
System.err.println("JobManager is now running on "+appReport.getHost()+":"+jmPort+"\n"
+ "Application report from ASM: \n" +
"\t application identifier: " + appId.toString() + "\n" +
"\t appId: " + appId.getId() + "\n" +
"\t appDiagnostics: " + appReport.getDiagnostics() + "\n" +
"\t appMasterHost: " + appReport.getHost() + "\n" +
"\t appQueue: " + appReport.getQueue() + "\n" +
"\t appMasterRpcPort: " + appReport.getRpcPort() + "\n" +
"\t appStartTime: " + appReport.getStartTime() + "\n" +
"\t yarnAppState: " + appReport.getYarnApplicationState() + "\n" +
"\t distributedFinalState: " + appReport.getFinalApplicationStatus() + "\n" +
"\t appTrackingUrl: " + appReport.getTrackingUrl() + "\n" +
"\t appUser: " + appReport.getUser());
if(!told) {
if(i == el.length) i = 0;
Thread.sleep(500); // wait for the application to switch to RUNNING
} else {
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
......@@ -16,11 +16,13 @@ package eu.stratosphere.yarn;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.Map;
import java.util.jar.JarEntry;
......@@ -32,8 +34,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
......@@ -151,27 +157,45 @@ public class Utils {
* @return Path to remote file (usually hdfs)
* @throws IOException
public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar)
public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
throws IOException {
// copy to HDFS
String suffix = ".stratosphere/" + appId + "/" + localRsrcPath.getName();
Path dst = new Path(fs.getHomeDirectory(), suffix);
LOG.debug("Copying from "+localRsrcPath+" to "+dst );
Path dst = new Path(homedir, suffix);
LOG.info("Copying from "+localRsrcPath+" to "+dst );
fs.copyFromLocalFile(localRsrcPath, dst);
registerLocalResource(fs, dst, appMasterJar);
return dst;
public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource appMasterJar) throws IOException {
public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
DataOutputBuffer dob = new DataOutputBuffer();
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
public static void logFilesInCurrentDirectory(final Log logger) {
new File(".").list(new FilenameFilter() {
public boolean accept(File dir, String name) {
return true;
......@@ -34,6 +34,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.cli.UnrecognizedOptionException;
import org.apache.commons.io.FileUtils;
import eu.stratosphere.api.common.accumulators.AccumulatorHelper;
import eu.stratosphere.client.program.Client;
......@@ -240,7 +241,7 @@ public class CliFrontend {
if( line.hasOption(ADDRESS_OPTION.getOpt())) {
address = line.getOptionValue(ADDRESS_OPTION.getOpt());
// Get jar file
if (line.hasOption(JAR_OPTION.getOpt())) {
......@@ -273,6 +274,16 @@ public class CliFrontend {
programArgs = line.getOptionValues(ARGS_OPTION.getOpt());
// see if there is a file containing the jobManager address.
String loc = getConfigurationDirectory();
File jmAddressFile = new File(loc+"/.yarn-jobmanager");
if(jmAddressFile.exists()) {
try {
address = FileUtils.readFileToString(jmAddressFile).trim();
System.out.println("Found a .yarn-jobmanager file, using \""+address+"\" to connect to the JobManager");
} catch (IOException e) {}
// get wait flag
wait = line.hasOption(WAIT_OPTION.getOpt());
......@@ -691,13 +702,7 @@ public class CliFrontend {
* Reads configuration settings. The default path can be overridden
* by setting the ENV variable "STRATOSPHERE_CONF_DIR".
* @return Stratosphere's global configuration
private Configuration getConfiguration() {
private String getConfigurationDirectory() {
String location = null;
if (System.getenv(ENV_CONFIG_DIRECTORY) != null) {
location = System.getenv(ENV_CONFIG_DIRECTORY);
......@@ -709,7 +714,16 @@ public class CliFrontend {
throw new RuntimeException("The configuration directory was not found. Please configure the '" +
ENV_CONFIG_DIRECTORY + "' environment variable properly.");
return location;
* Reads configuration settings. The default path can be overridden
* by setting the ENV variable "STRATOSPHERE_CONF_DIR".
* @return Stratosphere's global configuration
private Configuration getConfiguration() {
final String location = getConfigurationDirectory();
Configuration config = GlobalConfiguration.getConfiguration();
......@@ -198,6 +198,9 @@ public final class ConfigConstants {
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";
// ------------------------------ Web Client ------------------------------
......@@ -236,6 +239,10 @@ public final class ConfigConstants {
* The key for Stratosphere's base directory path
public static final String STRATOSPHERE_BASE_DIR_PATH_KEY = "stratosphere.base.dir.path";
public static final String STRATOSPHERE_JVM_OPTIONS = "env.java.opts";
......@@ -404,6 +411,7 @@ public final class ConfigConstants {
* The default directory to store uploaded jobs in.
public static final String DEFAULT_WEB_JOB_STORAGE_DIR = DEFAULT_WEB_TMP_DIR + "/webclient-jobs/";
* The default path to the file containing the list of access privileged users and passwords.
......@@ -416,12 +424,13 @@ public final class ConfigConstants {
* The default definition for an instance type, if no other configuration is provided.
public static final String DEFAULT_INSTANCE_TYPE = "default,2,1,1024,10,10";
public static final String DEFAULT_INSTANCE_TYPE = "default,1,1,1,1,0"; // minimalistic instance type until "cloud" model is fully removed.
* The default index for the default instance type.
public static final int DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX = 1;
// ------------------------------------------------------------------------
......@@ -30,6 +30,17 @@
<!-- create an empty ship directory -->
<!-- copy *.txt files -->
......@@ -50,6 +50,9 @@ jobmanager.web.history: 5
# Stratosphere chooses a port automatically, if this value is not set.
# If setting this variable when using YARN, beware that there might be conflicts when multiple
# TaskManagers are running on the same machine.
#taskmanager.rpc.port: 6122
# JVM heap size in MB
......@@ -49,5 +49,5 @@ CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
# $log_setting
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH eu.stratosphere.yarn.Client -j $STRATOSPHERE_LIB_DIR/*yarn-uberjar.jar -c $STRATOSPHERE_CONF_DIR/stratosphere-conf.yaml $*
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH eu.stratosphere.yarn.Client -ship ship/ -confDir $STRATOSPHERE_CONF_DIR -j $STRATOSPHERE_LIB_DIR/*yarn-uberjar.jar $*
......@@ -238,11 +238,13 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
public String toString() {
String iaString;
String portsString = " (ipcPort="+ipcPort+", dataPort="+dataPort+")";
if (this.hostName != null) {
iaString = this.hostName;
iaString = this.hostName+portsString;
} else {
iaString = inetAddress.toString();
iaString = iaString.replace("/", "");
iaString += portsString;
return iaString;
......@@ -51,7 +51,9 @@ public class LogfileInfoServlet extends HttpServlet {
// Find current stdtout file
for(File f : logDir.listFiles()) {
// contains "jobmanager" ".log" and no number in the end ->needs improvement
if(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".out") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) {
if( f.getName().equals("jobmanager-stdout.log") ||
(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".out") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))
) {
resp.setContentType("text/plain ");
......@@ -64,7 +66,8 @@ public class LogfileInfoServlet extends HttpServlet {
// Find current logfile
for(File f : logDir.listFiles()) {
// contains "jobmanager" ".log" and no number in the end ->needs improvement
if(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) {
if( f.getName().equals("jobmanager-stderr.log") ||
(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))) {
resp.setContentType("text/plain ");
......@@ -77,6 +77,8 @@ public class WebInfoServer {
// get base path of Stratosphere installation
String basePath = nepheleConfig.getString(ConfigConstants.STRATOSPHERE_BASE_DIR_PATH_KEY, "");
String webDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ROOT_PATH);
String logDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,
File webDir;
if(webDirPath.startsWith("/")) {
......@@ -110,7 +112,7 @@ public class WebInfoServer {
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager)), "/jobsInfo");
servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(new File(basePath+"/log"))), "/logInfo");
servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(new File(logDirPath))), "/logInfo");
// ----- the handler serving all the static files -----
......@@ -18,6 +18,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.UnknownHostException;
......@@ -147,9 +148,7 @@ public class TaskManager implements TaskOperationProtocol {
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
public TaskManager(final int taskManagersPerJVM) throws Exception {
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
InetSocketAddress jobManagerAddress = null;
if (address == null) {
......@@ -169,12 +168,6 @@ public class TaskManager implements TaskOperationProtocol {
LOG.info("Job manager address: " + jobManagerAddress);
// Determine interface address that is announced to the job manager
final int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
final int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
InetAddress taskManagerAddress = null;
// Try to create local stub for the job manager
......@@ -194,6 +187,14 @@ public class TaskManager implements TaskOperationProtocol {
throw new RuntimeException("The TaskManager failed to determine its own network address", ioe);
int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0);
int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 0);
if(ipcPort == 0) {
ipcPort = getAvailablePort();
if(dataPort == 0) {
dataPort = getAvailablePort();
this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort);
LOG.info("Announcing connection information " + this.localInstanceConnectionInfo + " to job manager");
......@@ -229,6 +230,7 @@ public class TaskManager implements TaskOperationProtocol {
this.accumulatorProtocolProxy = accumulatorProtocolStub;
// Start local RPC server
Server taskManagerServer = null;
try {
......@@ -297,13 +299,36 @@ public class TaskManager implements TaskOperationProtocol {
+ " megabytes of memory", rte);
throw rte;
this.ioManager = new IOManager(tmpDirPaths);
// Add shutdown hook for clean up tasks
Runtime.getRuntime().addShutdownHook(new TaskManagerCleanUp(this));
private int getAvailablePort() {
ServerSocket serverSocket = null;
int port = 0;
for(int i = 0; i < 50; i++){
try {
serverSocket = new ServerSocket(0);
port = serverSocket.getLocalPort();
if(port != 0) {
} catch (IOException e) {
LOG.debug("Unable to allocate port "+e.getMessage(), e);
if(!serverSocket.isClosed()) {
try {
} catch (IOException e) {
LOG.debug("error closing port",e);
return port;
* Entry point for the program.
......@@ -341,10 +366,9 @@ public class TaskManager implements TaskOperationProtocol {
LOG.fatal("Taskmanager startup failed:" + StringUtils.stringifyException(e));
// Run the main I/O loop
// Shut down
......@@ -367,6 +391,7 @@ public class TaskManager implements TaskOperationProtocol {
// Send heartbeat
try {
this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo, this.hardwareDescription);
} catch (IOException e) {
......@@ -863,14 +888,12 @@ public class TaskManager implements TaskOperationProtocol {
public void killTaskManager() throws IOException {
// Kill the entire JVM after a delay of 10ms, so this RPC will finish properly before
final Timer timer = new Timer();
final TimerTask timerTask = new TimerTask() {
public void run() {
