Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
PaddleRec
提交
154e5da2
P
PaddleRec
项目概览
PaddlePaddle
/
PaddleRec
通知
68
Star
12
Fork
5
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
27
列表
看板
标记
里程碑
合并请求
10
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
P
PaddleRec
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
27
Issue
27
列表
看板
标记
里程碑
合并请求
10
合并请求
10
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
154e5da2
编写于
3月 31, 2020
作者:
T
tangwei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
rename to eleps
上级
dc9f2dac
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
92 addition
and
51 deletion
+92
-51
models/base.py
models/base.py
+3
-3
models/ctr_dnn/model.py
models/ctr_dnn/model.py
+32
-0
models/ctr_dnn/reader.py
models/ctr_dnn/reader.py
+7
-0
reader/dataset.py
reader/dataset.py
+5
-5
trainer/ctr_trainer.py
trainer/ctr_trainer.py
+41
-39
trainer/trainer.py
trainer/trainer.py
+0
-0
utils/fs.py
utils/fs.py
+0
-0
utils/table.py
utils/table.py
+0
-0
utils/util.py
utils/util.py
+4
-4
未找到文件。
models/base.py
浏览文件 @
154e5da2
...
@@ -5,7 +5,7 @@ import abc
...
@@ -5,7 +5,7 @@ import abc
import
copy
import
copy
import
yaml
import
yaml
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
import
kagle.utils.kagle_table
as
kagle_
table
from
..utils
import
table
as
table
from
paddle.fluid.incubate.fleet.parameter_server.pslib
import
fleet
from
paddle.fluid.incubate.fleet.parameter_server.pslib
import
fleet
...
@@ -187,7 +187,7 @@ class YamlModel(Model):
...
@@ -187,7 +187,7 @@ class YamlModel(Model):
if
self
.
_build_nodes
[
phase
]
is
None
:
if
self
.
_build_nodes
[
phase
]
is
None
:
continue
continue
for
node
in
self
.
_build_nodes
[
phase
]:
for
node
in
self
.
_build_nodes
[
phase
]:
exec
(
"""layer=
kagle_
layer.{}(node)"""
.
format
(
node
[
'class'
]))
exec
(
"""layer=layer.{}(node)"""
.
format
(
node
[
'class'
]))
layer_output
,
extend_output
=
layer
.
generate
(
self
.
_config
[
'mode'
],
self
.
_build_param
)
layer_output
,
extend_output
=
layer
.
generate
(
self
.
_config
[
'mode'
],
self
.
_build_param
)
self
.
_build_param
[
'layer'
][
node
[
'name'
]]
=
layer_output
self
.
_build_param
[
'layer'
][
node
[
'name'
]]
=
layer_output
self
.
_build_param
[
'layer_extend'
][
node
[
'name'
]]
=
extend_output
self
.
_build_param
[
'layer_extend'
][
node
[
'name'
]]
=
extend_output
...
@@ -208,7 +208,7 @@ class YamlModel(Model):
...
@@ -208,7 +208,7 @@ class YamlModel(Model):
param_name
=
inference_param
[
'name'
]
param_name
=
inference_param
[
'name'
]
if
param_name
not
in
self
.
_build_param
[
'table'
]:
if
param_name
not
in
self
.
_build_param
[
'table'
]:
self
.
_build_param
[
'table'
][
param_name
]
=
{
'params'
:[]}
self
.
_build_param
[
'table'
][
param_name
]
=
{
'params'
:[]}
table_meta
=
kagle_
table
.
TableMeta
.
alloc_new_table
(
inference_param
[
'table_id'
])
table_meta
=
table
.
TableMeta
.
alloc_new_table
(
inference_param
[
'table_id'
])
self
.
_build_param
[
'table'
][
param_name
][
'_meta'
]
=
table_meta
self
.
_build_param
[
'table'
][
param_name
][
'_meta'
]
=
table_meta
self
.
_build_param
[
'table'
][
param_name
][
'params'
]
+=
inference_param
[
'params'
]
self
.
_build_param
[
'table'
][
param_name
][
'params'
]
+=
inference_param
[
'params'
]
pass
pass
...
...
models/ctr_dnn/model.py
浏览文件 @
154e5da2
class
TrainModel
(
object
):
def
input
(
self
):
pass
def
net
(
self
):
pass
def
net
(
self
):
pass
def
loss
(
self
):
pass
def
optimizer
(
self
):
pass
class
InferModel
(
object
):
def
input
(
self
):
pass
def
net
(
self
):
pass
def
net
(
self
):
pass
def
loss
(
self
):
pass
def
optimizer
(
self
):
pass
models/ctr_dnn/reader.py
浏览文件 @
154e5da2
def
TrainReader
():
pass
def
InferReader
():
pass
reader/dataset.py
浏览文件 @
154e5da2
...
@@ -7,8 +7,8 @@ import yaml
...
@@ -7,8 +7,8 @@ import yaml
import
time
import
time
import
datetime
import
datetime
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
import
kagle.utils.kagle_fs
as
kagle_
fs
from
..
utils
import
fs
as
fs
import
kagle.utils.kagle_util
as
kagle_
util
from
..
utils
import
util
as
util
class
Dataset
(
object
):
class
Dataset
(
object
):
...
@@ -61,16 +61,16 @@ class TimeSplitDataset(Dataset):
...
@@ -61,16 +61,16 @@ class TimeSplitDataset(Dataset):
Dataset
.
__init__
(
self
,
config
)
Dataset
.
__init__
(
self
,
config
)
if
'data_donefile'
not
in
config
or
config
[
'data_donefile'
]
is
None
:
if
'data_donefile'
not
in
config
or
config
[
'data_donefile'
]
is
None
:
config
[
'data_donefile'
]
=
config
[
'data_path'
]
+
"/to.hadoop.done"
config
[
'data_donefile'
]
=
config
[
'data_path'
]
+
"/to.hadoop.done"
self
.
_path_generator
=
kagle_
util
.
PathGenerator
({
'templates'
:
[
self
.
_path_generator
=
util
.
PathGenerator
({
'templates'
:
[
{
'name'
:
'data_path'
,
'template'
:
config
[
'data_path'
]},
{
'name'
:
'data_path'
,
'template'
:
config
[
'data_path'
]},
{
'name'
:
'donefile_path'
,
'template'
:
config
[
'data_donefile'
]}
{
'name'
:
'donefile_path'
,
'template'
:
config
[
'data_donefile'
]}
]})
]})
self
.
_split_interval
=
config
[
'split_interval'
]
# data split N mins per dir
self
.
_split_interval
=
config
[
'split_interval'
]
# data split N mins per dir
self
.
_data_file_handler
=
kagle_
fs
.
FileHandler
(
config
)
self
.
_data_file_handler
=
fs
.
FileHandler
(
config
)
def
_format_data_time
(
self
,
daytime_str
,
time_window_mins
):
def
_format_data_time
(
self
,
daytime_str
,
time_window_mins
):
""" """
""" """
data_time
=
kagle_
util
.
make_datetime
(
daytime_str
)
data_time
=
util
.
make_datetime
(
daytime_str
)
mins_of_day
=
data_time
.
hour
*
60
+
data_time
.
minute
mins_of_day
=
data_time
.
hour
*
60
+
data_time
.
minute
begin_stage
=
mins_of_day
/
self
.
_split_interval
begin_stage
=
mins_of_day
/
self
.
_split_interval
end_stage
=
(
mins_of_day
+
time_window_mins
)
/
self
.
_split_interval
end_stage
=
(
mins_of_day
+
time_window_mins
)
/
self
.
_split_interval
...
...
trainer/ctr_trainer.py
浏览文件 @
154e5da2
...
@@ -12,12 +12,14 @@ import datetime
...
@@ -12,12 +12,14 @@ import datetime
import
numpy
as
np
import
numpy
as
np
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
import
kagle.utils.kagle_fs
as
kagle_fs
import
kagle.utils.kagle_util
as
kagle_util
from
..
utils
import
fs
as
fs
import
kagle.kagle_model
as
kagle_model
from
..
utils
import
util
as
util
import
kagle.kagle_metric
as
kagle_metric
from
..
metrics
.
auc_metrics
import
AUCMetric
import
kagle.reader.dataset
as
kagle_dataset
from
..
models
import
base
as
model_basic
import
kagle.trainer.kagle_trainer
as
kagle_trainer
from
..
reader
import
dataset
from
.
import
trainer
from
paddle.fluid.incubate.fleet.parameter_server.pslib
import
fleet
from
paddle.fluid.incubate.fleet.parameter_server.pslib
import
fleet
from
paddle.fluid.incubate.fleet.base.role_maker
import
GeneralRoleMaker
from
paddle.fluid.incubate.fleet.base.role_maker
import
GeneralRoleMaker
...
@@ -62,22 +64,22 @@ def worker_numric_max(value, env="mpi"):
...
@@ -62,22 +64,22 @@ def worker_numric_max(value, env="mpi"):
return
wroker_numric_opt
(
value
,
env
,
"max"
)
return
wroker_numric_opt
(
value
,
env
,
"max"
)
class
CtrPaddleTrainer
(
kagle_
trainer
.
Trainer
):
class
CtrPaddleTrainer
(
trainer
.
Trainer
):
"""R
"""R
"""
"""
def
__init__
(
self
,
config
):
def
__init__
(
self
,
config
):
"""R
"""R
"""
"""
kagle_
trainer
.
Trainer
.
__init__
(
self
,
config
)
trainer
.
Trainer
.
__init__
(
self
,
config
)
config
[
'output_path'
]
=
kagle_
util
.
get_absolute_path
(
config
[
'output_path'
]
=
util
.
get_absolute_path
(
config
[
'output_path'
],
config
[
'io'
][
'afs'
])
config
[
'output_path'
],
config
[
'io'
][
'afs'
])
self
.
global_config
=
config
self
.
global_config
=
config
self
.
_place
=
fluid
.
CPUPlace
()
self
.
_place
=
fluid
.
CPUPlace
()
self
.
_exe
=
fluid
.
Executor
(
self
.
_place
)
self
.
_exe
=
fluid
.
Executor
(
self
.
_place
)
self
.
_exector_context
=
{}
self
.
_exector_context
=
{}
self
.
_metrics
=
{}
self
.
_metrics
=
{}
self
.
_path_generator
=
kagle_
util
.
PathGenerator
({
self
.
_path_generator
=
util
.
PathGenerator
({
'templates'
:
[
'templates'
:
[
{
'name'
:
'xbox_base_done'
,
'template'
:
config
[
'output_path'
]
+
'/xbox_base_done.txt'
},
{
'name'
:
'xbox_base_done'
,
'template'
:
config
[
'output_path'
]
+
'/xbox_base_done.txt'
},
{
'name'
:
'xbox_delta_done'
,
'template'
:
config
[
'output_path'
]
+
'/xbox_patch_done.txt'
},
{
'name'
:
'xbox_delta_done'
,
'template'
:
config
[
'output_path'
]
+
'/xbox_patch_done.txt'
},
...
@@ -116,7 +118,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -116,7 +118,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
scope
=
fluid
.
Scope
()
scope
=
fluid
.
Scope
()
self
.
_exector_context
[
executor
[
'name'
]]
=
{}
self
.
_exector_context
[
executor
[
'name'
]]
=
{}
self
.
_exector_context
[
executor
[
'name'
]][
'scope'
]
=
scope
self
.
_exector_context
[
executor
[
'name'
]][
'scope'
]
=
scope
self
.
_exector_context
[
executor
[
'name'
]][
'model'
]
=
kagle_model
.
create
(
executor
)
self
.
_exector_context
[
executor
[
'name'
]][
'model'
]
=
model_basic
.
create
(
executor
)
model
=
self
.
_exector_context
[
executor
[
'name'
]][
'model'
]
model
=
self
.
_exector_context
[
executor
[
'name'
]][
'model'
]
self
.
_metrics
.
update
(
model
.
get_metrics
())
self
.
_metrics
.
update
(
model
.
get_metrics
())
runnnable_scope
.
append
(
scope
)
runnnable_scope
.
append
(
scope
)
...
@@ -127,7 +129,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -127,7 +129,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
data_var_list
.
append
(
var
)
data_var_list
.
append
(
var
)
data_var_name_dict
[
var
.
name
]
=
var
data_var_name_dict
[
var
.
name
]
=
var
optimizer
=
kagle_model
.
Fluid
Model
.
build_optimizer
({
optimizer
=
model_basic
.
Yaml
Model
.
build_optimizer
({
'metrics'
:
self
.
_metrics
,
'metrics'
:
self
.
_metrics
,
'optimizer_conf'
:
self
.
global_config
[
'optimizer'
]
'optimizer_conf'
:
self
.
global_config
[
'optimizer'
]
})
})
...
@@ -153,7 +155,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -153,7 +155,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
dataset_item
[
'data_vars'
]
=
data_var_list
dataset_item
[
'data_vars'
]
=
data_var_list
dataset_item
.
update
(
self
.
global_config
[
'io'
][
'afs'
])
dataset_item
.
update
(
self
.
global_config
[
'io'
][
'afs'
])
dataset_item
[
"batch_size"
]
=
self
.
global_config
[
'batch_size'
]
dataset_item
[
"batch_size"
]
=
self
.
global_config
[
'batch_size'
]
self
.
_dataset
[
dataset_item
[
'name'
]]
=
kagle_
dataset
.
FluidTimeSplitDataset
(
dataset_item
)
self
.
_dataset
[
dataset_item
[
'name'
]]
=
dataset
.
FluidTimeSplitDataset
(
dataset_item
)
# if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
# if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
# util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
# util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
fleet
.
init_worker
()
fleet
.
init_worker
()
...
@@ -176,7 +178,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -176,7 +178,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
"""R
"""R
"""
"""
metrics
=
model
.
get_metrics
()
metrics
=
model
.
get_metrics
()
metric_calculator
=
kagle_metric
.
AUCMetric
(
None
)
metric_calculator
=
AUCMetric
(
None
)
for
metric
in
metrics
:
for
metric
in
metrics
:
metric_param
=
{
'label'
:
metric
,
'metric_dict'
:
metrics
[
metric
]}
metric_param
=
{
'label'
:
metric
,
'metric_dict'
:
metrics
[
metric
]}
metric_calculator
.
calculate
(
scope
,
metric_param
)
metric_calculator
.
calculate
(
scope
,
metric_param
)
...
@@ -188,13 +190,13 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -188,13 +190,13 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
def
save_model
(
self
,
day
,
pass_index
,
base_key
):
def
save_model
(
self
,
day
,
pass_index
,
base_key
):
"""R
"""R
"""
"""
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
{
'master'
:
True
,
'log_format'
:
'save model cost %s sec'
})
{
'master'
:
True
,
'log_format'
:
'save model cost %s sec'
})
model_path
=
self
.
_path_generator
.
generate_path
(
'batch_model'
,
{
'day'
:
day
,
'pass_id'
:
pass_index
})
model_path
=
self
.
_path_generator
.
generate_path
(
'batch_model'
,
{
'day'
:
day
,
'pass_id'
:
pass_index
})
save_mode
=
0
# just save all
save_mode
=
0
# just save all
if
pass_index
<
1
:
# batch_model
if
pass_index
<
1
:
# batch_model
save_mode
=
3
# unseen_day++, save all
save_mode
=
3
# unseen_day++, save all
kagle_
util
.
rank0_print
(
"going to save_model %s"
%
model_path
)
util
.
rank0_print
(
"going to save_model %s"
%
model_path
)
fleet
.
save_persistables
(
None
,
model_path
,
mode
=
save_mode
)
fleet
.
save_persistables
(
None
,
model_path
,
mode
=
save_mode
)
if
fleet
.
_role_maker
.
is_first_worker
():
if
fleet
.
_role_maker
.
is_first_worker
():
self
.
_train_pass
.
save_train_progress
(
day
,
pass_index
,
base_key
,
model_path
,
is_checkpoint
=
True
)
self
.
_train_pass
.
save_train_progress
(
day
,
pass_index
,
base_key
,
model_path
,
is_checkpoint
=
True
)
...
@@ -206,11 +208,11 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -206,11 +208,11 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
"""
"""
stdout_str
=
""
stdout_str
=
""
xbox_patch_id
=
str
(
int
(
time
.
time
()))
xbox_patch_id
=
str
(
int
(
time
.
time
()))
kagle_
util
.
rank0_print
(
"begin save delta model"
)
util
.
rank0_print
(
"begin save delta model"
)
model_path
=
""
model_path
=
""
xbox_model_donefile
=
""
xbox_model_donefile
=
""
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
{
'master'
:
True
,
\
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
{
'master'
:
True
,
\
'log_format'
:
'save xbox model cost %s sec'
,
'log_format'
:
'save xbox model cost %s sec'
,
'stdout'
:
stdout_str
})
'stdout'
:
stdout_str
})
if
pass_index
<
1
:
if
pass_index
<
1
:
...
@@ -225,23 +227,23 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -225,23 +227,23 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
total_save_num
=
fleet
.
save_persistables
(
None
,
model_path
,
mode
=
save_mode
)
total_save_num
=
fleet
.
save_persistables
(
None
,
model_path
,
mode
=
save_mode
)
cost_printer
.
done
()
cost_printer
.
done
()
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
{
'master'
:
True
,
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
{
'master'
:
True
,
'log_format'
:
'save cache model cost %s sec'
,
'log_format'
:
'save cache model cost %s sec'
,
'stdout'
:
stdout_str
})
'stdout'
:
stdout_str
})
model_file_handler
=
kagle_
fs
.
FileHandler
(
self
.
global_config
[
'io'
][
'afs'
])
model_file_handler
=
fs
.
FileHandler
(
self
.
global_config
[
'io'
][
'afs'
])
if
self
.
global_config
[
'save_cache_model'
]:
if
self
.
global_config
[
'save_cache_model'
]:
cache_save_num
=
fleet
.
save_cache_model
(
None
,
model_path
,
mode
=
save_mode
)
cache_save_num
=
fleet
.
save_cache_model
(
None
,
model_path
,
mode
=
save_mode
)
model_file_handler
.
write
(
model_file_handler
.
write
(
"file_prefix:part
\n
part_num:16
\n
key_num:%d
\n
"
%
cache_save_num
,
"file_prefix:part
\n
part_num:16
\n
key_num:%d
\n
"
%
cache_save_num
,
model_path
+
'/000_cache/sparse_cache.meta'
,
'w'
)
model_path
+
'/000_cache/sparse_cache.meta'
,
'w'
)
cost_printer
.
done
()
cost_printer
.
done
()
kagle_
util
.
rank0_print
(
"save xbox cache model done, key_num=%s"
%
cache_save_num
)
util
.
rank0_print
(
"save xbox cache model done, key_num=%s"
%
cache_save_num
)
save_env_param
=
{
save_env_param
=
{
'executor'
:
self
.
_exe
,
'executor'
:
self
.
_exe
,
'save_combine'
:
True
'save_combine'
:
True
}
}
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
{
'master'
:
True
,
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
{
'master'
:
True
,
'log_format'
:
'save dense model cost %s sec'
,
'log_format'
:
'save dense model cost %s sec'
,
'stdout'
:
stdout_str
})
'stdout'
:
stdout_str
})
if
fleet
.
_role_maker
.
is_first_worker
():
if
fleet
.
_role_maker
.
is_first_worker
():
...
@@ -269,8 +271,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -269,8 +271,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
"monitor_data"
:
monitor_data
,
"monitor_data"
:
monitor_data
,
"mpi_size"
:
str
(
fleet
.
worker_num
()),
"mpi_size"
:
str
(
fleet
.
worker_num
()),
"input"
:
model_path
.
rstrip
(
"/"
)
+
"/000"
,
"input"
:
model_path
.
rstrip
(
"/"
)
+
"/000"
,
"job_id"
:
kagle_
util
.
get_env_value
(
"JOB_ID"
),
"job_id"
:
util
.
get_env_value
(
"JOB_ID"
),
"job_name"
:
kagle_
util
.
get_env_value
(
"JOB_NAME"
)
"job_name"
:
util
.
get_env_value
(
"JOB_NAME"
)
}
}
if
fleet
.
_role_maker
.
is_first_worker
():
if
fleet
.
_role_maker
.
is_first_worker
():
model_file_handler
.
write
(
json
.
dumps
(
xbox_done_info
)
+
"
\n
"
,
xbox_model_donefile
,
'a'
)
model_file_handler
.
write
(
json
.
dumps
(
xbox_done_info
)
+
"
\n
"
,
xbox_model_donefile
,
'a'
)
...
@@ -289,7 +291,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -289,7 +291,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
scope
=
self
.
_exector_context
[
executor_name
][
'scope'
]
scope
=
self
.
_exector_context
[
executor_name
][
'scope'
]
model
=
self
.
_exector_context
[
executor_name
][
'model'
]
model
=
self
.
_exector_context
[
executor_name
][
'model'
]
with
fluid
.
scope_guard
(
scope
):
with
fluid
.
scope_guard
(
scope
):
kagle_
util
.
rank0_print
(
"Begin "
+
executor_name
+
" pass"
)
util
.
rank0_print
(
"Begin "
+
executor_name
+
" pass"
)
begin
=
time
.
time
()
begin
=
time
.
time
()
program
=
model
.
_build_param
[
'model'
][
'train_program'
]
program
=
model
.
_build_param
[
'model'
][
'train_program'
]
self
.
_exe
.
train_from_dataset
(
program
,
dataset
,
scope
,
self
.
_exe
.
train_from_dataset
(
program
,
dataset
,
scope
,
...
@@ -299,12 +301,12 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -299,12 +301,12 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
avg_cost
=
worker_numric_avg
(
local_cost
)
avg_cost
=
worker_numric_avg
(
local_cost
)
min_cost
=
worker_numric_min
(
local_cost
)
min_cost
=
worker_numric_min
(
local_cost
)
max_cost
=
worker_numric_max
(
local_cost
)
max_cost
=
worker_numric_max
(
local_cost
)
kagle_
util
.
rank0_print
(
"avg train time %s mins, min %s mins, max %s mins"
%
(
avg_cost
,
min_cost
,
max_cost
))
util
.
rank0_print
(
"avg train time %s mins, min %s mins, max %s mins"
%
(
avg_cost
,
min_cost
,
max_cost
))
self
.
_exector_context
[
executor_name
][
'cost'
]
=
max_cost
self
.
_exector_context
[
executor_name
][
'cost'
]
=
max_cost
monitor_data
=
""
monitor_data
=
""
self
.
print_global_metrics
(
scope
,
model
,
monitor_data
,
stdout_str
)
self
.
print_global_metrics
(
scope
,
model
,
monitor_data
,
stdout_str
)
kagle_
util
.
rank0_print
(
"End "
+
executor_name
+
" pass"
)
util
.
rank0_print
(
"End "
+
executor_name
+
" pass"
)
if
self
.
_train_pass
.
need_dump_inference
(
pass_id
)
and
executor_config
[
'dump_inference_model'
]:
if
self
.
_train_pass
.
need_dump_inference
(
pass_id
)
and
executor_config
[
'dump_inference_model'
]:
stdout_str
+=
self
.
save_xbox_model
(
day
,
pass_id
,
xbox_base_key
,
monitor_data
)
stdout_str
+=
self
.
save_xbox_model
(
day
,
pass_id
,
xbox_base_key
,
monitor_data
)
fleet
.
_role_maker
.
_barrier_worker
()
fleet
.
_role_maker
.
_barrier_worker
()
...
@@ -317,9 +319,9 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -317,9 +319,9 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
context
[
'status'
]
=
'wait'
context
[
'status'
]
=
'wait'
return
return
stdout_str
=
""
stdout_str
=
""
self
.
_train_pass
=
kagle_
util
.
TimeTrainPass
(
self
.
global_config
)
self
.
_train_pass
=
util
.
TimeTrainPass
(
self
.
global_config
)
if
not
self
.
global_config
[
'cold_start'
]:
if
not
self
.
global_config
[
'cold_start'
]:
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
{
'master'
:
True
,
'log_format'
:
'load model cost %s sec'
,
{
'master'
:
True
,
'log_format'
:
'load model cost %s sec'
,
'stdout'
:
stdout_str
})
'stdout'
:
stdout_str
})
self
.
print_log
(
"going to load model %s"
%
self
.
_train_pass
.
_checkpoint_model_path
,
{
'master'
:
True
})
self
.
print_log
(
"going to load model %s"
%
self
.
_train_pass
.
_checkpoint_model_path
,
{
'master'
:
True
})
...
@@ -358,8 +360,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -358,8 +360,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
xbox_base_key
=
int
(
time
.
time
())
xbox_base_key
=
int
(
time
.
time
())
context
[
'status'
]
=
'begin_day'
context
[
'status'
]
=
'begin_day'
kagle_
util
.
rank0_print
(
"shrink table"
)
util
.
rank0_print
(
"shrink table"
)
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
{
'master'
:
True
,
'log_format'
:
'shrink table done, cost %s sec'
})
{
'master'
:
True
,
'log_format'
:
'shrink table done, cost %s sec'
})
fleet
.
shrink_sparse_table
()
fleet
.
shrink_sparse_table
()
for
executor
in
self
.
_exector_context
:
for
executor
in
self
.
_exector_context
:
...
@@ -370,9 +372,9 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -370,9 +372,9 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
cost_printer
.
done
()
cost_printer
.
done
()
next_date
=
self
.
_train_pass
.
date
(
delta_day
=
1
)
next_date
=
self
.
_train_pass
.
date
(
delta_day
=
1
)
kagle_
util
.
rank0_print
(
"going to save xbox base model"
)
util
.
rank0_print
(
"going to save xbox base model"
)
self
.
save_xbox_model
(
next_date
,
0
,
xbox_base_key
,
""
)
self
.
save_xbox_model
(
next_date
,
0
,
xbox_base_key
,
""
)
kagle_
util
.
rank0_print
(
"going to save batch model"
)
util
.
rank0_print
(
"going to save batch model"
)
self
.
save_model
(
next_date
,
0
,
xbox_base_key
)
self
.
save_model
(
next_date
,
0
,
xbox_base_key
)
self
.
_train_pass
.
_base_key
=
xbox_base_key
self
.
_train_pass
.
_base_key
=
xbox_base_key
fleet
.
_role_maker
.
_barrier_worker
()
fleet
.
_role_maker
.
_barrier_worker
()
...
@@ -388,7 +390,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -388,7 +390,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
self
.
print_log
(
" ==== begin delta:%s ========"
%
pass_id
,
{
'master'
:
True
,
'stdout'
:
stdout_str
})
self
.
print_log
(
" ==== begin delta:%s ========"
%
pass_id
,
{
'master'
:
True
,
'stdout'
:
stdout_str
})
train_begin_time
=
time
.
time
()
train_begin_time
=
time
.
time
()
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
\
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
\
{
'master'
:
True
,
'log_format'
:
'load into memory done, cost %s sec'
,
{
'master'
:
True
,
'log_format'
:
'load into memory done, cost %s sec'
,
'stdout'
:
stdout_str
})
'stdout'
:
stdout_str
})
current_dataset
=
{}
current_dataset
=
{}
...
@@ -400,8 +402,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -400,8 +402,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
fleet
.
_role_maker
.
_barrier_worker
()
fleet
.
_role_maker
.
_barrier_worker
()
cost_printer
.
done
()
cost_printer
.
done
()
kagle_
util
.
rank0_print
(
"going to global shuffle"
)
util
.
rank0_print
(
"going to global shuffle"
)
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
{
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
{
'master'
:
True
,
'stdout'
:
stdout_str
,
'master'
:
True
,
'stdout'
:
stdout_str
,
'log_format'
:
'global shuffle done, cost %s sec'
})
'log_format'
:
'global shuffle done, cost %s sec'
})
for
name
in
current_dataset
:
for
name
in
current_dataset
:
...
@@ -423,7 +425,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -423,7 +425,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
pure_train_begin
=
time
.
time
()
pure_train_begin
=
time
.
time
()
for
executor
in
self
.
global_config
[
'executor'
]:
for
executor
in
self
.
global_config
[
'executor'
]:
self
.
run_executor
(
executor
,
current_dataset
[
executor
[
'dataset_name'
]],
stdout_str
)
self
.
run_executor
(
executor
,
current_dataset
[
executor
[
'dataset_name'
]],
stdout_str
)
cost_printer
=
kagle_util
.
CostPrinter
(
kagle_
util
.
print_cost
,
\
cost_printer
=
util
.
CostPrinter
(
util
.
print_cost
,
\
{
'master'
:
True
,
'log_format'
:
'release_memory cost %s sec'
})
{
'master'
:
True
,
'log_format'
:
'release_memory cost %s sec'
})
for
name
in
current_dataset
:
for
name
in
current_dataset
:
current_dataset
[
name
].
release_memory
()
current_dataset
[
name
].
release_memory
()
...
@@ -439,8 +441,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
...
@@ -439,8 +441,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer):
for
executor
in
self
.
_exector_context
:
for
executor
in
self
.
_exector_context
:
log_str
+=
'['
+
executor
+
':'
+
str
(
self
.
_exector_context
[
executor
][
'cost'
])
+
']'
log_str
+=
'['
+
executor
+
':'
+
str
(
self
.
_exector_context
[
executor
][
'cost'
])
+
']'
log_str
+=
'[other_cost:'
+
str
(
other_cost
)
+
']'
log_str
+=
'[other_cost:'
+
str
(
other_cost
)
+
']'
kagle_
util
.
rank0_print
(
log_str
)
util
.
rank0_print
(
log_str
)
stdout_str
+=
kagle_
util
.
now_time_str
()
+
log_str
stdout_str
+=
util
.
now_time_str
()
+
log_str
sys
.
stdout
.
write
(
stdout_str
)
sys
.
stdout
.
write
(
stdout_str
)
fleet
.
_role_maker
.
_barrier_worker
()
fleet
.
_role_maker
.
_barrier_worker
()
stdout_str
=
""
stdout_str
=
""
...
...
trainer/
kagle_
trainer.py
→
trainer/trainer.py
浏览文件 @
154e5da2
文件已移动
utils/
kagle_
fs.py
→
utils/fs.py
浏览文件 @
154e5da2
文件已移动
utils/
kagle_
table.py
→
utils/table.py
浏览文件 @
154e5da2
文件已移动
utils/
kagle_
util.py
→
utils/util.py
浏览文件 @
154e5da2
...
@@ -4,7 +4,7 @@ Util lib
...
@@ -4,7 +4,7 @@ Util lib
import
os
import
os
import
time
import
time
import
datetime
import
datetime
import
kagle.utils.kagle_fs
as
kagle_
fs
from
..
utils
import
fs
as
fs
def
get_env_value
(
env_name
):
def
get_env_value
(
env_name
):
...
@@ -168,10 +168,10 @@ class TimeTrainPass(object):
...
@@ -168,10 +168,10 @@ class TimeTrainPass(object):
self
.
_pass_donefile_handler
=
None
self
.
_pass_donefile_handler
=
None
if
'pass_donefile_name'
in
self
.
_config
:
if
'pass_donefile_name'
in
self
.
_config
:
self
.
_train_pass_donefile
=
global_config
[
'output_path'
]
+
'/'
+
self
.
_config
[
'pass_donefile_name'
]
self
.
_train_pass_donefile
=
global_config
[
'output_path'
]
+
'/'
+
self
.
_config
[
'pass_donefile_name'
]
if
kagle_
fs
.
is_afs_path
(
self
.
_train_pass_donefile
):
if
fs
.
is_afs_path
(
self
.
_train_pass_donefile
):
self
.
_pass_donefile_handler
=
kagle_
fs
.
FileHandler
(
global_config
[
'io'
][
'afs'
])
self
.
_pass_donefile_handler
=
fs
.
FileHandler
(
global_config
[
'io'
][
'afs'
])
else
:
else
:
self
.
_pass_donefile_handler
=
kagle_
fs
.
FileHandler
(
global_config
[
'io'
][
'local_fs'
])
self
.
_pass_donefile_handler
=
fs
.
FileHandler
(
global_config
[
'io'
][
'local_fs'
])
last_done
=
self
.
_pass_donefile_handler
.
cat
(
self
.
_train_pass_donefile
).
strip
().
split
(
'
\n
'
)[
-
1
]
last_done
=
self
.
_pass_donefile_handler
.
cat
(
self
.
_train_pass_donefile
).
strip
().
split
(
'
\n
'
)[
-
1
]
done_fileds
=
last_done
.
split
(
'
\t
'
)
done_fileds
=
last_done
.
split
(
'
\t
'
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录