提交 7244d04e 编写于 作者: R Robert Metzger

[yarn] set the same user for the services running in the YARN containers.

Fix #584
Fix #568
上级 7fe59e90
......@@ -19,6 +19,12 @@
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-runtime</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>hadoop-core</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......
......@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
......@@ -35,6 +36,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
......@@ -83,7 +86,8 @@ public class ApplicationMaster {
this.jm.shutdown();
}
}
public static void main(String[] args) throws Exception {
private void run() throws Exception {
//Utils.logFilesInCurrentDirectory(LOG);
// Initialize clients to ResourceManager and NodeManagers
Configuration conf = Utils.initializeYarnConfiguration();
......@@ -98,6 +102,7 @@ public class ApplicationMaster {
final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
final String remoteStratosphereJarPath = envs.get(Client.STRATOSPHERE_JAR_PATH);
final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
......@@ -111,6 +116,7 @@ public class ApplicationMaster {
throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
}
LOG.info("Working directory "+currDir);
// load Stratosphere configuration.
Utils.getStratosphereConfiguration(currDir);
......@@ -233,7 +239,7 @@ public class ApplicationMaster {
if(hasLog4j) {
tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
}
tmCommand += " eu.stratosphere.nephele.taskmanager.TaskManager -configDir . "
tmCommand += " eu.stratosphere.yarn.YarnTaskManagerRunner -configDir . "
+ " 1>"
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/taskmanager-stdout.log"
......@@ -263,6 +269,8 @@ public class ApplicationMaster {
// Setup CLASSPATH for Container (=TaskTracker)
Map<String, String> containerEnv = new HashMap<String, String>();
Utils.setupEnv(conf, containerEnv); //add stratosphere.jar to class path.
containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
ctx.setEnvironment(containerEnv);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
......@@ -307,5 +315,27 @@ public class ApplicationMaster {
// Un-register with ResourceManager
rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
}
public static void main(String[] args) throws Exception {
final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+ " user to execute Stratosphere ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
ugi.addToken(toks);
}
ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
new ApplicationMaster().run();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
});
}
}
......@@ -112,12 +112,20 @@ public class Client {
public final static String STRATOSPHERE_JAR_PATH = "_STRATOSPHERE_JAR_PATH"; // the stratosphere jar resource location (in HDFS).
public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
private static final String CONFIG_FILE_NAME = "stratosphere-conf.yaml";
private Configuration conf;
public void run(String[] args) throws Exception {
if(UserGroupInformation.isSecurityEnabled()) {
throw new RuntimeException("Stratosphere YARN client does not have security support right now."
+ "File a bug, we will fix it asap");
}
//Utils.logFilesInCurrentDirectory(LOG);
//
// Command Line Options
......@@ -217,7 +225,7 @@ public class Client {
}
}
}
List<File> shipFiles = null;
List<File> shipFiles = new ArrayList<File>();
// path to directory to ship
if(cmd.hasOption(SHIP_PATH.getOpt())) {
String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
......@@ -235,7 +243,7 @@ public class Client {
}
boolean hasLog4j = false;
//check if there is a log4j file
if(confDirPath != null) {
if(confDirPath.length() > 0) {
File l4j = new File(confDirPath+"/log4j.properties");
if(l4j.exists()) {
shipFiles.add(l4j);
......@@ -382,7 +390,7 @@ public class Client {
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
fs.setPermission(paths[2], permission); // set permission for path.
Utils.setTokensFor(amContainer, paths, this.conf);
LOG.debug("Security is enabled: "+ UserGroupInformation.isSecurityEnabled());
amContainer.setLocalResources(localResources);
fs.close();
......@@ -398,9 +406,10 @@ public class Client {
appMasterEnv.put(Client.ENV_APP_ID, appId.toString());
appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(jmMemory);
......
......@@ -23,8 +23,10 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Map;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
......@@ -35,8 +37,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
......@@ -47,6 +53,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.base.Preconditions;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.GlobalConfiguration;
......@@ -181,9 +189,21 @@ public class Utils {
public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for(Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token "+id+" with "+token);
credentials.addToken(id, token);
}
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
LOG.debug("Wrote tokens. Credentials buffer length: "+dob.getLength());
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}
......
/***********************************************************************************************************************
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.yarn;
import java.io.IOException;
import java.security.PrivilegedAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import eu.stratosphere.nephele.taskmanager.TaskManager;
public class YarnTaskManagerRunner {
private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
public static void main(final String[] args) throws IOException {
final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+ " user to execute Stratosphere TaskManager to '"+yarnClientUsername+"'");
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
ugi.addToken(toks);
}
ugi.doAs(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
TaskManager.main(args);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
});
}
}
......@@ -304,6 +304,10 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
// store properties
final long oldTimeout = this.openTimeout;
final int oldBufferSize = this.bufferSize;
final int oldLineLengthLimit = this.lineLengthLimit;
try {
final Path filePath = this.filePath;
......@@ -342,6 +346,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
throw new RuntimeException("Error: Invalid number of samples: " + numSamples);
}
// make sure that the sampling times out after a while if the file system does not answer in time
this.openTimeout = 10000;
// set a small read buffer size
......@@ -396,6 +401,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
if (LOG.isErrorEnabled())
LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
+ t.getMessage(), t);
} finally {
// restore properties (even on return)
this.openTimeout = oldTimeout;
this.bufferSize = oldBufferSize;
this.lineLengthLimit = oldLineLengthLimit;
}
// no statistics possible
......
......@@ -97,7 +97,7 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
DEFAULT_OPENING_TIMEOUT = ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT;
} else if (to == 0) {
DEFAULT_OPENING_TIMEOUT = Long.MAX_VALUE;
DEFAULT_OPENING_TIMEOUT = 300000; // 5 minutes
} else {
DEFAULT_OPENING_TIMEOUT = to;
}
......@@ -222,7 +222,6 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
public void setOpenTimeout(long openTimeout) {
if (openTimeout < 0)
throw new IllegalArgumentException("The timeout for opening the input splits must be positive or zero (= infinite).");
this.openTimeout = openTimeout;
}
......@@ -734,9 +733,8 @@ public abstract class FileInputFormat<OT> implements InputFormat<OT, FileInputSp
for (StackTraceElement e : this.getStackTrace()) {
bld.append("\tat ").append(e.toString()).append('\n');
}
throw new IOException("Input opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") +
" alive. Stack:\n" + bld.toString());
" alive. Stack of split open thread:\n" + bld.toString());
}
}
......
......@@ -44,6 +44,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
......@@ -148,6 +149,8 @@ public class TaskManager implements TaskOperationProtocol {
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*/
public TaskManager(final int taskManagersPerJVM) throws Exception {
LOG.info("Current user "+UserGroupInformation.getCurrentUser().getShortUserName());
LOG.info("user property: "+System.getProperty("user.name"));
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
InetSocketAddress jobManagerAddress = null;
......@@ -334,10 +337,10 @@ public class TaskManager implements TaskOperationProtocol {
*
* @param args
* arguments from the command line
* @throws IOException
*/
@SuppressWarnings("static-access")
public static void main(String[] args) {
public static void main(String[] args) throws IOException {
Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg().withDescription(
"Specify configuration directory.").create("configDir");
configDirOpt.setRequired(true);;
......@@ -358,6 +361,8 @@ public class TaskManager implements TaskOperationProtocol {
// First, try to load global configuration
GlobalConfiguration.loadConfiguration(configDir);
LOG.info("Current user "+UserGroupInformation.getCurrentUser().getShortUserName());
// Create a new task manager object
TaskManager taskManager = null;
try {
......@@ -371,11 +376,11 @@ public class TaskManager implements TaskOperationProtocol {
// Shut down
taskManager.shutdown();
}
// This method is called by the TaskManagers main thread
public void runIOLoop() {
long interval = GlobalConfiguration.getInteger("taskmanager.setup.periodictaskinterval",
DEFAULTPERIODICTASKSINTERVAL);
......
......@@ -386,7 +386,6 @@ public final class DistributedFileSystem extends FileSystem {
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()),
new org.apache.hadoop.fs.Path(dst.toString()));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册