提交 cc3d47cf 编写于 作者: T tangwei

windows supported

上级 9d54734c
...@@ -35,7 +35,6 @@ class ModelBase(object): ...@@ -35,7 +35,6 @@ class ModelBase(object):
self._data_loader = None self._data_loader = None
self._infer_data_loader = None self._infer_data_loader = None
self._fetch_interval = 20 self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform() self._platform = envs.get_platform()
self._init_hyper_parameters() self._init_hyper_parameters()
self._env = config self._env = config
...@@ -50,11 +49,11 @@ class ModelBase(object): ...@@ -50,11 +49,11 @@ class ModelBase(object):
self._slot_inited = True self._slot_inited = True
dataset = {} dataset = {}
model_dict = {} model_dict = {}
for i in self._env["executor"]: for i in envs.get_global_env("phase"):
if i["name"] == kargs["name"]: if i["name"] == kargs["name"]:
model_dict = i model_dict = i
break break
for i in self._env["dataset"]: for i in envs.get_global_env("dataset"):
if i["name"] == model_dict["dataset_name"]: if i["name"] == model_dict["dataset_name"]:
dataset = i dataset = i
break break
...@@ -139,8 +138,7 @@ class ModelBase(object): ...@@ -139,8 +138,7 @@ class ModelBase(object):
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0' os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
if name == "SGD": if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001, reg = envs.get_global_env("hyper_parameters.reg", 0.0001)
self._namespace)
optimizer_i = fluid.optimizer.SGD( optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM": elif name == "ADAM":
......
...@@ -64,26 +64,21 @@ class Trainer(object): ...@@ -64,26 +64,21 @@ class Trainer(object):
self.increment_models = [] self.increment_models = []
self._exector_context = {} self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False} self._context = {'status': 'uninit', 'is_exit': False}
self._config_yaml = config self._context["config_yaml"] = config
self._context["config_yaml"] = self._config_yaml
self._config = envs.load_yaml(config)
self._context["env"] = self._config
self._model = {} self._model = {}
self._dataset = {} self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_runtime_environ("mode") self._runner_name = envs.get_runtime_environ("mode")
self._context["runner_name"] = self._runner_name self._context["runner_name"] = self._runner_name
phase_names = self._config.get( phase_names = envs.get_global_env(
"runner." + self._runner_name + ".phases", None) "runner." + self._runner_name + ".phases", None)
phases = [] phases = []
if phase_names is None: if phase_names is None:
phases = self._config.get("phase") phases = envs.get_global_env("phase")
else: else:
for phase in self._config.get("phase"): for phase in envs.get_global_env("phase"):
if phase["name"] in phase_names: if phase["name"] in phase_names:
phases.append(phase) phases.append(phase)
...@@ -100,19 +95,21 @@ class Trainer(object): ...@@ -100,19 +95,21 @@ class Trainer(object):
""" """
device = envs.get_global_env( device = envs.get_global_env(
"runner." + self._runner_name + ".device", default_value="CPU") "runner." + self._runner_name + ".device", default_value="CPU")
if device.upper() == 'GPU': device = device.upper()
if device == 'GPU':
self.check_gpu() self.check_gpu()
self.device = Device.GPU self.device = Device.GPU
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0)) gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
self._place = fluid.CUDAPlace(gpu_id) self._place = fluid.CUDAPlace(gpu_id)
self._exe = fluid.Executor(self._place) self._exe = fluid.Executor(self._place)
elif device.upper() == "CPU": elif device == "CPU":
self.device = Device.CPU self.device = Device.CPU
self._place = fluid.CPUPlace() self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place) self._exe = fluid.Executor(self._place)
else: else:
raise ValueError("Not Support device {}".format(device)) raise ValueError("Not Support device {}".format(device))
self._context["device"] = device.upper() self._context["device"] = device
self._context["exe"] = self._exe self._context["exe"] = self._exe
self._context["place"] = self._place self._context["place"] = self._place
...@@ -130,7 +127,6 @@ class Trainer(object): ...@@ -130,7 +127,6 @@ class Trainer(object):
try: try:
if not fluid.is_compiled_with_cuda(): if not fluid.is_compiled_with_cuda():
raise RuntimeError(err) raise RuntimeError(err)
sys.exit(1)
except Exception as e: except Exception as e:
pass pass
......
...@@ -58,11 +58,9 @@ class SingleNetwork(NetworkBase): ...@@ -58,11 +58,9 @@ class SingleNetwork(NetworkBase):
with fluid.program_guard(train_program, startup_program): with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard(): with fluid.unique_name.guard():
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
model_path = model_dict["model"].replace( model_path = model_dict["model"]
"{workspace}", model = envs.lazy_instance_by_fliename(model_path,
envs.path_adapter(context["env"]["workspace"])) "Model")(None)
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
if context["is_infer"]: if context["is_infer"]:
model._infer_data_var = model.input_data( model._infer_data_var = model.input_data(
...@@ -97,7 +95,7 @@ class SingleNetwork(NetworkBase): ...@@ -97,7 +95,7 @@ class SingleNetwork(NetworkBase):
"default_main_program"] = train_program.clone() "default_main_program"] = train_program.clone()
context["dataset"] = {} context["dataset"] = {}
for dataset in context["env"]["dataset"]: for dataset in envs.get_global_env("dataset"):
if dataset["type"] != "DataLoader": if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context) dataset_class = QueueDataset(context)
context["dataset"][dataset[ context["dataset"][dataset[
...@@ -114,19 +112,17 @@ class PSNetwork(NetworkBase): ...@@ -114,19 +112,17 @@ class PSNetwork(NetworkBase):
def build_network(self, context): def build_network(self, context):
context["model"] = {} context["model"] = {}
if len(context["env"]["phase"]) > 1: if len(envs.get_global_env("phase")) > 1:
warnings.warn( warnings.warn(
"Cluster Train Only Support One Phase.", "Cluster Train Only Support One Phase.",
category=UserWarning, category=UserWarning,
stacklevel=2) stacklevel=2)
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
context["model"][model_dict["name"]] = {} context["model"][model_dict["name"]] = {}
dataset_name = model_dict["dataset_name"] dataset_name = model_dict["dataset_name"]
model_path = model_dict["model"].replace( model_path = model_dict["model"]
"{workspace}", envs.path_adapter(context["env"]["workspace"])) model = envs.lazy_instance_by_fliename(model_path, "Model")(None)
model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"])
model._data_var = model.input_data( model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"]) dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name + if envs.get_global_env("dataset." + dataset_name +
...@@ -155,7 +151,7 @@ class PSNetwork(NetworkBase): ...@@ -155,7 +151,7 @@ class PSNetwork(NetworkBase):
else: else:
context["fleet"].init_worker() context["fleet"].init_worker()
context["dataset"] = {} context["dataset"] = {}
for dataset in context["env"]["dataset"]: for dataset in envs.get_global_env("dataset"):
if dataset["type"] != "DataLoader": if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context) dataset_class = QueueDataset(context)
context["dataset"][dataset[ context["dataset"][dataset[
...@@ -201,12 +197,12 @@ class PslibNetwork(NetworkBase): ...@@ -201,12 +197,12 @@ class PslibNetwork(NetworkBase):
def build_network(self, context): def build_network(self, context):
context["model"] = {} context["model"] = {}
if len(context["env"]["phase"]) > 1: if len(envs.get_global_env("phase")) > 1:
warnings.warn( warnings.warn(
"Cluster Train Only Support One Phase.", "Cluster Train Only Support One Phase.",
category=UserWarning, category=UserWarning,
stacklevel=2) stacklevel=2)
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
train_program = fluid.Program() train_program = fluid.Program()
startup_program = fluid.Program() startup_program = fluid.Program()
scope = fluid.Scope() scope = fluid.Scope()
...@@ -216,12 +212,9 @@ class PslibNetwork(NetworkBase): ...@@ -216,12 +212,9 @@ class PslibNetwork(NetworkBase):
with fluid.unique_name.guard(): with fluid.unique_name.guard():
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
context["model"][model_dict["name"]] = {} context["model"][model_dict["name"]] = {}
model_path = model_dict["model"]
model_path = model_dict["model"].replace( model = envs.lazy_instance_by_fliename(model_path,
"{workspace}", "Model")(None)
envs.path_adapter(context["env"]["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
model._data_var = model.input_data( model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"]) dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name + if envs.get_global_env("dataset." + dataset_name +
...@@ -250,7 +243,7 @@ class PslibNetwork(NetworkBase): ...@@ -250,7 +243,7 @@ class PslibNetwork(NetworkBase):
self._server(context) self._server(context)
else: else:
context["dataset"] = {} context["dataset"] = {}
for dataset in context["env"]["dataset"]: for dataset in envs.get_global_env("dataset"):
if dataset["type"] != "DataLoader": if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context) dataset_class = QueueDataset(context)
context["dataset"][dataset[ context["dataset"][dataset[
...@@ -270,12 +263,12 @@ class CollectiveNetwork(NetworkBase): ...@@ -270,12 +263,12 @@ class CollectiveNetwork(NetworkBase):
def build_network(self, context): def build_network(self, context):
context["model"] = {} context["model"] = {}
if len(context["env"]["phase"]) > 1: if len(envs.get_global_env("phase")) > 1:
warnings.warn( warnings.warn(
"Cluster Train Only Support One Phase.", "Cluster Train Only Support One Phase.",
category=UserWarning, category=UserWarning,
stacklevel=2) stacklevel=2)
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
context["model"][model_dict["name"]] = {} context["model"][model_dict["name"]] = {}
dataset_name = model_dict["dataset_name"] dataset_name = model_dict["dataset_name"]
...@@ -284,11 +277,9 @@ class CollectiveNetwork(NetworkBase): ...@@ -284,11 +277,9 @@ class CollectiveNetwork(NetworkBase):
scope = fluid.Scope() scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program): with fluid.program_guard(train_program, startup_program):
with fluid.scope_guard(scope): with fluid.scope_guard(scope):
model_path = model_dict["model"].replace( model_path = model_dict["model"]
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model = envs.lazy_instance_by_fliename(model_path, model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"]) "Model")(None)
model._data_var = model.input_data( model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"]) dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name + if envs.get_global_env("dataset." + dataset_name +
...@@ -314,7 +305,7 @@ class CollectiveNetwork(NetworkBase): ...@@ -314,7 +305,7 @@ class CollectiveNetwork(NetworkBase):
"default_main_program"] = train_program "default_main_program"] = train_program
context["dataset"] = {} context["dataset"] = {}
for dataset in context["env"]["dataset"]: for dataset in envs.get_global_env("dataset"):
if dataset["type"] != "DataLoader": if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context) dataset_class = QueueDataset(context)
context["dataset"][dataset[ context["dataset"][dataset[
......
...@@ -40,6 +40,7 @@ class RunnerBase(object): ...@@ -40,6 +40,7 @@ class RunnerBase(object):
def _run(self, context, model_dict): def _run(self, context, model_dict):
reader_name = model_dict["dataset_name"] reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "." name = "dataset." + reader_name + "."
if envs.get_global_env(name + "type") == "DataLoader": if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict, context) self._executor_dataloader_train(model_dict, context)
else: else:
...@@ -309,7 +310,7 @@ class PSRunner(RunnerBase): ...@@ -309,7 +310,7 @@ class PSRunner(RunnerBase):
epochs = int( epochs = int(
envs.get_global_env("runner." + context["runner_name"] + envs.get_global_env("runner." + context["runner_name"] +
".epochs")) ".epochs"))
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
for epoch in range(epochs): for epoch in range(epochs):
begin_time = time.time() begin_time = time.time()
self._run(context, model_dict) self._run(context, model_dict)
...@@ -336,7 +337,7 @@ class CollectiveRunner(RunnerBase): ...@@ -336,7 +337,7 @@ class CollectiveRunner(RunnerBase):
epochs = int( epochs = int(
envs.get_global_env("runner." + context["runner_name"] + envs.get_global_env("runner." + context["runner_name"] +
".epochs")) ".epochs"))
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
for epoch in range(epochs): for epoch in range(epochs):
begin_time = time.time() begin_time = time.time()
self._run(context, model_dict) self._run(context, model_dict)
...@@ -361,7 +362,7 @@ class PslibRunner(RunnerBase): ...@@ -361,7 +362,7 @@ class PslibRunner(RunnerBase):
def run(self, context): def run(self, context):
context["fleet"].init_worker() context["fleet"].init_worker()
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
epochs = int( epochs = int(
envs.get_global_env("runner." + context["runner_name"] + envs.get_global_env("runner." + context["runner_name"] +
".epochs")) ".epochs"))
...@@ -382,7 +383,7 @@ class PslibRunner(RunnerBase): ...@@ -382,7 +383,7 @@ class PslibRunner(RunnerBase):
day = begin_day + datetime.timedelta(days=day, hours=hour) day = begin_day + datetime.timedelta(days=day, hours=hour)
day_s = day.strftime('%Y%m%d/%H') day_s = day.strftime('%Y%m%d/%H')
for dataset in context["env"]["dataset"]: for dataset in envs.get_global_env("dataset"):
if dataset["type"] != "DataLoader": if dataset["type"] != "DataLoader":
name = dataset["name"] name = dataset["name"]
train_data_path = envs.get_global_env(name + train_data_path = envs.get_global_env(name +
......
...@@ -73,7 +73,7 @@ class PSStartup(StartupBase): ...@@ -73,7 +73,7 @@ class PSStartup(StartupBase):
pass pass
def startup(self, context): def startup(self, context):
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
train_prog = context["model"][model_dict["name"]]["main_program"] train_prog = context["model"][model_dict["name"]]["main_program"]
...@@ -91,7 +91,7 @@ class CollectiveStartup(StartupBase): ...@@ -91,7 +91,7 @@ class CollectiveStartup(StartupBase):
pass pass
def startup(self, context): def startup(self, context):
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
train_prog = context["model"][model_dict["name"]][ train_prog = context["model"][model_dict["name"]][
"default_main_program"] "default_main_program"]
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import json
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleInfer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
device = envs.get_global_env("device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._infer_data_var
dataset.set_use_var(inputs)
break
return dataset
def _get_dataloader(self, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots")
dense_slots = envs.get_global_env(name + "dense_slots")
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
opt_name = envs.get_global_env("hyper_parameters.optimizer.class")
opt_lr = envs.get_global_env(
"hyper_parameters.optimizer.learning_rate")
opt_strategy = envs.get_global_env(
"hyper_parameters.optimizer.strategy")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._infer_data_var = model.input_data(
is_infer=True,
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=True)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._infer_data_var, True)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(
envs.get_global_env("runner." + self._runner_name + ".epochs"))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
with fluid.scope_guard(scope):
self._exe.infer_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
fetch_vars = []
fetch_alias = []
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics_format.append("{}: {{}}".format("batch"))
metrics_indexes = dict()
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_indexes[var.name] = len(metrics_varnames) - 1
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._model[model_name][3]._data_loader
reader.start()
batch_id = 0
scope = self._model[model_name][2]
infer_results = []
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames,
return_numpy=False)
metrics = [batch_id]
metrics.extend(metrics_rets)
batch_infer_result = {}
for k, v in metrics_indexes.items():
batch_infer_result[k] = np.array(metrics_rets[
v]).tolist()
infer_results.append(batch_infer_result)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
with open(model_dict['save_path'], 'w') as fout:
json.dump(infer_results, fout)
def terminal(self, context):
context['is_exit'] = True
def load(self, is_fleet=False):
name = "runner." + self._runner_name + "."
dirname = envs.get_global_env(name + "init_model_path", None)
if dirname is None or dirname == "":
return
print("single_infer going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
fluid.io.load_persistables(self._exe, dirname)
def save(self, epoch_id, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", None)
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", None)
if feed_varnames is None or fetch_varnames is None or feed_varnames == "":
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import time
import logging
import os
import paddle.fluid as fluid
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
from paddlerec.core.utils import envs
from paddlerec.core.reader import SlotReader
from paddlerec.core.utils import dataloader_instance
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
class SingleTrainer(TranspileTrainer):
def __init__(self, config=None):
super(TranspileTrainer, self).__init__(config)
self._env = self._config
self.processor_register()
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
device = envs.get_global_env("runner." + self._runner_name + ".device")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
elif device == 'cpu':
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.executor_train)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
context['status'] = 'init_pass'
def _get_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
"TRAIN", self._config_yaml)
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, "fake", \
sparse_slots.replace(" ", "?"), dense_slots.replace(" ", "?"), str(padding))
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(envs.get_global_env(name + "batch_size"))
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
for model_dict in self._env["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = self._model[model_dict["name"]][3]
inputs = model._data_var
dataset.set_use_var(inputs)
break
return dataset
def _get_dataloader(self, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
abs_dir = os.path.dirname(os.path.abspath(__file__))
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class, dataset_name, self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
"TrainReader")
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
def _create_dataset(self, dataset_name):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
thread_num = envs.get_global_env(name + "thread_num")
batch_size = envs.get_global_env(name + "batch_size")
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(),
" change reader to DataLoader")
type_name = "DataLoader"
padding = 0
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name)
def init(self, context):
for model_dict in self._env["phase"]:
self._model[model_dict["name"]] = [None] * 5
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(self._env["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(self._env)
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=False)
self._get_dataloader(dataset_name,
model._data_loader)
model.net(model._data_var, False)
optimizer = model.optimizer()
optimizer.minimize(model._cost)
self._model[model_dict["name"]][0] = train_program
self._model[model_dict["name"]][1] = startup_program
self._model[model_dict["name"]][2] = scope
self._model[model_dict["name"]][3] = model
self._model[model_dict["name"]][4] = train_program.clone()
for dataset in self._env["dataset"]:
if dataset["type"] != "DataLoader":
self._dataset[dataset["name"]] = self._create_dataset(dataset[
"name"])
context['status'] = 'startup_pass'
def startup(self, context):
for model_dict in self._env["phase"]:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
self._exe.run(self._model[model_dict["name"]][1])
context['status'] = 'train_pass'
def executor_train(self, context):
epochs = int(
envs.get_global_env("runner." + self._runner_name + ".epochs"))
for j in range(epochs):
for model_dict in self._env["phase"]:
if j == 0:
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][0]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.load()
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
begin_time = time.time()
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict)
else:
self._executor_dataset_train(model_dict)
with fluid.scope_guard(self._model[model_dict["name"]][2]):
train_prog = self._model[model_dict["name"]][4]
startup_prog = self._model[model_dict["name"]][1]
with fluid.program_guard(train_prog, startup_prog):
self.save(j)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, time elasped: {}".format(j, seconds))
context['status'] = "terminal_pass"
def _executor_dataset_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
scope = self._model[model_name][2]
program = self._model[model_name][0]
reader = self._dataset[reader_name]
threads = model_dict.get("thread_num", 1)
with fluid.scope_guard(scope):
self._exe.train_from_dataset(
program=program,
dataset=reader,
thread=threads,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = self._model[model_name][3]
program = self._model[model_name][0].clone()
_build_strategy = fluid.BuildStrategy()
_exe_strategy = fluid.ExecutionStrategy()
# 0: kCoeffNumDevice; 1: One; 2: Customized
_gradient_scale_strategy = model_dict.get("gradient_scale_strategy", 0)
if _gradient_scale_strategy == 0:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.CoeffNumDevice
elif _gradient_scale_strategy == 1:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.One
elif _gradient_scale_strategy == 2:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
else:
raise ValueError(
"Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]."
)
_build_strategy.gradient_scale_strategy = gradient_scale_strategy
if "thread_num" in model_dict and model_dict["thread_num"] > 1:
_build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
_exe_strategy.num_threads = model_dict["thread_num"]
os.environ['CPU_NUM'] = str(_exe_strategy.num_threads)
program = fluid.compiler.CompiledProgram(program).with_data_parallel(
loss_name=model_class.get_avg_cost().name,
build_strategy=_build_strategy,
exec_strategy=_exe_strategy)
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + self._runner_name +
".print_interval", 20))
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("batch"))
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = self._model[model_name][3]._data_loader
reader.start()
batch_id = 0
scope = self._model[model_name][2]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [batch_id]
metrics.extend(metrics_rets)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def terminal(self, context):
context['is_exit'] = True
def load(self, is_fleet=False):
dirname = envs.get_global_env(
"runner." + self._runner_name + ".init_model_path", None)
load_vars = envs.get_global_env(
"runner." + self._runner_name + ".load_vars", None)
def name_has_embedding(var):
res = False
for var_name in load_vars:
if var_name == var.name:
return True
return res
if dirname is None or dirname == "":
return
print("going to load ", dirname)
if is_fleet:
fleet.load_persistables(self._exe, dirname)
else:
if load_vars is None or len(load_vars) == 0:
fluid.io.load_persistables(self._exe, dirname)
else:
fluid.io.load_vars(
self._exe, dirname, predicate=name_has_embedding)
def save(self, epoch_id, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", [])
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", [])
if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \
len(feed_varnames) == 0 or len(fetch_varnames) == 0:
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
def save_persistables():
name = "runner." + self._runner_name + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
save_persistables()
save_inference_model()
...@@ -208,7 +208,7 @@ CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www ...@@ -208,7 +208,7 @@ CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www
稀疏参数输入的定义: 稀疏参数输入的定义:
```python ```python
def sparse_inputs(): def sparse_inputs():
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self._namespace) ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None)
sparse_input_ids = [ sparse_input_ids = [
fluid.layers.data(name="S" + str(i), fluid.layers.data(name="S" + str(i),
...@@ -222,7 +222,7 @@ def sparse_inputs(): ...@@ -222,7 +222,7 @@ def sparse_inputs():
稠密参数输入的定义: 稠密参数输入的定义:
```python ```python
def dense_input(): def dense_input():
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self._namespace) dim = envs.get_global_env("hyper_parameters.dense_input_dim", None)
dense_input_var = fluid.layers.data(name="D", dense_input_var = fluid.layers.data(name="D",
shape=[dim], shape=[dim],
......
...@@ -139,7 +139,6 @@ class Model(object): ...@@ -139,7 +139,6 @@ class Model(object):
self._data_loader = None self._data_loader = None
self._infer_data_loader = None self._infer_data_loader = None
self._fetch_interval = 20 self._fetch_interval = 20
self._namespace = "train.model"
self._platform = envs.get_platform() self._platform = envs.get_platform()
def get_inputs(self): def get_inputs(self):
......
...@@ -24,8 +24,7 @@ hyper_parameters: ...@@ -24,8 +24,7 @@ hyper_parameters:
```python ```python
if name == "SGD": if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001, reg = envs.get_global_env("hyper_parameters.reg", 0.0001)
self._namespace)
optimizer_i = fluid.optimizer.SGD( optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM": elif name == "ADAM":
......
...@@ -47,7 +47,7 @@ class Startup(StartupBase): ...@@ -47,7 +47,7 @@ class Startup(StartupBase):
def _single_startup(self, context): def _single_startup(self, context):
load_tree_from_numpy = envs.get_global_env( load_tree_from_numpy = envs.get_global_env(
"hyper_parameters.tree.load_tree_from_numpy", False) "hyper_parameters.tree.load_tree_from_numpy", False)
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
context["exe"].run(context["model"][model_dict["name"]][ context["exe"].run(context["model"][model_dict["name"]][
"startup_program"]) "startup_program"])
...@@ -106,7 +106,7 @@ class Startup(StartupBase): ...@@ -106,7 +106,7 @@ class Startup(StartupBase):
warmup_model_path = envs.get_global_env( warmup_model_path = envs.get_global_env(
"runner." + context["runner_name"] + ".init_model_path", None) "runner." + context["runner_name"] + ".init_model_path", None)
assert warmup_model_path != None, "set runner.init_model_path for loading model" assert warmup_model_path != None, "set runner.init_model_path for loading model"
model_dict = context["env"]["phase"][0] model_dict = envs.get_global_env("phase")[0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]): with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
context["exe"].run(context["model"][model_dict["name"]][ context["exe"].run(context["model"][model_dict["name"]][
"startup_program"]) "startup_program"])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册