Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
0d0803f5
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
0d0803f5
编写于
6月 27, 2014
作者:
S
Stephan Ewen
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Fix the managed memory planning after patch
9ce62930
上级
9ce62930
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
30 addition
and
19 deletion
+30
-19
stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
...u/stratosphere/client/minicluster/NepheleMiniCluster.java
+6
-3
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
...java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+24
-16
未找到文件。
stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
浏览文件 @
0d0803f5
...
@@ -272,7 +272,7 @@ public class NepheleMiniCluster {
...
@@ -272,7 +272,7 @@ public class NepheleMiniCluster {
config
.
setBoolean
(
ConfigConstants
.
FILESYSTEM_DEFAULT_OVERWRITE_KEY
,
defaultOverwriteFiles
);
config
.
setBoolean
(
ConfigConstants
.
FILESYSTEM_DEFAULT_OVERWRITE_KEY
,
defaultOverwriteFiles
);
config
.
setBoolean
(
ConfigConstants
.
FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY
,
defaultAlwaysCreateDirectory
);
config
.
setBoolean
(
ConfigConstants
.
FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY
,
defaultAlwaysCreateDirectory
);
if
(
memorySize
<
0
){
if
(
memorySize
<
0
){
memorySize
=
HardwareDescriptionFactory
.
extractFromSystem
().
getSizeOfFreeMemory
();
memorySize
=
HardwareDescriptionFactory
.
extractFromSystem
().
getSizeOfFreeMemory
();
// at this time, we need to scale down the memory, because we cannot dedicate all free memory to the
// at this time, we need to scale down the memory, because we cannot dedicate all free memory to the
...
@@ -282,9 +282,12 @@ public class NepheleMiniCluster {
...
@@ -282,9 +282,12 @@ public class NepheleMiniCluster {
GlobalConfiguration
.
getLong
(
ConfigConstants
.
TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY
,
GlobalConfiguration
.
getLong
(
ConfigConstants
.
TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY
,
ConfigConstants
.
DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
);
ConfigConstants
.
DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
);
memorySize
=
(
long
)
(
0.8
*
(
memorySize
-
bufferMem
));
memorySize
=
memorySize
-
bufferMem
;
// apply the fraction that makes sure memory is left to the heap for other data structures and UDFs.
memorySize
=
(
long
)
(
memorySize
*
ConfigConstants
.
DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION
);
//convert from bytes to mega
bytes
//convert from bytes to megabytes
memorySize
>>>=
20
;
memorySize
>>>=
20
;
}
}
...
...
stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
浏览文件 @
0d0803f5
...
@@ -334,9 +334,8 @@ public class TaskManager implements TaskOperationProtocol {
...
@@ -334,9 +334,8 @@ public class TaskManager implements TaskOperationProtocol {
throw
new
Exception
(
"Failed to instantiate ChannelManager."
,
ioe
);
throw
new
Exception
(
"Failed to instantiate ChannelManager."
,
ioe
);
}
}
// initialize the number of slots
{
{
HardwareDescription
resources
=
HardwareDescriptionFactory
.
extractFromSystem
();
int
slots
=
GlobalConfiguration
.
getInteger
(
ConfigConstants
.
TASK_MANAGER_NUM_TASK_SLOTS
,
-
1
);
int
slots
=
GlobalConfiguration
.
getInteger
(
ConfigConstants
.
TASK_MANAGER_NUM_TASK_SLOTS
,
-
1
);
if
(
slots
==
-
1
)
{
if
(
slots
==
-
1
)
{
slots
=
1
;
slots
=
1
;
...
@@ -347,23 +346,34 @@ public class TaskManager implements TaskOperationProtocol {
...
@@ -347,23 +346,34 @@ public class TaskManager implements TaskOperationProtocol {
LOG
.
info
(
"Creating "
+
slots
+
" task slot(s)."
);
LOG
.
info
(
"Creating "
+
slots
+
" task slot(s)."
);
}
}
this
.
numberOfSlots
=
slots
;
this
.
numberOfSlots
=
slots
;
}
this
.
hardwareDescription
=
HardwareDescriptionFactory
.
extractFromSystem
();
// initialize the memory manager
{
// Check whether the memory size has been explicitly configured.
final
long
configuredMemorySize
=
GlobalConfiguration
.
getInteger
(
ConfigConstants
.
TASK_MANAGER_MEMORY_SIZE_KEY
,
-
1
);
final
long
memorySize
;
// Check whether the memory size has been explicitly configured. if so that overrides the default mechanism
if
(
configuredMemorySize
==
-
1
)
{
// of taking as much as is mentioned in the hardware description
// no manually configured memory. take a relative fraction of the free heap space
long
memorySize
=
GlobalConfiguration
.
getInteger
(
ConfigConstants
.
TASK_MANAGER_MEMORY_SIZE_KEY
,
-
1
);
float
fraction
=
GlobalConfiguration
.
getFloat
(
ConfigConstants
.
TASK_MANAGER_MEMORY_FRACTION_KEY
,
ConfigConstants
.
DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION
);
memorySize
=
(
long
)
(
this
.
hardwareDescription
.
getSizeOfFreeMemory
()
*
fraction
);
if
(
memorySize
>
0
)
{
LOG
.
info
(
"Using "
+
fraction
+
" of the free heap space for managed memory."
);
// manually configured memory size. override the value in the hardware config
}
resources
=
HardwareDescriptionFactory
.
construct
(
resources
.
getNumberOfCPUCores
(),
else
if
(
configuredMemorySize
<=
0
)
{
resources
.
getSizeOfPhysicalMemory
(),
memorySize
*
1024L
*
1024L
);
throw
new
Exception
(
"Invalid value for Memory Manager memory size: "
+
configuredMemorySize
);
}
else
{
memorySize
=
configuredMemorySize
<<
20
;
}
}
this
.
hardwareDescription
=
resources
;
final
int
pageSize
=
GlobalConfiguration
.
getInteger
(
ConfigConstants
.
TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY
,
final
int
pageSize
=
GlobalConfiguration
.
getInteger
(
ConfigConstants
.
TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY
,
ConfigConstants
.
DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
);
ConfigConstants
.
DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE
);
// Initialize the memory manager
// Initialize the memory manager
LOG
.
info
(
"Initializing memory manager with "
+
(
resources
.
getSizeOfFreeMemory
()
>>>
20
)
+
" megabytes of memory. "
+
LOG
.
info
(
"Initializing memory manager with "
+
(
memorySize
>>>
20
)
+
" megabytes of memory. "
+
"Page size is "
+
pageSize
+
" bytes."
);
"Page size is "
+
pageSize
+
" bytes."
);
try
{
try
{
...
@@ -371,11 +381,9 @@ public class TaskManager implements TaskOperationProtocol {
...
@@ -371,11 +381,9 @@ public class TaskManager implements TaskOperationProtocol {
final
boolean
lazyAllocation
=
GlobalConfiguration
.
getBoolean
(
ConfigConstants
.
TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY
,
final
boolean
lazyAllocation
=
GlobalConfiguration
.
getBoolean
(
ConfigConstants
.
TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY
,
ConfigConstants
.
DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION
);
ConfigConstants
.
DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION
);
this
.
memoryManager
=
new
DefaultMemoryManager
(
resources
.
getSizeOfFreeMemory
(),
this
.
numberOfSlots
,
this
.
memoryManager
=
new
DefaultMemoryManager
(
memorySize
,
this
.
numberOfSlots
,
pageSize
);
pageSize
);
}
catch
(
Throwable
t
)
{
}
catch
(
Throwable
t
)
{
LOG
.
fatal
(
"Unable to initialize memory manager with "
+
(
resources
.
getSizeOfFreeMemory
()
>>>
20
)
LOG
.
fatal
(
"Unable to initialize memory manager with "
+
(
memorySize
>>>
20
)
+
" megabytes of memory."
,
t
);
+
" megabytes of memory."
,
t
);
throw
new
Exception
(
"Unable to initialize memory manager."
,
t
);
throw
new
Exception
(
"Unable to initialize memory manager."
,
t
);
}
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录