From 41a0069318dc4dfdc67ebb1b9ea6587933d4afd1 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 24 Apr 2014 09:19:32 +0200 Subject: [PATCH] fix issues #736 and #733 --- stratosphere-addons/yarn/pom.xml | 8 ++++++- .../stratosphere/yarn/ApplicationMaster.java | 6 ------ .../java/eu/stratosphere/yarn/Client.java | 11 +++++++++- .../main/java/eu/stratosphere/yarn/Utils.java | 9 +++++++- .../yarn/YarnTaskManagerRunner.java | 15 +++++++++++-- .../eu/stratosphere/client/CliFrontend.java | 5 +++-- .../nephele/taskmanager/TaskManager.java | 21 +++++++++++++++++-- 7 files changed, 60 insertions(+), 15 deletions(-) diff --git a/stratosphere-addons/yarn/pom.xml b/stratosphere-addons/yarn/pom.xml index 3076e70468e..272a64c0adf 100644 --- a/stratosphere-addons/yarn/pom.xml +++ b/stratosphere-addons/yarn/pom.xml @@ -26,7 +26,13 @@ - + + + eu.stratosphere + stratosphere-clients + ${project.version} + + org.apache.hadoop hadoop-yarn-client diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java index 4e74539ba19..b319c68a8d7 100644 --- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java +++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java @@ -73,7 +73,6 @@ public class ApplicationMaster { final String logDirs = envs.get(Environment.LOG_DIRS.key()); final String ownHostname = envs.get(Environment.NM_HOST.key()); final String appId = envs.get(Client.ENV_APP_ID); - final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR); final String applicationMasterHost = envs.get(Environment.NM_HOST.key()); final String remoteStratosphereJarPath = envs.get(Client.STRATOSPHERE_JAR_PATH); @@ -108,8 +107,6 @@ public class ApplicationMaster { output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n"); } else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) { output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n"); - } else if(localDirs != null && line.contains(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY)) { - output.append(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY+": "+localDirs+"\n"); } else { output.append(line+"\n"); } @@ -118,9 +115,6 @@ public class ApplicationMaster { output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n"); output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n"); output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n"); - if(localDirs != null) { - output.append(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY+": "+localDirs+"\n"); - } output.close(); br.close(); File newConf = new File(currDir+"/stratosphere-conf-modified.yaml"); diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java index 953fce73bd1..2f6ad13968a 100644 --- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java +++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java @@ -65,6 +65,7 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; +import eu.stratosphere.client.CliFrontend; import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.GlobalConfiguration; @@ -288,6 +289,12 @@ public class Client { // Create a local resource to point to the destination jar path final FileSystem fs = FileSystem.get(conf); + if(fs.getScheme().startsWith("file")) { + LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the " + + "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values." + + "The Stratosphere YARN client needs to store its files in a distributed file system"); + } + // Create yarnClient final YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); @@ -452,9 +459,11 @@ public class Client { System.err.println("Stratosphere JobManager is now running on "+appReport.getHost()+":"+jmPort); System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl()); // write jobmanager connect information - PrintWriter out = new PrintWriter(confDirPath+".yarn-jobmanager"); + File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE); + PrintWriter out = new PrintWriter(addrFile); out.println(appReport.getHost()+":"+jmPort); out.close(); + addrFile.setReadable(true, false); // readable for all. told = true; } if(!told) { diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java index 3203b1c7617..526b7c3d1dc 100644 --- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java +++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java @@ -104,7 +104,7 @@ public class Utils { try { fileUrl = path.toURL(); } catch (MalformedURLException e) { - e.printStackTrace(); + throw new RuntimeException("Erroneous config file path", e); } URL[] urls = {fileUrl}; ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader()); @@ -120,6 +120,13 @@ public class Utils { } public static Configuration initializeYarnConfiguration() { Configuration conf = new YarnConfiguration(); + String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + if(configuredHadoopConfig != null) { + LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting."); + addPathToConfig(conf, new File(configuredHadoopConfig)); + setDefaultConfValues(conf); + return conf; + } String envs[] = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" }; for(int i = 0; i < envs.length; ++i) { String confPath = System.getenv(envs[i]); diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/YarnTaskManagerRunner.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/YarnTaskManagerRunner.java index b239ebe63f5..b73e7bc49d4 100644 --- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/YarnTaskManagerRunner.java +++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/YarnTaskManagerRunner.java @@ -16,12 +16,15 @@ package eu.stratosphere.yarn; import java.io.IOException; import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import eu.stratosphere.nephele.taskmanager.TaskManager; @@ -30,7 +33,15 @@ public class YarnTaskManagerRunner { private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class); public static void main(final String[] args) throws IOException { - final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME); + Map envs = System.getenv(); + final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + + // configure local directory + final String[] newArgs = Arrays.copyOf(args, args.length + 2); + newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR; + newArgs[newArgs.length-1] = localDirs; + LOG.info("Setting log path "+localDirs); LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting" + " user to execute Stratosphere TaskManager to '"+yarnClientUsername+"'"); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername); @@ -41,7 +52,7 @@ public class YarnTaskManagerRunner { @Override public Object run() { try { - TaskManager.main(args); + TaskManager.main(newArgs); } catch (Exception e) { e.printStackTrace(); } diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java index 3fd5c967063..2b483aeebea 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java @@ -105,6 +105,7 @@ public class CliFrontend { private static final String ENV_CONFIG_DIRECTORY = "STRATOSPHERE_CONF_DIR"; private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; + public static final String JOBMANAGER_ADDRESS_FILE = ".yarn-jobmanager"; private CommandLineParser parser; @@ -294,11 +295,11 @@ public class CliFrontend { // see if there is a file containing the jobManager address. String loc = getConfigurationDirectory(); - File jmAddressFile = new File(loc+"/.yarn-jobmanager"); + File jmAddressFile = new File(loc + "/" + JOBMANAGER_ADDRESS_FILE); if (jmAddressFile.exists()) { try { address = FileUtils.readFileToString(jmAddressFile).trim(); - System.out.println("Found a .yarn-jobmanager file, using \""+address+"\" to connect to the JobManager"); + System.out.println("Found a " + JOBMANAGER_ADDRESS_FILE + " file, using \""+address+"\" to connect to the JobManager"); } catch (IOException e) {} } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index bcbf064984e..e954e72a338 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -104,6 +104,7 @@ public class TaskManager implements TaskOperationProtocol { private static final int IPC_HANDLER_COUNT = 1; + public final static String ARG_CONF_DIR = "tempDir"; private final JobManagerProtocol jobManager; @@ -371,9 +372,16 @@ public class TaskManager implements TaskOperationProtocol { public static void main(String[] args) throws IOException { Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription( "Specify configuration directory.").create("configDir"); + // tempDir option is used by the YARN client. + Option tempDir = OptionBuilder.withArgName("temporary directory (overwrites configured option)") + .hasArg().withDescription( + "Specify temporary directory.").create(ARG_CONF_DIR); configDirOpt.setRequired(true); + tempDir.setRequired(false); Options options = new Options(); options.addOption(configDirOpt); + options.addOption(tempDir); + CommandLineParser parser = new GnuParser(); CommandLine line = null; @@ -385,10 +393,19 @@ public class TaskManager implements TaskOperationProtocol { } String configDir = line.getOptionValue(configDirOpt.getOpt(), null); - + String tempDirVal = line.getOptionValue(tempDir.getOpt(), null); + // First, try to load global configuration GlobalConfiguration.loadConfiguration(configDir); - + if(tempDirVal != null // the YARN TM runner has set a value for the temp dir + // the configuration does not contain a temp direcory + && GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null) == null) { + Configuration c = GlobalConfiguration.getConfiguration(); + c.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tempDirVal); + LOG.info("Setting temporary directory to "+tempDirVal); + GlobalConfiguration.includeConfiguration(c); + } + System.err.println("Configuration "+GlobalConfiguration.getConfiguration()); LOG.info("Current user "+UserGroupInformation.getCurrentUser().getShortUserName()); // Create a new task manager object -- GitLab