提交 c68f2694 编写于 作者: T tangwei

fix code style

上级 66e1859f
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
trainer implement. trainer implement.
...@@ -22,5 +21,3 @@ Trainer ...@@ -22,5 +21,3 @@ Trainer
↘ (for online learning training) OnlineLearningTrainer ↘ (for online learning training) OnlineLearningTrainer
""" """
...@@ -59,8 +59,10 @@ class CtrTrainer(Trainer): ...@@ -59,8 +59,10 @@ class CtrTrainer(Trainer):
reader_class = envs.get_global_env("class", None, namespace) reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__)) abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN", self._config_yaml) pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN",
train_data_path = envs.get_global_env("train_data_path", None, namespace) self._config_yaml)
train_data_path = envs.get_global_env("train_data_path", None,
namespace)
dataset = fluid.DatasetFactory().create_dataset() dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs) dataset.set_use_var(inputs)
...@@ -87,7 +89,8 @@ class CtrTrainer(Trainer): ...@@ -87,7 +89,8 @@ class CtrTrainer(Trainer):
self.model.train_net() self.model.train_net()
optimizer = self.model.optimizer() optimizer = self.model.optimizer()
optimizer = fleet.distributed_optimizer(optimizer, strategy={"use_cvm": False}) optimizer = fleet.distributed_optimizer(
optimizer, strategy={"use_cvm": False})
optimizer.minimize(self.model.get_avg_cost()) optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server(): if fleet.is_server():
...@@ -118,12 +121,14 @@ class CtrTrainer(Trainer): ...@@ -118,12 +121,14 @@ class CtrTrainer(Trainer):
gs = shuf * 0 gs = shuf * 0
fleet._role_maker._node_type_comm.Allreduce(shuf, gs) fleet._role_maker._node_type_comm.Allreduce(shuf, gs)
print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index(), fleet.worker_num(), gs)) print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index(
), fleet.worker_num(), gs))
epochs = envs.get_global_env("train.epochs") epochs = envs.get_global_env("train.epochs")
for i in range(epochs): for i in range(epochs):
self._exe.train_from_dataset(program=fluid.default_main_program(), self._exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset, dataset=dataset,
fetch_list=self.fetch_vars, fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias, fetch_info=self.fetch_alias,
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import datetime import datetime
import json import json
import sys import sys
...@@ -23,7 +22,6 @@ import paddle.fluid as fluid ...@@ -23,7 +22,6 @@ import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddlerec.core.utils import fs as fs from paddlerec.core.utils import fs as fs
from paddlerec.core.utils import util as util from paddlerec.core.utils import util as util
from paddlerec.core.metrics.auc_metrics import AUCMetric from paddlerec.core.metrics.auc_metrics import AUCMetric
...@@ -80,20 +78,31 @@ class CtrTrainer(Trainer): ...@@ -80,20 +78,31 @@ class CtrTrainer(Trainer):
"""R """R
""" """
Trainer.__init__(self, config) Trainer.__init__(self, config)
config['output_path'] = util.get_absolute_path( config['output_path'] = util.get_absolute_path(config['output_path'],
config['output_path'], config['io']['afs']) config['io']['afs'])
self.global_config = config self.global_config = config
self._metrics = {} self._metrics = {}
self._path_generator = util.PathGenerator({ self._path_generator = util.PathGenerator({
'templates': [ 'templates': [{
{'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, 'name': 'xbox_base_done',
{'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, 'template': config['output_path'] + '/xbox_base_done.txt'
{'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, }, {
{'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'}, 'name': 'xbox_delta_done',
{'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'} 'template': config['output_path'] + '/xbox_patch_done.txt'
] }, {
'name': 'xbox_base',
'template': config['output_path'] + '/xbox/{day}/base/'
}, {
'name': 'xbox_delta',
'template':
config['output_path'] + '/xbox/{day}/delta-{pass_id}/'
}, {
'name': 'batch_model',
'template':
config['output_path'] + '/batch_model/{day}/{pass_id}/'
}]
}) })
if 'path_generator' in config: if 'path_generator' in config:
self._path_generator.add_path_template(config['path_generator']) self._path_generator.add_path_template(config['path_generator'])
...@@ -111,9 +120,11 @@ class CtrTrainer(Trainer): ...@@ -111,9 +120,11 @@ class CtrTrainer(Trainer):
if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu': if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu':
afs_config = self.global_config['io']['afs'] afs_config = self.global_config['io']['afs']
role_maker = GeneralRoleMaker( role_maker = GeneralRoleMaker(
hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'], hdfs_name=afs_config['fs_name'],
hdfs_ugi=afs_config['fs_ugi'],
path=self.global_config['output_path'] + "/gloo", path=self.global_config['output_path'] + "/gloo",
init_timeout_seconds=1200, run_timeout_seconds=1200) init_timeout_seconds=1200,
run_timeout_seconds=1200)
fleet.init(role_maker) fleet.init(role_maker)
data_var_list = [] data_var_list = []
data_var_name_dict = {} data_var_name_dict = {}
...@@ -125,7 +136,8 @@ class CtrTrainer(Trainer): ...@@ -125,7 +136,8 @@ class CtrTrainer(Trainer):
scope = fluid.Scope() scope = fluid.Scope()
self._exector_context[executor['name']] = {} self._exector_context[executor['name']] = {}
self._exector_context[executor['name']]['scope'] = scope self._exector_context[executor['name']]['scope'] = scope
self._exector_context[executor['name']]['model'] = model_basic.create(executor) self._exector_context[executor['name']][
'model'] = model_basic.create(executor)
model = self._exector_context[executor['name']]['model'] model = self._exector_context[executor['name']]['model']
self._metrics.update(model.get_metrics()) self._metrics.update(model.get_metrics())
runnnable_scope.append(scope) runnnable_scope.append(scope)
...@@ -146,9 +158,12 @@ class CtrTrainer(Trainer): ...@@ -146,9 +158,12 @@ class CtrTrainer(Trainer):
model = self._exector_context[executor['name']]['model'] model = self._exector_context[executor['name']]['model']
program = model._build_param['model']['train_program'] program = model._build_param['model']['train_program']
if not executor['is_update_sparse']: if not executor['is_update_sparse']:
program._fleet_opt["program_configs"][str(id(model.get_avg_cost().block.program))]["push_sparse"] = [] program._fleet_opt["program_configs"][str(
id(model.get_avg_cost().block.program))][
"push_sparse"] = []
if 'train_thread_num' not in executor: if 'train_thread_num' not in executor:
executor['train_thread_num'] = self.global_config['train_thread_num'] executor['train_thread_num'] = self.global_config[
'train_thread_num']
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
self._exe.run(model._build_param['model']['startup_program']) self._exe.run(model._build_param['model']['startup_program'])
model.dump_model_program('./') model.dump_model_program('./')
...@@ -162,7 +177,8 @@ class CtrTrainer(Trainer): ...@@ -162,7 +177,8 @@ class CtrTrainer(Trainer):
dataset_item['data_vars'] = data_var_list dataset_item['data_vars'] = data_var_list
dataset_item.update(self.global_config['io']['afs']) dataset_item.update(self.global_config['io']['afs'])
dataset_item["batch_size"] = self.global_config['batch_size'] dataset_item["batch_size"] = self.global_config['batch_size']
self._dataset[dataset_item['name']] = dataset.FluidTimeSplitDataset(dataset_item) self._dataset[dataset_item[
'name']] = dataset.FluidTimeSplitDataset(dataset_item)
# if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass: # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
# util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3) # util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
fleet.init_worker() fleet.init_worker()
...@@ -190,23 +206,30 @@ class CtrTrainer(Trainer): ...@@ -190,23 +206,30 @@ class CtrTrainer(Trainer):
metric_param = {'label': metric, 'metric_dict': metrics[metric]} metric_param = {'label': metric, 'metric_dict': metrics[metric]}
metric_calculator.calculate(scope, metric_param) metric_calculator.calculate(scope, metric_param)
metric_result = metric_calculator.get_result_to_string() metric_result = metric_calculator.get_result_to_string()
self.print_log(metric_result, {'master': True, 'stdout': stdout_str}) self.print_log(metric_result,
{'master': True,
'stdout': stdout_str})
monitor_data += metric_result monitor_data += metric_result
metric_calculator.clear(scope, metric_param) metric_calculator.clear(scope, metric_param)
def save_model(self, day, pass_index, base_key): def save_model(self, day, pass_index, base_key):
"""R """R
""" """
cost_printer = util.CostPrinter(util.print_cost, cost_printer = util.CostPrinter(util.print_cost, {
{'master': True, 'log_format': 'save model cost %s sec'}) 'master': True,
model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index}) 'log_format': 'save model cost %s sec'
})
model_path = self._path_generator.generate_path(
'batch_model', {'day': day,
'pass_id': pass_index})
save_mode = 0 # just save all save_mode = 0 # just save all
if pass_index < 1: # batch_model if pass_index < 1: # batch_model
save_mode = 3 # unseen_day++, save all save_mode = 3 # unseen_day++, save all
util.rank0_print("going to save_model %s" % model_path) util.rank0_print("going to save_model %s" % model_path)
fleet.save_persistables(None, model_path, mode=save_mode) fleet.save_persistables(None, model_path, mode=save_mode)
if fleet._role_maker.is_first_worker(): if fleet._role_maker.is_first_worker():
self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) self._train_pass.save_train_progress(
day, pass_index, base_key, model_path, is_checkpoint=True)
cost_printer.done() cost_printer.done()
return model_path return model_path
...@@ -225,46 +248,58 @@ class CtrTrainer(Trainer): ...@@ -225,46 +248,58 @@ class CtrTrainer(Trainer):
if pass_index < 1: if pass_index < 1:
save_mode = 2 save_mode = 2
xbox_patch_id = xbox_base_key xbox_patch_id = xbox_base_key
model_path = self._path_generator.generate_path('xbox_base', {'day': day}) model_path = self._path_generator.generate_path('xbox_base',
xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day}) {'day': day})
xbox_model_donefile = self._path_generator.generate_path(
'xbox_base_done', {'day': day})
else: else:
save_mode = 1 save_mode = 1
model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index}) model_path = self._path_generator.generate_path(
xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day}) 'xbox_delta', {'day': day,
total_save_num = fleet.save_persistables(None, model_path, mode=save_mode) 'pass_id': pass_index})
xbox_model_donefile = self._path_generator.generate_path(
'xbox_delta_done', {'day': day})
total_save_num = fleet.save_persistables(
None, model_path, mode=save_mode)
cost_printer.done() cost_printer.done()
cost_printer = util.CostPrinter(util.print_cost, {'master': True, cost_printer = util.CostPrinter(util.print_cost, {
'master': True,
'log_format': 'save cache model cost %s sec', 'log_format': 'save cache model cost %s sec',
'stdout': stdout_str}) 'stdout': stdout_str
})
model_file_handler = fs.FileHandler(self.global_config['io']['afs']) model_file_handler = fs.FileHandler(self.global_config['io']['afs'])
if self.global_config['save_cache_model']: if self.global_config['save_cache_model']:
cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode) cache_save_num = fleet.save_cache_model(
None, model_path, mode=save_mode)
model_file_handler.write( model_file_handler.write(
"file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num, "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num,
model_path + '/000_cache/sparse_cache.meta', 'w') model_path + '/000_cache/sparse_cache.meta', 'w')
cost_printer.done() cost_printer.done()
util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num) util.rank0_print("save xbox cache model done, key_num=%s" %
cache_save_num)
save_env_param = { save_env_param = {'executor': self._exe, 'save_combine': True}
'executor': self._exe, cost_printer = util.CostPrinter(util.print_cost, {
'save_combine': True 'master': True,
}
cost_printer = util.CostPrinter(util.print_cost, {'master': True,
'log_format': 'save dense model cost %s sec', 'log_format': 'save dense model cost %s sec',
'stdout': stdout_str}) 'stdout': stdout_str
})
if fleet._role_maker.is_first_worker(): if fleet._role_maker.is_first_worker():
for executor in self.global_config['executor']: for executor in self.global_config['executor']:
if 'layer_for_inference' not in executor: if 'layer_for_inference' not in executor:
continue continue
executor_name = executor['name'] executor_name = executor['name']
model = self._exector_context[executor_name]['model'] model = self._exector_context[executor_name]['model']
save_env_param['inference_list'] = executor['layer_for_inference'] save_env_param['inference_list'] = executor[
save_env_param['scope'] = self._exector_context[executor_name]['scope'] 'layer_for_inference']
save_env_param['scope'] = self._exector_context[executor_name][
'scope']
model.dump_inference_param(save_env_param) model.dump_inference_param(save_env_param)
for dnn_layer in executor['layer_for_inference']: for dnn_layer in executor['layer_for_inference']:
model_file_handler.cp(dnn_layer['save_file_name'], model_file_handler.cp(dnn_layer['save_file_name'],
model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) model_path + '/dnn_plugin/' +
dnn_layer['save_file_name'])
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
cost_printer.done() cost_printer.done()
...@@ -282,9 +317,15 @@ class CtrTrainer(Trainer): ...@@ -282,9 +317,15 @@ class CtrTrainer(Trainer):
"job_name": util.get_env_value("JOB_NAME") "job_name": util.get_env_value("JOB_NAME")
} }
if fleet._role_maker.is_first_worker(): if fleet._role_maker.is_first_worker():
model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') model_file_handler.write(
json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
if pass_index > 0: if pass_index > 0:
self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) self._train_pass.save_train_progress(
day,
pass_index,
xbox_base_key,
model_path,
is_checkpoint=False)
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
return stdout_str return stdout_str
...@@ -301,21 +342,28 @@ class CtrTrainer(Trainer): ...@@ -301,21 +342,28 @@ class CtrTrainer(Trainer):
util.rank0_print("Begin " + executor_name + " pass") util.rank0_print("Begin " + executor_name + " pass")
begin = time.time() begin = time.time()
program = model._build_param['model']['train_program'] program = model._build_param['model']['train_program']
self._exe.train_from_dataset(program, dataset, scope, self._exe.train_from_dataset(
thread=executor_config['train_thread_num'], debug=self.global_config['debug']) program,
dataset,
scope,
thread=executor_config['train_thread_num'],
debug=self.global_config['debug'])
end = time.time() end = time.time()
local_cost = (end - begin) / 60.0 local_cost = (end - begin) / 60.0
avg_cost = worker_numric_avg(local_cost) avg_cost = worker_numric_avg(local_cost)
min_cost = worker_numric_min(local_cost) min_cost = worker_numric_min(local_cost)
max_cost = worker_numric_max(local_cost) max_cost = worker_numric_max(local_cost)
util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost)) util.rank0_print("avg train time %s mins, min %s mins, max %s mins"
% (avg_cost, min_cost, max_cost))
self._exector_context[executor_name]['cost'] = max_cost self._exector_context[executor_name]['cost'] = max_cost
monitor_data = "" monitor_data = ""
self.print_global_metrics(scope, model, monitor_data, stdout_str) self.print_global_metrics(scope, model, monitor_data, stdout_str)
util.rank0_print("End " + executor_name + " pass") util.rank0_print("End " + executor_name + " pass")
if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: if self._train_pass.need_dump_inference(
stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) pass_id) and executor_config['dump_inference_model']:
stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key,
monitor_data)
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
def startup(self, context): def startup(self, context):
...@@ -328,10 +376,14 @@ class CtrTrainer(Trainer): ...@@ -328,10 +376,14 @@ class CtrTrainer(Trainer):
stdout_str = "" stdout_str = ""
self._train_pass = util.TimeTrainPass(self.global_config) self._train_pass = util.TimeTrainPass(self.global_config)
if not self.global_config['cold_start']: if not self.global_config['cold_start']:
cost_printer = util.CostPrinter(util.print_cost, cost_printer = util.CostPrinter(util.print_cost, {
{'master': True, 'log_format': 'load model cost %s sec', 'master': True,
'stdout': stdout_str}) 'log_format': 'load model cost %s sec',
self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) 'stdout': stdout_str
})
self.print_log("going to load model %s" %
self._train_pass._checkpoint_model_path,
{'master': True})
# if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
# and config.reqi_dnn_plugin_pass >= self._pass_id: # and config.reqi_dnn_plugin_pass >= self._pass_id:
# fleet.load_one_table(0, self._train_pass._checkpoint_model_path) # fleet.load_one_table(0, self._train_pass._checkpoint_model_path)
...@@ -340,9 +392,12 @@ class CtrTrainer(Trainer): ...@@ -340,9 +392,12 @@ class CtrTrainer(Trainer):
cost_printer.done() cost_printer.done()
if self.global_config['save_first_base']: if self.global_config['save_first_base']:
self.print_log("save_first_base=True", {'master': True}) self.print_log("save_first_base=True", {'master': True})
self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str}) self.print_log("going to save xbox base model",
{'master': True,
'stdout': stdout_str})
self._train_pass._base_key = int(time.time()) self._train_pass._base_key = int(time.time())
stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "") stdout_str += self.save_xbox_model(self._train_pass.date(), 0,
self._train_pass._base_key, "")
context['status'] = 'begin_day' context['status'] = 'begin_day'
def begin_day(self, context): def begin_day(self, context):
...@@ -353,7 +408,9 @@ class CtrTrainer(Trainer): ...@@ -353,7 +408,9 @@ class CtrTrainer(Trainer):
context['is_exit'] = True context['is_exit'] = True
day = self._train_pass.date() day = self._train_pass.date()
pass_id = self._train_pass._pass_id pass_id = self._train_pass._pass_id
self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str}) self.print_log("======== BEGIN DAY:%s ========" % day,
{'master': True,
'stdout': stdout_str})
if pass_id == self._train_pass.max_pass_num_day(): if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day' context['status'] = 'end_day'
else: else:
...@@ -368,8 +425,10 @@ class CtrTrainer(Trainer): ...@@ -368,8 +425,10 @@ class CtrTrainer(Trainer):
context['status'] = 'begin_day' context['status'] = 'begin_day'
util.rank0_print("shrink table") util.rank0_print("shrink table")
cost_printer = util.CostPrinter(util.print_cost, cost_printer = util.CostPrinter(util.print_cost, {
{'master': True, 'log_format': 'shrink table done, cost %s sec'}) 'master': True,
'log_format': 'shrink table done, cost %s sec'
})
fleet.shrink_sparse_table() fleet.shrink_sparse_table()
for executor in self._exector_context: for executor in self._exector_context:
self._exector_context[executor]['model'].shrink({ self._exector_context[executor]['model'].shrink({
...@@ -394,7 +453,9 @@ class CtrTrainer(Trainer): ...@@ -394,7 +453,9 @@ class CtrTrainer(Trainer):
pass_id = self._train_pass._pass_id pass_id = self._train_pass._pass_id
base_key = self._train_pass._base_key base_key = self._train_pass._base_key
pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M") pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) self.print_log(" ==== begin delta:%s ========" % pass_id,
{'master': True,
'stdout': stdout_str})
train_begin_time = time.time() train_begin_time = time.time()
cost_printer = util.CostPrinter(util.print_cost, \ cost_printer = util.CostPrinter(util.print_cost, \
...@@ -403,35 +464,46 @@ class CtrTrainer(Trainer): ...@@ -403,35 +464,46 @@ class CtrTrainer(Trainer):
current_dataset = {} current_dataset = {}
for name in self._dataset: for name in self._dataset:
current_dataset[name] = self._dataset[name].load_dataset({ current_dataset[name] = self._dataset[name].load_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), 'node_num': fleet.worker_num(),
'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass 'node_idx': fleet.worker_index(),
'begin_time': pass_time,
'time_window_min': self._train_pass._interval_per_pass
}) })
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
cost_printer.done() cost_printer.done()
util.rank0_print("going to global shuffle") util.rank0_print("going to global shuffle")
cost_printer = util.CostPrinter(util.print_cost, { cost_printer = util.CostPrinter(util.print_cost, {
'master': True, 'stdout': stdout_str, 'master': True,
'log_format': 'global shuffle done, cost %s sec'}) 'stdout': stdout_str,
'log_format': 'global shuffle done, cost %s sec'
})
for name in current_dataset: for name in current_dataset:
current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) current_dataset[name].global_shuffle(
fleet, self.global_config['dataset']['shuffle_thread'])
cost_printer.done() cost_printer.done()
# str(dataset.get_shuffle_data_size(fleet)) # str(dataset.get_shuffle_data_size(fleet))
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
if self.global_config['prefetch_data']: if self.global_config['prefetch_data']:
next_pass_time = (self._train_pass._current_train_time + next_pass_time = (
datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M") self._train_pass._current_train_time + datetime.timedelta(
minutes=self._train_pass._interval_per_pass)
).strftime("%Y%m%d%H%M")
for name in self._dataset: for name in self._dataset:
self._dataset[name].preload_dataset({ self._dataset[name].preload_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), 'node_num': fleet.worker_num(),
'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass 'node_idx': fleet.worker_index(),
'begin_time': next_pass_time,
'time_window_min': self._train_pass._interval_per_pass
}) })
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
pure_train_begin = time.time() pure_train_begin = time.time()
for executor in self.global_config['executor']: for executor in self.global_config['executor']:
self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) self.run_executor(executor,
current_dataset[executor['dataset_name']],
stdout_str)
cost_printer = util.CostPrinter(util.print_cost, \ cost_printer = util.CostPrinter(util.print_cost, \
{'master': True, 'log_format': 'release_memory cost %s sec'}) {'master': True, 'log_format': 'release_memory cost %s sec'})
for name in current_dataset: for name in current_dataset:
...@@ -444,9 +516,11 @@ class CtrTrainer(Trainer): ...@@ -444,9 +516,11 @@ class CtrTrainer(Trainer):
train_end_time = time.time() train_end_time = time.time()
train_cost = train_end_time - train_begin_time train_cost = train_end_time - train_begin_time
other_cost = train_cost - pure_train_cost other_cost = train_cost - pure_train_cost
log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost) log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (
day, pass_id, train_cost)
for executor in self._exector_context: for executor in self._exector_context:
log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']' log_str += '[' + executor + ':' + str(self._exector_context[
executor]['cost']) + ']'
log_str += '[other_cost:' + str(other_cost) + ']' log_str += '[other_cost:' + str(other_cost) + ']'
util.rank0_print(log_str) util.rank0_print(log_str)
stdout_str += util.now_time_str() + log_str stdout_str += util.now_time_str() + log_str
......
...@@ -7,5 +7,3 @@ ...@@ -7,5 +7,3 @@
### K8S集群运行分布式 ### K8S集群运行分布式
> 占位 > 占位
...@@ -12,4 +12,3 @@ ...@@ -12,4 +12,3 @@
| 多任务 | [ESMM]() | ✓ | x | ✓ | x | ✓ | ✓ | | 多任务 | [ESMM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 匹配 | [DSSM]() | ✓ | x | ✓ | x | ✓ | ✓ | | 匹配 | [DSSM]() | ✓ | x | ✓ | x | ✓ | ✓ |
| 匹配 | [Multiview-Simnet]() | ✓ | x | ✓ | x | ✓ | ✓ | | 匹配 | [Multiview-Simnet]() | ✓ | x | ✓ | x | ✓ | ✓ |
...@@ -5,4 +5,3 @@ ...@@ -5,4 +5,3 @@
## [参数服务器训练](https://www.paddlepaddle.org.cn/tutorials/projectdetail/464839) ## [参数服务器训练](https://www.paddlepaddle.org.cn/tutorials/projectdetail/464839)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -37,4 +37,3 @@ train: ...@@ -37,4 +37,3 @@ train:
dirname: "inference" dirname: "inference"
epoch_interval: 100 epoch_interval: 100
save_last: True save_last: True
...@@ -31,7 +31,8 @@ class Model(ModelBase): ...@@ -31,7 +31,8 @@ class Model(ModelBase):
def train_net(self): def train_net(self):
""" network definition """ """ network definition """
data = fluid.data(name="input", shape=[None, self.max_len], dtype='int64') data = fluid.data(
name="input", shape=[None, self.max_len], dtype='int64')
label = fluid.data(name="label", shape=[None, 1], dtype='int64') label = fluid.data(name="label", shape=[None, 1], dtype='int64')
seq_len = fluid.data(name="seq_len", shape=[None], dtype='int64') seq_len = fluid.data(name="seq_len", shape=[None], dtype='int64')
...@@ -51,7 +52,9 @@ class Model(ModelBase): ...@@ -51,7 +52,9 @@ class Model(ModelBase):
# full connect layer # full connect layer
fc_1 = fluid.layers.fc(input=[conv], size=self.hid_dim) fc_1 = fluid.layers.fc(input=[conv], size=self.hid_dim)
# softmax layer # softmax layer
prediction = fluid.layers.fc(input=[fc_1], size=self.class_dim, act="softmax") prediction = fluid.layers.fc(input=[fc_1],
size=self.class_dim,
act="softmax")
cost = fluid.layers.cross_entropy(input=prediction, label=label) cost = fluid.layers.cross_entropy(input=prediction, label=label)
avg_cost = fluid.layers.mean(x=cost) avg_cost = fluid.layers.mean(x=cost)
acc = fluid.layers.accuracy(input=prediction, label=label) acc = fluid.layers.accuracy(input=prediction, label=label)
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import sys import sys
from paddlerec.core.reader import Reader from paddlerec.core.reader import Reader
...@@ -38,7 +37,8 @@ class TrainReader(Reader): ...@@ -38,7 +37,8 @@ class TrainReader(Reader):
data = [int(i) for i in data] data = [int(i) for i in data]
label = [int(i) for i in label] label = [int(i) for i in label]
seq_len = [int(i) for i in seq_len] seq_len = [int(i) for i in seq_len]
print >> sys.stderr, str([('data', data), ('label', label), ('seq_len', seq_len)]) print >> sys.stderr, str(
[('data', data), ('label', label), ('seq_len', seq_len)])
yield [('data', data), ('label', label), ('seq_len', seq_len)] yield [('data', data), ('label', label), ('seq_len', seq_len)]
return data_iter return data_iter
...@@ -87,4 +87,3 @@ python -m paddlerec.run -m paddlerec.models.contentunderstanding.classification ...@@ -87,4 +87,3 @@ python -m paddlerec.run -m paddlerec.models.contentunderstanding.classification
| :------------------: | :--------------------: | :---------: |:---------: | :---------: |:---------: | | :------------------: | :--------------------: | :---------: |:---------: | :---------: |:---------: |
| ag news dataset | TagSpace | -- | -- | -- | -- | | ag news dataset | TagSpace | -- | -- | -- | -- |
| -- | Classification | -- | -- | -- | -- | | -- | Classification | -- | -- | -- | -- |
...@@ -47,4 +47,3 @@ train: ...@@ -47,4 +47,3 @@ train:
dirname: "inference" dirname: "inference"
epoch_interval: 100 epoch_interval: 100
save_last: True save_last: True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -37,4 +37,3 @@ ...@@ -37,4 +37,3 @@
python -m paddlerec.run -m paddlerec.models.match.dssm # dssm python -m paddlerec.run -m paddlerec.models.match.dssm # dssm
python -m paddlerec.run -m paddlerec.models.match.multiview-simnet # multiview-simnet python -m paddlerec.run -m paddlerec.models.match.multiview-simnet # multiview-simnet
``` ```
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -56,4 +56,3 @@ python -m paddlerec.run -m paddlerec.models.multitask.esmm # esmm ...@@ -56,4 +56,3 @@ python -m paddlerec.run -m paddlerec.models.multitask.esmm # esmm
| Census-income Data | Share-Bottom | -- | 0.93120/0.99256 | | Census-income Data | Share-Bottom | -- | 0.93120/0.99256 |
| Census-income Data | MMoE | -- | 0.94465/0.99324 | | Census-income Data | MMoE | -- | 0.94465/0.99324 |
| Ali-CCP | ESMM | -- | 0.97181/0.49967 | | Ali-CCP | ESMM | -- | 0.97181/0.49967 |
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os import os
import sys import sys
import io import io
......
...@@ -26,8 +26,8 @@ from collections import Counter ...@@ -26,8 +26,8 @@ from collections import Counter
import os import os
import paddle.fluid.incubate.data_generator as dg import paddle.fluid.incubate.data_generator as dg
class TrainReader(dg.MultiSlotDataGenerator):
class TrainReader(dg.MultiSlotDataGenerator):
def __init__(self, config): def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self) dg.MultiSlotDataGenerator.__init__(self)
...@@ -109,6 +109,7 @@ class TrainReader(dg.MultiSlotDataGenerator): ...@@ -109,6 +109,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter return data_iter
reader = TrainReader("../config.yaml") reader = TrainReader("../config.yaml")
reader.init() reader.init()
reader.run_from_stdin() reader.run_from_stdin()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
import os import os
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os import os
import shutil import shutil
import sys import sys
......
...@@ -19,8 +19,9 @@ try: ...@@ -19,8 +19,9 @@ try:
import cPickle as pickle import cPickle as pickle
except ImportError: except ImportError:
import pickle import pickle
class TrainReader(dg.MultiSlotDataGenerator):
class TrainReader(dg.MultiSlotDataGenerator):
def __init__(self, config): def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self) dg.MultiSlotDataGenerator.__init__(self)
...@@ -77,15 +78,18 @@ class TrainReader(dg.MultiSlotDataGenerator): ...@@ -77,15 +78,18 @@ class TrainReader(dg.MultiSlotDataGenerator):
def data_iter(): def data_iter():
feat_idx, feat_value, label = self._process_line(line) feat_idx, feat_value, label = self._process_line(line)
s = "" s = ""
for i in [('feat_idx', feat_idx), ('feat_value', feat_value), ('label', label)]: for i in [('feat_idx', feat_idx), ('feat_value', feat_value),
('label', label)]:
k = i[0] k = i[0]
v = i[1] v = i[1]
for j in v: for j in v:
s += " " + k + ":" + str(j) s += " " + k + ":" + str(j)
print s.strip() print s.strip()
yield None yield None
return data_iter return data_iter
reader = TrainReader("../config.yaml") reader = TrainReader("../config.yaml")
reader.init() reader.init()
reader.run_from_stdin() reader.run_from_stdin()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os import os
import numpy import numpy
from collections import Counter from collections import Counter
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function from __future__ import print_function
import random import random
import pickle import pickle
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function from __future__ import print_function
import pickle import pickle
import pandas as pd import pandas as pd
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function from __future__ import print_function
import random import random
import pickle import pickle
......
...@@ -32,6 +32,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator): ...@@ -32,6 +32,7 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
""" """
Read the data line by line and process it as a dictionary Read the data line by line and process it as a dictionary
""" """
def reader(): def reader():
""" """
This function needs to be implemented by the user, based on data format This function needs to be implemented by the user, based on data format
...@@ -59,9 +60,10 @@ class CriteoDataset(dg.MultiSlotDataGenerator): ...@@ -59,9 +60,10 @@ class CriteoDataset(dg.MultiSlotDataGenerator):
for i in dense_feature: for i in dense_feature:
s += " dense_feature:" + str(i) s += " dense_feature:" + str(i)
for i in range(1, 1 + len(categorical_range_)): for i in range(1, 1 + len(categorical_range_)):
s += " " + str(i) + ":" + str(sparse_feature[i-1][0]) s += " " + str(i) + ":" + str(sparse_feature[i - 1][0])
print s.strip() print s.strip()
yield None yield None
return reader return reader
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os import os
import io import io
import args import args
import pandas as pd import pandas as pd
from sklearn import preprocessing from sklearn import preprocessing
def _clean_file(source_path,target_path):
def _clean_file(source_path, target_path):
"""makes changes to match the CSV format.""" """makes changes to match the CSV format."""
with io.open(source_path, 'r') as temp_eval_file: with io.open(source_path, 'r') as temp_eval_file:
with io.open(target_path, 'w') as eval_file: with io.open(target_path, 'w') as eval_file:
...@@ -18,6 +33,7 @@ def _clean_file(source_path,target_path): ...@@ -18,6 +33,7 @@ def _clean_file(source_path,target_path):
line += '\n' line += '\n'
eval_file.write(line) eval_file.write(line)
def build_model_columns(train_data_path, test_data_path): def build_model_columns(train_data_path, test_data_path):
# The column names are from # The column names are from
# https://www2.1010data.com/documentationcenter/prod/Tutorials/MachineLearningExamples/CensusIncomeDataSet.html # https://www2.1010data.com/documentationcenter/prod/Tutorials/MachineLearningExamples/CensusIncomeDataSet.html
...@@ -44,61 +60,92 @@ def build_model_columns(train_data_path, test_data_path): ...@@ -44,61 +60,92 @@ def build_model_columns(train_data_path, test_data_path):
# First group of tasks according to the paper # First group of tasks according to the paper
#label_columns = ['income_50k', 'marital_stat'] #label_columns = ['income_50k', 'marital_stat']
categorical_columns = ['education','marital_status','relationship','workclass','occupation'] categorical_columns = [
'education', 'marital_status', 'relationship', 'workclass',
'occupation'
]
for col in categorical_columns: for col in categorical_columns:
label_train = preprocessing.LabelEncoder() label_train = preprocessing.LabelEncoder()
train_df[col]= label_train.fit_transform(train_df[col]) train_df[col] = label_train.fit_transform(train_df[col])
label_test = preprocessing.LabelEncoder() label_test = preprocessing.LabelEncoder()
test_df[col]= label_test.fit_transform(test_df[col]) test_df[col] = label_test.fit_transform(test_df[col])
bins = [18, 25, 30, 35, 40, 45, 50, 55, 60, 65] bins = [18, 25, 30, 35, 40, 45, 50, 55, 60, 65]
train_df['age_buckets'] = pd.cut(train_df['age'].values.tolist(), bins,labels=False) train_df['age_buckets'] = pd.cut(train_df['age'].values.tolist(),
test_df['age_buckets'] = pd.cut(test_df['age'].values.tolist(), bins,labels=False) bins,
labels=False)
base_columns = ['education', 'marital_status', 'relationship', 'workclass', 'occupation', 'age_buckets'] test_df['age_buckets'] = pd.cut(test_df['age'].values.tolist(),
bins,
labels=False)
base_columns = [
'education', 'marital_status', 'relationship', 'workclass',
'occupation', 'age_buckets'
]
train_df['education_occupation'] = train_df['education'].astype(str) + '_' + train_df['occupation'].astype(str) train_df['education_occupation'] = train_df['education'].astype(
test_df['education_occupation'] = test_df['education'].astype(str) + '_' + test_df['occupation'].astype(str) str) + '_' + train_df['occupation'].astype(str)
train_df['age_buckets_education_occupation'] = train_df['age_buckets'].astype(str) + '_' + train_df['education'].astype(str) + '_' + train_df['occupation'].astype(str) test_df['education_occupation'] = test_df['education'].astype(
test_df['age_buckets_education_occupation'] = test_df['age_buckets'].astype(str) + '_' + test_df['education'].astype(str) + '_' + test_df['occupation'].astype(str) str) + '_' + test_df['occupation'].astype(str)
crossed_columns = ['education_occupation','age_buckets_education_occupation'] train_df['age_buckets_education_occupation'] = train_df[
'age_buckets'].astype(str) + '_' + train_df['education'].astype(
str) + '_' + train_df['occupation'].astype(str)
test_df['age_buckets_education_occupation'] = test_df[
'age_buckets'].astype(str) + '_' + test_df['education'].astype(
str) + '_' + test_df['occupation'].astype(str)
crossed_columns = [
'education_occupation', 'age_buckets_education_occupation'
]
for col in crossed_columns: for col in crossed_columns:
label_train = preprocessing.LabelEncoder() label_train = preprocessing.LabelEncoder()
train_df[col]= label_train.fit_transform(train_df[col]) train_df[col] = label_train.fit_transform(train_df[col])
label_test = preprocessing.LabelEncoder() label_test = preprocessing.LabelEncoder()
test_df[col]= label_test.fit_transform(test_df[col]) test_df[col] = label_test.fit_transform(test_df[col])
wide_columns = base_columns + crossed_columns wide_columns = base_columns + crossed_columns
train_df_temp = pd.get_dummies(train_df[categorical_columns],columns=categorical_columns) train_df_temp = pd.get_dummies(
test_df_temp = pd.get_dummies(test_df[categorical_columns], columns=categorical_columns) train_df[categorical_columns], columns=categorical_columns)
test_df_temp = pd.get_dummies(
test_df[categorical_columns], columns=categorical_columns)
train_df = train_df.join(train_df_temp) train_df = train_df.join(train_df_temp)
test_df = test_df.join(test_df_temp) test_df = test_df.join(test_df_temp)
deep_columns = list(train_df_temp.columns)+ ['age','education_num','capital_gain','capital_loss','hours_per_week'] deep_columns = list(train_df_temp.columns) + [
'age', 'education_num', 'capital_gain', 'capital_loss',
'hours_per_week'
]
train_df['label'] = train_df['income_bracket'].apply(lambda x : 1 if x == '>50K' else 0) train_df['label'] = train_df['income_bracket'].apply(
test_df['label'] = test_df['income_bracket'].apply(lambda x : 1 if x == '>50K' else 0) lambda x: 1 if x == '>50K' else 0)
test_df['label'] = test_df['income_bracket'].apply(
lambda x: 1 if x == '>50K' else 0)
with io.open('train_data/columns.txt','w') as f: with io.open('train_data/columns.txt', 'w') as f:
write_str = str(len(wide_columns)) + '\n' + str(len(deep_columns)) + '\n' write_str = str(len(wide_columns)) + '\n' + str(len(
deep_columns)) + '\n'
f.write(write_str) f.write(write_str)
f.close() f.close()
with io.open('test_data/columns.txt','w') as f: with io.open('test_data/columns.txt', 'w') as f:
write_str = str(len(wide_columns)) + '\n' + str(len(deep_columns)) + '\n' write_str = str(len(wide_columns)) + '\n' + str(len(
deep_columns)) + '\n'
f.write(write_str) f.write(write_str)
f.close() f.close()
train_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(train_data_path,index=False) train_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(
test_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(test_data_path,index=False) train_data_path, index=False)
test_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(
test_data_path, index=False)
def clean_file(train_path, test_path, train_data_path, test_data_path): def clean_file(train_path, test_path, train_data_path, test_data_path):
_clean_file(train_path, train_data_path) _clean_file(train_path, train_data_path)
_clean_file(test_path, test_data_path) _clean_file(test_path, test_data_path)
if __name__ == '__main__': if __name__ == '__main__':
args = args.parse_args() args = args.parse_args()
clean_file(args.train_path, args.test_path, args.train_data_path, args.test_data_path) clean_file(args.train_path, args.test_path, args.train_data_path,
args.test_data_path)
build_model_columns(args.train_data_path, args.test_data_path) build_model_columns(args.train_data_path, args.test_data_path)
...@@ -20,6 +20,7 @@ except ImportError: ...@@ -20,6 +20,7 @@ except ImportError:
import pickle import pickle
import paddle.fluid.incubate.data_generator as dg import paddle.fluid.incubate.data_generator as dg
class TrainReader(dg.MultiSlotDataGenerator): class TrainReader(dg.MultiSlotDataGenerator):
def __init__(self, config): def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self) dg.MultiSlotDataGenerator.__init__(self)
...@@ -50,7 +51,8 @@ class TrainReader(dg.MultiSlotDataGenerator): ...@@ -50,7 +51,8 @@ class TrainReader(dg.MultiSlotDataGenerator):
wide_feat, deep_deat, label = self._process_line(line) wide_feat, deep_deat, label = self._process_line(line)
s = "" s = ""
for i in [('wide_input', wide_feat), ('deep_input', deep_deat), ('label', label)]: for i in [('wide_input', wide_feat), ('deep_input', deep_deat),
('label', label)]:
k = i[0] k = i[0]
v = i[1] v = i[1]
for j in v: for j in v:
...@@ -60,6 +62,7 @@ class TrainReader(dg.MultiSlotDataGenerator): ...@@ -60,6 +62,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter return data_iter
reader = TrainReader("../config.yaml") reader = TrainReader("../config.yaml")
reader.init() reader.init()
reader.run_from_stdin() reader.run_from_stdin()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os import os
import shutil import shutil
import sys import sys
......
...@@ -21,6 +21,7 @@ except ImportError: ...@@ -21,6 +21,7 @@ except ImportError:
import pickle import pickle
import paddle.fluid.incubate.data_generator as dg import paddle.fluid.incubate.data_generator as dg
class TrainReader(dg.MultiSlotDataGenerator): class TrainReader(dg.MultiSlotDataGenerator):
def __init__(self, config): def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self) dg.MultiSlotDataGenerator.__init__(self)
...@@ -48,7 +49,8 @@ class TrainReader(dg.MultiSlotDataGenerator): ...@@ -48,7 +49,8 @@ class TrainReader(dg.MultiSlotDataGenerator):
feat_idx, feat_value, label = self._process_line(line) feat_idx, feat_value, label = self._process_line(line)
s = "" s = ""
for i in [('feat_idx', feat_idx), ('feat_value', feat_value), ('label', label)]: for i in [('feat_idx', feat_idx), ('feat_value', feat_value),
('label', label)]:
k = i[0] k = i[0]
v = i[1] v = i[1]
for j in v: for j in v:
...@@ -58,6 +60,7 @@ class TrainReader(dg.MultiSlotDataGenerator): ...@@ -58,6 +60,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter return data_iter
reader = TrainReader("../config.yaml") reader = TrainReader("../config.yaml")
reader.init() reader.init()
reader.run_from_stdin() reader.run_from_stdin()
...@@ -31,5 +31,3 @@ mv diginetica/train.txt train_data ...@@ -31,5 +31,3 @@ mv diginetica/train.txt train_data
mkdir test_data mkdir test_data
mv diginetica/test.txt test_data mv diginetica/test.txt test_data
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse import argparse
import time import time
import pickle import pickle
...@@ -10,6 +24,7 @@ parser.add_argument( ...@@ -10,6 +24,7 @@ parser.add_argument(
help='dataset dir: diginetica/yoochoose1_4/yoochoose1_64/sample') help='dataset dir: diginetica/yoochoose1_4/yoochoose1_64/sample')
opt = parser.parse_args() opt = parser.parse_args()
def process_data(file_type): def process_data(file_type):
path = os.path.join(opt.data_dir, file_type) path = os.path.join(opt.data_dir, file_type)
output_path = os.path.splitext(path)[0] + ".txt" output_path = os.path.splitext(path)[0] + ".txt"
...@@ -23,6 +38,7 @@ def process_data(file_type): ...@@ -23,6 +38,7 @@ def process_data(file_type):
fout.write(str(data[i][1])) fout.write(str(data[i][1]))
fout.write("\n") fout.write("\n")
process_data("train") process_data("train")
process_data("test") process_data("test")
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests import requests
import sys import sys
import time import time
......
...@@ -78,4 +78,3 @@ python -m paddlerec.run -m paddlerec.models.recall.youtube_dnn # youtube_dnn ...@@ -78,4 +78,3 @@ python -m paddlerec.run -m paddlerec.models.recall.youtube_dnn # youtube_dnn
| MOVIELENS | NCF | 0.688 | -- | | MOVIELENS | NCF | 0.688 | -- |
| -- | Youtube | -- | -- | | -- | Youtube | -- | -- |
| 1 Billion Word Language Model Benchmark | Word2Vec | -- | 0.54 | | 1 Billion Word Language Model Benchmark | Word2Vec | -- | 0.54 |
...@@ -35,6 +35,3 @@ wget --no-check-certificate https://paddlerec.bj.bcebos.com/word2vec/test_dir.ta ...@@ -35,6 +35,3 @@ wget --no-check-certificate https://paddlerec.bj.bcebos.com/word2vec/test_dir.ta
tar xzvf test_dir.tar -C raw_data tar xzvf test_dir.tar -C raw_data
mv raw_data/data/test_dir test_data/ mv raw_data/data/test_dir test_data/
rm -rf raw_data rm -rf raw_data
...@@ -26,8 +26,10 @@ from paddlerec.core.utils import util ...@@ -26,8 +26,10 @@ from paddlerec.core.utils import util
engines = {} engines = {}
device = ["CPU", "GPU"] device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"] clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
engine_choices = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER", engine_choices = [
"TDM_SINGLE", "TDM_LOCAL_CLUSTER", "TDM_CLUSTER"] "SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER",
"TDM_CLUSTER"
]
custom_model = ['TDM'] custom_model = ['TDM']
model_name = "" model_name = ""
...@@ -66,7 +68,8 @@ def get_engine(args): ...@@ -66,7 +68,8 @@ def get_engine(args):
engine = engine.upper() engine = engine.upper()
if engine not in engine_choices: if engine not in engine_choices:
raise ValueError("train.engin can not be chosen in {}".format(engine_choices)) raise ValueError("train.engin can not be chosen in {}".format(
engine_choices))
print("engines: \n{}".format(engines)) print("engines: \n{}".format(engines))
...@@ -77,8 +80,10 @@ def get_engine(args): ...@@ -77,8 +80,10 @@ def get_engine(args):
def get_transpiler(): def get_transpiler():
FNULL = open(os.devnull, 'w') FNULL = open(os.devnull, 'w')
cmd = ["python", "-c", cmd = [
"import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"] "python", "-c",
"import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"
]
proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd()) proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
ret = proc.wait() ret = proc.wait()
if ret == -11: if ret == -11:
...@@ -152,7 +157,8 @@ def cluster_engine(args): ...@@ -152,7 +157,8 @@ def cluster_engine(args):
update_workspace(flattens) update_workspace(flattens)
envs.set_runtime_environs(flattens) envs.set_runtime_environs(flattens)
print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value"))) print(envs.pretty_print_envs(flattens, ("Submit Runtime Envs", "Value"
)))
launch = ClusterEngine(None, args.model) launch = ClusterEngine(None, args.model)
return launch return launch
...@@ -163,7 +169,8 @@ def cluster_engine(args): ...@@ -163,7 +169,8 @@ def cluster_engine(args):
cluster_envs = {} cluster_envs = {}
cluster_envs["train.trainer.trainer"] = trainer cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster" cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.threads"] = envs.get_runtime_environ("CPU_NUM") cluster_envs["train.trainer.threads"] = envs.get_runtime_environ(
"CPU_NUM")
cluster_envs["train.trainer.platform"] = envs.get_platform() cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to with model: {}".format( print("launch {} engine with cluster to with model: {}".format(
trainer, args.model)) trainer, args.model))
...@@ -181,7 +188,8 @@ def cluster_engine(args): ...@@ -181,7 +188,8 @@ def cluster_engine(args):
def cluster_mpi_engine(args): def cluster_mpi_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model)) print("launch cluster engine with cluster to run model: {}".format(
args.model))
cluster_envs = {} cluster_envs = {}
cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer" cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
...@@ -209,7 +217,8 @@ def local_cluster_engine(args): ...@@ -209,7 +217,8 @@ def local_cluster_engine(args):
cluster_envs["train.trainer.platform"] = envs.get_platform() cluster_envs["train.trainer.platform"] = envs.get_platform()
cluster_envs["CPU_NUM"] = "2" cluster_envs["CPU_NUM"] = "2"
print("launch {} engine with cluster to run model: {}".format(trainer, args.model)) print("launch {} engine with cluster to run model: {}".format(trainer,
args.model))
set_runtime_envs(cluster_envs, args.model) set_runtime_envs(cluster_envs, args.model)
launch = LocalClusterEngine(cluster_envs, args.model) launch = LocalClusterEngine(cluster_envs, args.model)
...@@ -217,10 +226,12 @@ def local_cluster_engine(args): ...@@ -217,10 +226,12 @@ def local_cluster_engine(args):
def local_mpi_engine(args): def local_mpi_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model)) print("launch cluster engine with cluster to run model: {}".format(
args.model))
from paddlerec.core.engine.local_mpi import LocalMPIEngine from paddlerec.core.engine.local_mpi import LocalMPIEngine
print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model)) print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(
args.model))
mpi = util.run_which("mpirun") mpi = util.run_which("mpirun")
if not mpi: if not mpi:
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
setup for paddle-rec. setup for paddle-rec.
""" """
...@@ -22,11 +21,7 @@ from setuptools import setup, find_packages ...@@ -22,11 +21,7 @@ from setuptools import setup, find_packages
import shutil import shutil
import tempfile import tempfile
requires = ["paddlepaddle == 1.7.2", "pyyaml >= 5.1.1"]
requires = [
"paddlepaddle == 1.7.2",
"pyyaml >= 5.1.1"
]
about = {} about = {}
about["__title__"] = "paddle-rec" about["__title__"] = "paddle-rec"
...@@ -48,18 +43,27 @@ def build(dirname): ...@@ -48,18 +43,27 @@ def build(dirname):
package_dir = os.path.dirname(os.path.abspath(__file__)) package_dir = os.path.dirname(os.path.abspath(__file__))
run_cmd("cp -r {}/* {}".format(package_dir, dirname)) run_cmd("cp -r {}/* {}".format(package_dir, dirname))
run_cmd("mkdir {}".format(os.path.join(dirname, "paddlerec"))) run_cmd("mkdir {}".format(os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec"))) run_cmd("mv {} {}".format(
run_cmd("mv {} {}".format(os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec"))) os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec"))) run_cmd("mv {} {}".format(
run_cmd("mv {} {}".format(os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec"))) os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec"))) run_cmd("mv {} {}".format(
run_cmd("mv {} {}".format(os.path.join(dirname, "*.py"), os.path.join(dirname, "paddlerec"))) os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "*.py"), os.path.join(dirname, "paddlerec")))
packages = find_packages(dirname, include=('paddlerec.*')) packages = find_packages(dirname, include=('paddlerec.*'))
package_dir = {'': dirname} package_dir = {'': dirname}
package_data = {} package_data = {}
models_copy = ['data/*.txt', 'data/*/*.txt', '*.yaml', '*.sh', 'tree/*.npy', 'tree/*.txt'] models_copy = [
'data/*.txt', 'data/*/*.txt', '*.yaml', '*.sh', 'tree/*.npy',
'tree/*.txt'
]
engine_copy = ['*/*.sh'] engine_copy = ['*/*.sh']
for package in packages: for package in packages:
if package.startswith("paddlerec.models."): if package.startswith("paddlerec.models."):
...@@ -80,8 +84,7 @@ def build(dirname): ...@@ -80,8 +84,7 @@ def build(dirname):
package_data=package_data, package_data=package_data,
python_requires=">=2.7", python_requires=">=2.7",
install_requires=requires, install_requires=requires,
zip_safe=False zip_safe=False)
)
dirname = tempfile.mkdtemp() dirname = tempfile.mkdtemp()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册