Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
41a00693
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
41a00693
编写于
4月 24, 2014
作者:
R
Robert Metzger
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix issues #736 and #733
上级
a17ec901
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
60 addition
and
15 deletion
+60
-15
stratosphere-addons/yarn/pom.xml
stratosphere-addons/yarn/pom.xml
+7
-1
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
...src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
+0
-6
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
...ddons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
+10
-1
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java
...addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java
+8
-1
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/YarnTaskManagerRunner.java
...main/java/eu/stratosphere/yarn/YarnTaskManagerRunner.java
+13
-2
stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java
...nts/src/main/java/eu/stratosphere/client/CliFrontend.java
+3
-2
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
...java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+19
-2
未找到文件。
stratosphere-addons/yarn/pom.xml
浏览文件 @
41a00693
...
@@ -26,7 +26,13 @@
...
@@ -26,7 +26,13 @@
</exclusion>
</exclusion>
</exclusions>
</exclusions>
</dependency>
</dependency>
<dependency>
<groupId>
eu.stratosphere
</groupId>
<artifactId>
stratosphere-clients
</artifactId>
<version>
${project.version}
</version>
</dependency>
<dependency>
<dependency>
<groupId>
org.apache.hadoop
</groupId>
<groupId>
org.apache.hadoop
</groupId>
<artifactId>
hadoop-yarn-client
</artifactId>
<artifactId>
hadoop-yarn-client
</artifactId>
...
...
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
浏览文件 @
41a00693
...
@@ -73,7 +73,6 @@ public class ApplicationMaster {
...
@@ -73,7 +73,6 @@ public class ApplicationMaster {
final
String
logDirs
=
envs
.
get
(
Environment
.
LOG_DIRS
.
key
());
final
String
logDirs
=
envs
.
get
(
Environment
.
LOG_DIRS
.
key
());
final
String
ownHostname
=
envs
.
get
(
Environment
.
NM_HOST
.
key
());
final
String
ownHostname
=
envs
.
get
(
Environment
.
NM_HOST
.
key
());
final
String
appId
=
envs
.
get
(
Client
.
ENV_APP_ID
);
final
String
appId
=
envs
.
get
(
Client
.
ENV_APP_ID
);
final
String
localDirs
=
envs
.
get
(
Environment
.
LOCAL_DIRS
.
key
());
final
String
clientHomeDir
=
envs
.
get
(
Client
.
ENV_CLIENT_HOME_DIR
);
final
String
clientHomeDir
=
envs
.
get
(
Client
.
ENV_CLIENT_HOME_DIR
);
final
String
applicationMasterHost
=
envs
.
get
(
Environment
.
NM_HOST
.
key
());
final
String
applicationMasterHost
=
envs
.
get
(
Environment
.
NM_HOST
.
key
());
final
String
remoteStratosphereJarPath
=
envs
.
get
(
Client
.
STRATOSPHERE_JAR_PATH
);
final
String
remoteStratosphereJarPath
=
envs
.
get
(
Client
.
STRATOSPHERE_JAR_PATH
);
...
@@ -108,8 +107,6 @@ public class ApplicationMaster {
...
@@ -108,8 +107,6 @@ public class ApplicationMaster {
output
.
append
(
ConfigConstants
.
JOB_MANAGER_IPC_ADDRESS_KEY
+
": "
+
ownHostname
+
"\n"
);
output
.
append
(
ConfigConstants
.
JOB_MANAGER_IPC_ADDRESS_KEY
+
": "
+
ownHostname
+
"\n"
);
}
else
if
(
line
.
contains
(
ConfigConstants
.
JOB_MANAGER_WEB_ROOT_PATH_KEY
))
{
}
else
if
(
line
.
contains
(
ConfigConstants
.
JOB_MANAGER_WEB_ROOT_PATH_KEY
))
{
output
.
append
(
ConfigConstants
.
JOB_MANAGER_WEB_ROOT_PATH_KEY
+
": "
+
"\n"
);
output
.
append
(
ConfigConstants
.
JOB_MANAGER_WEB_ROOT_PATH_KEY
+
": "
+
"\n"
);
}
else
if
(
localDirs
!=
null
&&
line
.
contains
(
ConfigConstants
.
TASK_MANAGER_TMP_DIR_KEY
))
{
output
.
append
(
ConfigConstants
.
TASK_MANAGER_TMP_DIR_KEY
+
": "
+
localDirs
+
"\n"
);
}
else
{
}
else
{
output
.
append
(
line
+
"\n"
);
output
.
append
(
line
+
"\n"
);
}
}
...
@@ -118,9 +115,6 @@ public class ApplicationMaster {
...
@@ -118,9 +115,6 @@ public class ApplicationMaster {
output
.
append
(
ConfigConstants
.
JOB_MANAGER_IPC_ADDRESS_KEY
+
": "
+
ownHostname
+
"\n"
);
output
.
append
(
ConfigConstants
.
JOB_MANAGER_IPC_ADDRESS_KEY
+
": "
+
ownHostname
+
"\n"
);
output
.
append
(
ConfigConstants
.
JOB_MANAGER_WEB_ROOT_PATH_KEY
+
": "
+
localWebInterfaceDir
+
"\n"
);
output
.
append
(
ConfigConstants
.
JOB_MANAGER_WEB_ROOT_PATH_KEY
+
": "
+
localWebInterfaceDir
+
"\n"
);
output
.
append
(
ConfigConstants
.
JOB_MANAGER_WEB_LOG_PATH_KEY
+
": "
+
logDirs
+
"\n"
);
output
.
append
(
ConfigConstants
.
JOB_MANAGER_WEB_LOG_PATH_KEY
+
": "
+
logDirs
+
"\n"
);
if
(
localDirs
!=
null
)
{
output
.
append
(
ConfigConstants
.
TASK_MANAGER_TMP_DIR_KEY
+
": "
+
localDirs
+
"\n"
);
}
output
.
close
();
output
.
close
();
br
.
close
();
br
.
close
();
File
newConf
=
new
File
(
currDir
+
"/stratosphere-conf-modified.yaml"
);
File
newConf
=
new
File
(
currDir
+
"/stratosphere-conf-modified.yaml"
);
...
...
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
浏览文件 @
41a00693
...
@@ -65,6 +65,7 @@ import org.apache.log4j.Level;
...
@@ -65,6 +65,7 @@ import org.apache.log4j.Level;
import
org.apache.log4j.Logger
;
import
org.apache.log4j.Logger
;
import
org.apache.log4j.PatternLayout
;
import
org.apache.log4j.PatternLayout
;
import
eu.stratosphere.client.CliFrontend
;
import
eu.stratosphere.configuration.ConfigConstants
;
import
eu.stratosphere.configuration.ConfigConstants
;
import
eu.stratosphere.configuration.GlobalConfiguration
;
import
eu.stratosphere.configuration.GlobalConfiguration
;
...
@@ -288,6 +289,12 @@ public class Client {
...
@@ -288,6 +289,12 @@ public class Client {
// Create a local resource to point to the destination jar path
// Create a local resource to point to the destination jar path
final
FileSystem
fs
=
FileSystem
.
get
(
conf
);
final
FileSystem
fs
=
FileSystem
.
get
(
conf
);
if
(
fs
.
getScheme
().
startsWith
(
"file"
))
{
LOG
.
warn
(
"The file system scheme is '"
+
fs
.
getScheme
()
+
"'. This indicates that the "
+
"specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+
"The Stratosphere YARN client needs to store its files in a distributed file system"
);
}
// Create yarnClient
// Create yarnClient
final
YarnClient
yarnClient
=
YarnClient
.
createYarnClient
();
final
YarnClient
yarnClient
=
YarnClient
.
createYarnClient
();
yarnClient
.
init
(
conf
);
yarnClient
.
init
(
conf
);
...
@@ -452,9 +459,11 @@ public class Client {
...
@@ -452,9 +459,11 @@ public class Client {
System
.
err
.
println
(
"Stratosphere JobManager is now running on "
+
appReport
.
getHost
()+
":"
+
jmPort
);
System
.
err
.
println
(
"Stratosphere JobManager is now running on "
+
appReport
.
getHost
()+
":"
+
jmPort
);
System
.
err
.
println
(
"JobManager Web Interface: "
+
appReport
.
getTrackingUrl
());
System
.
err
.
println
(
"JobManager Web Interface: "
+
appReport
.
getTrackingUrl
());
// write jobmanager connect information
// write jobmanager connect information
PrintWriter
out
=
new
PrintWriter
(
confDirPath
+
".yarn-jobmanager"
);
File
addrFile
=
new
File
(
confDirPath
+
CliFrontend
.
JOBMANAGER_ADDRESS_FILE
);
PrintWriter
out
=
new
PrintWriter
(
addrFile
);
out
.
println
(
appReport
.
getHost
()+
":"
+
jmPort
);
out
.
println
(
appReport
.
getHost
()+
":"
+
jmPort
);
out
.
close
();
out
.
close
();
addrFile
.
setReadable
(
true
,
false
);
// readable for all.
told
=
true
;
told
=
true
;
}
}
if
(!
told
)
{
if
(!
told
)
{
...
...
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java
浏览文件 @
41a00693
...
@@ -104,7 +104,7 @@ public class Utils {
...
@@ -104,7 +104,7 @@ public class Utils {
try
{
try
{
fileUrl
=
path
.
toURL
();
fileUrl
=
path
.
toURL
();
}
catch
(
MalformedURLException
e
)
{
}
catch
(
MalformedURLException
e
)
{
e
.
printStackTrace
(
);
throw
new
RuntimeException
(
"Erroneous config file path"
,
e
);
}
}
URL
[]
urls
=
{
fileUrl
};
URL
[]
urls
=
{
fileUrl
};
ClassLoader
cl
=
new
URLClassLoader
(
urls
,
conf
.
getClassLoader
());
ClassLoader
cl
=
new
URLClassLoader
(
urls
,
conf
.
getClassLoader
());
...
@@ -120,6 +120,13 @@ public class Utils {
...
@@ -120,6 +120,13 @@ public class Utils {
}
}
public
static
Configuration
initializeYarnConfiguration
()
{
public
static
Configuration
initializeYarnConfiguration
()
{
Configuration
conf
=
new
YarnConfiguration
();
Configuration
conf
=
new
YarnConfiguration
();
String
configuredHadoopConfig
=
GlobalConfiguration
.
getString
(
ConfigConstants
.
PATH_HADOOP_CONFIG
,
null
);
if
(
configuredHadoopConfig
!=
null
)
{
LOG
.
info
(
"Using hadoop configuration path from "
+
ConfigConstants
.
PATH_HADOOP_CONFIG
+
" setting."
);
addPathToConfig
(
conf
,
new
File
(
configuredHadoopConfig
));
setDefaultConfValues
(
conf
);
return
conf
;
}
String
envs
[]
=
{
"YARN_CONF_DIR"
,
"HADOOP_CONF_DIR"
,
"HADOOP_CONF_PATH"
};
String
envs
[]
=
{
"YARN_CONF_DIR"
,
"HADOOP_CONF_DIR"
,
"HADOOP_CONF_PATH"
};
for
(
int
i
=
0
;
i
<
envs
.
length
;
++
i
)
{
for
(
int
i
=
0
;
i
<
envs
.
length
;
++
i
)
{
String
confPath
=
System
.
getenv
(
envs
[
i
]);
String
confPath
=
System
.
getenv
(
envs
[
i
]);
...
...
stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/YarnTaskManagerRunner.java
浏览文件 @
41a00693
...
@@ -16,12 +16,15 @@ package eu.stratosphere.yarn;
...
@@ -16,12 +16,15 @@ package eu.stratosphere.yarn;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.security.PrivilegedAction
;
import
java.security.PrivilegedAction
;
import
java.util.Arrays
;
import
java.util.Map
;
import
org.apache.commons.logging.Log
;
import
org.apache.commons.logging.Log
;
import
org.apache.commons.logging.LogFactory
;
import
org.apache.commons.logging.LogFactory
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.apache.hadoop.security.UserGroupInformation
;
import
org.apache.hadoop.security.token.Token
;
import
org.apache.hadoop.security.token.Token
;
import
org.apache.hadoop.security.token.TokenIdentifier
;
import
org.apache.hadoop.security.token.TokenIdentifier
;
import
org.apache.hadoop.yarn.api.ApplicationConstants.Environment
;
import
eu.stratosphere.nephele.taskmanager.TaskManager
;
import
eu.stratosphere.nephele.taskmanager.TaskManager
;
...
@@ -30,7 +33,15 @@ public class YarnTaskManagerRunner {
...
@@ -30,7 +33,15 @@ public class YarnTaskManagerRunner {
private
static
final
Log
LOG
=
LogFactory
.
getLog
(
YarnTaskManagerRunner
.
class
);
private
static
final
Log
LOG
=
LogFactory
.
getLog
(
YarnTaskManagerRunner
.
class
);
public
static
void
main
(
final
String
[]
args
)
throws
IOException
{
public
static
void
main
(
final
String
[]
args
)
throws
IOException
{
final
String
yarnClientUsername
=
System
.
getenv
(
Client
.
ENV_CLIENT_USERNAME
);
Map
<
String
,
String
>
envs
=
System
.
getenv
();
final
String
yarnClientUsername
=
envs
.
get
(
Client
.
ENV_CLIENT_USERNAME
);
final
String
localDirs
=
envs
.
get
(
Environment
.
LOCAL_DIRS
.
key
());
// configure local directory
final
String
[]
newArgs
=
Arrays
.
copyOf
(
args
,
args
.
length
+
2
);
newArgs
[
newArgs
.
length
-
2
]
=
"-"
+
TaskManager
.
ARG_CONF_DIR
;
newArgs
[
newArgs
.
length
-
1
]
=
localDirs
;
LOG
.
info
(
"Setting log path "
+
localDirs
);
LOG
.
info
(
"YARN daemon runs as '"
+
UserGroupInformation
.
getCurrentUser
().
getShortUserName
()+
"' setting"
LOG
.
info
(
"YARN daemon runs as '"
+
UserGroupInformation
.
getCurrentUser
().
getShortUserName
()+
"' setting"
+
" user to execute Stratosphere TaskManager to '"
+
yarnClientUsername
+
"'"
);
+
" user to execute Stratosphere TaskManager to '"
+
yarnClientUsername
+
"'"
);
UserGroupInformation
ugi
=
UserGroupInformation
.
createRemoteUser
(
yarnClientUsername
);
UserGroupInformation
ugi
=
UserGroupInformation
.
createRemoteUser
(
yarnClientUsername
);
...
@@ -41,7 +52,7 @@ public class YarnTaskManagerRunner {
...
@@ -41,7 +52,7 @@ public class YarnTaskManagerRunner {
@Override
@Override
public
Object
run
()
{
public
Object
run
()
{
try
{
try
{
TaskManager
.
main
(
a
rgs
);
TaskManager
.
main
(
newA
rgs
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
}
}
...
...
stratosphere-clients/src/main/java/eu/stratosphere/client/CliFrontend.java
浏览文件 @
41a00693
...
@@ -105,6 +105,7 @@ public class CliFrontend {
...
@@ -105,6 +105,7 @@ public class CliFrontend {
private
static
final
String
ENV_CONFIG_DIRECTORY
=
"STRATOSPHERE_CONF_DIR"
;
private
static
final
String
ENV_CONFIG_DIRECTORY
=
"STRATOSPHERE_CONF_DIR"
;
private
static
final
String
CONFIG_DIRECTORY_FALLBACK_1
=
"../conf"
;
private
static
final
String
CONFIG_DIRECTORY_FALLBACK_1
=
"../conf"
;
private
static
final
String
CONFIG_DIRECTORY_FALLBACK_2
=
"conf"
;
private
static
final
String
CONFIG_DIRECTORY_FALLBACK_2
=
"conf"
;
public
static
final
String
JOBMANAGER_ADDRESS_FILE
=
".yarn-jobmanager"
;
private
CommandLineParser
parser
;
private
CommandLineParser
parser
;
...
@@ -294,11 +295,11 @@ public class CliFrontend {
...
@@ -294,11 +295,11 @@ public class CliFrontend {
// see if there is a file containing the jobManager address.
// see if there is a file containing the jobManager address.
String
loc
=
getConfigurationDirectory
();
String
loc
=
getConfigurationDirectory
();
File
jmAddressFile
=
new
File
(
loc
+
"/.yarn-jobmanager"
);
File
jmAddressFile
=
new
File
(
loc
+
"/"
+
JOBMANAGER_ADDRESS_FILE
);
if
(
jmAddressFile
.
exists
())
{
if
(
jmAddressFile
.
exists
())
{
try
{
try
{
address
=
FileUtils
.
readFileToString
(
jmAddressFile
).
trim
();
address
=
FileUtils
.
readFileToString
(
jmAddressFile
).
trim
();
System
.
out
.
println
(
"Found a
.yarn-jobmanager
file, using \""
+
address
+
"\" to connect to the JobManager"
);
System
.
out
.
println
(
"Found a
"
+
JOBMANAGER_ADDRESS_FILE
+
"
file, using \""
+
address
+
"\" to connect to the JobManager"
);
}
catch
(
IOException
e
)
{}
}
catch
(
IOException
e
)
{}
}
}
...
...
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
浏览文件 @
41a00693
...
@@ -104,6 +104,7 @@ public class TaskManager implements TaskOperationProtocol {
...
@@ -104,6 +104,7 @@ public class TaskManager implements TaskOperationProtocol {
private
static
final
int
IPC_HANDLER_COUNT
=
1
;
private
static
final
int
IPC_HANDLER_COUNT
=
1
;
public
final
static
String
ARG_CONF_DIR
=
"tempDir"
;
private
final
JobManagerProtocol
jobManager
;
private
final
JobManagerProtocol
jobManager
;
...
@@ -371,9 +372,16 @@ public class TaskManager implements TaskOperationProtocol {
...
@@ -371,9 +372,16 @@ public class TaskManager implements TaskOperationProtocol {
public
static
void
main
(
String
[]
args
)
throws
IOException
{
public
static
void
main
(
String
[]
args
)
throws
IOException
{
Option
configDirOpt
=
OptionBuilder
.
withArgName
(
"config directory"
).
hasArg
().
withDescription
(
Option
configDirOpt
=
OptionBuilder
.
withArgName
(
"config directory"
).
hasArg
().
withDescription
(
"Specify configuration directory."
).
create
(
"configDir"
);
"Specify configuration directory."
).
create
(
"configDir"
);
// tempDir option is used by the YARN client.
Option
tempDir
=
OptionBuilder
.
withArgName
(
"temporary directory (overwrites configured option)"
)
.
hasArg
().
withDescription
(
"Specify temporary directory."
).
create
(
ARG_CONF_DIR
);
configDirOpt
.
setRequired
(
true
);
configDirOpt
.
setRequired
(
true
);
tempDir
.
setRequired
(
false
);
Options
options
=
new
Options
();
Options
options
=
new
Options
();
options
.
addOption
(
configDirOpt
);
options
.
addOption
(
configDirOpt
);
options
.
addOption
(
tempDir
);
CommandLineParser
parser
=
new
GnuParser
();
CommandLineParser
parser
=
new
GnuParser
();
CommandLine
line
=
null
;
CommandLine
line
=
null
;
...
@@ -385,10 +393,19 @@ public class TaskManager implements TaskOperationProtocol {
...
@@ -385,10 +393,19 @@ public class TaskManager implements TaskOperationProtocol {
}
}
String
configDir
=
line
.
getOptionValue
(
configDirOpt
.
getOpt
(),
null
);
String
configDir
=
line
.
getOptionValue
(
configDirOpt
.
getOpt
(),
null
);
String
tempDirVal
=
line
.
getOptionValue
(
tempDir
.
getOpt
(),
null
);
// First, try to load global configuration
// First, try to load global configuration
GlobalConfiguration
.
loadConfiguration
(
configDir
);
GlobalConfiguration
.
loadConfiguration
(
configDir
);
if
(
tempDirVal
!=
null
// the YARN TM runner has set a value for the temp dir
// the configuration does not contain a temp direcory
&&
GlobalConfiguration
.
getString
(
ConfigConstants
.
TASK_MANAGER_TMP_DIR_KEY
,
null
)
==
null
)
{
Configuration
c
=
GlobalConfiguration
.
getConfiguration
();
c
.
setString
(
ConfigConstants
.
TASK_MANAGER_TMP_DIR_KEY
,
tempDirVal
);
LOG
.
info
(
"Setting temporary directory to "
+
tempDirVal
);
GlobalConfiguration
.
includeConfiguration
(
c
);
}
System
.
err
.
println
(
"Configuration "
+
GlobalConfiguration
.
getConfiguration
());
LOG
.
info
(
"Current user "
+
UserGroupInformation
.
getCurrentUser
().
getShortUserName
());
LOG
.
info
(
"Current user "
+
UserGroupInformation
.
getCurrentUser
().
getShortUserName
());
// Create a new task manager object
// Create a new task manager object
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录