Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Oneflow-Inc
OneFlow-Benchmark
提交
707e226e
O
OneFlow-Benchmark
项目概览
Oneflow-Inc
/
OneFlow-Benchmark
上一次同步 2 年多
通知
1
Star
92
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
OneFlow-Benchmark
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
707e226e
编写于
6月 09, 2021
作者:
X
XIE Xuan
提交者:
GitHub
6月 09, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #199 from Oneflow-Inc/wdl_multi_dataloader_process
Wdl multi dataloader thread
上级
c9a9342a
30f94808
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
38 addition
and
24 deletion
+38
-24
ClickThroughRate/WideDeepLearning/wdl_train_eval.py
ClickThroughRate/WideDeepLearning/wdl_train_eval.py
+38
-24
未找到文件。
ClickThroughRate/WideDeepLearning/wdl_train_eval.py
浏览文件 @
707e226e
...
...
@@ -26,6 +26,12 @@ def str_list(x):
return
x
.
split
(
','
)
parser
=
argparse
.
ArgumentParser
()
parser
.
add_argument
(
'--dataset_format'
,
type
=
str
,
default
=
'ofrecord'
,
help
=
'ofrecord or onerec'
)
parser
.
add_argument
(
"--use_single_dataloader_thread"
,
action
=
"store_true"
,
help
=
"use single dataloader threads per node or not."
)
parser
.
add_argument
(
'--num_dataloader_thread_per_gpu'
,
type
=
int
,
default
=
2
)
parser
.
add_argument
(
'--train_data_dir'
,
type
=
str
,
default
=
''
)
parser
.
add_argument
(
'--train_data_part_num'
,
type
=
int
,
default
=
1
)
parser
.
add_argument
(
'--train_part_name_suffix_length'
,
type
=
int
,
default
=-
1
)
...
...
@@ -60,16 +66,23 @@ FLAGS = parser.parse_args()
DEEP_HIDDEN_UNITS
=
[
FLAGS
.
hidden_size
for
i
in
range
(
FLAGS
.
hidden_units_num
)]
def
_data_loader
(
data_dir
,
data_part_num
,
batch_size
,
part_name_suffix_length
=-
1
,
shuffle
=
True
):
if
FLAGS
.
dataset_format
==
'ofrecord'
:
return
_data_loader_ofrecord
(
data_dir
,
data_part_num
,
batch_size
,
part_name_suffix_length
,
shuffle
)
elif
FLAGS
.
dataset_format
==
'onerec'
:
return
_data_loader_onerec
(
data_dir
,
batch_size
,
shuffle
)
elif
FLAGS
.
dataset_format
==
'synthetic'
:
return
_data_loader_synthetic
(
batch_size
)
assert
FLAGS
.
num_dataloader_thread_per_gpu
>=
1
if
FLAGS
.
use_single_dataloader_thread
:
devices
=
[
'{}:0'
.
format
(
i
)
for
i
in
range
(
FLAGS
.
num_nodes
)]
else
:
assert
0
,
"Please specify dataset_type as `ofrecord`, `onerec` or `synthetic`."
num_dataloader_thread
=
FLAGS
.
num_dataloader_thread_per_gpu
*
FLAGS
.
gpu_num_per_node
devices
=
[
'{}:0-{}'
.
format
(
i
,
num_dataloader_thread
-
1
)
for
i
in
range
(
FLAGS
.
num_nodes
)]
with
flow
.
scope
.
placement
(
"cpu"
,
devices
):
if
FLAGS
.
dataset_format
==
'ofrecord'
:
data
=
_data_loader_ofrecord
(
data_dir
,
data_part_num
,
batch_size
,
part_name_suffix_length
,
shuffle
)
elif
FLAGS
.
dataset_format
==
'onerec'
:
data
=
_data_loader_onerec
(
data_dir
,
batch_size
,
shuffle
)
elif
FLAGS
.
dataset_format
==
'synthetic'
:
data
=
_data_loader_synthetic
(
batch_size
)
else
:
assert
0
,
"Please specify dataset_type as `ofrecord`, `onerec` or `synthetic`."
return
flow
.
identity_n
(
data
)
def
_data_loader_ofrecord
(
data_dir
,
data_part_num
,
batch_size
,
part_name_suffix_length
=-
1
,
...
...
@@ -88,22 +101,20 @@ def _data_loader_ofrecord(data_dir, data_part_num, batch_size, part_name_suffix_
dense_fields
=
_blob_decoder
(
"dense_fields"
,
(
FLAGS
.
num_dense_fields
,),
flow
.
float
)
wide_sparse_fields
=
_blob_decoder
(
"wide_sparse_fields"
,
(
FLAGS
.
num_wide_sparse_fields
,))
deep_sparse_fields
=
_blob_decoder
(
"deep_sparse_fields"
,
(
FLAGS
.
num_deep_sparse_fields
,))
return
flow
.
identity_n
([
labels
,
dense_fields
,
wide_sparse_fields
,
deep_sparse_fields
])
return
[
labels
,
dense_fields
,
wide_sparse_fields
,
deep_sparse_fields
]
def
_data_loader_synthetic
(
batch_size
):
devices
=
[
'{}:0-{}'
.
format
(
i
,
FLAGS
.
gpu_num_per_node
-
1
)
for
i
in
range
(
FLAGS
.
num_nodes
)]
with
flow
.
scope
.
placement
(
"cpu"
,
devices
):
def
_blob_random
(
shape
,
dtype
=
flow
.
int32
,
initializer
=
flow
.
zeros_initializer
(
flow
.
int32
)):
return
flow
.
data
.
decode_random
(
shape
=
shape
,
dtype
=
dtype
,
batch_size
=
batch_size
,
initializer
=
initializer
)
labels
=
_blob_random
((
1
,),
initializer
=
flow
.
random_uniform_initializer
(
dtype
=
flow
.
int32
))
dense_fields
=
_blob_random
((
FLAGS
.
num_dense_fields
,),
dtype
=
flow
.
float
,
initializer
=
flow
.
random_uniform_initializer
())
wide_sparse_fields
=
_blob_random
((
FLAGS
.
num_wide_sparse_fields
,))
deep_sparse_fields
=
_blob_random
((
FLAGS
.
num_deep_sparse_fields
,))
print
(
'use synthetic data'
)
return
flow
.
identity_n
([
labels
,
dense_fields
,
wide_sparse_fields
,
deep_sparse_fields
])
def
_blob_random
(
shape
,
dtype
=
flow
.
int32
,
initializer
=
flow
.
zeros_initializer
(
flow
.
int32
)):
return
flow
.
data
.
decode_random
(
shape
=
shape
,
dtype
=
dtype
,
batch_size
=
batch_size
,
initializer
=
initializer
)
labels
=
_blob_random
((
1
,),
initializer
=
flow
.
random_uniform_initializer
(
dtype
=
flow
.
int32
))
dense_fields
=
_blob_random
((
FLAGS
.
num_dense_fields
,),
dtype
=
flow
.
float
,
initializer
=
flow
.
random_uniform_initializer
())
wide_sparse_fields
=
_blob_random
((
FLAGS
.
num_wide_sparse_fields
,))
deep_sparse_fields
=
_blob_random
((
FLAGS
.
num_deep_sparse_fields
,))
print
(
'use synthetic data'
)
return
[
labels
,
dense_fields
,
wide_sparse_fields
,
deep_sparse_fields
]
def
_data_loader_onerec
(
data_dir
,
batch_size
,
shuffle
):
...
...
@@ -112,6 +123,7 @@ def _data_loader_onerec(data_dir, batch_size, shuffle):
files
=
glob
.
glob
(
os
.
path
.
join
(
data_dir
,
'*.onerec'
))
readdata
=
flow
.
data
.
onerec_reader
(
files
=
files
,
batch_size
=
batch_size
,
random_shuffle
=
shuffle
,
verify_example
=
False
,
shuffle_mode
=
"batch"
,
shuffle_buffer_size
=
64
,
shuffle_after_epoch
=
shuffle
)
...
...
@@ -122,7 +134,7 @@ def _data_loader_onerec(data_dir, batch_size, shuffle):
dense_fields
=
_blob_decoder
(
"dense_fields"
,
(
FLAGS
.
num_dense_fields
,),
flow
.
float
)
wide_sparse_fields
=
_blob_decoder
(
"wide_sparse_fields"
,
(
FLAGS
.
num_wide_sparse_fields
,))
deep_sparse_fields
=
_blob_decoder
(
"deep_sparse_fields"
,
(
FLAGS
.
num_deep_sparse_fields
,))
return
flow
.
identity_n
([
labels
,
dense_fields
,
wide_sparse_fields
,
deep_sparse_fields
])
return
[
labels
,
dense_fields
,
wide_sparse_fields
,
deep_sparse_fields
]
def
_model
(
dense_fields
,
wide_sparse_fields
,
deep_sparse_fields
):
...
...
@@ -260,7 +272,9 @@ def main():
flow
.
config
.
gpu_device_num
(
FLAGS
.
gpu_num_per_node
)
flow
.
config
.
enable_model_io_v2
(
True
)
flow
.
config
.
enable_debug_mode
(
True
)
flow
.
config
.
collective_boxing
.
nccl_enable_all_to_all
(
True
)
flow
.
config
.
enable_legacy_model_io
(
True
)
flow
.
config
.
nccl_use_compute_stream
(
True
)
# flow.config.collective_boxing.nccl_enable_all_to_all(True)
#flow.config.enable_numa_aware_cuda_malloc_host(True)
#flow.config.collective_boxing.enable_fusion(False)
check_point
=
flow
.
train
.
CheckPoint
()
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录