提交 f9ef376e 编写于 作者: T tangwei

fix code style

上级 bfce2242
...@@ -31,6 +31,7 @@ def create(config): ...@@ -31,6 +31,7 @@ def create(config):
Model Instance Model Instance
""" """
model = None model = None
if config['mode'] == 'fluid': if config['mode'] == 'fluid':
model = YamlModel(config) model = YamlModel(config)
model.train_net() model.train_net()
...@@ -50,7 +51,12 @@ class YamlModel(Model): ...@@ -50,7 +51,12 @@ class YamlModel(Model):
f = open(config['layer_file'], 'r') f = open(config['layer_file'], 'r')
self._build_nodes = yaml.safe_load(f.read()) self._build_nodes = yaml.safe_load(f.read())
self._build_phase = ['input', 'param', 'summary', 'layer'] self._build_phase = ['input', 'param', 'summary', 'layer']
self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}} self._build_param = {
'layer': {},
'inner_layer': {},
'layer_extend': {},
'model': {}
}
self._inference_meta = {'dependency': {}, 'params': {}} self._inference_meta = {'dependency': {}, 'params': {}}
def train_net(self): def train_net(self):
...@@ -76,10 +82,12 @@ class YamlModel(Model): ...@@ -76,10 +82,12 @@ class YamlModel(Model):
if self._build_nodes[phase] is None: if self._build_nodes[phase] is None:
continue continue
for node in self._build_nodes[phase]: for node in self._build_nodes[phase]:
exec("""layer=layer.{}(node)""".format(node['class'])) exec ("""layer=layer.{}(node)""".format(node['class']))
layer_output, extend_output = layer.generate(self._config['mode'], self._build_param) layer_output, extend_output = layer.generate(
self._config['mode'], self._build_param)
self._build_param['layer'][node['name']] = layer_output self._build_param['layer'][node['name']] = layer_output
self._build_param['layer_extend'][node['name']] = extend_output self._build_param['layer_extend'][node[
'name']] = extend_output
if extend_output is None: if extend_output is None:
continue continue
if 'loss' in extend_output: if 'loss' in extend_output:
...@@ -89,17 +97,24 @@ class YamlModel(Model): ...@@ -89,17 +97,24 @@ class YamlModel(Model):
self._cost += extend_output['loss'] self._cost += extend_output['loss']
if 'data_var' in extend_output: if 'data_var' in extend_output:
self._data_var += extend_output['data_var'] self._data_var += extend_output['data_var']
if 'metric_label' in extend_output and extend_output['metric_label'] is not None: if 'metric_label' in extend_output and extend_output[
self._metrics[extend_output['metric_label']] = extend_output['metric_dict'] 'metric_label'] is not None:
self._metrics[extend_output[
'metric_label']] = extend_output['metric_dict']
if 'inference_param' in extend_output: if 'inference_param' in extend_output:
inference_param = extend_output['inference_param'] inference_param = extend_output['inference_param']
param_name = inference_param['name'] param_name = inference_param['name']
if param_name not in self._build_param['table']: if param_name not in self._build_param['table']:
self._build_param['table'][param_name] = {'params': []} self._build_param['table'][param_name] = {
table_meta = table.TableMeta.alloc_new_table(inference_param['table_id']) 'params': []
self._build_param['table'][param_name]['_meta'] = table_meta }
self._build_param['table'][param_name]['params'] += inference_param['params'] table_meta = table.TableMeta.alloc_new_table(
inference_param['table_id'])
self._build_param['table'][param_name][
'_meta'] = table_meta
self._build_param['table'][param_name][
'params'] += inference_param['params']
pass pass
@classmethod @classmethod
...@@ -114,20 +129,25 @@ class YamlModel(Model): ...@@ -114,20 +129,25 @@ class YamlModel(Model):
metrics = params['metrics'] metrics = params['metrics']
for name in metrics: for name in metrics:
model_metrics = metrics[name] model_metrics = metrics[name]
stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics] stat_var_names += [
model_metrics[metric]['var'].name
for metric in model_metrics
]
strategy['stat_var_names'] = list(set(stat_var_names)) strategy['stat_var_names'] = list(set(stat_var_names))
optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \ optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \
'(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
exec(optimizer_generator) exec (optimizer_generator)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
return optimizer return optimizer
def dump_model_program(self, path): def dump_model_program(self, path):
"""R """R
""" """
with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout: with open(path + '/' + self._name + '_main_program.pbtxt',
"w") as fout:
print >> fout, self._build_param['model']['train_program'] print >> fout, self._build_param['model']['train_program']
with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout: with open(path + '/' + self._name + '_startup_program.pbtxt',
"w") as fout:
print >> fout, self._build_param['model']['startup_program'] print >> fout, self._build_param['model']['startup_program']
pass pass
...@@ -137,7 +157,8 @@ class YamlModel(Model): ...@@ -137,7 +157,8 @@ class YamlModel(Model):
scope = params['scope'] scope = params['scope']
decay = params['decay'] decay = params['decay']
for param_table in self._build_param['table']: for param_table in self._build_param['table']:
table_id = self._build_param['table'][param_table]['_meta']._table_id table_id = self._build_param['table'][param_table][
'_meta']._table_id
fleet.shrink_dense_table(decay, scope=scope, table_id=table_id) fleet.shrink_dense_table(decay, scope=scope, table_id=table_id)
def dump_inference_program(self, inference_layer, path): def dump_inference_program(self, inference_layer, path):
...@@ -152,17 +173,25 @@ class YamlModel(Model): ...@@ -152,17 +173,25 @@ class YamlModel(Model):
executor = params['executor'] executor = params['executor']
program = self._build_param['model']['train_program'] program = self._build_param['model']['train_program']
for table_name, table in self._build_param['table'].items(): for table_name, table in self._build_param['table'].items():
fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id,
table['params'])
for infernce_item in params['inference_list']: for infernce_item in params['inference_list']:
params_name_list = self.inference_params(infernce_item['layer_name']) params_name_list = self.inference_params(infernce_item[
params_var_list = [program.global_block().var(i) for i in params_name_list] 'layer_name'])
params_var_list = [
program.global_block().var(i) for i in params_name_list
]
params_file_name = infernce_item['save_file_name'] params_file_name = infernce_item['save_file_name']
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
if params['save_combine']: if params['save_combine']:
fluid.io.save_vars(executor, "./", \ fluid.io.save_vars(executor, "./", \
program, vars=params_var_list, filename=params_file_name) program, vars=params_var_list, filename=params_file_name)
else: else:
fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list) fluid.io.save_vars(
executor,
params_file_name,
program,
vars=params_var_list)
def inference_params(self, inference_layer): def inference_params(self, inference_layer):
""" """
...@@ -177,11 +206,13 @@ class YamlModel(Model): ...@@ -177,11 +206,13 @@ class YamlModel(Model):
return self._inference_meta['params'][layer] return self._inference_meta['params'][layer]
self._inference_meta['params'][layer] = [] self._inference_meta['params'][layer] = []
self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer) self._inference_meta['dependency'][layer] = self.get_dependency(
self._build_param['inner_layer'], layer)
for node in self._build_nodes['layer']: for node in self._build_nodes['layer']:
if node['name'] not in self._inference_meta['dependency'][layer]: if node['name'] not in self._inference_meta['dependency'][layer]:
continue continue
if 'inference_param' in self._build_param['layer_extend'][node['name']]: if 'inference_param' in self._build_param['layer_extend'][node[
'name']]:
self._inference_meta['params'][layer] += \ self._inference_meta['params'][layer] += \
self._build_param['layer_extend'][node['name']]['inference_param']['params'] self._build_param['layer_extend'][node['name']]['inference_param']['params']
return self._inference_meta['params'][layer] return self._inference_meta['params'][layer]
...@@ -199,5 +230,6 @@ class YamlModel(Model): ...@@ -199,5 +230,6 @@ class YamlModel(Model):
dependencys = copy.deepcopy(layer_graph[dest_layer]['input']) dependencys = copy.deepcopy(layer_graph[dest_layer]['input'])
dependency_list = copy.deepcopy(dependencys) dependency_list = copy.deepcopy(dependencys)
for dependency in dependencys: for dependency in dependencys:
dependency_list = dependency_list + self.get_dependency(layer_graph, dependency) dependency_list = dependency_list + self.get_dependency(
layer_graph, dependency)
return list(set(dependency_list)) return list(set(dependency_list))
...@@ -18,7 +18,7 @@ from paddlerec.core.layer import Layer ...@@ -18,7 +18,7 @@ from paddlerec.core.layer import Layer
class EmbeddingFuseLayer(Layer): class EmbeddingFuseLayer(Layer):
"""R """embedding + sequence + concat
""" """
def __init__(self, config): def __init__(self, config):
...@@ -40,7 +40,8 @@ class EmbeddingFuseLayer(Layer): ...@@ -40,7 +40,8 @@ class EmbeddingFuseLayer(Layer):
show_clk.stop_gradient = True show_clk.stop_gradient = True
data_var = [] data_var = []
for slot in self._slots: for slot in self._slots:
l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1) l = fluid.layers.data(
name=slot, shape=[1], dtype="int64", lod_level=1)
data_var.append(l) data_var.append(l)
emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \ emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \
is_sparse=True, is_distributed=True, is_sparse=True, is_distributed=True,
...@@ -48,7 +49,8 @@ class EmbeddingFuseLayer(Layer): ...@@ -48,7 +49,8 @@ class EmbeddingFuseLayer(Layer):
emb = fluid.layers.sequence_pool(input=emb, pool_type='sum') emb = fluid.layers.sequence_pool(input=emb, pool_type='sum')
emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm) emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm)
self._emb_layers.append(emb) self._emb_layers.append(emb)
output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name) output = fluid.layers.concat(
input=self._emb_layers, axis=1, name=self._name)
return output, {'data_var': data_var} return output, {'data_var': data_var}
...@@ -111,7 +113,13 @@ class ParamLayer(Layer): ...@@ -111,7 +113,13 @@ class ParamLayer(Layer):
def generate(self, param): def generate(self, param):
"""R """R
""" """
return self._config, {'inference_param': {'name': 'param', 'params': [], 'table_id': self._table_id}} return self._config, {
'inference_param': {
'name': 'param',
'params': [],
'table_id': self._table_id
}
}
class SummaryLayer(Layer): class SummaryLayer(Layer):
...@@ -129,7 +137,13 @@ class SummaryLayer(Layer): ...@@ -129,7 +137,13 @@ class SummaryLayer(Layer):
def generate(self, param): def generate(self, param):
"""R """R
""" """
return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}} return self._config, {
'inference_param': {
'name': 'summary',
'params': [],
'table_id': self._table_id
}
}
class NormalizationLayer(Layer): class NormalizationLayer(Layer):
...@@ -152,9 +166,19 @@ class NormalizationLayer(Layer): ...@@ -152,9 +166,19 @@ class NormalizationLayer(Layer):
if len(self._input) > 0: if len(self._input) > 0:
input_list = [param['layer'][i] for i in self._input] input_list = [param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1) input_layer = fluid.layers.concat(input=input_list, axis=1)
bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={ bn = fluid.layers.data_norm(
"batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4}) input=input_layer,
inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum'] name=self._name,
epsilon=1e-4,
param_attr={
"batch_size": 1e4,
"batch_sum_default": 0.0,
"batch_square": 1e4
})
inference_param = [
self._name + '.batch_size', self._name + '.batch_sum',
self._name + '.batch_square_sum'
]
return bn, {'inference_param': {'name': 'summary', \ return bn, {'inference_param': {'name': 'summary', \
'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}} 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}}
...@@ -181,11 +205,13 @@ class FCLayer(Layer): ...@@ -181,11 +205,13 @@ class FCLayer(Layer):
input_list = [param['layer'][i] for i in self._input] input_list = [param['layer'][i] for i in self._input]
input_layer = fluid.layers.concat(input=input_list, axis=1) input_layer = fluid.layers.concat(input=input_list, axis=1)
input_coln = input_layer.shape[1] input_coln = input_layer.shape[1]
scale = param_layer['init_range'] / (input_coln ** 0.5) scale = param_layer['init_range'] / (input_coln**0.5)
bias = None bias = None
if self._bias: if self._bias:
bias = fluid.ParamAttr(learning_rate=1.0, bias = fluid.ParamAttr(
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)) learning_rate=1.0,
initializer=fluid.initializer.NormalInitializer(
loc=0.0, scale=scale))
fc = fluid.layers.fc( fc = fluid.layers.fc(
name=self._name, name=self._name,
input=input_layer, input=input_layer,
...@@ -216,18 +242,46 @@ class LogLossLayer(Layer): ...@@ -216,18 +242,46 @@ class LogLossLayer(Layer):
self._extend_output = { self._extend_output = {
'metric_label': self._metric_label, 'metric_label': self._metric_label,
'metric_dict': { 'metric_dict': {
'auc': {'var': None}, 'auc': {
'batch_auc': {'var': None}, 'var': None
'stat_pos': {'var': None, 'data_type': 'int64'}, },
'stat_neg': {'var': None, 'data_type': 'int64'}, 'batch_auc': {
'batch_stat_pos': {'var': None, 'data_type': 'int64'}, 'var': None
'batch_stat_neg': {'var': None, 'data_type': 'int64'}, },
'pos_ins_num': {'var': None}, 'stat_pos': {
'abserr': {'var': None}, 'var': None,
'sqrerr': {'var': None}, 'data_type': 'int64'
'prob': {'var': None}, },
'total_ins_num': {'var': None}, 'stat_neg': {
'q': {'var': None} 'var': None,
'data_type': 'int64'
},
'batch_stat_pos': {
'var': None,
'data_type': 'int64'
},
'batch_stat_neg': {
'var': None,
'data_type': 'int64'
},
'pos_ins_num': {
'var': None
},
'abserr': {
'var': None
},
'sqrerr': {
'var': None
},
'prob': {
'var': None
},
'total_ins_num': {
'var': None
},
'q': {
'var': None
}
} }
} }
...@@ -236,9 +290,12 @@ class LogLossLayer(Layer): ...@@ -236,9 +290,12 @@ class LogLossLayer(Layer):
""" """
input_layer = param['layer'][self._input[0]] input_layer = param['layer'][self._input[0]]
label_layer = param['layer'][self._label] label_layer = param['layer'][self._label]
output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name) output = fluid.layers.clip(
input_layer, self._bound[0], self._bound[1], name=self._name)
norm = fluid.layers.sigmoid(output, name=self._name) norm = fluid.layers.sigmoid(output, name=self._name)
output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32')) output = fluid.layers.log_loss(
norm, fluid.layers.cast(
x=label_layer, dtype='float32'))
if self._weight: if self._weight:
weight_layer = param['layer'][self._weight] weight_layer = param['layer'][self._weight]
output = fluid.layers.elementwise_mul(output, weight_layer) output = fluid.layers.elementwise_mul(output, weight_layer)
...@@ -248,7 +305,11 @@ class LogLossLayer(Layer): ...@@ -248,7 +305,11 @@ class LogLossLayer(Layer):
# For AUC Metric # For AUC Metric
metric = self._extend_output['metric_dict'] metric = self._extend_output['metric_dict']
binary_predict = fluid.layers.concat( binary_predict = fluid.layers.concat(
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1) input=[
fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm),
norm
],
axis=1)
metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \ metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \
metric['batch_stat_neg']['var'], metric['stat_pos']['var'], metric['batch_stat_neg']['var'], metric['stat_pos']['var'],
metric['stat_neg']['var']] = \ metric['stat_neg']['var']] = \
......
...@@ -30,8 +30,10 @@ class Trainer(object): ...@@ -30,8 +30,10 @@ class Trainer(object):
def __init__(self, config=None): def __init__(self, config=None):
self._status_processor = {} self._status_processor = {}
self._place = fluid.CPUPlace() self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place) self._exe = fluid.Executor(self._place)
self._exector_context = {} self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False} self._context = {'status': 'uninit', 'is_exit': False}
self._config_yaml = config self._config_yaml = config
...@@ -95,6 +97,6 @@ def user_define_engine(engine_yaml): ...@@ -95,6 +97,6 @@ def user_define_engine(engine_yaml):
train_dirname = os.path.dirname(train_location) train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0] base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname) sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename( trainer_class = envs.lazy_instance_by_fliename(base_name,
base_name, "UserDefineTraining") "UserDefineTraining")
return trainer_class return trainer_class
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Training use fluid with one node only. Training use fluid with one node only.
""" """
...@@ -43,11 +42,14 @@ class ClusterTrainer(TranspileTrainer): ...@@ -43,11 +42,14 @@ class ClusterTrainer(TranspileTrainer):
self.regist_context_processor('uninit', self.instance) self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init) self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup) self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train) self.regist_context_processor('train_pass', self.dataset_train)
else: else:
self.regist_context_processor( self.regist_context_processor('train_pass',
'train_pass', self.dataloader_train) self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer) self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal) self.regist_context_processor('terminal_pass', self.terminal)
...@@ -75,8 +77,8 @@ class ClusterTrainer(TranspileTrainer): ...@@ -75,8 +77,8 @@ class ClusterTrainer(TranspileTrainer):
def init(self, context): def init(self, context):
self.model.train_net() self.model.train_net()
optimizer = self.model.optimizer() optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env( optimizer_name = envs.get_global_env("hyper_parameters.optimizer",
"hyper_parameters.optimizer", None, "train.model") None, "train.model")
if optimizer_name not in ["", "sgd", "SGD", "Sgd"]: if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0' os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
...@@ -114,9 +116,9 @@ class ClusterTrainer(TranspileTrainer): ...@@ -114,9 +116,9 @@ class ClusterTrainer(TranspileTrainer):
program = fluid.compiler.CompiledProgram( program = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel( fleet.main_program).with_data_parallel(
loss_name=self.model.get_avg_cost().name, loss_name=self.model.get_avg_cost().name,
build_strategy=self.strategy.get_build_strategy(), build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy()) exec_strategy=self.strategy.get_execute_strategy())
metrics_varnames = [] metrics_varnames = []
metrics_format = [] metrics_format = []
...@@ -135,9 +137,8 @@ class ClusterTrainer(TranspileTrainer): ...@@ -135,9 +137,8 @@ class ClusterTrainer(TranspileTrainer):
batch_id = 0 batch_id = 0
try: try:
while True: while True:
metrics_rets = self._exe.run( metrics_rets = self._exe.run(program=program,
program=program, fetch_list=metrics_varnames)
fetch_list=metrics_varnames)
metrics = [epoch, batch_id] metrics = [epoch, batch_id]
metrics.extend(metrics_rets) metrics.extend(metrics_rets)
...@@ -162,14 +163,16 @@ class ClusterTrainer(TranspileTrainer): ...@@ -162,14 +163,16 @@ class ClusterTrainer(TranspileTrainer):
for i in range(epochs): for i in range(epochs):
begin_time = time.time() begin_time = time.time()
self._exe.train_from_dataset(program=fluid.default_main_program(), self._exe.train_from_dataset(
dataset=dataset, program=fluid.default_main_program(),
fetch_list=self.fetch_vars, dataset=dataset,
fetch_info=self.fetch_alias, fetch_list=self.fetch_vars,
print_period=self.fetch_period) fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time() end_time = time.time()
times = end_time-begin_time times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=True) self.save(i, "train", is_fleet=True)
fleet.stop_worker() fleet.stop_worker()
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Training use fluid with one node only. Training use fluid with one node only.
""" """
...@@ -44,11 +43,14 @@ class OnlineLearningTrainer(TranspileTrainer): ...@@ -44,11 +43,14 @@ class OnlineLearningTrainer(TranspileTrainer):
self.regist_context_processor('uninit', self.instance) self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init) self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup) self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader":
if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train) self.regist_context_processor('train_pass', self.dataset_train)
else: else:
self.regist_context_processor( self.regist_context_processor('train_pass',
'train_pass', self.dataloader_train) self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer) self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal) self.regist_context_processor('terminal_pass', self.terminal)
...@@ -110,27 +112,27 @@ class OnlineLearningTrainer(TranspileTrainer): ...@@ -110,27 +112,27 @@ class OnlineLearningTrainer(TranspileTrainer):
if state == "TRAIN": if state == "TRAIN":
inputs = self.model.get_inputs() inputs = self.model.get_inputs()
namespace = "train.reader" namespace = "train.reader"
train_data_path = envs.get_global_env( train_data_path = envs.get_global_env("train_data_path", None,
"train_data_path", None, namespace) namespace)
else: else:
inputs = self.model.get_infer_inputs() inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader" namespace = "evaluate.reader"
train_data_path = envs.get_global_env( train_data_path = envs.get_global_env("test_data_path", None,
"test_data_path", None, namespace) namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads")) threads = int(envs.get_runtime_environ("train.trainer.threads"))
batch_size = envs.get_global_env("batch_size", None, namespace) batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace) reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__)) abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
pipe_cmd = "python {} {} {} {}".format( pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state,
reader, reader_class, state, self._config_yaml) self._config_yaml)
if train_data_path.startswith("paddlerec::"): if train_data_path.startswith("paddlerec::"):
package_base = envs.get_runtime_environ("PACKAGE_BASE") package_base = envs.get_runtime_environ("PACKAGE_BASE")
assert package_base is not None assert package_base is not None
train_data_path = os.path.join( train_data_path = os.path.join(package_base,
package_base, train_data_path.split("::")[1]) train_data_path.split("::")[1])
dataset = fluid.DatasetFactory().create_dataset() dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs) dataset.set_use_var(inputs)
...@@ -166,14 +168,16 @@ class OnlineLearningTrainer(TranspileTrainer): ...@@ -166,14 +168,16 @@ class OnlineLearningTrainer(TranspileTrainer):
ins = self._get_dataset_ins() ins = self._get_dataset_ins()
begin_time = time.time() begin_time = time.time()
self._exe.train_from_dataset(program=fluid.default_main_program(), self._exe.train_from_dataset(
dataset=dataset, program=fluid.default_main_program(),
fetch_list=self.fetch_vars, dataset=dataset,
fetch_info=self.fetch_alias, fetch_list=self.fetch_vars,
print_period=self.fetch_period) fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time() end_time = time.time()
times = end_time-begin_time times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=True) self.save(i, "train", is_fleet=True)
fleet.stop_worker() fleet.stop_worker()
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Training use fluid with one node only. Training use fluid with one node only.
""" """
...@@ -36,8 +35,9 @@ class SingleTrainer(TranspileTrainer): ...@@ -36,8 +35,9 @@ class SingleTrainer(TranspileTrainer):
self.regist_context_processor('uninit', self.instance) self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init) self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup) self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None,
"train.reader") != "DataLoader": if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train) self.regist_context_processor('train_pass', self.dataset_train)
else: else:
self.regist_context_processor('train_pass', self.dataloader_train) self.regist_context_processor('train_pass', self.dataloader_train)
...@@ -73,9 +73,8 @@ class SingleTrainer(TranspileTrainer): ...@@ -73,9 +73,8 @@ class SingleTrainer(TranspileTrainer):
reader = self._get_dataloader("TRAIN") reader = self._get_dataloader("TRAIN")
epochs = envs.get_global_env("train.epochs") epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram( program = fluid.compiler.CompiledProgram(fluid.default_main_program(
fluid.default_main_program()).with_data_parallel( )).with_data_parallel(loss_name=self.model.get_avg_cost().name)
loss_name=self.model.get_avg_cost().name)
metrics_varnames = [] metrics_varnames = []
metrics_format = [] metrics_format = []
...@@ -94,9 +93,8 @@ class SingleTrainer(TranspileTrainer): ...@@ -94,9 +93,8 @@ class SingleTrainer(TranspileTrainer):
batch_id = 0 batch_id = 0
try: try:
while True: while True:
metrics_rets = self._exe.run( metrics_rets = self._exe.run(program=program,
program=program, fetch_list=metrics_varnames)
fetch_list=metrics_varnames)
metrics = [epoch, batch_id] metrics = [epoch, batch_id]
metrics.extend(metrics_rets) metrics.extend(metrics_rets)
...@@ -117,14 +115,16 @@ class SingleTrainer(TranspileTrainer): ...@@ -117,14 +115,16 @@ class SingleTrainer(TranspileTrainer):
epochs = envs.get_global_env("train.epochs") epochs = envs.get_global_env("train.epochs")
for i in range(epochs): for i in range(epochs):
begin_time = time.time() begin_time = time.time()
self._exe.train_from_dataset(program=fluid.default_main_program(), self._exe.train_from_dataset(
dataset=dataset, program=fluid.default_main_program(),
fetch_list=self.fetch_vars, dataset=dataset,
fetch_info=self.fetch_alias, fetch_list=self.fetch_vars,
print_period=self.fetch_period) fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time() end_time = time.time()
times = end_time - begin_time times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins / times)) print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=False) self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass' context['status'] = 'infer_pass'
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Training use fluid with one node only. Training use fluid with one node only.
""" """
...@@ -36,8 +35,8 @@ special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"] ...@@ -36,8 +35,8 @@ special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"]
class TDMClusterTrainer(ClusterTrainer): class TDMClusterTrainer(ClusterTrainer):
def server(self, context): def server(self, context):
namespace = "train.startup" namespace = "train.startup"
init_model_path = envs.get_global_env( init_model_path = envs.get_global_env("cluster.init_model_path", "",
"cluster.init_model_path", "", namespace) namespace)
assert init_model_path != "", "Cluster train must has init_model for TDM" assert init_model_path != "", "Cluster train must has init_model for TDM"
fleet.init_server(init_model_path) fleet.init_server(init_model_path)
logger.info("TDM: load model from {}".format(init_model_path)) logger.info("TDM: load model from {}".format(init_model_path))
...@@ -48,24 +47,27 @@ class TDMClusterTrainer(ClusterTrainer): ...@@ -48,24 +47,27 @@ class TDMClusterTrainer(ClusterTrainer):
self._exe.run(fleet.startup_program) self._exe.run(fleet.startup_program)
namespace = "train.startup" namespace = "train.startup"
load_tree = envs.get_global_env( load_tree = envs.get_global_env("tree.load_tree", True, namespace)
"tree.load_tree", True, namespace)
self.tree_layer_path = envs.get_global_env( self.tree_layer_path = envs.get_global_env("tree.tree_layer_path", "",
"tree.tree_layer_path", "", namespace) namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace) self.tree_travel_path = envs.get_global_env("tree.tree_travel_path",
self.tree_info_path = envs.get_global_env( "", namespace)
"tree.tree_info_path", "", namespace)
self.tree_info_path = envs.get_global_env("tree.tree_info_path", "",
save_init_model = envs.get_global_env( namespace)
"cluster.save_init_model", False, namespace)
init_model_path = envs.get_global_env( save_init_model = envs.get_global_env("cluster.save_init_model", False,
"cluster.init_model_path", "", namespace) namespace)
init_model_path = envs.get_global_env("cluster.init_model_path", "",
namespace)
if load_tree: if load_tree:
# covert tree to tensor, set it into Fluid's variable. # covert tree to tensor, set it into Fluid's variable.
for param_name in special_param: for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor() param_t = fluid.global_scope().find_var(param_name).get_tensor(
)
param_array = self._tdm_prepare(param_name) param_array = self._tdm_prepare(param_name)
param_t.set(param_array.astype('int32'), self._place) param_t.set(param_array.astype('int32'), self._place)
...@@ -93,8 +95,8 @@ class TDMClusterTrainer(ClusterTrainer): ...@@ -93,8 +95,8 @@ class TDMClusterTrainer(ClusterTrainer):
def _tdm_travel_prepare(self): def _tdm_travel_prepare(self):
"""load tdm tree param from npy/list file""" """load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path) travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format( logger.info("TDM Tree leaf node nums: {}".format(travel_array.shape[
travel_array.shape[0])) 0]))
return travel_array return travel_array
def _tdm_layer_prepare(self): def _tdm_layer_prepare(self):
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Training use fluid with one node only. Training use fluid with one node only.
""" """
...@@ -27,33 +26,38 @@ from paddlerec.core.utils import envs ...@@ -27,33 +26,38 @@ from paddlerec.core.utils import envs
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid") logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", special_param = [
"TDM_Tree_Info", "TDM_Tree_Emb"] "TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info", "TDM_Tree_Emb"
]
class TDMSingleTrainer(SingleTrainer): class TDMSingleTrainer(SingleTrainer):
def startup(self, context): def startup(self, context):
namespace = "train.startup" namespace = "train.startup"
load_persistables = envs.get_global_env( load_persistables = envs.get_global_env("single.load_persistables",
"single.load_persistables", False, namespace) False, namespace)
persistables_model_path = envs.get_global_env( persistables_model_path = envs.get_global_env(
"single.persistables_model_path", "", namespace) "single.persistables_model_path", "", namespace)
load_tree = envs.get_global_env( load_tree = envs.get_global_env("tree.load_tree", False, namespace)
"tree.load_tree", False, namespace)
self.tree_layer_path = envs.get_global_env( self.tree_layer_path = envs.get_global_env("tree.tree_layer_path", "",
"tree.tree_layer_path", "", namespace) namespace)
self.tree_travel_path = envs.get_global_env(
"tree.tree_travel_path", "", namespace) self.tree_travel_path = envs.get_global_env("tree.tree_travel_path",
self.tree_info_path = envs.get_global_env( "", namespace)
"tree.tree_info_path", "", namespace)
self.tree_emb_path = envs.get_global_env( self.tree_info_path = envs.get_global_env("tree.tree_info_path", "",
"tree.tree_emb_path", "", namespace) namespace)
save_init_model = envs.get_global_env( self.tree_emb_path = envs.get_global_env("tree.tree_emb_path", "",
"single.save_init_model", False, namespace) namespace)
init_model_path = envs.get_global_env(
"single.init_model_path", "", namespace) save_init_model = envs.get_global_env("single.save_init_model", False,
namespace)
init_model_path = envs.get_global_env("single.init_model_path", "",
namespace)
self._exe.run(fluid.default_startup_program()) self._exe.run(fluid.default_startup_program())
if load_persistables: if load_persistables:
...@@ -68,7 +72,8 @@ class TDMSingleTrainer(SingleTrainer): ...@@ -68,7 +72,8 @@ class TDMSingleTrainer(SingleTrainer):
if load_tree: if load_tree:
# covert tree to tensor, set it into Fluid's variable. # covert tree to tensor, set it into Fluid's variable.
for param_name in special_param: for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor() param_t = fluid.global_scope().find_var(param_name).get_tensor(
)
param_array = self._tdm_prepare(param_name) param_array = self._tdm_prepare(param_name)
if param_name == 'TDM_Tree_Emb': if param_name == 'TDM_Tree_Emb':
param_t.set(param_array.astype('float32'), self._place) param_t.set(param_array.astype('float32'), self._place)
...@@ -102,15 +107,15 @@ class TDMSingleTrainer(SingleTrainer): ...@@ -102,15 +107,15 @@ class TDMSingleTrainer(SingleTrainer):
def _tdm_travel_prepare(self): def _tdm_travel_prepare(self):
"""load tdm tree param from npy/list file""" """load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path) travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format( logger.info("TDM Tree leaf node nums: {}".format(travel_array.shape[
travel_array.shape[0])) 0]))
return travel_array return travel_array
def _tdm_emb_prepare(self): def _tdm_emb_prepare(self):
"""load tdm tree param from npy/list file""" """load tdm tree param from npy/list file"""
emb_array = np.load(self.tree_emb_path) emb_array = np.load(self.tree_emb_path)
logger.info("TDM Tree node nums from emb: {}".format( logger.info("TDM Tree node nums from emb: {}".format(emb_array.shape[
emb_array.shape[0])) 0]))
return emb_array return emb_array
def _tdm_layer_prepare(self): def _tdm_layer_prepare(self):
......
...@@ -11,7 +11,6 @@ ...@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Training use fluid with DistributeTranspiler Training use fluid with DistributeTranspiler
""" """
...@@ -39,9 +38,12 @@ class TranspileTrainer(Trainer): ...@@ -39,9 +38,12 @@ class TranspileTrainer(Trainer):
self.increment_models = [] self.increment_models = []
def processor_register(self): def processor_register(self):
print("Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first") print(
"Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first"
)
def _get_dataloader(self, state="TRAIN"): def _get_dataloader(self, state="TRAIN"):
if state == "TRAIN": if state == "TRAIN":
dataloader = self.model._data_loader dataloader = self.model._data_loader
namespace = "train.reader" namespace = "train.reader"
...@@ -59,12 +61,14 @@ class TranspileTrainer(Trainer): ...@@ -59,12 +61,14 @@ class TranspileTrainer(Trainer):
if sparse_slots is None and dense_slots is None: if sparse_slots is None and dense_slots is None:
reader_class = envs.get_global_env("class", None, namespace) reader_class = envs.get_global_env("class", None, namespace)
reader = dataloader_instance.dataloader( reader = dataloader_instance.dataloader(reader_class, state,
reader_class, state, self._config_yaml) self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class, class_name) reader_class = envs.lazy_instance_by_fliename(reader_class,
class_name)
reader_ins = reader_class(self._config_yaml) reader_ins = reader_class(self._config_yaml)
else: else:
reader = dataloader_instance.slotdataloader("", state, self._config_yaml) reader = dataloader_instance.slotdataloader("", state,
self._config_yaml)
reader_ins = SlotReader(self._config_yaml) reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'): if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
...@@ -94,13 +98,13 @@ class TranspileTrainer(Trainer): ...@@ -94,13 +98,13 @@ class TranspileTrainer(Trainer):
if state == "TRAIN": if state == "TRAIN":
inputs = self.model.get_inputs() inputs = self.model.get_inputs()
namespace = "train.reader" namespace = "train.reader"
train_data_path = envs.get_global_env( train_data_path = envs.get_global_env("train_data_path", None,
"train_data_path", None, namespace) namespace)
else: else:
inputs = self.model.get_infer_inputs() inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader" namespace = "evaluate.reader"
train_data_path = envs.get_global_env( train_data_path = envs.get_global_env("test_data_path", None,
"test_data_path", None, namespace) namespace)
sparse_slots = envs.get_global_env("sparse_slots", None, namespace) sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace) dense_slots = envs.get_global_env("dense_slots", None, namespace)
...@@ -112,8 +116,8 @@ class TranspileTrainer(Trainer): ...@@ -112,8 +116,8 @@ class TranspileTrainer(Trainer):
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
if sparse_slots is None and dense_slots is None: if sparse_slots is None and dense_slots is None:
pipe_cmd = "python {} {} {} {}".format( pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state,
reader, reader_class, state, self._config_yaml) self._config_yaml)
else: else:
padding = envs.get_global_env("padding", 0, namespace) padding = envs.get_global_env("padding", 0, namespace)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format( pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
...@@ -123,8 +127,8 @@ class TranspileTrainer(Trainer): ...@@ -123,8 +127,8 @@ class TranspileTrainer(Trainer):
if train_data_path.startswith("paddlerec::"): if train_data_path.startswith("paddlerec::"):
package_base = envs.get_runtime_environ("PACKAGE_BASE") package_base = envs.get_runtime_environ("PACKAGE_BASE")
assert package_base is not None assert package_base is not None
train_data_path = os.path.join( train_data_path = os.path.join(package_base,
package_base, train_data_path.split("::")[1]) train_data_path.split("::")[1])
dataset = fluid.DatasetFactory().create_dataset() dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs) dataset.set_use_var(inputs)
...@@ -140,11 +144,11 @@ class TranspileTrainer(Trainer): ...@@ -140,11 +144,11 @@ class TranspileTrainer(Trainer):
debug_mode = envs.get_global_env("reader_debug_mode", False, namespace) debug_mode = envs.get_global_env("reader_debug_mode", False, namespace)
if debug_mode: if debug_mode:
print( print("--- Dataset Debug Mode Begin , show pre 10 data of {}---".
"--- Dataset Debug Mode Begin , show pre 10 data of {}---".format(file_list[0])) format(file_list[0]))
os.system("cat {} | {} | head -10".format(file_list[0], pipe_cmd)) os.system("cat {} | {} | head -10".format(file_list[0], pipe_cmd))
print( print("--- Dataset Debug Mode End , show pre 10 data of {}---".
"--- Dataset Debug Mode End , show pre 10 data of {}---".format(file_list[0])) format(file_list[0]))
exit(0) exit(0)
return dataset return dataset
...@@ -166,27 +170,29 @@ class TranspileTrainer(Trainer): ...@@ -166,27 +170,29 @@ class TranspileTrainer(Trainer):
if not need_save(epoch_id, save_interval, False): if not need_save(epoch_id, save_interval, False):
return return
feed_varnames = envs.get_global_env( feed_varnames = envs.get_global_env("save.inference.feed_varnames",
"save.inference.feed_varnames", None, namespace) None, namespace)
fetch_varnames = envs.get_global_env( fetch_varnames = envs.get_global_env(
"save.inference.fetch_varnames", None, namespace) "save.inference.fetch_varnames", None, namespace)
if feed_varnames is None or fetch_varnames is None: if feed_varnames is None or fetch_varnames is None:
return return
fetch_vars = [fluid.default_main_program().global_block().vars[varname] fetch_vars = [
for varname in fetch_varnames] fluid.default_main_program().global_block().vars[varname]
dirname = envs.get_global_env( for varname in fetch_varnames
"save.inference.dirname", None, namespace) ]
dirname = envs.get_global_env("save.inference.dirname", None,
namespace)
assert dirname is not None assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id)) dirname = os.path.join(dirname, str(epoch_id))
if is_fleet: if is_fleet:
fleet.save_inference_model( fleet.save_inference_model(self._exe, dirname, feed_varnames,
self._exe, dirname, feed_varnames, fetch_vars) fetch_vars)
else: else:
fluid.io.save_inference_model( fluid.io.save_inference_model(dirname, feed_varnames,
dirname, feed_varnames, fetch_vars, self._exe) fetch_vars, self._exe)
self.inference_models.append((epoch_id, dirname)) self.inference_models.append((epoch_id, dirname))
def save_persistables(): def save_persistables():
...@@ -196,8 +202,8 @@ class TranspileTrainer(Trainer): ...@@ -196,8 +202,8 @@ class TranspileTrainer(Trainer):
if not need_save(epoch_id, save_interval, False): if not need_save(epoch_id, save_interval, False):
return return
dirname = envs.get_global_env( dirname = envs.get_global_env("save.increment.dirname", None,
"save.increment.dirname", None, namespace) namespace)
assert dirname is not None assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id)) dirname = os.path.join(dirname, str(epoch_id))
...@@ -275,10 +281,9 @@ class TranspileTrainer(Trainer): ...@@ -275,10 +281,9 @@ class TranspileTrainer(Trainer):
batch_id = 0 batch_id = 0
try: try:
while True: while True:
metrics_rets = self._exe.run( metrics_rets = self._exe.run(program=program,
program=program, fetch_list=metrics_varnames,
fetch_list=metrics_varnames, return_numpy=is_return_numpy)
return_numpy=is_return_numpy)
metrics = [epoch, batch_id] metrics = [epoch, batch_id]
metrics.extend(metrics_rets) metrics.extend(metrics_rets)
......
...@@ -24,7 +24,7 @@ from paddlerec.core.utils import util as util ...@@ -24,7 +24,7 @@ from paddlerec.core.utils import util as util
class DatasetHolder(object): class DatasetHolder(object):
""" """
Dataset Base Dataset Holder
""" """
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
...@@ -74,11 +74,17 @@ class TimeSplitDatasetHolder(DatasetHolder): ...@@ -74,11 +74,17 @@ class TimeSplitDatasetHolder(DatasetHolder):
Dataset.__init__(self, config) Dataset.__init__(self, config)
if 'data_donefile' not in config or config['data_donefile'] is None: if 'data_donefile' not in config or config['data_donefile'] is None:
config['data_donefile'] = config['data_path'] + "/to.hadoop.done" config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
self._path_generator = util.PathGenerator({'templates': [ self._path_generator = util.PathGenerator({
{'name': 'data_path', 'template': config['data_path']}, 'templates': [{
{'name': 'donefile_path', 'template': config['data_donefile']} 'name': 'data_path',
]}) 'template': config['data_path']
self._split_interval = config['split_interval'] # data split N mins per dir }, {
'name': 'donefile_path',
'template': config['data_donefile']
}]
})
self._split_interval = config[
'split_interval'] # data split N mins per dir
self._data_file_handler = fs.FileHandler(config) self._data_file_handler = fs.FileHandler(config)
def _format_data_time(self, daytime_str, time_window_mins): def _format_data_time(self, daytime_str, time_window_mins):
...@@ -91,7 +97,8 @@ class TimeSplitDatasetHolder(DatasetHolder): ...@@ -91,7 +97,8 @@ class TimeSplitDatasetHolder(DatasetHolder):
return None, 0 return None, 0
if mins_of_day % self._split_interval != 0: if mins_of_day % self._split_interval != 0:
skip_mins = self._split_interval - (mins_of_day % self._split_interval) skip_mins = self._split_interval - (mins_of_day %
self._split_interval)
data_time = data_time + datetime.timedelta(minutes=skip_mins) data_time = data_time + datetime.timedelta(minutes=skip_mins)
time_window_mins = time_window_mins - skip_mins time_window_mins = time_window_mins - skip_mins
return data_time, time_window_mins return data_time, time_window_mins
...@@ -106,17 +113,24 @@ class TimeSplitDatasetHolder(DatasetHolder): ...@@ -106,17 +113,24 @@ class TimeSplitDatasetHolder(DatasetHolder):
True/False True/False
""" """
is_ready = True is_ready = True
data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) data_time, windows_mins = self._format_data_time(daytime_str,
time_window_mins)
while time_window_mins > 0: while time_window_mins > 0:
file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) file_path = self._path_generator.generate_path(
'donefile_path', {'time_format': data_time})
if not self._data_file_handler.is_exist(file_path): if not self._data_file_handler.is_exist(file_path):
is_ready = False is_ready = False
break break
time_window_mins = time_window_mins - self._split_interval time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval) data_time = data_time + datetime.timedelta(
minutes=self._split_interval)
return is_ready return is_ready
def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): def get_file_list(self,
daytime_str,
time_window_mins,
node_num=1,
node_idx=0):
""" """
data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx] data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx]
Args: Args:
...@@ -128,26 +142,32 @@ class TimeSplitDatasetHolder(DatasetHolder): ...@@ -128,26 +142,32 @@ class TimeSplitDatasetHolder(DatasetHolder):
list, data_shard[node_idx] list, data_shard[node_idx]
""" """
data_file_list = [] data_file_list = []
data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) data_time, windows_mins = self._format_data_time(daytime_str,
time_window_mins)
while time_window_mins > 0: while time_window_mins > 0:
file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) file_path = self._path_generator.generate_path(
'data_path', {'time_format': data_time})
sub_file_list = self._data_file_handler.ls(file_path) sub_file_list = self._data_file_handler.ls(file_path)
for sub_file in sub_file_list: for sub_file in sub_file_list:
sub_file_name = self._data_file_handler.get_file_name(sub_file) sub_file_name = self._data_file_handler.get_file_name(sub_file)
if not sub_file_name.startswith(self._config['filename_prefix']): if not sub_file_name.startswith(self._config[
'filename_prefix']):
continue continue
if hash(sub_file_name) % node_num == node_idx: if hash(sub_file_name) % node_num == node_idx:
data_file_list.append(sub_file) data_file_list.append(sub_file)
time_window_mins = time_window_mins - self._split_interval time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval) data_time = data_time + datetime.timedelta(
minutes=self._split_interval)
return data_file_list return data_file_list
def _alloc_dataset(self, file_list): def _alloc_dataset(self, file_list):
""" """ """ """
dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) dataset = fluid.DatasetFactory().create_dataset(self._config[
'dataset_type'])
dataset.set_batch_size(self._config['batch_size']) dataset.set_batch_size(self._config['batch_size'])
dataset.set_thread(self._config['load_thread']) dataset.set_thread(self._config['load_thread'])
dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi']) dataset.set_hdfs_config(self._config['fs_name'],
self._config['fs_ugi'])
dataset.set_pipe_command(self._config['data_converter']) dataset.set_pipe_command(self._config['data_converter'])
dataset.set_filelist(file_list) dataset.set_filelist(file_list)
dataset.set_use_var(self._config['data_vars']) dataset.set_use_var(self._config['data_vars'])
...@@ -163,7 +183,9 @@ class TimeSplitDatasetHolder(DatasetHolder): ...@@ -163,7 +183,9 @@ class TimeSplitDatasetHolder(DatasetHolder):
while self.check_ready(begin_time, windown_min) == False: while self.check_ready(begin_time, windown_min) == False:
print("dataset not ready, time:" + begin_time) print("dataset not ready, time:" + begin_time)
time.sleep(30) time.sleep(30)
file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) file_list = self.get_file_list(begin_time, windown_min,
params['node_num'],
params['node_idx'])
self._datasets[begin_time] = self._alloc_dataset(file_list) self._datasets[begin_time] = self._alloc_dataset(file_list)
self._datasets[begin_time].load_into_memory() self._datasets[begin_time].load_into_memory()
else: else:
...@@ -176,9 +198,12 @@ class TimeSplitDatasetHolder(DatasetHolder): ...@@ -176,9 +198,12 @@ class TimeSplitDatasetHolder(DatasetHolder):
windown_min = params['time_window_min'] windown_min = params['time_window_min']
if begin_time not in self._datasets: if begin_time not in self._datasets:
if self.check_ready(begin_time, windown_min): if self.check_ready(begin_time, windown_min):
file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) file_list = self.get_file_list(begin_time, windown_min,
params['node_num'],
params['node_idx'])
self._datasets[begin_time] = self._alloc_dataset(file_list) self._datasets[begin_time] = self._alloc_dataset(file_list)
self._datasets[begin_time].preload_into_memory(self._config['preload_thread']) self._datasets[begin_time].preload_into_memory(self._config[
'preload_thread'])
return True return True
return False return False
......
...@@ -17,10 +17,11 @@ import sys ...@@ -17,10 +17,11 @@ import sys
from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.reader import SlotReader from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import envs
if len(sys.argv) < 4: if len(sys.argv) < 4:
raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path") raise ValueError(
"reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path"
)
reader_package = sys.argv[1] reader_package = sys.argv[1]
......
...@@ -95,7 +95,7 @@ def path_adapter(path): ...@@ -95,7 +95,7 @@ def path_adapter(path):
l_p = path.split("paddlerec.")[1].replace(".", "/") l_p = path.split("paddlerec.")[1].replace(".", "/")
return os.path.join(package, l_p) return os.path.join(package, l_p)
else: else:
return path return path
def windows_path_converter(path): def windows_path_converter(path):
...@@ -159,8 +159,8 @@ def pretty_print_envs(envs, header=None): ...@@ -159,8 +159,8 @@ def pretty_print_envs(envs, header=None):
def lazy_instance_by_package(package, class_name): def lazy_instance_by_package(package, class_name):
models = get_global_env("train.model.models") models = get_global_env("train.model.models")
model_package = __import__( model_package = __import__(package,
package, globals(), locals(), package.split(".")) globals(), locals(), package.split("."))
instance = getattr(model_package, class_name) instance = getattr(model_package, class_name)
return instance return instance
...@@ -170,8 +170,8 @@ def lazy_instance_by_fliename(abs, class_name): ...@@ -170,8 +170,8 @@ def lazy_instance_by_fliename(abs, class_name):
sys.path.append(dirname) sys.path.append(dirname)
package = os.path.splitext(os.path.basename(abs))[0] package = os.path.splitext(os.path.basename(abs))[0]
model_package = __import__( model_package = __import__(package,
package, globals(), locals(), package.split(".")) globals(), locals(), package.split("."))
instance = getattr(model_package, class_name) instance = getattr(model_package, class_name)
return instance return instance
...@@ -189,8 +189,7 @@ def get_platform(): ...@@ -189,8 +189,7 @@ def get_platform():
def find_free_port(): def find_free_port():
def __free_port(): def __free_port():
with closing(socket.socket(socket.AF_INET, with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
socket.SOCK_STREAM)) as s:
s.bind(('', 0)) s.bind(('', 0))
return s.getsockname()[1] return s.getsockname()[1]
......
...@@ -22,6 +22,7 @@ from paddlerec.core.utils import fs as fs ...@@ -22,6 +22,7 @@ from paddlerec.core.utils import fs as fs
def save_program_proto(path, program=None): def save_program_proto(path, program=None):
if program is None: if program is None:
_program = fluid.default_main_program() _program = fluid.default_main_program()
else: else:
...@@ -175,7 +176,8 @@ class PathGenerator(object): ...@@ -175,7 +176,8 @@ class PathGenerator(object):
""" """
if template_name in self._templates: if template_name in self._templates:
if 'time_format' in param: if 'time_format' in param:
str = param['time_format'].strftime(self._templates[template_name]) str = param['time_format'].strftime(self._templates[
template_name])
return str.format(**param) return str.format(**param)
return self._templates[template_name].format(**param) return self._templates[template_name].format(**param)
else: else:
...@@ -198,31 +200,39 @@ class TimeTrainPass(object): ...@@ -198,31 +200,39 @@ class TimeTrainPass(object):
self._begin_day = make_datetime(day_fields[0].strip()) self._begin_day = make_datetime(day_fields[0].strip())
if len(day_fields) == 1 or len(day_fields[1]) == 0: if len(day_fields) == 1 or len(day_fields[1]) == 0:
# 100 years, meaning to continuous running # 100 years, meaning to continuous running
self._end_day = self._begin_day + datetime.timedelta(days=36500) self._end_day = self._begin_day + datetime.timedelta(
days=36500)
else: else:
# example: 2020212+10 # example: 2020212+10
run_day = int(day_fields[1].strip()) run_day = int(day_fields[1].strip())
self._end_day = self._begin_day + datetime.timedelta(days=run_day) self._end_day = self._begin_day + datetime.timedelta(
days=run_day)
else: else:
# example: {20191001..20191031} # example: {20191001..20191031}
days = os.popen("echo -n " + self._config['days']).read().split(" ") days = os.popen("echo -n " + self._config['days']).read().split(
" ")
self._begin_day = make_datetime(days[0]) self._begin_day = make_datetime(days[0])
self._end_day = make_datetime(days[len(days) - 1]) self._end_day = make_datetime(days[len(days) - 1])
self._checkpoint_interval = self._config['checkpoint_interval'] self._checkpoint_interval = self._config['checkpoint_interval']
self._dump_inference_interval = self._config['dump_inference_interval'] self._dump_inference_interval = self._config['dump_inference_interval']
self._interval_per_pass = self._config['train_time_interval'] # train N min data per pass self._interval_per_pass = self._config[
'train_time_interval'] # train N min data per pass
self._pass_id = 0 self._pass_id = 0
self._inference_pass_id = 0 self._inference_pass_id = 0
self._pass_donefile_handler = None self._pass_donefile_handler = None
if 'pass_donefile_name' in self._config: if 'pass_donefile_name' in self._config:
self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name'] self._train_pass_donefile = global_config[
'output_path'] + '/' + self._config['pass_donefile_name']
if fs.is_afs_path(self._train_pass_donefile): if fs.is_afs_path(self._train_pass_donefile):
self._pass_donefile_handler = fs.FileHandler(global_config['io']['afs']) self._pass_donefile_handler = fs.FileHandler(global_config[
'io']['afs'])
else: else:
self._pass_donefile_handler = fs.FileHandler(global_config['io']['local_fs']) self._pass_donefile_handler = fs.FileHandler(global_config[
'io']['local_fs'])
last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1] last_done = self._pass_donefile_handler.cat(
self._train_pass_donefile).strip().split('\n')[-1]
done_fileds = last_done.split('\t') done_fileds = last_done.split('\t')
if len(done_fileds) > 4: if len(done_fileds) > 4:
self._base_key = done_fileds[1] self._base_key = done_fileds[1]
...@@ -236,15 +246,18 @@ class TimeTrainPass(object): ...@@ -236,15 +246,18 @@ class TimeTrainPass(object):
""" """
return 24 * 60 / self._interval_per_pass return 24 * 60 / self._interval_per_pass
def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint): def save_train_progress(self, day, pass_id, base_key, model_path,
is_checkpoint):
"""R """R
""" """
if is_checkpoint: if is_checkpoint:
self._checkpoint_pass_id = pass_id self._checkpoint_pass_id = pass_id
self._checkpoint_model_path = model_path self._checkpoint_model_path = model_path
done_content = "%s\t%s\t%s\t%s\t%d\n" % (day, base_key, done_content = "%s\t%s\t%s\t%s\t%d\n" % (
self._checkpoint_model_path, self._checkpoint_pass_id, pass_id) day, base_key, self._checkpoint_model_path,
self._pass_donefile_handler.write(done_content, self._train_pass_donefile, 'a') self._checkpoint_pass_id, pass_id)
self._pass_donefile_handler.write(done_content,
self._train_pass_donefile, 'a')
pass pass
def init_pass_by_id(self, date_str, pass_id): def init_pass_by_id(self, date_str, pass_id):
...@@ -286,12 +299,14 @@ class TimeTrainPass(object): ...@@ -286,12 +299,14 @@ class TimeTrainPass(object):
if self._pass_id < 1: if self._pass_id < 1:
self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M")) self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M"))
else: else:
next_time = self._current_train_time + datetime.timedelta(minutes=self._interval_per_pass) next_time = self._current_train_time + datetime.timedelta(
minutes=self._interval_per_pass)
if (next_time - self._end_day).total_seconds() > 0: if (next_time - self._end_day).total_seconds() > 0:
has_next = False has_next = False
else: else:
self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M")) self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M"))
if has_next and (self._inference_pass_id < self._pass_id or self._pass_id < old_pass_id): if has_next and (self._inference_pass_id < self._pass_id or
self._pass_id < old_pass_id):
self._inference_pass_id = self._pass_id - 1 self._inference_pass_id = self._pass_id - 1
return has_next return has_next
...@@ -319,9 +334,11 @@ class TimeTrainPass(object): ...@@ -319,9 +334,11 @@ class TimeTrainPass(object):
Return: Return:
date(current_train_time + delta_day) date(current_train_time + delta_day)
""" """
return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d") return (self._current_train_time + datetime.timedelta(days=delta_day)
).strftime("%Y%m%d")
def timestamp(self, delta_day=0): def timestamp(self, delta_day=0):
"""R """R
""" """
return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp() return (self._current_train_time + datetime.timedelta(days=delta_day)
).timestamp()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册