未验证 提交 9adaacbb 编写于 作者: W wuzhihua 提交者: GitHub

Merge pull request #72 from MrChengmo/fix_ci

fix ci & support multi runner
......@@ -14,7 +14,6 @@
import os
import sys
import yaml
from paddlerec.core.utils import envs
trainer_abs = os.path.join(
......@@ -61,7 +60,6 @@ class TrainerFactory(object):
def create(config):
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
trainer = TrainerFactory._build_trainer(config)
return trainer
......
......@@ -35,7 +35,6 @@ class ModelBase(object):
self._data_loader = None
self._infer_data_loader = None
self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform()
self._init_hyper_parameters()
self._env = config
......@@ -50,7 +49,7 @@ class ModelBase(object):
self._slot_inited = True
dataset = {}
model_dict = {}
for i in self._env["executor"]:
for i in self._env["phase"]:
if i["name"] == kargs["name"]:
model_dict = i
break
......@@ -89,7 +88,7 @@ class ModelBase(object):
self._data_var.append(l)
self._sparse_data_var.append(l)
dataset_class = dataset["type"]
dataset_class = envs.get_global_env(name + "type")
if dataset_class == "DataLoader":
self._init_dataloader()
......@@ -139,10 +138,7 @@ class ModelBase(object):
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001,
self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
optimizer_i = fluid.optimizer.SGD(lr)
elif name == "ADAM":
optimizer_i = fluid.optimizer.Adam(lr, lazy_mode=True)
elif name == "ADAGRAD":
......@@ -206,31 +202,8 @@ class ModelBase(object):
def net(self, is_infer=False):
return None
def _construct_reader(self, is_infer=False):
if is_infer:
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
else:
dataset_class = envs.get_global_env("dataset_class", None,
"train.reader")
if dataset_class == "DataLoader":
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
def train_net(self):
input_data = self.input_data(is_infer=False)
self._data_var = input_data
self._construct_reader(is_infer=False)
self.net(input_data, is_infer=False)
pass
def infer_net(self):
input_data = self.input_data(is_infer=True)
self._infer_data_var = input_data
self._construct_reader(is_infer=True)
self.net(input_data, is_infer=True)
pass
......@@ -17,7 +17,6 @@ import abc
import os
from functools import reduce
import paddle.fluid.incubate.data_generator as dg
import yaml
from paddlerec.core.utils import envs
......@@ -28,7 +27,6 @@ class ReaderBase(dg.MultiSlotDataGenerator):
dg.MultiSlotDataGenerator.__init__(self)
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod
def init(self):
......@@ -47,7 +45,6 @@ class SlotReader(dg.MultiSlotDataGenerator):
dg.MultiSlotDataGenerator.__init__(self)
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul
......
......@@ -16,7 +16,6 @@ import abc
import os
import time
import sys
import yaml
import traceback
from paddle import fluid
......@@ -64,19 +63,31 @@ class Trainer(object):
self.increment_models = []
self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False}
self._config_yaml = config
self._context["config_yaml"] = self._config_yaml
self._context["config_yaml"] = config
self._config = envs.load_yaml(config)
self._context["env"] = self._config
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
self._runner_name = envs.get_runtime_environ("mode")
self._context["runner_name"] = self._runner_name
phase_names = envs.get_global_env(
"runner." + self._runner_name + ".phases", None)
_config = envs.load_yaml(config)
self._context["env"] = _config
self._context["dataset"] = _config.get("dataset")
phases = []
if phase_names is None:
phases = _config.get("phase")
else:
for phase in _config.get("phase"):
if phase["name"] in phase_names:
phases.append(phase)
self._context["phases"] = phases
print("PaddleRec: Runner {} Begin".format(self._runner_name))
self.which_engine()
self.which_device()
......@@ -89,19 +100,21 @@ class Trainer(object):
"""
device = envs.get_global_env(
"runner." + self._runner_name + ".device", default_value="CPU")
if device.upper() == 'GPU':
device = device.upper()
if device == 'GPU':
self.check_gpu()
self.device = Device.GPU
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
self._place = fluid.CUDAPlace(gpu_id)
self._exe = fluid.Executor(self._place)
elif device.upper() == "CPU":
elif device == "CPU":
self.device = Device.CPU
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
else:
raise ValueError("Not Support device {}".format(device))
self._context["device"] = device.upper()
self._context["device"] = device
self._context["exe"] = self._exe
self._context["place"] = self._place
......@@ -119,7 +132,6 @@ class Trainer(object):
try:
if not fluid.is_compiled_with_cuda():
raise RuntimeError(err)
sys.exit(1)
except Exception as e:
pass
......@@ -191,17 +203,10 @@ class Trainer(object):
None : run a processor for this status
"""
status = context['status']
try:
if status in self._status_processor:
self._status_processor[context['status']](context)
else:
self.other_status_processor(context)
except Exception as err:
traceback.print_exc()
print('Catch Exception:%s' % str(err))
sys.stdout.flush()
self._context['is_exit'] = self.handle_processor_exception(
status, context, err)
if status in self._status_processor:
self._status_processor[context['status']](context)
else:
self.other_status_processor(context)
def other_status_processor(self, context):
"""
......@@ -212,14 +217,17 @@ class Trainer(object):
print('unknow context_status:%s, do nothing' % context['status'])
time.sleep(60)
def handle_processor_exception(self, status, context, exception):
def handle_processor_exception(self, context, exception):
"""
when exception throwed from processor, will call this func to handle it
Return:
bool exit_app or not
"""
print('Exit app. catch exception in precoss status:%s, except:%s' %
(context['status'], str(exception)))
print("\n--------------------------------\nPaddleRec Error Message "
"Summary:\n--------------------------------\n")
print(
'Exit PaddleRec. catch exception in precoss status: [%s], except: %s'
% (context['status'], str(exception)))
return True
def reload_train_context(self):
......@@ -233,19 +241,14 @@ class Trainer(object):
keep running by statu context.
"""
while True:
self.reload_train_context()
self.context_process(self._context)
if self._context['is_exit']:
break
def user_define_engine(engine_yaml):
_config = envs.load_yaml(engine_yaml)
envs.set_runtime_environs(_config)
train_location = envs.get_global_env("engine.file")
train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename(base_name,
"UserDefineTraining")
return trainer_class
try:
self.reload_train_context()
self.context_process(self._context)
if self._context['is_exit']:
break
except Exception as err:
traceback.print_exc()
print('Catch Exception:%s' % str(err))
sys.stdout.flush()
self.handle_processor_exception(self._context, err)
sys.exit(type(err).__name__)
......@@ -126,7 +126,7 @@ class QueueDataset(DatasetBase):
file_list = context["fleet"].split_files(file_list)
dataset.set_filelist(file_list)
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
if model_dict["dataset_name"] == dataset_name:
model = context["model"][model_dict["name"]]["model"]
thread_num = int(model_dict["thread_num"])
......
......@@ -48,7 +48,7 @@ class SingleNetwork(NetworkBase):
def build_network(self, context):
context["model"] = {}
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
context["model"][model_dict["name"]] = {}
train_program = fluid.Program()
startup_program = fluid.Program()
......@@ -58,9 +58,8 @@ class SingleNetwork(NetworkBase):
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
......@@ -98,7 +97,8 @@ class SingleNetwork(NetworkBase):
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] + ".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
......@@ -123,8 +123,8 @@ class PSNetwork(NetworkBase):
context["model"][model_dict["name"]] = {}
dataset_name = model_dict["dataset_name"]
model_path = model_dict["model"].replace(
"{workspace}", envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"])
model._data_var = model.input_data(
......@@ -156,7 +156,9 @@ class PSNetwork(NetworkBase):
context["fleet"].init_worker()
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] +
".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
......@@ -216,10 +218,8 @@ class PslibNetwork(NetworkBase):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
context["model"][model_dict["name"]] = {}
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
model._data_var = model.input_data(
......@@ -251,7 +251,9 @@ class PslibNetwork(NetworkBase):
else:
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] +
".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
......@@ -284,9 +286,9 @@ class CollectiveNetwork(NetworkBase):
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"])
model._data_var = model.input_data(
......@@ -315,7 +317,8 @@ class CollectiveNetwork(NetworkBase):
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] + ".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
......
......@@ -40,6 +40,7 @@ class RunnerBase(object):
def _run(self, context, model_dict):
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict, context)
else:
......@@ -156,7 +157,7 @@ class RunnerBase(object):
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
else:
raise ValueError(
"Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]."
"Unsupported config. gradient_scale_strategy must be one of [0, 1, 2]."
)
_build_strategy.gradient_scale_strategy = gradient_scale_strategy
......@@ -285,7 +286,7 @@ class SingleRunner(RunnerBase):
envs.get_global_env("runner." + context["runner_name"] +
".epochs"))
for epoch in range(epochs):
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
begin_time = time.time()
self._run(context, model_dict)
end_time = time.time()
......@@ -384,7 +385,7 @@ class PslibRunner(RunnerBase):
day = begin_day + datetime.timedelta(days=day, hours=hour)
day_s = day.strftime('%Y%m%d/%H')
for dataset in context["env"]["dataset"]:
for dataset in envs.get_global_env("dataset"):
if dataset["type"] != "DataLoader":
name = dataset["name"]
train_data_path = envs.get_global_env(name +
......
......@@ -54,7 +54,7 @@ class SingleStartup(StartupBase):
pass
def startup(self, context):
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
......
......@@ -17,10 +17,7 @@ General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + P
from __future__ import print_function
import os
import time
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer, EngineMode, FleetMode, Device
from paddlerec.core.trainers.framework.dataset import *
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import json
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleInfer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._infer_data_var
dataset.set_use_var(inputs)
break
return dataset
def _get_dataloader(self, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env(
"hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._infer_data_var = model.input_data(
is_infer=True,
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=True)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._infer_data_var, True)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(
envs.get_global_env("runner." + self._runner_name + ".epochs", 1))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.infer_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
fetch_vars = []
fetch_alias = []
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics_format.append("{}: {{}}".format("batch"))
metrics_indexes = dict()
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_indexes[var.name] = len(metrics_varnames) - 1
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._model[model_name][3]._data_loader
reader.start()
batch_id = 0
scope = self._model[model_name][2]
infer_results = []
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames,
return_numpy=False)
metrics = [batch_id]
metrics.extend(metrics_rets)
batch_infer_result = {}
for k, v in metrics_indexes.items():
batch_infer_result[k] = np.array(metrics_rets[
v]).tolist()
infer_results.append(batch_infer_result)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
def load(self, is_fleet=False):
name = "runner." + self._runner_name + "."
dirname = envs.get_global_env(name + "init_model_path", None)
if dirname is None or dirname == "":
return
print("single_infer going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
fluid.io.load_persistables(self._exe, dirname)
def save(self, epoch_id, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", None)
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", None)
if feed_varnames is None or fetch_varnames is None or feed_varnames == "":
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._data_var
dataset.set_use_var(inputs)
break
return dataset
def _get_dataloader(self, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=False)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._data_var, False)
optimizer = model.optimizer()
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(
envs.get_global_env("runner." + self._runner_name + ".epochs"))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
threads = model_dict.get("thread_num", 1)
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
thread=threads,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
_build_strategy = fluid.BuildStrategy()
_exe_strategy = fluid.ExecutionStrategy()
# 0: kCoeffNumDevice; 1: One; 2: Customized
_gradient_scale_strategy = model_dict.get("gradient_scale_strategy", 0)
if _gradient_scale_strategy == 0:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.CoeffNumDevice
elif _gradient_scale_strategy == 1:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.One
elif _gradient_scale_strategy == 2:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
else:
raise ValueError(
"Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]."
)
_build_strategy.gradient_scale_strategy = gradient_scale_strategy
if "thread_num" in model_dict and model_dict["thread_num"] > 1:
_build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
_exe_strategy.num_threads = model_dict["thread_num"]
os.environ['CPU_NUM'] = str(_exe_strategy.num_threads)
program = fluid.compiler.CompiledProgram(program).with_data_parallel(
loss_name=model_class.get_avg_cost().name,
build_strategy=_build_strategy,
exec_strategy=_exe_strategy)
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("batch"))
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._model[model_name][3]._data_loader
reader.start()
batch_id = 0
scope = self._model[model_name][2]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [batch_id]
metrics.extend(metrics_rets)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
def load(self, is_fleet=False):
dirname = envs.get_global_env(
"runner." + self._runner_name + ".init_model_path", None)
load_vars = envs.get_global_env(
"runner." + self._runner_name + ".load_vars", None)
def name_has_embedding(var):
res = False
for var_name in load_vars:
if var_name == var.name:
return True
return res
if dirname is None or dirname == "":
return
print("going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
if load_vars is None or len(load_vars) == 0:
fluid.io.load_persistables(self._exe, dirname)
else:
fluid.io.load_vars(
self._exe, dirname, predicate=name_has_embedding)
def save(self, epoch_id, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", [])
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", [])
if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \
len(feed_varnames) == 0 or len(fetch_varnames) == 0:
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
......@@ -20,9 +20,8 @@ import socket
import sys
import traceback
import yaml
global_envs = {}
global_envs_flatten = {}
def flatten_environs(envs, separator="."):
......@@ -92,6 +91,16 @@ def set_global_envs(envs):
fatten_env_namespace([], envs)
for name, value in global_envs.items():
if isinstance(value, str):
value = os_path_adapter(workspace_adapter(value))
global_envs[name] = value
if get_platform() != "LINUX":
for dataset in envs["dataset"]:
name = ".".join(["dataset", dataset["name"], "type"])
global_envs[name] = "DataLoader"
def get_global_env(env_name, default_value=None, namespace=None):
"""
......@@ -106,7 +115,7 @@ def get_global_envs():
return global_envs
def path_adapter(path):
def paddlerec_adapter(path):
if path.startswith("paddlerec."):
package = get_runtime_environ("PACKAGE_BASE")
l_p = path.split("paddlerec.")[1].replace(".", "/")
......@@ -115,24 +124,28 @@ def path_adapter(path):
return path
def windows_path_converter(path):
def os_path_adapter(value):
if get_platform() == "WINDOWS":
return path.replace("/", "\\")
value = value.replace("/", "\\")
else:
return path.replace("\\", "/")
value = value.replace("\\", "/")
return value
def update_workspace():
def workspace_adapter(value):
workspace = global_envs.get("workspace")
if not workspace:
workspace = paddlerec_adapter(workspace)
value = value.replace("{workspace}", workspace)
return value
def reader_adapter():
if get_platform() != "WINDOWS":
return
workspace = path_adapter(workspace)
for name, value in global_envs.items():
if isinstance(value, str):
value = value.replace("{workspace}", workspace)
value = windows_path_converter(value)
global_envs[name] = value
datasets = global_envs.get("dataset")
for dataset in datasets:
dataset["type"] = "DataLoader"
def pretty_print_envs(envs, header=None):
......
......@@ -208,7 +208,7 @@ CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www
稀疏参数输入的定义:
```python
def sparse_inputs():
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self._namespace)
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None)
sparse_input_ids = [
fluid.layers.data(name="S" + str(i),
......@@ -222,7 +222,7 @@ def sparse_inputs():
稠密参数输入的定义:
```python
def dense_input():
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self._namespace)
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None)
dense_input_var = fluid.layers.data(name="D",
shape=[dim],
......
......@@ -111,7 +111,7 @@ Engine的自定义实现,可以参考[local_cluster.py](../core/engine/local_c
我们以GeneralTrainer为例,概览Trainer行为:
```python
class SingleTrainer(TranspileTrainer):
class GeneralTrainer(Trainer):
def processor_register(self):
print("processor_register begin")
self.regist_context_processor('uninit', self.instance)
......@@ -151,7 +151,6 @@ class Model(object):
self._data_loader = None
self._infer_data_loader = None
self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform()
def get_inputs(self):
......@@ -211,7 +210,6 @@ class Reader(dg.MultiSlotDataGenerator):
dg.MultiSlotDataGenerator.__init__(self)
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod
def init(self):
......@@ -231,9 +229,6 @@ class Reader(dg.MultiSlotDataGenerator):
完成reader的构建工作。
Reader数据处理的逻辑,可以参考[criteo_reader.py](../../models/rank/../../paddlerec/models/rank/criteo_reader.py)
## Metric
......
......@@ -24,8 +24,7 @@ hyper_parameters:
```python
if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001,
self._namespace)
reg = envs.get_global_env("hyper_parameters.reg", 0.0001)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
......
......@@ -13,6 +13,7 @@ runner:
device: cpu # 执行在 cpu 上
init_model_path: "init_model" # 指定初始化模型的地址
print_interval: 10 # 预测信息的打印间隔,以batch为单位
phases: phase_infer
```
再定义具体的执行内容:
......
......@@ -65,7 +65,7 @@ python -m paddlerec.run -m paddlerec.models.recall.word2vec
- **`runner`** : runner是训练的引擎,亦可称之为运行器,在runner中定义执行设备(cpu、gpu),执行的模式(训练、预测、单机、多机等),以及运行的超参,例如训练轮数,模型保存地址等。
- **`phase`** : phase是训练中的阶段的概念,是引擎具体执行的内容,该内容是指:具体运行哪个模型文件,使用哪个reader。
PaddleRec每次运行时,会执行一个运行器,通过`mode`指定`runner`的名字。每个运行器可以执行多个`phase`,所以PaddleRec支持一键启动多阶段的训练。
PaddleRec每次运行时,会执行一个或多个运行器,通过`mode`指定`runner`的名字。每个运行器可以执行一个或多个`phase`,所以PaddleRec支持一键启动多阶段的训练。
### 单机CPU训练
......@@ -73,6 +73,7 @@ PaddleRec每次运行时,会执行一个运行器,通过`mode`指定`runner`
```yaml
mode: single_cpu_train # 执行名为 single_cpu_train 的运行器
# mode 也支持多个runner的执行,此处可以改为 mode: [single_cpu_train, single_cpu_infer]
runner:
- name: single_cpu_train # 定义 runner 名为 single_cpu_train
......@@ -88,6 +89,8 @@ runner:
save_inference_fetch_varnames: [] # inference model 的fetch参数的名字
init_model_path: "" # 如果是加载模型热启,则可以指定初始化模型的地址
print_interval: 10 # 训练信息的打印间隔,以batch为单位
phases: [phase_train] # 若没有指定phases,则会默认运行所有phase
# phase 也支持自定多个phase的执行,此处可以改为 phases: [phase_train, phase_infer]
```
再定义具体的执行内容:
......
......@@ -12,10 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import os
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
try:
......@@ -27,12 +23,7 @@ except ImportError:
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
......
......@@ -51,7 +51,7 @@ hyper_parameters:
fc_sizes: [512, 256, 128, 32]
# select runner by name
mode: single_cpu_train
mode: [single_cpu_train, single_cpu_infer]
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
......@@ -69,21 +69,8 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
- name: single_gpu_train
class: train
# num of epochs
epochs: 4
# device to run training or infer
device: gpu
selected_gpus: "2"
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
phases: [phase1]
- name: single_cpu_infer
class: infer
# num of epochs
......@@ -91,31 +78,7 @@ runner:
# device to run training or infer
device: cpu
init_model_path: "increment/0" # load model path
- name: local_cluster_cpu_ps_train
class: local_cluster
epochs: 4
device: cpu
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 1
- name: multi_gpu_train
class: train
epochs: 4
device: gpu
selected_gpus: "2,3"
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
phases: [phase2]
# runner will run all the phase in each epoch
phase:
......@@ -123,7 +86,8 @@ phase:
model: "{workspace}/model.py" # user-defined model
dataset_name: dataloader_train # select dataset by name
thread_num: 1
#- name: phase2
# model: "{workspace}/model.py" # user-defined model
# dataset_name: dataset_infer # select dataset by name
# thread_num: 1
- name: phase2
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_infer # select dataset by name
thread_num: 1
......@@ -12,10 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import os
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
......@@ -28,12 +24,7 @@ except ImportError:
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
......
......@@ -12,10 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import os
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
try:
......@@ -27,12 +23,7 @@ except ImportError:
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
......
......@@ -18,11 +18,9 @@ import sys
import argparse
import tempfile
import yaml
import copy
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
from paddlerec.core.utils import validation
from paddlerec.core.utils import util
from paddlerec.core.utils import validation
......@@ -96,35 +94,61 @@ def get_all_inters_from_yaml(file, filters):
return ret
def get_engine(args):
def get_modes(running_config):
if not isinstance(running_config, dict):
raise ValueError("get_modes arguments must be [dict]")
modes = running_config.get("mode")
if not modes:
raise ValueError("yaml mast have config: mode")
if isinstance(modes, str):
modes = [modes]
return modes
def get_engine(args, running_config, mode):
transpiler = get_transpiler()
with open(args.model, 'r') as rb:
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
engine = run_extras.get("train.engine", None)
if engine is None:
engine = run_extras.get("runner." + _envs["mode"] + ".class", None)
engine_class = ".".join(["runner", mode, "class"])
engine_device = ".".join(["runner", mode, "device"])
device_gpu_choices = ".".join(["runner", mode, "device", "selected_gpus"])
engine = running_config.get(engine_class, None)
if engine is None:
engine = "train"
raise ValueError("not find {} in yaml, please check".format(
mode, engine_class))
device = running_config.get(engine_device, None)
engine = engine.upper()
device = device.upper()
if device is None:
print("not find device be specified in yaml, set CPU as default")
device = "CPU"
if device == "GPU":
selected_gpus = running_config.get(device_gpu_choices, None)
if selected_gpus is None:
print(
"not find selected_gpus be specified in yaml, set `0` as default"
)
selected_gpus = ["0"]
else:
print("selected_gpus {} will be specified for running".format(
selected_gpus))
device = run_extras.get("runner." + _envs["mode"] + ".device", "CPU")
if device.upper() == "GPU":
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num > 1:
engine = "LOCAL_CLUSTER"
engine = engine.upper()
if engine not in engine_choices:
raise ValueError("runner.class can not be chosen in {}".format(
engine_choices))
print("engines: \n{}".format(engines))
raise ValueError("{} can not be chosen in {}".format(engine_class,
engine_choices))
run_engine = engines[transpiler].get(engine, None)
return run_engine
......@@ -146,12 +170,7 @@ def set_runtime_envs(cluster_envs, engine_yaml):
if cluster_envs is None:
cluster_envs = {}
engine_extras = get_inters_from_yaml(engine_yaml, "train.trainer.")
if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs:
cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"]
envs.set_runtime_environs(cluster_envs)
envs.set_runtime_environs(engine_extras)
need_print = {}
for k, v in os.environ.items():
......@@ -162,29 +181,31 @@ def set_runtime_envs(cluster_envs, engine_yaml):
def single_train_engine(args):
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
trainer_class = run_extras.get(
"runner." + _envs["mode"] + ".trainer_class", None)
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
executor_mode = "train"
if trainer_class:
trainer = trainer_class
else:
trainer = "GeneralTrainer"
single_envs = {}
executor_mode = "train"
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
"ps")
device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
selected_gpus_num = len(selected_gpus.split(","))
if device.upper() == "GPU":
assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num != 1:
raise ValueError(
"Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
)
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs = {}
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs["train.trainer.trainer"] = trainer
single_envs["fleet_mode"] = fleet_mode
single_envs["train.trainer.executor_mode"] = executor_mode
......@@ -199,29 +220,32 @@ def single_train_engine(args):
def single_infer_engine(args):
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
trainer_class = run_extras.get(
"runner." + _envs["mode"] + ".trainer_class", None)
if trainer_class:
trainer = trainer_class
else:
trainer = "GeneralTrainer"
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
executor_mode = "infer"
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
"ps")
device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
selected_gpus_num = len(selected_gpus.split(","))
single_envs = {}
if device.upper() == "GPU":
assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num != 1:
raise ValueError(
"Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
)
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs = {}
single_envs["selected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.executor_mode"] = executor_mode
single_envs["fleet_mode"] = fleet_mode
......@@ -235,18 +259,6 @@ def single_infer_engine(args):
def cluster_engine(args):
def update_workspace(cluster_envs):
workspace = cluster_envs.get("engine_workspace", None)
if not workspace:
return
path = envs.path_adapter(workspace)
for name, value in cluster_envs.items():
if isinstance(value, str):
value = value.replace("{workspace}", path)
value = envs.windows_path_converter(value)
cluster_envs[name] = value
def master():
role = "MASTER"
from paddlerec.core.engine.cluster.cluster import ClusterEngine
......@@ -255,8 +267,6 @@ def cluster_engine(args):
flattens["engine_role"] = role
flattens["engine_run_config"] = args.model
flattens["engine_temp_path"] = tempfile.mkdtemp()
update_workspace(flattens)
envs.set_runtime_environs(flattens)
print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value")))
......@@ -424,7 +434,7 @@ def local_mpi_engine(args):
def get_abs_model(model):
if model.startswith("paddlerec."):
dir = envs.path_adapter(model)
dir = envs.paddlerec_adapter(model)
path = os.path.join(dir, "config.yaml")
else:
if not os.path.isfile(model):
......@@ -442,11 +452,17 @@ if __name__ == "__main__":
envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})
args = parser.parse_args()
model_name = args.model.split('.')[-1]
args.model = get_abs_model(args.model)
if not validation.yaml_validation(args.model):
sys.exit(-1)
engine_registry()
which_engine = get_engine(args)
engine = which_engine(args)
engine.run()
running_config = get_all_inters_from_yaml(args.model, ["mode", "runner."])
modes = get_modes(running_config)
for mode in modes:
envs.set_runtime_environs({"mode": mode})
which_engine = get_engine(args, running_config, mode)
engine = which_engine(args)
engine.run()
......@@ -34,27 +34,28 @@ about["__url__"] = "https://github.com/PaddlePaddle/PaddleRec"
readme = ""
def run_cmd(command):
assert command is not None and isinstance(command, str)
return os.popen(command).read().strip()
def build(dirname):
package_dir = os.path.dirname(os.path.abspath(__file__))
run_cmd("cp -r {}/* {}".format(package_dir, dirname))
run_cmd("mkdir {}".format(os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "*.py"), os.path.join(dirname, "paddlerec")))
shutil.copytree(
package_dir, dirname, ignore=shutil.ignore_patterns(".git"))
os.mkdir(os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec"))
for f in os.listdir(dirname):
if os.path.isdir(f):
continue
if os.path.splitext(f)[1] == ".py":
shutil.move(
os.path.join(dirname, f), os.path.join(dirname, "paddlerec"))
packages = find_packages(dirname, include=('paddlerec.*'))
package_dir = {'': dirname}
......@@ -90,7 +91,7 @@ def build(dirname):
zip_safe=False)
dirname = tempfile.mkdtemp()
dirname = tempfile.mktemp()
build(dirname)
shutil.rmtree(dirname)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册