提交 0b2e925a 编写于 作者: T tangwei

add multi phase and multi runner

上级 38a6c047
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
import os import os
import sys import sys
import yaml
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
trainer_abs = os.path.join( trainer_abs = os.path.join(
......
...@@ -74,9 +74,20 @@ class Trainer(object): ...@@ -74,9 +74,20 @@ class Trainer(object):
self._dataset = {} self._dataset = {}
envs.set_global_envs(self._config) envs.set_global_envs(self._config)
envs.update_workspace() envs.update_workspace()
self._runner_name = envs.get_global_env("mode") self._runner_name = envs.get_runtime_environ("mode")
self._context["runner_name"] = self._runner_name self._context["runner_name"] = self._runner_name
phase_names = self._config.get(
"runner." + self._runner_name + ".phases", None)
phases = []
if phase_names is None:
phases = self._config.get("phase")
else:
for phase in self._config.get("phase"):
if phase["name"] in phase_names:
phases.append(phase)
self._context["phases"] = phases
print("PaddleRec: Runner {} Begin".format(self._runner_name)) print("PaddleRec: Runner {} Begin".format(self._runner_name))
self.which_engine() self.which_engine()
self.which_device() self.which_device()
......
...@@ -126,7 +126,7 @@ class QueueDataset(DatasetBase): ...@@ -126,7 +126,7 @@ class QueueDataset(DatasetBase):
file_list = context["fleet"].split_files(file_list) file_list = context["fleet"].split_files(file_list)
dataset.set_filelist(file_list) dataset.set_filelist(file_list)
for model_dict in context["env"]["phase"]: for model_dict in context["phases"]:
if model_dict["dataset_name"] == dataset_name: if model_dict["dataset_name"] == dataset_name:
model = context["model"][model_dict["name"]]["model"] model = context["model"][model_dict["name"]]["model"]
thread_num = int(model_dict["thread_num"]) thread_num = int(model_dict["thread_num"])
......
...@@ -48,7 +48,7 @@ class SingleNetwork(NetworkBase): ...@@ -48,7 +48,7 @@ class SingleNetwork(NetworkBase):
def build_network(self, context): def build_network(self, context):
context["model"] = {} context["model"] = {}
for model_dict in context["env"]["phase"]: for model_dict in context["phases"]:
context["model"][model_dict["name"]] = {} context["model"][model_dict["name"]] = {}
train_program = fluid.Program() train_program = fluid.Program()
startup_program = fluid.Program() startup_program = fluid.Program()
......
...@@ -283,7 +283,7 @@ class SingleRunner(RunnerBase): ...@@ -283,7 +283,7 @@ class SingleRunner(RunnerBase):
envs.get_global_env("runner." + context["runner_name"] + envs.get_global_env("runner." + context["runner_name"] +
".epochs")) ".epochs"))
for epoch in range(epochs): for epoch in range(epochs):
for model_dict in context["env"]["phase"]: for model_dict in context["phases"]:
begin_time = time.time() begin_time = time.time()
self._run(context, model_dict) self._run(context, model_dict)
end_time = time.time() end_time = time.time()
......
...@@ -54,7 +54,7 @@ class SingleStartup(StartupBase): ...@@ -54,7 +54,7 @@ class SingleStartup(StartupBase):
pass pass
def startup(self, context): def startup(self, context):
for model_dict in context["env"]["phase"]: for model_dict in context["phases"]:
with fluid.scope_guard(context["model"][model_dict["name"]][ with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]): "scope"]):
train_prog = context["model"][model_dict["name"]][ train_prog = context["model"][model_dict["name"]][
......
...@@ -17,10 +17,7 @@ General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + P ...@@ -17,10 +17,7 @@ General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + P
from __future__ import print_function from __future__ import print_function
import os import os
import time
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer, EngineMode, FleetMode, Device from paddlerec.core.trainer import Trainer, EngineMode, FleetMode, Device
from paddlerec.core.trainers.framework.dataset import * from paddlerec.core.trainers.framework.dataset import *
......
...@@ -51,7 +51,7 @@ hyper_parameters: ...@@ -51,7 +51,7 @@ hyper_parameters:
fc_sizes: [512, 256, 128, 32] fc_sizes: [512, 256, 128, 32]
# select runner by name # select runner by name
mode: single_cpu_train mode: [single_cpu_train, single_cpu_infer]
# config of each runner. # config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process. # runner is a kind of paddle training class, which wraps the train/infer process.
runner: runner:
...@@ -69,6 +69,8 @@ runner: ...@@ -69,6 +69,8 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path init_model_path: "" # load model path
print_interval: 10 print_interval: 10
phases: [phase1]
- name: single_gpu_train - name: single_gpu_train
class: train class: train
# num of epochs # num of epochs
...@@ -84,6 +86,7 @@ runner: ...@@ -84,6 +86,7 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path init_model_path: "" # load model path
print_interval: 10 print_interval: 10
- name: single_cpu_infer - name: single_cpu_infer
class: infer class: infer
# num of epochs # num of epochs
...@@ -91,6 +94,8 @@ runner: ...@@ -91,6 +94,8 @@ runner:
# device to run training or infer # device to run training or infer
device: cpu device: cpu
init_model_path: "increment/0" # load model path init_model_path: "increment/0" # load model path
phases: [phase2]
- name: local_cluster_cpu_ps_train - name: local_cluster_cpu_ps_train
class: local_cluster class: local_cluster
epochs: 4 epochs: 4
...@@ -103,6 +108,7 @@ runner: ...@@ -103,6 +108,7 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path init_model_path: "" # load model path
print_interval: 1 print_interval: 1
- name: multi_gpu_train - name: multi_gpu_train
class: train class: train
epochs: 4 epochs: 4
...@@ -123,7 +129,8 @@ phase: ...@@ -123,7 +129,8 @@ phase:
model: "{workspace}/model.py" # user-defined model model: "{workspace}/model.py" # user-defined model
dataset_name: dataloader_train # select dataset by name dataset_name: dataloader_train # select dataset by name
thread_num: 1 thread_num: 1
#- name: phase2
# model: "{workspace}/model.py" # user-defined model - name: phase2
# dataset_name: dataset_infer # select dataset by name model: "{workspace}/model.py" # user-defined model
# thread_num: 1 dataset_name: dataset_infer # select dataset by name
thread_num: 1
...@@ -18,11 +18,9 @@ import sys ...@@ -18,11 +18,9 @@ import sys
import argparse import argparse
import tempfile import tempfile
import yaml
import copy import copy
from paddlerec.core.factory import TrainerFactory from paddlerec.core.factory import TrainerFactory
from paddlerec.core.utils import envs from paddlerec.core.utils import envs
from paddlerec.core.utils import validation
from paddlerec.core.utils import util from paddlerec.core.utils import util
from paddlerec.core.utils import validation from paddlerec.core.utils import validation
...@@ -96,35 +94,57 @@ def get_all_inters_from_yaml(file, filters): ...@@ -96,35 +94,57 @@ def get_all_inters_from_yaml(file, filters):
return ret return ret
def get_engine(args): def get_modes(running_config):
if not isinstance(running_config, dict):
raise ValueError("get_modes arguments must be [dict]")
modes = running_config.get("mode")
if not modes:
raise ValueError("yaml mast have config: mode")
return modes
def get_engine(args, running_config, mode):
transpiler = get_transpiler() transpiler = get_transpiler()
with open(args.model, 'r') as rb: _envs = envs.load_yaml(args.model)
_envs = yaml.load(rb.read(), Loader=yaml.FullLoader)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."])
engine = run_extras.get("train.engine", None) engine_class = ".".join(["runner", mode, "class"])
if engine is None: engine_device = ".".join(["runner", mode, "device"])
engine = run_extras.get("runner." + _envs["mode"] + ".class", None) device_gpu_choices = ".".join(["runner", mode, "device", "selected_gpus"])
engine = running_config.get(engine_class, None)
if engine is None: if engine is None:
engine = "train" raise ValueError("not find {} in yaml, please check".format(
mode, engine_class))
device = running_config.get(engine_device, None)
if device is None:
print("not find device be specified in yaml, set CPU as default")
device = "CPU"
device = run_extras.get("runner." + _envs["mode"] + ".device", "CPU")
if device.upper() == "GPU": if device.upper() == "GPU":
selected_gpus = run_extras.get( selected_gpus = running_config.get(device_gpu_choices, None)
"runner." + _envs["mode"] + ".selected_gpus", "0")
if selected_gpus is None:
print(
"not find selected_gpus be specified in yaml, set `0` as default"
)
selected_gpus = ["0"]
else:
print("selected_gpus {} will be specified for running".format(
selected_gpus))
selected_gpus_num = len(selected_gpus.split(",")) selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num > 1: if selected_gpus_num > 1:
engine = "LOCAL_CLUSTER" engine = "LOCAL_CLUSTER"
engine = engine.upper() engine = engine.upper()
if engine not in engine_choices: if engine not in engine_choices:
raise ValueError("runner.class can not be chosen in {}".format( raise ValueError("{} can not be chosen in {}".format(engine_class,
engine_choices)) engine_choices))
print("engines: \n{}".format(engines))
run_engine = engines[transpiler].get(engine, None) run_engine = engines[transpiler].get(engine, None)
return run_engine return run_engine
...@@ -146,12 +166,7 @@ def set_runtime_envs(cluster_envs, engine_yaml): ...@@ -146,12 +166,7 @@ def set_runtime_envs(cluster_envs, engine_yaml):
if cluster_envs is None: if cluster_envs is None:
cluster_envs = {} cluster_envs = {}
engine_extras = get_inters_from_yaml(engine_yaml, "train.trainer.")
if "train.trainer.threads" in engine_extras and "CPU_NUM" in cluster_envs:
cluster_envs["CPU_NUM"] = engine_extras["train.trainer.threads"]
envs.set_runtime_environs(cluster_envs) envs.set_runtime_environs(cluster_envs)
envs.set_runtime_environs(engine_extras)
need_print = {} need_print = {}
for k, v in os.environ.items(): for k, v in os.environ.items():
...@@ -163,28 +178,32 @@ def set_runtime_envs(cluster_envs, engine_yaml): ...@@ -163,28 +178,32 @@ def set_runtime_envs(cluster_envs, engine_yaml):
def single_train_engine(args): def single_train_engine(args):
_envs = envs.load_yaml(args.model) _envs = envs.load_yaml(args.model)
run_extras = get_all_inters_from_yaml(args.model, ["train.", "runner."]) run_extras = get_all_inters_from_yaml(args.model, ["runner."])
trainer_class = run_extras.get(
"runner." + _envs["mode"] + ".trainer_class", None) mode = envs.get_runtime_environ("mode")
trainer_class = ".".join(["runner", mode, "trainer_class"])
fleet_class = ".".join(["runner", mode, "fleet_mode"])
device_class = ".".join(["runner", mode, "device"])
selected_gpus_class = ".".join(["runner", mode, "selected_gpus"])
trainer = run_extras.get(trainer_class, "GeneralTrainer")
fleet_mode = run_extras.get(fleet_class, "ps")
device = run_extras.get(device_class, "cpu")
selected_gpus = run_extras.get(selected_gpus_class, "0")
executor_mode = "train"
if trainer_class: single_envs = {}
trainer = trainer_class
else:
trainer = "GeneralTrainer"
executor_mode = "train"
fleet_mode = run_extras.get("runner." + _envs["mode"] + ".fleet_mode",
"ps")
device = run_extras.get("runner." + _envs["mode"] + ".device", "cpu")
selected_gpus = run_extras.get(
"runner." + _envs["mode"] + ".selected_gpus", "0")
selected_gpus_num = len(selected_gpus.split(","))
if device.upper() == "GPU": if device.upper() == "GPU":
assert selected_gpus_num == 1, "Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS" selected_gpus_num = len(selected_gpus.split(","))
if selected_gpus_num != 1:
raise ValueError(
"Single Mode Only Support One GPU, Set Local Cluster Mode to use Multi-GPUS"
)
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs = {}
single_envs["selsected_gpus"] = selected_gpus
single_envs["FLAGS_selected_gpus"] = selected_gpus
single_envs["train.trainer.trainer"] = trainer single_envs["train.trainer.trainer"] = trainer
single_envs["fleet_mode"] = fleet_mode single_envs["fleet_mode"] = fleet_mode
single_envs["train.trainer.executor_mode"] = executor_mode single_envs["train.trainer.executor_mode"] = executor_mode
...@@ -444,9 +463,16 @@ if __name__ == "__main__": ...@@ -444,9 +463,16 @@ if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
model_name = args.model.split('.')[-1] model_name = args.model.split('.')[-1]
args.model = get_abs_model(args.model) args.model = get_abs_model(args.model)
if not validation.yaml_validation(args.model): if not validation.yaml_validation(args.model):
sys.exit(-1) sys.exit(-1)
engine_registry() engine_registry()
which_engine = get_engine(args)
engine = which_engine(args) running_config = get_all_inters_from_yaml(args.model, ["mode", "runner."])
engine.run() modes = get_modes(running_config)
for mode in modes:
envs.set_runtime_environs({"mode": mode})
which_engine = get_engine(args, running_config, mode)
engine = which_engine(args)
engine.run()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册