提交 be9bb50e 编写于 作者: C chengmo

merge multi phase

......@@ -14,7 +14,6 @@
import os
import sys
import yaml
from paddlerec.core.utils import envs
trainer_abs = os.path.join(
......@@ -61,7 +60,6 @@ class TrainerFactory(object):
def create(config):
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
trainer = TrainerFactory._build_trainer(config)
return trainer
......
......@@ -35,7 +35,6 @@ class ModelBase(object):
self._data_loader = None
self._infer_data_loader = None
self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform()
self._init_hyper_parameters()
self._env = config
......@@ -50,7 +49,7 @@ class ModelBase(object):
self._slot_inited = True
dataset = {}
model_dict = {}
for i in self._env["executor"]:
for i in self._env["phase"]:
if i["name"] == kargs["name"]:
model_dict = i
break
......@@ -89,7 +88,7 @@ class ModelBase(object):
self._data_var.append(l)
self._sparse_data_var.append(l)
dataset_class = dataset["type"]
dataset_class = envs.get_global_env(name + "type")
if dataset_class == "DataLoader":
self._init_dataloader()
......@@ -139,8 +138,7 @@ class ModelBase(object):
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001,
self._namespace)
reg = envs.get_global_env("hyper_parameters.reg", 0.0001)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
......@@ -206,31 +204,8 @@ class ModelBase(object):
def net(self, is_infer=False):
return None
def _construct_reader(self, is_infer=False):
if is_infer:
self._infer_data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._infer_data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
else:
dataset_class = envs.get_global_env("dataset_class", None,
"train.reader")
if dataset_class == "DataLoader":
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
def train_net(self):
input_data = self.input_data(is_infer=False)
self._data_var = input_data
self._construct_reader(is_infer=False)
self.net(input_data, is_infer=False)
pass
def infer_net(self):
input_data = self.input_data(is_infer=True)
self._infer_data_var = input_data
self._construct_reader(is_infer=True)
self.net(input_data, is_infer=True)
pass
......@@ -17,7 +17,6 @@ import abc
import os
from functools import reduce
import paddle.fluid.incubate.data_generator as dg
import yaml
from paddlerec.core.utils import envs
......@@ -28,7 +27,6 @@ class ReaderBase(dg.MultiSlotDataGenerator):
dg.MultiSlotDataGenerator.__init__(self)
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod
def init(self):
......@@ -47,7 +45,6 @@ class SlotReader(dg.MultiSlotDataGenerator):
dg.MultiSlotDataGenerator.__init__(self)
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
def init(self, sparse_slots, dense_slots, padding=0):
from operator import mul
......
......@@ -16,7 +16,6 @@ import abc
import os
import time
import sys
import yaml
import traceback
from paddle import fluid
......@@ -64,19 +63,31 @@ class Trainer(object):
self.increment_models = []
self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False}
self._config_yaml = config
self._context["config_yaml"] = self._config_yaml
self._context["config_yaml"] = config
self._config = envs.load_yaml(config)
self._context["env"] = self._config
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
self._runner_name = envs.get_runtime_environ("mode")
self._context["runner_name"] = self._runner_name
phase_names = envs.get_global_env(
"runner." + self._runner_name + ".phases", None)
_config = envs.load_yaml(config)
self._context["env"] = _config
self._context["dataset"] = _config.get("dataset")
phases = []
if phase_names is None:
phases = _config.get("phase")
else:
for phase in _config.get("phase"):
if phase["name"] in phase_names:
phases.append(phase)
self._context["phases"] = phases
print("PaddleRec: Runner {} Begin".format(self._runner_name))
self.which_engine()
self.which_device()
......@@ -89,19 +100,21 @@ class Trainer(object):
"""
device = envs.get_global_env(
"runner." + self._runner_name + ".device", default_value="CPU")
if device.upper() == 'GPU':
device = device.upper()
if device == 'GPU':
self.check_gpu()
self.device = Device.GPU
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
self._place = fluid.CUDAPlace(gpu_id)
self._exe = fluid.Executor(self._place)
elif device.upper() == "CPU":
elif device == "CPU":
self.device = Device.CPU
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
else:
raise ValueError("Not Support device {}".format(device))
self._context["device"] = device.upper()
self._context["device"] = device
self._context["exe"] = self._exe
self._context["place"] = self._place
......@@ -119,7 +132,6 @@ class Trainer(object):
try:
if not fluid.is_compiled_with_cuda():
raise RuntimeError(err)
sys.exit(1)
except Exception as e:
pass
......@@ -240,15 +252,3 @@ class Trainer(object):
sys.stdout.flush()
self.handle_processor_exception(self._context, err)
sys.exit(type(err).__name__)
def user_define_engine(engine_yaml):
_config = envs.load_yaml(engine_yaml)
envs.set_runtime_environs(_config)
train_location = envs.get_global_env("engine.file")
train_dirname = os.path.dirname(train_location)
base_name = os.path.splitext(os.path.basename(train_location))[0]
sys.path.append(train_dirname)
trainer_class = envs.lazy_instance_by_fliename(base_name,
"UserDefineTraining")
return trainer_class
......@@ -126,7 +126,7 @@ class QueueDataset(DatasetBase):
file_list = context["fleet"].split_files(file_list)
dataset.set_filelist(file_list)
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
if model_dict["dataset_name"] == dataset_name:
model = context["model"][model_dict["name"]]["model"]
thread_num = int(model_dict["thread_num"])
......
......@@ -48,7 +48,7 @@ class SingleNetwork(NetworkBase):
def build_network(self, context):
context["model"] = {}
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
context["model"][model_dict["name"]] = {}
train_program = fluid.Program()
startup_program = fluid.Program()
......@@ -58,9 +58,8 @@ class SingleNetwork(NetworkBase):
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
......@@ -98,7 +97,8 @@ class SingleNetwork(NetworkBase):
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] + ".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
......@@ -123,8 +123,8 @@ class PSNetwork(NetworkBase):
context["model"][model_dict["name"]] = {}
dataset_name = model_dict["dataset_name"]
model_path = model_dict["model"].replace(
"{workspace}", envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"])
model._data_var = model.input_data(
......@@ -156,7 +156,9 @@ class PSNetwork(NetworkBase):
context["fleet"].init_worker()
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] +
".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
......@@ -216,10 +218,8 @@ class PslibNetwork(NetworkBase):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
context["model"][model_dict["name"]] = {}
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
model._data_var = model.input_data(
......@@ -251,7 +251,9 @@ class PslibNetwork(NetworkBase):
else:
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] +
".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
......@@ -284,9 +286,9 @@ class CollectiveNetwork(NetworkBase):
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model_path = envs.os_path_adapter(
envs.workspace_adapter(model_dict["model"]))
model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"])
model._data_var = model.input_data(
......@@ -315,7 +317,8 @@ class CollectiveNetwork(NetworkBase):
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
type = envs.get_global_env("dataset." + dataset["name"] + ".type")
if type != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
......
......@@ -40,6 +40,7 @@ class RunnerBase(object):
def _run(self, context, model_dict):
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict, context)
else:
......@@ -154,7 +155,7 @@ class RunnerBase(object):
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
else:
raise ValueError(
"Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]."
"Unsupported config. gradient_scale_strategy must be one of [0, 1, 2]."
)
_build_strategy.gradient_scale_strategy = gradient_scale_strategy
......@@ -283,7 +284,7 @@ class SingleRunner(RunnerBase):
envs.get_global_env("runner." + context["runner_name"] +
".epochs"))
for epoch in range(epochs):
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
begin_time = time.time()
self._run(context, model_dict)
end_time = time.time()
......@@ -382,7 +383,7 @@ class PslibRunner(RunnerBase):
day = begin_day + datetime.timedelta(days=day, hours=hour)
day_s = day.strftime('%Y%m%d/%H')
for dataset in context["env"]["dataset"]:
for dataset in envs.get_global_env("dataset"):
if dataset["type"] != "DataLoader":
name = dataset["name"]
train_data_path = envs.get_global_env(name +
......
......@@ -54,7 +54,7 @@ class SingleStartup(StartupBase):
pass
def startup(self, context):
for model_dict in context["env"]["phase"]:
for model_dict in context["phases"]:
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
......
......@@ -17,10 +17,7 @@ General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + P
from __future__ import print_function
import os
import time
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer, EngineMode, FleetMode, Device
from paddlerec.core.trainers.framework.dataset import *
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import json
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleInfer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._infer_data_var
dataset.set_use_var(inputs)
break
return dataset
def _get_dataloader(self, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env(
"hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._infer_data_var = model.input_data(
is_infer=True,
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=True)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._infer_data_var, True)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(
envs.get_global_env("runner." + self._runner_name + ".epochs", 1))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.infer_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
fetch_vars = []
fetch_alias = []
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics_format.append("{}: {{}}".format("batch"))
metrics_indexes = dict()
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_indexes[var.name] = len(metrics_varnames) - 1
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._model[model_name][3]._data_loader
reader.start()
batch_id = 0
scope = self._model[model_name][2]
infer_results = []
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames,
return_numpy=False)
metrics = [batch_id]
metrics.extend(metrics_rets)
batch_infer_result = {}
for k, v in metrics_indexes.items():
batch_infer_result[k] = np.array(metrics_rets[
v]).tolist()
infer_results.append(batch_infer_result)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
def load(self, is_fleet=False):
name = "runner." + self._runner_name + "."
dirname = envs.get_global_env(name + "init_model_path", None)
if dirname is None or dirname == "":
return
print("single_infer going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
fluid.io.load_persistables(self._exe, dirname)
def save(self, epoch_id, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", None)
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", None)
if feed_varnames is None or fetch_varnames is None or feed_varnames == "":
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._data_var
dataset.set_use_var(inputs)
break
return dataset
def _get_dataloader(self, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=False)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._data_var, False)
optimizer = model.optimizer()
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(
envs.get_global_env("runner." + self._runner_name + ".epochs"))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
threads = model_dict.get("thread_num", 1)
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
thread=threads,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
_build_strategy = fluid.BuildStrategy()
_exe_strategy = fluid.ExecutionStrategy()
# 0: kCoeffNumDevice; 1: One; 2: Customized
_gradient_scale_strategy = model_dict.get("gradient_scale_strategy", 0)
if _gradient_scale_strategy == 0:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.CoeffNumDevice
elif _gradient_scale_strategy == 1:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.One
elif _gradient_scale_strategy == 2:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
else:
raise ValueError(
"Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]."
)
_build_strategy.gradient_scale_strategy = gradient_scale_strategy
if "thread_num" in model_dict and model_dict["thread_num"] > 1:
_build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
_exe_strategy.num_threads = model_dict["thread_num"]
os.environ['CPU_NUM'] = str(_exe_strategy.num_threads)
program = fluid.compiler.CompiledProgram(program).with_data_parallel(
loss_name=model_class.get_avg_cost().name,
build_strategy=_build_strategy,
exec_strategy=_exe_strategy)
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("batch"))
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._model[model_name][3]._data_loader
reader.start()
batch_id = 0
scope = self._model[model_name][2]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [batch_id]
metrics.extend(metrics_rets)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
def load(self, is_fleet=False):
dirname = envs.get_global_env(
"runner." + self._runner_name + ".init_model_path", None)
load_vars = envs.get_global_env(
"runner." + self._runner_name + ".load_vars", None)
def name_has_embedding(var):
res = False
for var_name in load_vars:
if var_name == var.name:
return True
return res
if dirname is None or dirname == "":
return
print("going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
if load_vars is None or len(load_vars) == 0:
fluid.io.load_persistables(self._exe, dirname)
else:
fluid.io.load_vars(
self._exe, dirname, predicate=name_has_embedding)
def save(self, epoch_id, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", [])
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", [])
if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \
len(feed_varnames) == 0 or len(fetch_varnames) == 0:
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
......@@ -20,9 +20,8 @@ import socket
import sys
import traceback
import yaml
global_envs = {}
global_envs_flatten = {}
def flatten_environs(envs, separator="."):
......@@ -92,6 +91,16 @@ def set_global_envs(envs):
fatten_env_namespace([], envs)
for name, value in global_envs.items():
if isinstance(value, str):
value = os_path_adapter(workspace_adapter(value))
global_envs[name] = value
if get_platform() != "LINUX":
for dataset in envs["dataset"]:
name = ".".join(["dataset", dataset["name"], "type"])
global_envs[name] = "DataLoader"
def get_global_env(env_name, default_value=None, namespace=None):
"""
......@@ -106,7 +115,7 @@ def get_global_envs():
return global_envs
def path_adapter(path):
def paddlerec_adapter(path):
if path.startswith("paddlerec."):
package = get_runtime_environ("PACKAGE_BASE")
l_p = path.split("paddlerec.")[1].replace(".", "/")
......@@ -115,24 +124,28 @@ def path_adapter(path):
return path
def windows_path_converter(path):
def os_path_adapter(value):
if get_platform() == "WINDOWS":
return path.replace("/", "\\")
value = value.replace("/", "\\")
else:
return path.replace("\\", "/")
value = value.replace("\\", "/")
return value
def update_workspace():
def workspace_adapter(value):
workspace = global_envs.get("workspace")
if not workspace:
workspace = paddlerec_adapter(workspace)
value = value.replace("{workspace}", workspace)
return value
def reader_adapter():
if get_platform() != "WINDOWS":
return
workspace = path_adapter(workspace)
for name, value in global_envs.items():
if isinstance(value, str):
value = value.replace("{workspace}", workspace)
value = windows_path_converter(value)
global_envs[name] = value
datasets = global_envs.get("dataset")
for dataset in datasets:
dataset["type"] = "DataLoader"
def pretty_print_envs(envs, header=None):
......
......@@ -208,7 +208,7 @@ CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www
稀疏参数输入的定义:
```python
def sparse_inputs():
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self._namespace)
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None)
sparse_input_ids = [
fluid.layers.data(name="S" + str(i),
......@@ -222,7 +222,7 @@ def sparse_inputs():
稠密参数输入的定义:
```python
def dense_input():
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self._namespace)
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None)
dense_input_var = fluid.layers.data(name="D",
shape=[dim],
......
......@@ -151,7 +151,6 @@ class Model(object):
self._data_loader = None
self._infer_data_loader = None
self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform()
def get_inputs(self):
......@@ -211,7 +210,6 @@ class Reader(dg.MultiSlotDataGenerator):
dg.MultiSlotDataGenerator.__init__(self)
_config = envs.load_yaml(config)
envs.set_global_envs(_config)
envs.update_workspace()
@abc.abstractmethod
def init(self):
......
......@@ -24,8 +24,7 @@ hyper_parameters:
```python
if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001,
self._namespace)
reg = envs.get_global_env("hyper_parameters.reg", 0.0001)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
......
......@@ -12,10 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import os
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
try:
......@@ -27,12 +23,7 @@ except ImportError:
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
......@@ -87,7 +78,6 @@ class Reader(dg.MultiSlotDataGenerator):
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
......
......@@ -51,7 +51,7 @@ hyper_parameters:
fc_sizes: [512, 256, 128, 32]
# select runner by name
mode: single_cpu_train
mode: [single_cpu_train, single_cpu_infer]
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
......@@ -69,6 +69,8 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
phases: [phase1]
- name: single_gpu_train
class: train
# num of epochs
......@@ -84,6 +86,7 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
- name: single_cpu_infer
class: infer
# num of epochs
......@@ -91,6 +94,8 @@ runner:
# device to run training or infer
device: cpu
init_model_path: "increment/0" # load model path
phases: [phase2]
- name: local_cluster_cpu_ps_train
class: local_cluster
epochs: 4
......@@ -103,6 +108,7 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 1
- name: multi_gpu_train
class: train
epochs: 4
......@@ -123,7 +129,8 @@ phase:
model: "{workspace}/model.py" # user-defined model
dataset_name: dataloader_train # select dataset by name
thread_num: 1
#- name: phase2
# model: "{workspace}/model.py" # user-defined model
# dataset_name: dataset_infer # select dataset by name
# thread_num: 1
- name: phase2
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_infer # select dataset by name
thread_num: 1
......@@ -12,10 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import os
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
......@@ -28,12 +24,7 @@ except ImportError:
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
......@@ -88,7 +79,6 @@ class Reader(dg.MultiSlotDataGenerator):
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
......
......@@ -12,10 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import os
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
try:
......@@ -27,12 +23,7 @@ except ImportError:
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
raise ValueError("reader config only support yaml")
_config = envs.load_yaml(config)
def init(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
......@@ -87,7 +78,6 @@ class Reader(dg.MultiSlotDataGenerator):
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
......
......@@ -18,11 +18,9 @@ import sys
import argparse
import tempfile
import yaml
import copy
from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs
from paddlerec.core.utils import validation
from paddlerec.core.utils import util
from paddlerec.core.utils import validation
......@@ -96,35 +94,61 @@ def get_all_inters_from_yaml(file, filters):
return ret
def get_engine(args):
def get_modes(running_config):
if not isinstance(running_config, dict):
raise ValueError("get_modes arguments must be [dict]")
modes = running_config.get("mode")
if not modes:
raise ValueError("yaml mast have config: mode")
if isinstance(modes, str):
modes = [modes]
return modes
def get_engine(args, running_config, mode):
transpiler = get_transpiler()
with open(args.model, 'r') as rb:
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
engine = run_extras.get("train.engine", None)
if engine is None:
engine = run_extras.get("runner." + _envs["mode"] + ".class", None)
engine_class = ".".join(["runner", mode, "class"])
engine_device = ".".join(["runner", mode, "device"])
device_gpu_choices = ".".join(["runner", mode, "device", "selected_gpus"])
engine = running_config.get(engine_class, None)
if engine is None:
engine = "train"
raise ValueError("not find {} in yaml, please check".format(
mode, engine_class))
device = running_config.get(engine_device, None)
engine = engine.upper()
device = device.upper()
if device is None:
print("not find device be specified in yaml, set CPU as default")
device = "CPU"
if device == "GPU":
selected_gpus = running_config.get(device_gpu_choices, None)
if selected_gpus is None:
print(
"not find selected_gpus be specified in yaml, set `0` as default"
)
selected_gpus = ["0"]
else:
print("selected_gpus {} will be specified for running".format(
selected_gpus))
device = run_extras.get("runner." + _envs["mode"] + ".device", "CPU")
if device.upper() == "GPU":
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num > 1:
engine = "LOCAL_CLUSTER"
engine = engine.upper()
if engine not in engine_choices:
raise ValueError("runner.class can not be chosen in {}".format(
engine_choices))
print("engines: \n{}".format(engines))
raise ValueError("{} can not be chosen in {}".format(engine_class,
engine_choices))
run_engine = engines[transpiler].get(engine, None)
return run_engine
......@@ -146,12 +170,7 @@ def set_runtime_envs(cluster_envs, engine_yaml):
if cluster_envs is None:
cluster_envs = {}
engine_extras = get_inters_from_yaml(engine_yaml, "train.trainer.")
if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs:
cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"]
envs.set_runtime_environs(cluster_envs)
envs.set_runtime_environs(engine_extras)
need_print = {}
for k, v in os.environ.items():
......@@ -162,29 +181,31 @@ def set_runtime_envs(cluster_envs, engine_yaml):
def single_train_engine(args):
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
trainer_class = run_extras.get(
"runner." + _envs["mode"] + ".trainer_class", None)
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
executor_mode = "train"
if trainer_class:
trainer = trainer_class
else:
trainer = "GeneralTrainer"
single_envs = {}
executor_mode = "train"
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
"ps")
device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
selected_gpus_num = len(selected_gpus.split(","))
if device.upper() == "GPU":
assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num != 1:
raise ValueError(
"Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
)
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs = {}
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs["train.trainer.trainer"] = trainer
single_envs["fleet_mode"] = fleet_mode
single_envs["train.trainer.executor_mode"] = executor_mode
......@@ -199,29 +220,32 @@ def single_train_engine(args):
def single_infer_engine(args):
_envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
trainer_class = run_extras.get(
"runner." + _envs["mode"] + ".trainer_class", None)
if trainer_class:
trainer = trainer_class
else:
trainer = "GeneralTrainer"
run_extras = get_all_inters_from_yaml(args.model, ["runner."])
mode = envs.get_runtime_environ("mode")
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
executor_mode = "infer"
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
"ps")
device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
selected_gpus_num = len(selected_gpus.split(","))
single_envs = {}
if device.upper() == "GPU":
assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num != 1:
raise ValueError(
"Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
)
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs = {}
single_envs["selected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.executor_mode"] = executor_mode
single_envs["fleet_mode"] = fleet_mode
......@@ -235,18 +259,6 @@ def single_infer_engine(args):
def cluster_engine(args):
def update_workspace(cluster_envs):
workspace = cluster_envs.get("engine_workspace", None)
if not workspace:
return
path = envs.path_adapter(workspace)
for name, value in cluster_envs.items():
if isinstance(value, str):
value = value.replace("{workspace}", path)
value = envs.windows_path_converter(value)
cluster_envs[name] = value
def master():
role = "MASTER"
from paddlerec.core.engine.cluster.cluster import ClusterEngine
......@@ -255,8 +267,6 @@ def cluster_engine(args):
flattens["engine_role"] = role
flattens["engine_run_config"] = args.model
flattens["engine_temp_path"] = tempfile.mkdtemp()
update_workspace(flattens)
envs.set_runtime_environs(flattens)
print(envs.pretty_print_envs(flattens, ("Submit Envs", "Value")))
......@@ -424,7 +434,7 @@ def local_mpi_engine(args):
def get_abs_model(model):
if model.startswith("paddlerec."):
dir = envs.path_adapter(model)
dir = envs.paddlerec_adapter(model)
path = os.path.join(dir, "config.yaml")
else:
if not os.path.isfile(model):
......@@ -442,11 +452,17 @@ if __name__ == "__main__":
envs.set_runtime_environs({"PACKAGE_BASE": abs_dir})
args = parser.parse_args()
model_name = args.model.split('.')[-1]
args.model = get_abs_model(args.model)
if not validation.yaml_validation(args.model):
sys.exit(-1)
engine_registry()
which_engine = get_engine(args)
engine = which_engine(args)
engine.run()
running_config = get_all_inters_from_yaml(args.model, ["mode", "runner."])
modes = get_modes(running_config)
for mode in modes:
envs.set_runtime_environs({"mode": mode})
which_engine = get_engine(args, running_config, mode)
engine = which_engine(args)
engine.run()
......@@ -34,27 +34,28 @@ about["__url__"] = "https://github.com/PaddlePaddle/PaddleRec"
readme = ""
def run_cmd(command):
assert command is not None and isinstance(command, str)
return os.popen(command).read().strip()
def build(dirname):
package_dir = os.path.dirname(os.path.abspath(__file__))
run_cmd("cp -r {}/* {}".format(package_dir, dirname))
run_cmd("mkdir {}".format(os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec")))
run_cmd("mv {} {}".format(
os.path.join(dirname, "*.py"), os.path.join(dirname, "paddlerec")))
shutil.copytree(
package_dir, dirname, ignore=shutil.ignore_patterns(".git"))
os.mkdir(os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "core"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "doc"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "models"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "tests"), os.path.join(dirname, "paddlerec"))
shutil.move(
os.path.join(dirname, "tools"), os.path.join(dirname, "paddlerec"))
for f in os.listdir(dirname):
if os.path.isdir(f):
continue
if os.path.splitext(f)[1] == ".py":
shutil.move(
os.path.join(dirname, f), os.path.join(dirname, "paddlerec"))
packages = find_packages(dirname, include=('paddlerec.*'))
package_dir = {'': dirname}
......@@ -90,7 +91,7 @@ def build(dirname):
zip_safe=False)
dirname = tempfile.mkdtemp()
dirname = tempfile.mktemp()
build(dirname)
shutil.rmtree(dirname)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册