未验证 提交 8d1afdeb 编写于 作者: C Chengmo 提交者: GitHub

Refactor Trainer - for merge (#54)

* fix yaml import

* add value checker

* add development.md & update yaml.md

* fix tdm

* update general trainer

* update design

* rename model & reader

* update startup

* add gpu support

* after self-test

* code clean

* fix

* change dnn yaml

* add mpi support

* fix pslib

* fix pslib

* fix pslib network

* fix pslib program

* fix pslib runner

* fix fleet mode

* fix is_distribute

* fix deepfm

* change pslib code

* fix din

* update & fix

* fix reader base

* add ci & fix

* fix fnn

* fix ci sudo
Co-authored-by: Ntangwei <tangwei12@baidu.com>
上级 cac23dc0
......@@ -10,10 +10,20 @@ os:
env:
- JOB=check_style
- JOB=model_test
before_install:
# For pylint dockstring checker
- sudo apt-get update
- sudo apt-get install -y python-pip libpython-dev
- sudo pip install -U pip
- sudo pip install six --upgrade --ignore-installed six
- sudo pip install pillow
- sudo pip install PyYAML
- sudo pip install pylint pytest astroid isort pre-commit
- sudo pip install kiwisolver
- sudo pip install paddlepaddle==1.7.2 --ignore-installed urllib3
- sudo python setup.py install
- |
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
......
......@@ -26,11 +26,12 @@ from paddlerec.core.utils import envs
class LocalClusterEngine(Engine):
def start_procs(self):
fleet_mode = self.envs["fleet_mode"]
worker_num = self.envs["worker_num"]
server_num = self.envs["server_num"]
ports = [self.envs["start_port"]]
logs_dir = self.envs["log_dir"]
selected_gpus = self.envs["selected_gpus"].split(",")
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env["CLUSTER_INSTANCE"] = "1"
......@@ -39,71 +40,115 @@ class LocalClusterEngine(Engine):
procs = []
log_fns = []
for i in range(server_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints_ips = [
x.split(":")[0] for x in user_endpoints.split(",")
]
user_endpoints_port = [
x.split(":")[1] for x in user_endpoints.split(",")
]
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
for i in range(server_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PORT": user_endpoints_port[i],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": user_endpoints_ips[i]
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/server.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
procs.append(proc)
for i in range(worker_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i)
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/worker.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
procs.append(proc)
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < server_num:
continue
procs[i].wait()
if len(log_fns) > 0:
log_fns[i].close()
for i in range(server_num):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print(
"all workers already completed, you can view logs under the `{}` directory".
format(logs_dir),
file=sys.stderr)
if fleet_mode.upper() == "PS":
for i in range(server_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints_ips = [
x.split(":")[0] for x in user_endpoints.split(",")
]
user_endpoints_port = [
x.split(":")[1] for x in user_endpoints.split(",")
]
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
for i in range(server_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PORT": user_endpoints_port[i],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": user_endpoints_ips[i]
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/server.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd,
env=current_env,
stdout=fn,
stderr=fn,
cwd=os.getcwd())
procs.append(proc)
for i in range(worker_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i)
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/worker.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd,
env=current_env,
stdout=fn,
stderr=fn,
cwd=os.getcwd())
procs.append(proc)
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < server_num:
continue
procs[i].wait()
if len(log_fns) > 0:
log_fns[i].close()
for i in range(server_num):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print(
"all workers already completed, you can view logs under the `{}` directory".
format(logs_dir),
file=sys.stderr)
elif fleet_mode.upper() == "COLLECTIVE":
selected_gpus_num = len(selected_gpus)
for i in range(selected_gpus_num - 1):
while True:
new_port = envs.find_free_port()
if new_port not in ports:
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
for i in range(selected_gpus_num):
current_env.update({
"PADDLE_TRAINER_ENDPOINTS": user_endpoints,
"PADDLE_CURRENT_ENDPOINTS": user_endpoints[i],
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i),
"FLAGS_selected_gpus": str(selected_gpus[i])
})
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/worker.%d" % (logs_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(
cmd,
env=current_env,
stdout=fn,
stderr=fn,
cwd=os.getcwd())
procs.append(proc)
def run(self):
self.start_procs()
......@@ -23,18 +23,13 @@ trainers = {}
def trainer_registry():
trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py")
trainers["SingleInfer"] = os.path.join(trainer_abs, "single_infer.py")
trainers["ClusterTrainer"] = os.path.join(trainer_abs,
"cluster_trainer.py")
# Definition of procedure execution process
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs,
"ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(trainer_abs,
"ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(trainer_abs,
"tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(trainer_abs,
"tdm_cluster_trainer.py")
trainers["GeneralTrainer"] = os.path.join(trainer_abs,
"general_trainer.py")
trainer_registry()
......
......@@ -13,13 +13,13 @@
# limitations under the License.
import abc
import os
import paddle.fluid as fluid
from paddlerec.core.utils import envs
class Model(object):
class ModelBase(object):
"""Base Model
"""
__metaclass__ = abc.ABCMeta
......@@ -133,6 +133,11 @@ class Model(object):
raise ValueError(
"configured optimizer can only supported SGD/Adam/Adagrad")
if name == "SGD":
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '1'
else:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
if name == "SGD":
reg = envs.get_global_env("hyper_parameters.reg", 0.0001,
self._namespace)
......
......@@ -21,7 +21,7 @@ import yaml
from paddlerec.core.utils import envs
class Reader(dg.MultiSlotDataGenerator):
class ReaderBase(dg.MultiSlotDataGenerator):
__metaclass__ = abc.ABCMeta
def __init__(self, config):
......@@ -32,7 +32,7 @@ class Reader(dg.MultiSlotDataGenerator):
@abc.abstractmethod
def init(self):
"""init"""
"""init """
pass
@abc.abstractmethod
......
......@@ -24,20 +24,158 @@ from paddle import fluid
from paddlerec.core.utils import envs
class EngineMode:
"""
There are various engine designed for different runing environment.
"""
SINGLE = 1
CLUSTER = 2
LOCAL_CLUSTER = 3
class FleetMode:
"""
Paddle Distributed train support: ParameterServer/Collective/PSlib
"""
PS = 1
COLLECTIVE = 2
PSLIB = 3
class Device:
"""
PaddleRec Support CPU/GPU, XPU will comming soon
"""
CPU = 1
GPU = 2
# XPU =3
class Trainer(object):
"""R
"""
Trainer Base
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config=None):
self._status_processor = {}
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self.model = None
self.inference_models = []
self.increment_models = []
self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False}
self._config_yaml = config
self._context["config_yaml"] = self._config_yaml
self._config = envs.load_yaml(config)
self._context["env"] = self._config
self._model = {}
self._dataset = {}
envs.set_global_envs(self._config)
envs.update_workspace()
self._runner_name = envs.get_global_env("mode")
self._context["runner_name"] = self._runner_name
print("PaddleRec: Runner {} Begin".format(self._runner_name))
self.which_engine()
self.which_device()
self.which_fleet_mode()
self.which_executor_mode()
self.legality_check()
def which_device(self):
"""R
"""
device = envs.get_global_env(
"runner." + self._runner_name + ".device", default_value="CPU")
if device.upper() == 'GPU':
self.check_gpu()
self.device = Device.GPU
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
self._place = fluid.CUDAPlace(gpu_id)
self._exe = fluid.Executor(self._place)
elif device.upper() == "CPU":
self.device = Device.CPU
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
else:
raise ValueError("Not Support device {}".format(device))
self._context["device"] = device.upper()
self._context["exe"] = self._exe
self._context["place"] = self._place
def check_gpu(self):
"""
Log error and exit when set use_gpu=true in paddlepaddle
cpu version.
"""
err = "GPU cannot be set as true while you are " \
"using paddlepaddle cpu version ! \nPlease try: \n" \
"\t1. Install paddlepaddle-gpu to run model on GPU \n" \
"\t2. Set device as cpu in config file to run " \
"model on CPU"
try:
if not fluid.is_compiled_with_cuda():
raise RuntimeError(err)
sys.exit(1)
except Exception as e:
pass
def which_engine(self):
engine = envs.get_runtime_environ("train.trainer.engine")
if engine.upper() == "SINGLE":
self.engine = EngineMode.SINGLE
self.is_fleet = False
elif engine.upper() == "LOCAL_CLUSTER":
self.engine = EngineMode.LOCAL_CLUSTER
self.is_fleet = True
elif engine.upper() == "CLUSTER":
self.engine = EngineMode.CLUSTER
self.is_fleet = True
else:
raise ValueError("Not Support Engine {}".format(engine))
self._context["is_fleet"] = self.is_fleet
self._context["engine"] = self.engine
def which_fleet_mode(self):
fleet_mode = envs.get_runtime_environ("fleet_mode")
if fleet_mode.upper() == "PS":
self.fleet_mode = FleetMode.PS
elif fleet_mode.upper() == "COLLECTIVE":
self.fleet_mode = FleetMode.COLLECTIVE
elif fleet_mode.upper() == "PSLIB":
self.fleet_mode = FleetMode.PSLIB
else:
raise ValueError("Not Support Fleet Mode {}".format(fleet_mode))
self._context["is_pslib"] = (fleet_mode.upper() == "PSLIB")
self._context["fleet_mode"] = fleet_mode
def which_executor_mode(self):
executor_mode = envs.get_runtime_environ("train.trainer.executor_mode")
if executor_mode.upper() not in ["TRAIN", "INFER"]:
raise ValueError("Not Support Executor Mode {}".format(
executor_mode))
if executor_mode.upper() == "TRAIN":
self.is_infer = False
else:
self.is_infer = True
print("Executor Mode: {}".format(executor_mode))
self._context["is_infer"] = self.is_infer
def legality_check(self):
if self.device == Device.CPU:
assert self.fleet_mode != FleetMode.COLLECTIVE, "Not Support CPU with Collective Mode"
if self.is_infer:
assert self.engine == EngineMode.SINGLE, "Not Support Distributed Infer "
@abc.abstractmethod
def processor_register(self):
pass
def regist_context_processor(self, status_name, processor):
"""
regist a processor for specify status
......@@ -80,8 +218,8 @@ class Trainer(object):
Return:
bool exit_app or not
"""
print('Exit app. catch exception in precoss status:%s, except:%s' \
% (context['status'], str(exception)))
print('Exit app. catch exception in precoss status:%s, except:%s' %
(context['status'], str(exception)))
return True
def reload_train_context(self):
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import os
import time
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
class ClusterTrainer(TranspileTrainer):
def processor_register(self):
role = PaddleCloudRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass',
self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
mode = envs.get_runtime_environ("train.trainer.strategy")
assert mode in ["async", "geo", "sync", "half_async"]
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
assert strategy is not None
self.strategy = strategy
return strategy
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env("hyper_parameters.optimizer",
None, "train.model")
if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
fleet.init_worker()
reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.model.get_avg_cost().name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % self.fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
self.save(epoch, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def dataset_train(self, context):
fleet.init_worker()
dataset = self._get_dataset()
ins = self._get_dataset_ins()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
begin_time = time.time()
self._exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.utils import dataloader_instance
from paddlerec.core.reader import SlotReader
from paddlerec.core.trainer import EngineMode
__all__ = ["DatasetBase", "DataLoader", "QueueDataset"]
class DatasetBase(object):
"""R
"""
def __init__(self, context):
pass
def get_dataset(self, context):
pass
class DataLoader(DatasetBase):
def __init__(self, context):
pass
def get_dataloader(self, context, dataset_name, dataloader):
name = "dataset." + dataset_name + "."
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
batch_size = envs.get_global_env(name + "batch_size")
reader_class = envs.get_global_env(name + "data_converter")
reader_class_name = envs.get_global_env(name + "reader_class_name",
"Reader")
if sparse_slots == "" and dense_slots == "":
reader = dataloader_instance.dataloader_by_name(
reader_class,
dataset_name,
context["config_yaml"],
context,
reader_class_name=reader_class_name)
reader_class = envs.lazy_instance_by_fliename(reader_class,
reader_class_name)
reader_ins = reader_class(context["config_yaml"])
else:
reader = dataloader_instance.slotdataloader_by_name(
"", dataset_name, context["config_yaml"], context)
reader_ins = SlotReader(context["config_yaml"])
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
return dataloader
class QueueDataset(DatasetBase):
def __init__(self, context):
pass
def create_dataset(self, dataset_name, context):
name = "dataset." + dataset_name + "."
type_name = envs.get_global_env(name + "type")
if envs.get_platform() != "LINUX":
print("platform ", envs.get_platform(), "Reader To Dataloader")
type_name = "DataLoader"
if type_name == "DataLoader":
return None
else:
return self._get_dataset(dataset_name, context)
def _get_dataset(self, dataset_name, context):
name = "dataset." + dataset_name + "."
reader_class = envs.get_global_env(name + "data_converter")
reader_class_name = envs.get_global_env(name + "reader_class_name",
"Reader")
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../../utils', 'dataset_instance.py')
sparse_slots = envs.get_global_env(name + "sparse_slots", "").strip()
dense_slots = envs.get_global_env(name + "dense_slots", "").strip()
if sparse_slots == "" and dense_slots == "":
pipe_cmd = "python {} {} {} {}".format(reader, reader_class,
reader_class_name,
context["config_yaml"])
else:
if sparse_slots == "":
sparse_slots = "?"
if dense_slots == "":
dense_slots = "?"
padding = envs.get_global_env(name + "padding", 0)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", context["config_yaml"], "fake",
sparse_slots.replace(" ", "?"),
dense_slots.replace(" ", "?"), str(padding))
batch_size = envs.get_global_env(name + "batch_size")
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_batch_size(batch_size)
dataset.set_pipe_command(pipe_cmd)
train_data_path = envs.get_global_env(name + "data_path")
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
if context["engine"] == EngineMode.LOCAL_CLUSTER:
file_list = context["fleet"].split_files(file_list)
dataset.set_filelist(file_list)
for model_dict in context["env"]["phase"]:
if model_dict["dataset_name"] == dataset_name:
model = context["model"][model_dict["name"]]["model"]
thread_num = int(model_dict["thread_num"])
dataset.set_thread(thread_num)
if context["is_infer"]:
inputs = model._infer_data_var
else:
inputs = model._data_var
dataset.set_use_var(inputs)
break
return dataset
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
__all__ = [
"InstanceBase", "SingleInstance", "PSInstance", "PslibInstance",
"CollectiveInstance"
]
class InstanceBase(object):
"""R
"""
def __init__(self, context):
pass
def instance(self, context):
pass
class SingleInstance(InstanceBase):
def __init__(self, context):
print("Running SingleInstance.")
pass
def instance(self, context):
context['status'] = 'network_pass'
class PSInstance(InstanceBase):
def __init__(self, context):
print("Running PSInstance.")
pass
def instance(self, context):
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
role = PaddleCloudRoleMaker()
fleet.init(role)
context['fleet'] = fleet
context['status'] = 'network_pass'
class PslibInstance(InstanceBase):
def __init__(self, context):
print("Running PslibInstance.")
pass
def instance(self, context):
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
fleet.init()
context['fleet'] = fleet
context['status'] = 'network_pass'
class CollectiveInstance(InstanceBase):
def __init__(self, context):
print("Running CollectiveInstance.")
pass
def instance(self, context):
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
role = PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
context['fleet'] = fleet
context['status'] = 'network_pass'
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.trainers.framework.dataset import DataLoader, QueueDataset
__all__ = [
"NetworkBase", "SingleNetwork", "PSNetwork", "PslibNetwork",
"CollectiveNetwork"
]
class NetworkBase(object):
"""R
"""
def __init__(self, context):
pass
def build_network(self, context):
pass
class SingleNetwork(NetworkBase):
"""R
"""
def __init__(self, context):
print("Running SingleNetwork.")
pass
def build_network(self, context):
context["model"] = {}
for model_dict in context["env"]["phase"]:
context["model"][model_dict["name"]] = {}
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
if context["is_infer"]:
model._infer_data_var = model.input_data(
is_infer=context["is_infer"],
dataset_name=model_dict["dataset_name"])
else:
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(
is_infer=context["is_infer"])
data_loader = DataLoader(context)
data_loader.get_dataloader(context, dataset_name,
model._data_loader)
if context["is_infer"]:
model.net(model._infer_data_var,
context["is_infer"])
else:
model.net(model._data_var, context["is_infer"])
optimizer = model.optimizer()
optimizer.minimize(model._cost)
context["model"][model_dict["name"]][
"main_program"] = train_program
context["model"][model_dict["name"]][
"startup_program"] = startup_program
context["model"][model_dict["name"]]["scope"] = scope
context["model"][model_dict["name"]]["model"] = model
context["model"][model_dict["name"]][
"default_main_program"] = train_program.clone()
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
context)
context["status"] = "startup_pass"
class PSNetwork(NetworkBase):
def __init__(self, context):
print("Running PSNetwork.")
pass
def build_network(self, context):
context["model"] = {}
if len(context["env"]["phase"]) > 1:
warnings.warn(
"Cluster Train Only Support One Phase.",
category=UserWarning,
stacklevel=2)
model_dict = context["env"]["phase"][0]
context["model"][model_dict["name"]] = {}
dataset_name = model_dict["dataset_name"]
model_path = model_dict["model"].replace(
"{workspace}", envs.path_adapter(context["env"]["workspace"]))
model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"])
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=False)
data_loader = DataLoader(context)
data_loader.get_dataloader(context, dataset_name,
model._data_loader)
model.net(model._data_var, False)
optimizer = model.optimizer()
strategy = self._build_strategy(context)
optimizer = context["fleet"].distributed_optimizer(optimizer, strategy)
optimizer.minimize(model._cost)
context["model"][model_dict["name"]]["main_program"] = context[
"fleet"].main_program
context["model"][model_dict["name"]]["startup_program"] = context[
"fleet"].startup_program
context["model"][model_dict["name"]]["scope"] = fluid.global_scope()
context["model"][model_dict["name"]]["model"] = model
context["model"][model_dict["name"]]["default_main_program"] = context[
"fleet"].main_program.clone()
if context["fleet"].is_server():
self._server(context)
else:
context["fleet"].init_worker()
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
dataset["name"], context)
context["status"] = "startup_pass"
def _build_strategy(self, context):
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
mode = envs.get_runtime_environ("train.trainer.strategy")
assert mode in ["async", "geo", "sync", "half_async"]
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
assert strategy is not None
context["strategy"] = strategy
return strategy
def _server(self, context):
init_model_path = envs.get_global_env(
"runner." + context["runner_name"] + ".init_model_path",
default_value="")
context["fleet"].init_server(init_model_path)
context["fleet"].run_server()
context['status'] = "terminal_pass"
class PslibNetwork(NetworkBase):
def __init__(self, context):
print("Running PslibNetwork.")
pass
def build_network(self, context):
context["model"] = {}
if len(context["env"]["phase"]) > 1:
warnings.warn(
"Cluster Train Only Support One Phase.",
category=UserWarning,
stacklevel=2)
model_dict = context["env"]["phase"][0]
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
dataset_name = model_dict["dataset_name"]
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
with fluid.scope_guard(scope):
context["model"][model_dict["name"]] = {}
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model = envs.lazy_instance_by_fliename(
model_path, "Model")(context["env"])
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=False)
data_loader = DataLoader(context)
data_loader.get_dataloader(context, dataset_name,
model._data_loader)
model.net(model._data_var, False)
optimizer = model.optimizer()
optimizer = context["fleet"].distributed_optimizer(
optimizer)
optimizer.minimize([model._cost], [fluid.global_scope()])
context["model"][model_dict["name"]][
"main_program"] = train_program
context["model"][model_dict["name"]][
"startup_program"] = startup_program
context["model"][model_dict["name"]]["scope"] = scope
context["model"][model_dict["name"]]["model"] = model
context["model"][model_dict["name"]][
"default_main_program"] = train_program.clone()
if context["fleet"].is_server():
self._server(context)
else:
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(
dataset["name"], context)
context["status"] = "startup_pass"
def _server(self, context):
context["fleet"].run_server()
context['status'] = "terminal_pass"
class CollectiveNetwork(NetworkBase):
def __init__(self, context):
print("Running CollectiveNetwork.")
pass
def build_network(self, context):
context["model"] = {}
if len(context["env"]["phase"]) > 1:
warnings.warn(
"Cluster Train Only Support One Phase.",
category=UserWarning,
stacklevel=2)
model_dict = context["env"]["phase"][0]
context["model"][model_dict["name"]] = {}
dataset_name = model_dict["dataset_name"]
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
with fluid.program_guard(train_program, startup_program):
with fluid.scope_guard(scope):
model_path = model_dict["model"].replace(
"{workspace}",
envs.path_adapter(context["env"]["workspace"]))
model = envs.lazy_instance_by_fliename(model_path,
"Model")(context["env"])
model._data_var = model.input_data(
dataset_name=model_dict["dataset_name"])
if envs.get_global_env("dataset." + dataset_name +
".type") == "DataLoader":
model._init_dataloader(is_infer=False)
data_loader = DataLoader(context)
data_loader.get_dataloader(context, dataset_name,
model._data_loader)
model.net(model._data_var, False)
optimizer = model.optimizer()
strategy = self._build_strategy(context)
optimizer = context["fleet"].distributed_optimizer(optimizer,
strategy)
optimizer.minimize(model._cost)
context["model"][model_dict["name"]]["main_program"] = context[
"fleet"].main_program
context["model"][model_dict["name"]][
"startup_program"] = startup_program
context["model"][model_dict["name"]]["scope"] = scope
context["model"][model_dict["name"]]["model"] = model
context["model"][model_dict["name"]][
"default_main_program"] = train_program
context["dataset"] = {}
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
dataset_class = QueueDataset(context)
context["dataset"][dataset[
"name"]] = dataset_class.create_dataset(dataset["name"],
context)
context["status"] = "startup_pass"
def _build_strategy(self, context):
from paddle.fluid.incubate.fleet.collective import DistributedStrategy
exec_strategy = fluid.ExecutionStrategy()
strategy = DistributedStrategy()
strategy.exec_strategy = exec_strategy
context["strategy"] = strategy
return strategy
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import time
import warnings
import datetime
import paddle.fluid as fluid
from paddlerec.core.utils import envs
__all__ = [
"RunnerBase", "SingleRunner", "PSRunner", "CollectiveRunner", "PslibRunner"
]
class RunnerBase(object):
"""R
"""
def __init__(self, context):
pass
def exuctor(self, context):
pass
def _run(self, context, model_dict):
reader_name = model_dict["dataset_name"]
name = "dataset." + reader_name + "."
if envs.get_global_env(name + "type") == "DataLoader":
self._executor_dataloader_train(model_dict, context)
else:
self._executor_dataset_train(model_dict, context)
def _executor_dataset_train(self, model_dict, context):
reader_name = model_dict["dataset_name"]
model_name = model_dict["name"]
model_class = context["model"][model_dict["name"]]["model"]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + context["runner_name"] +
".print_interval", 20))
scope = context["model"][model_name]["scope"]
program = context["model"][model_name]["main_program"]
reader = context["dataset"][reader_name]
with fluid.scope_guard(scope):
if context["is_infer"]:
metrics = model_class.get_infer_results()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
context["exe"].infer_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
else:
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
with fluid.scope_guard(scope):
context["exe"].train_from_dataset(
program=program,
dataset=reader,
fetch_list=fetch_vars,
fetch_info=fetch_alias,
print_period=fetch_period)
def _executor_dataloader_train(self, model_dict, context):
model_name = model_dict["name"]
model_class = context["model"][model_dict["name"]]["model"]
if context["is_infer"]:
program = context["model"][model_name]["main_program"]
elif context["is_fleet"]:
if context["fleet_mode"].upper() == "PS":
program = self._get_ps_program(model_dict, context)
elif context["fleet_mode"].upper() == "COLLECTIVE":
program = context["model"][model_name]["main_program"]
elif not context["is_fleet"]:
if context["device"].upper() == "CPU":
program = self._get_single_cpu_program(model_dict, context)
elif context["device"].upper() == "GPU":
program = self._get_single_gpu_program(model_dict, context)
reader_name = model_dict["dataset_name"]
fetch_vars = []
fetch_alias = []
fetch_period = int(
envs.get_global_env("runner." + context["runner_name"] +
".print_interval", 20))
if context["is_infer"]:
metrics = model_class.get_infer_results()
else:
metrics = model_class.get_metrics()
if metrics:
fetch_vars = metrics.values()
fetch_alias = metrics.keys()
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("batch"))
for name, var in metrics.items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
reader = context["model"][model_dict["name"]]["model"]._data_loader
reader.start()
batch_id = 0
scope = context["model"][model_name]["scope"]
with fluid.scope_guard(scope):
try:
while True:
metrics_rets = context["exe"].run(
program=program, fetch_list=metrics_varnames)
metrics = [batch_id]
metrics.extend(metrics_rets)
if batch_id % fetch_period == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
def _get_strategy(self, model_dict, context):
_build_strategy = fluid.BuildStrategy()
_exe_strategy = fluid.ExecutionStrategy()
# 0: kCoeffNumDevice; 1: One; 2: Customized
_gradient_scale_strategy = model_dict.get("gradient_scale_strategy", 0)
if _gradient_scale_strategy == 0:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.CoeffNumDevice
elif _gradient_scale_strategy == 1:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.One
elif _gradient_scale_strategy == 2:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
else:
raise ValueError(
"Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]."
)
_build_strategy.gradient_scale_strategy = gradient_scale_strategy
if "thread_num" in model_dict and model_dict["thread_num"] > 1:
_build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
_exe_strategy.num_threads = model_dict["thread_num"]
os.environ['CPU_NUM'] = str(_exe_strategy.num_threads)
return _exe_strategy, _build_strategy
def _get_single_gpu_program(self, model_dict, context):
model_name = model_dict["name"]
return context["model"][model_name]["main_program"].clone()
def _get_single_cpu_program(self, model_dict, context):
model_name = model_dict["name"]
model_class = context["model"][model_dict["name"]]["model"]
program = context["model"][model_name]["main_program"].clone()
_exe_strategy, _build_strategy = self._get_strategy(model_dict,
context)
program = fluid.compiler.CompiledProgram(program).with_data_parallel(
loss_name=model_class.get_avg_cost().name,
build_strategy=_build_strategy,
exec_strategy=_exe_strategy)
return program
def _get_ps_program(self, model_dict, context):
model_name = model_dict["name"]
model_class = context["model"][model_dict["name"]]["model"]
program = context["model"][model_name]["main_program"].clone()
_build_strategy = context["strategy"].get_build_strategy()
_exe_strategy = context["strategy"].get_execute_strategy()
if "thread_num" in model_dict and model_dict["thread_num"] > 1:
_build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
_exe_strategy.num_threads = model_dict["thread_num"]
os.environ['CPU_NUM'] = str(_exe_strategy.num_threads)
_gradient_scale_strategy = model_dict.get("gradient_scale_strategy", 0)
if _gradient_scale_strategy == 0:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.CoeffNumDevice
elif _gradient_scale_strategy == 1:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.One
elif _gradient_scale_strategy == 2:
gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
else:
raise ValueError(
"Unsurpported config. gradient_scale_strategy must be one of [0, 1, 2]."
)
_build_strategy.gradient_scale_strategy = gradient_scale_strategy
program = fluid.compiler.CompiledProgram(program).with_data_parallel(
loss_name=model_class.get_avg_cost().name,
build_strategy=_build_strategy,
exec_strategy=_exe_strategy)
return program
def save(self, epoch_id, context, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
name = "runner." + context["runner_name"] + "."
save_interval = int(
envs.get_global_env(name + "save_inference_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env(
name + "save_inference_feed_varnames", [])
fetch_varnames = envs.get_global_env(
name + "save_inference_fetch_varnames", [])
if feed_varnames is None or fetch_varnames is None or feed_varnames == "" or fetch_varnames == "" or \
len(feed_varnames) == 0 or len(fetch_varnames) == 0:
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env(name + "save_inference_path", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
context["fleet"].save_inference_model(
context["exe"], dirname, feed_varnames, fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, context["exe"])
def save_persistables():
name = "runner." + context["runner_name"] + "."
save_interval = int(
envs.get_global_env(name + "save_checkpoint_interval", -1))
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env(name + "save_checkpoint_path", None)
if dirname is None or dirname == "":
return
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
context["fleet"].save_persistables(context["exe"], dirname)
else:
fluid.io.save_persistables(context["exe"], dirname)
save_persistables()
save_inference_model()
class SingleRunner(RunnerBase):
"""R
"""
def __init__(self, context):
print("Running SingleRunner.")
pass
def run(self, context):
epochs = int(
envs.get_global_env("runner." + context["runner_name"] +
".epochs"))
for epoch in range(epochs):
for model_dict in context["env"]["phase"]:
begin_time = time.time()
self._run(context, model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, use time: {}".format(epoch, seconds))
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"default_main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
self.save(epoch, context)
context["status"] = "terminal_pass"
class PSRunner(RunnerBase):
def __init__(self, context):
print("Running PSRunner.")
pass
def run(self, context):
epochs = int(
envs.get_global_env("runner." + context["runner_name"] +
".epochs"))
model_dict = context["env"]["phase"][0]
for epoch in range(epochs):
begin_time = time.time()
self._run(context, model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, use time: {}".format(epoch, seconds))
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
self.save(epoch, context, True)
context["status"] = "terminal_pass"
class CollectiveRunner(RunnerBase):
def __init__(self, context):
print("Running CollectiveRunner.")
pass
def run(self, context):
epochs = int(
envs.get_global_env("runner." + context["runner_name"] +
".epochs"))
model_dict = context["env"]["phase"][0]
for epoch in range(epochs):
begin_time = time.time()
self._run(context, model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, use time: {}".format(epoch, seconds))
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"default_main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
self.save(epoch, context, True)
context["status"] = "terminal_pass"
class PslibRunner(RunnerBase):
def __init__(self, context):
print("Running PSRunner.")
pass
def run(self, context):
context["fleet"].init_worker()
model_dict = context["env"]["phase"][0]
epochs = int(
envs.get_global_env("runner." + context["runner_name"] +
".epochs"))
for epoch in range(epochs):
begin_time = time.time()
self._run(context, model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, use time: {}".format(epoch, seconds))
"""
# online Training Can do more, As shown below:
begin_day = datetime.datetime.strptime("begin_day_d", '%Y%m%d')
days = int(
envs.get_global_env("runner." + context["runner_name"] + ".days"))
for day in range(days):
for hour in range(24):
day = begin_day + datetime.timedelta(days=day, hours=hour)
day_s = day.strftime('%Y%m%d/%H')
for dataset in context["env"]["dataset"]:
if dataset["type"] != "DataLoader":
name = dataset["name"]
train_data_path = envs.get_global_env(name +
"data_path")
train_data_path = os.path.join(train_data_path, day_s)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
context["dataset"][name].set_filelist(file_list)
for epoch in range(epochs):
begin_time = time.time()
self._run(context, model_dict)
end_time = time.time()
seconds = end_time - begin_time
print("epoch {} done, use time: {}".format(epoch, seconds))
with fluid.scope_guard(context["model"][model_dict["name"]]
["scope"]):
train_prog = context["model"][model_dict["name"]][
"default_main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
self.save(epoch, context, True)
"""
context["status"] = "terminal_pass"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
__all__ = ["StartupBase", "SingleStartup", "PSStartup", "CollectiveStartup"]
class StartupBase(object):
"""R
"""
def __init__(self, context):
pass
def startup(self, context):
pass
def load(self, context, is_fleet=False, main_program=None):
dirname = envs.get_global_env(
"runner." + context["runner_name"] + ".init_model_path", None)
if dirname is None or dirname == "":
return
print("going to load ", dirname)
if is_fleet:
context["fleet"].load_persistables(context["exe"], dirname)
else:
fluid.io.load_persistables(
context["exe"], dirname, main_program=main_program)
class SingleStartup(StartupBase):
"""R
"""
def __init__(self, context):
print("Running SingleStartup.")
pass
def startup(self, context):
for model_dict in context["env"]["phase"]:
with fluid.scope_guard(context["model"][model_dict["name"]][
"scope"]):
train_prog = context["model"][model_dict["name"]][
"main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
self.load(context, main_program=train_prog)
context["status"] = "train_pass"
class PSStartup(StartupBase):
def __init__(self, context):
print("Running PSStartup.")
pass
def startup(self, context):
model_dict = context["env"]["phase"][0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
train_prog = context["model"][model_dict["name"]]["main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
self.load(context, True)
context["status"] = "train_pass"
class CollectiveStartup(StartupBase):
def __init__(self, context):
print("Running CollectiveStartup.")
pass
def startup(self, context):
model_dict = context["env"]["phase"][0]
with fluid.scope_guard(context["model"][model_dict["name"]]["scope"]):
train_prog = context["model"][model_dict["name"]][
"default_main_program"]
startup_prog = context["model"][model_dict["name"]][
"startup_program"]
with fluid.program_guard(train_prog, startup_prog):
context["exe"].run(startup_prog)
self.load(context, True)
context["status"] = "train_pass"
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
__all__ = ["TerminalBase", "PSTerminalBase"]
class TerminalBase(object):
"""R
"""
def __init__(self, context):
pass
def terminal(self, context):
print("PaddleRec Finish")
class PSTerminal(TerminalBase):
"""R
"""
def __init__(self, context):
pass
def terminal(self, context):
context["fleet"].stop_worker()
print("PaddleRec Finish")
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
General Trainer, applicable to many situations: Single/Cluster/Local_Cluster + PS/COLLECTIVE
"""
from __future__ import print_function
import os
import time
import warnings
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer, EngineMode, FleetMode, Device
from paddlerec.core.trainers.framework.dataset import *
from paddlerec.core.trainers.framework.runner import *
from paddlerec.core.trainers.framework.instance import *
from paddlerec.core.trainers.framework.network import *
from paddlerec.core.trainers.framework.startup import *
class GeneralTrainer(Trainer):
"""
Trainer for various situations.
"""
def __init__(self, config=None):
Trainer.__init__(self, config)
self.processor_register()
self.abs_dir = os.path.dirname(os.path.abspath(__file__))
self.runner_env_name = "runner." + self._context["runner_name"]
def processor_register(self):
print("processor_register begin")
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('network_pass', self.network)
self.regist_context_processor('startup_pass', self.startup)
self.regist_context_processor('train_pass', self.runner)
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
instance_class_path = envs.get_global_env(
self.runner_env_name + ".instance_class_path", default_value=None)
if instance_class_path:
instance_class = envs.lazy_instance_by_fliename(
instance_class_path, "Instance")(context)
else:
if self.engine == EngineMode.SINGLE:
instance_class_name = "SingleInstance"
elif self.fleet_mode == FleetMode.PSLIB:
instance_class_name = "PslibInstance"
elif self.fleet_mode == FleetMode.PS:
instance_class_name = "PSInstance"
elif self.fleet_mode == FleetMode.COLLECTIVE:
instance_class_name = "CollectiveInstance"
else:
raise ValueError("Instance Init Error")
instance_path = os.path.join(self.abs_dir, "framework",
"instance.py")
instance_class = envs.lazy_instance_by_fliename(
instance_path, instance_class_name)(context)
instance_class.instance(context)
def network(self, context):
network_class_path = envs.get_global_env(
self.runner_env_name + ".network_class_path", default_value=None)
if network_class_path:
network_class = envs.lazy_instance_by_fliename(network_class_path,
"Network")(context)
else:
if self.engine == EngineMode.SINGLE:
network_class_name = "SingleNetwork"
elif self.fleet_mode == FleetMode.PSLIB:
network_class_name = "PslibNetwork"
elif self.fleet_mode == FleetMode.PS:
network_class_name = "PSNetwork"
elif self.fleet_mode == FleetMode.COLLECTIVE:
network_class_name = "CollectiveNetwork"
else:
raise ValueError("NetWork Init Error")
network_path = os.path.join(self.abs_dir, "framework",
"network.py")
network_class = envs.lazy_instance_by_fliename(
network_path, network_class_name)(context)
network_class.build_network(context)
def startup(self, context):
startup_class_path = envs.get_global_env(
self.runner_env_name + ".startup_class_path", default_value=None)
if startup_class_path:
startup_class = envs.lazy_instance_by_fliename(startup_class_path,
"Startup")(context)
else:
if self.engine == EngineMode.SINGLE:
startup_class_name = "SingleStartup"
elif self.fleet_mode == FleetMode.PS or self.fleet_mode == FleetMode.PSLIB:
startup_class_name = "PSStartup"
elif self.fleet_mode == FleetMode.COLLECTIVE:
startup_class_name = "CollectiveStartup"
else:
raise ValueError("Startup Init Error")
startup_path = os.path.join(self.abs_dir, "framework",
"startup.py")
startup_class = envs.lazy_instance_by_fliename(
startup_path, startup_class_name)(context)
startup_class.startup(context)
def runner(self, context):
runner_class_path = envs.get_global_env(
self.runner_env_name + ".runner_class_paht", default_value=None)
if runner_class_path:
runner_class = envs.lazy_instance_by_fliename(runner_class_path,
"Runner")(context)
else:
if self.engine == EngineMode.SINGLE:
runner_class_name = "SingleRunner"
elif self.fleet_mode == FleetMode.PSLIB:
runner_class_name = "PslibRunner"
elif self.fleet_mode == FleetMode.PS:
runner_class_name = "PSRunner"
elif self.fleet_mode == FleetMode.COLLECTIVE:
runner_class_name = "CollectiveRunner"
else:
raise ValueError("Runner Init Error")
runner_path = os.path.join(self.abs_dir, "framework", "runner.py")
runner_class = envs.lazy_instance_by_fliename(
runner_path, runner_class_name)(context)
runner_class.run(context)
def terminal(self, context):
terminal_class_path = envs.get_global_env(
self.runner_env_name + ".terminal_class_path", default_value=None)
if terminal_class_path:
terminal_class = envs.lazy_instance_by_fliename(
terminal_class_path, "Terminal")(context)
terminal_class.terminal(context)
else:
terminal_class_name = "TerminalBase"
if self.engine != EngineMode.SINGLE and self.fleet_mode != FleetMode.COLLECTIVE:
terminal_class_name = "PSTerminal"
terminal_path = os.path.join(self.abs_dir, "framework",
"terminal.py")
terminal_class = envs.lazy_instance_by_fliename(
terminal_path, terminal_class_name)(context)
terminal_class.terminal(context)
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import datetime
import os
import time
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer
class OnlineLearningTrainer(TranspileTrainer):
def processor_register(self):
role = PaddleCloudRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX" and envs.get_global_env(
"dataset_class", None, "train.reader") != "DataLoader":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass',
self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
mode = envs.get_runtime_environ("train.trainer.strategy")
assert mode in ["async", "geo", "sync", "half_async"]
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
assert strategy is not None
self.strategy = strategy
return strategy
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
print("online learning can only support LINUX only")
context['status'] = 'terminal_pass'
def _get_dataset(self, state="TRAIN", hour=None):
if state == "TRAIN":
inputs = self.model.get_inputs()
namespace = "train.reader"
train_data_path = envs.get_global_env("train_data_path", None,
namespace)
else:
inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader"
train_data_path = envs.get_global_env("test_data_path", None,
namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads"))
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state,
self._config_yaml)
if train_data_path.startswith("paddlerec::"):
package_base = envs.get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
train_data_path = os.path.join(package_base,
train_data_path.split("::")[1])
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
if hour is not None:
train_data_path = os.path.join(train_data_path, hour)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
self.files = file_list
dataset.set_filelist(self.files)
return dataset
def dataset_train(self, context):
fleet.init_worker()
days = envs.get_global_env("train.days")
begin_day = datetime.datetime.strptime("begin_day_d", '%Y%m%d')
for day in range(days):
for hour in range(24):
day = begin_day + datetime.timedelta(days=day, hours=hour)
day_s = day.strftime('%Y%m%d/%H')
i = day.strftime('%Y%m%d_%H')
dataset = self._get_dataset(hour=day_s)
ins = self._get_dataset_ins()
begin_time = time.time()
self._exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
end_time = time.time()
times = end_time - begin_time
print("epoch {} using time {}, speed {:.2f} lines/s".format(
i, times, ins / times))
self.save(i, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'infer_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.trainers.single_trainer import SingleTrainer
from paddlerec.core.utils import envs
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = [
"TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info", "TDM_Tree_Emb"
]
class TDMSingleTrainer(SingleTrainer):
def startup(self, context):
namespace = "train.startup"
load_persistables = envs.get_global_env("single.load_persistables",
False, namespace)
persistables_model_path = envs.get_global_env(
"single.persistables_model_path", "", namespace)
load_tree = envs.get_global_env("tree.load_tree", False, namespace)
self.tree_layer_path = envs.get_global_env("tree.tree_layer_path", "",
namespace)
self.tree_travel_path = envs.get_global_env("tree.tree_travel_path",
"", namespace)
self.tree_info_path = envs.get_global_env("tree.tree_info_path", "",
namespace)
self.tree_emb_path = envs.get_global_env("tree.tree_emb_path", "",
namespace)
save_init_model = envs.get_global_env("single.save_init_model", False,
namespace)
init_model_path = envs.get_global_env("single.init_model_path", "",
namespace)
self._exe.run(fluid.default_startup_program())
if load_persistables:
# 从paddle二进制模型加载参数
fluid.io.load_persistables(
executor=self._exe,
dirname=persistables_model_path,
main_program=fluid.default_main_program())
logger.info("Load persistables from \"{}\"".format(
persistables_model_path))
if load_tree:
# covert tree to tensor, set it into Fluid's variable.
for param_name in special_param:
param_t = fluid.global_scope().find_var(param_name).get_tensor(
)
param_array = self._tdm_prepare(param_name)
if param_name == 'TDM_Tree_Emb':
param_t.set(param_array.astype('float32'), self._place)
else:
param_t.set(param_array.astype('int32'), self._place)
if save_init_model:
logger.info("Begin Save Init model.")
fluid.io.save_persistables(
executor=self._exe, dirname=init_model_path)
logger.info("End Save Init model.")
context['status'] = 'train_pass'
def _tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self._tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self._tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self._tdm_info_prepare()
return info_array
elif param_name == "TDM_Tree_Emb":
emb_array = self._tdm_emb_prepare()
return emb_array
else:
raise " {} is not a special tdm param name".format(param_name)
def _tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(travel_array.shape[
0]))
return travel_array
def _tdm_emb_prepare(self):
"""load tdm tree param from npy/list file"""
emb_array = np.load(self.tree_emb_path)
logger.info("TDM Tree node nums from emb: {}".format(emb_array.shape[
0]))
return emb_array
def _tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def _tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with DistributeTranspiler
"""
import os
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddlerec.core.trainer import Trainer
from paddlerec.core.utils import envs
from paddlerec.core.utils import dataloader_instance
from paddlerec.core.reader import SlotReader
class TranspileTrainer(Trainer):
def __init__(self, config=None):
Trainer.__init__(self, config)
device = envs.get_global_env("train.device", "cpu")
if device == 'gpu':
self._place = fluid.CUDAPlace(0)
self._exe = fluid.Executor(self._place)
self.processor_register()
self.model = None
self.inference_models = []
self.increment_models = []
def processor_register(self):
print(
"Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first"
)
def _get_dataloader(self, state="TRAIN"):
if state == "TRAIN":
dataloader = self.model._data_loader
namespace = "train.reader"
class_name = "TrainReader"
else:
dataloader = self.model._infer_data_loader
namespace = "evaluate.reader"
class_name = "EvaluateReader"
sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace)
batch_size = envs.get_global_env("batch_size", None, namespace)
print("batch_size: {}".format(batch_size))
if sparse_slots is None and dense_slots is None:
reader_class = envs.get_global_env("class", None, namespace)
reader = dataloader_instance.dataloader(reader_class, state,
self._config_yaml)
reader_class = envs.lazy_instance_by_fliename(reader_class,
class_name)
reader_ins = reader_class(self._config_yaml)
else:
reader = dataloader_instance.slotdataloader("", state,
self._config_yaml)
reader_ins = SlotReader(self._config_yaml)
if hasattr(reader_ins, 'generate_batch_from_trainfiles'):
dataloader.set_sample_list_generator(reader)
else:
dataloader.set_sample_generator(reader, batch_size)
debug_mode = envs.get_global_env("reader_debug_mode", False, namespace)
if debug_mode:
print("--- DataLoader Debug Mode Begin , show pre 10 data ---")
for idx, line in enumerate(reader()):
print(line)
if idx >= 9:
break
print("--- DataLoader Debug Mode End , show pre 10 data ---")
exit(0)
return dataloader
def _get_dataset_ins(self):
count = 0
for f in self.files:
for _, _ in enumerate(open(f, 'r')):
count += 1
return count
def _get_dataset(self, state="TRAIN"):
if state == "TRAIN":
inputs = self.model.get_inputs()
namespace = "train.reader"
train_data_path = envs.get_global_env("train_data_path", None,
namespace)
else:
inputs = self.model.get_infer_inputs()
namespace = "evaluate.reader"
train_data_path = envs.get_global_env("test_data_path", None,
namespace)
sparse_slots = envs.get_global_env("sparse_slots", None, namespace)
dense_slots = envs.get_global_env("dense_slots", None, namespace)
threads = int(envs.get_runtime_environ("train.trainer.threads"))
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
if sparse_slots is None and dense_slots is None:
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state,
self._config_yaml)
else:
if sparse_slots is None:
sparse_slots = "#"
if dense_slots is None:
dense_slots = "#"
padding = envs.get_global_env("padding", 0, namespace)
pipe_cmd = "python {} {} {} {} {} {} {} {}".format(
reader, "slot", "slot", self._config_yaml, namespace, \
sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding))
if train_data_path.startswith("paddlerec::"):
package_base = envs.get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
train_data_path = os.path.join(package_base,
train_data_path.split("::")[1])
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
self.files = file_list
dataset.set_filelist(self.files)
debug_mode = envs.get_global_env("reader_debug_mode", False, namespace)
if debug_mode:
print("--- Dataset Debug Mode Begin , show pre 10 data of {}---".
format(file_list[0]))
os.system("cat {} | {} | head -10".format(file_list[0], pipe_cmd))
print("--- Dataset Debug Mode End , show pre 10 data of {}---".
format(file_list[0]))
exit(0)
return dataset
def save(self, epoch_id, namespace, is_fleet=False):
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
if epoch_id == -1:
return False
return epoch_id % epoch_interval == 0
def save_inference_model():
save_interval = envs.get_global_env(
"save.inference.epoch_interval", -1, namespace)
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env("save.inference.feed_varnames",
None, namespace)
fetch_varnames = envs.get_global_env(
"save.inference.fetch_varnames", None, namespace)
if feed_varnames is None or fetch_varnames is None:
return
fetch_vars = [
fluid.default_main_program().global_block().vars[varname]
for varname in fetch_varnames
]
dirname = envs.get_global_env("save.inference.dirname", None,
namespace)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_inference_model(self._exe, dirname, feed_varnames,
fetch_vars)
else:
fluid.io.save_inference_model(dirname, feed_varnames,
fetch_vars, self._exe)
self.inference_models.append((epoch_id, dirname))
def save_persistables():
save_interval = envs.get_global_env(
"save.increment.epoch_interval", -1, namespace)
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env("save.increment.dirname", None,
namespace)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
if is_fleet:
fleet.save_persistables(self._exe, dirname)
else:
fluid.io.save_persistables(self._exe, dirname)
self.increment_models.append((epoch_id, dirname))
save_persistables()
save_inference_model()
def instance(self, context):
models = envs.get_global_env("train.model.models")
model_class = envs.lazy_instance_by_fliename(models, "Model")
self.model = model_class(None)
context['status'] = 'init_pass'
def init(self, context):
print("Need to be implement")
context['is_exit'] = True
def dataloader_train(self, context):
print("Need to be implement")
context['is_exit'] = True
def dataset_train(self, context):
print("Need to be implement")
context['is_exit'] = True
def infer(self, context):
infer_program = fluid.Program()
startup_program = fluid.Program()
with fluid.unique_name.guard():
with fluid.program_guard(infer_program, startup_program):
self.model.infer_net()
if self.model._infer_data_loader is None:
context['status'] = 'terminal_pass'
return
reader = self._get_dataloader("Evaluate")
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_infer_results().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
self._exe.run(startup_program)
model_list = self.increment_models
evaluate_only = envs.get_global_env(
'evaluate_only', False, namespace='evaluate')
if evaluate_only:
model_list = [(0, envs.get_global_env(
'evaluate_model_path', "", namespace='evaluate'))]
is_return_numpy = envs.get_global_env(
'is_return_numpy', True, namespace='evaluate')
for (epoch, model_dir) in model_list:
print("Begin to infer No.{} model, model_dir: {}".format(
epoch, model_dir))
program = infer_program.clone()
fluid.io.load_persistables(self._exe, model_dir, program)
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(program=program,
fetch_list=metrics_varnames,
return_numpy=is_return_numpy)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % 2 == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
context['status'] = 'terminal_pass'
def terminal(self, context):
print("clean up and exit")
context['is_exit'] = True
......@@ -18,10 +18,17 @@ from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.utils.envs import get_global_env
from paddlerec.core.utils.envs import get_runtime_environ
from paddlerec.core.reader import SlotReader
from paddlerec.core.trainer import EngineMode
def dataloader_by_name(readerclass, dataset_name, yaml_file):
reader_class = lazy_instance_by_fliename(readerclass, "TrainReader")
def dataloader_by_name(readerclass,
dataset_name,
yaml_file,
context,
reader_class_name="Reader"):
reader_class = lazy_instance_by_fliename(readerclass, reader_class_name)
name = "dataset." + dataset_name + "."
data_path = get_global_env(name + "data_path")
......@@ -31,6 +38,10 @@ def dataloader_by_name(readerclass, dataset_name, yaml_file):
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
if context["engine"] == EngineMode.LOCAL_CLUSTER:
files = context["fleet"].split_files(files)
print("file_list : {}".format(files))
reader = reader_class(yaml_file)
reader.init()
......@@ -57,7 +68,7 @@ def dataloader_by_name(readerclass, dataset_name, yaml_file):
return gen_reader
def slotdataloader_by_name(readerclass, dataset_name, yaml_file):
def slotdataloader_by_name(readerclass, dataset_name, yaml_file, context):
name = "dataset." + dataset_name + "."
reader_name = "SlotReader"
data_path = get_global_env(name + "data_path")
......@@ -68,6 +79,10 @@ def slotdataloader_by_name(readerclass, dataset_name, yaml_file):
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
if context["engine"] == EngineMode.LOCAL_CLUSTER:
files = context["fleet"].split_files(files)
print("file_list: {}".format(files))
sparse = get_global_env(name + "sparse_slots", "#")
if sparse == "":
sparse = "#"
......@@ -101,51 +116,7 @@ def slotdataloader_by_name(readerclass, dataset_name, yaml_file):
return gen_reader
def dataloader(readerclass, train, yaml_file):
if train == "TRAIN":
reader_name = "TrainReader"
namespace = "train.reader"
data_path = get_global_env("train_data_path", None, namespace)
else:
reader_name = "EvaluateReader"
namespace = "evaluate.reader"
data_path = get_global_env("test_data_path", None, namespace)
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
reader_class = lazy_instance_by_fliename(readerclass, reader_name)
reader = reader_class(yaml_file)
reader.init()
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
def slotdataloader(readerclass, train, yaml_file):
def slotdataloader(readerclass, train, yaml_file, context):
if train == "TRAIN":
reader_name = "SlotReader"
namespace = "train.reader"
......@@ -161,6 +132,9 @@ def slotdataloader(readerclass, train, yaml_file):
data_path = os.path.join(package_base, data_path.split("::")[1])
files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)]
if context["engine"] == EngineMode.LOCAL_CLUSTER:
files = context["fleet"].split_files(files)
print("file_list: {}".format(files))
sparse = get_global_env("sparse_slots", "#", namespace)
if sparse == "":
......
......@@ -25,16 +25,14 @@ if len(sys.argv) < 4:
reader_package = sys.argv[1]
if sys.argv[2].upper() == "TRAIN":
reader_name = "TrainReader"
elif sys.argv[2].upper() == "EVALUATE":
reader_name = "EvaluateReader"
else:
if sys.argv[1].upper() == "SLOT":
reader_name = "SlotReader"
namespace = sys.argv[4]
sparse_slots = sys.argv[5].replace("?", " ")
dense_slots = sys.argv[6].replace("?", " ")
padding = int(sys.argv[7])
else:
reader_name = sys.argv[2]
yaml_abs_path = sys.argv[3]
......
......@@ -20,6 +20,8 @@ import socket
import sys
import traceback
import yaml
global_envs = {}
......@@ -61,6 +63,11 @@ def get_trainer():
return train_mode
def get_fleet_mode():
fleet_mode = get_runtime_environ("fleet_mode")
return fleet_mode
def set_global_envs(envs):
assert isinstance(envs, dict)
......
文件模式从 100644 更改为 100755
......@@ -73,7 +73,7 @@ Reader的逻辑需要一个单独的python文件进行描述。我们试写一
1. 首先我们需要引入Reader基类
```python
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
```
2. 创建一个子类,继承Reader的基类,训练所需Reader命名为`TrainerReader`
```python
......@@ -257,12 +257,12 @@ self._data_var.append(self.label_input)
```python
# 引入PaddleRec的Reader基类
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
# 引入PaddleRec的读取yaml配置文件的方法
from paddlerec.core.utils import envs
# 定义TrainReader,需要继承 paddlerec.core.reader.Reader
class TrainReader(Reader):
class Reader(ReaderBase)::
# 数据预处理逻辑,继承自基类
# 如果无需处理, 使用pass跳过该函数的执行
......
......@@ -13,8 +13,8 @@
# limitations under the License.
import paddle.fluid as fluid
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.utils import envs
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......
......@@ -14,10 +14,10 @@
import sys
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
pass
......
......@@ -17,7 +17,7 @@ import paddle.fluid.layers.nn as nn
import paddle.fluid.layers.tensor as tensor
import paddle.fluid.layers.control_flow as cf
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
from paddlerec.core.utils import envs
......@@ -83,9 +83,9 @@ class Model(ModelBase):
mul_cos_neg = nn.cos_sim(neg_tag_emb, mul_text_hid)
cos_neg_all = fluid.layers.sequence_reshape(
input=mul_cos_neg, new_dim=self.neg_size)
#choose max negtive cosine
# choose max negtive cosine
cos_neg = nn.reduce_max(cos_neg_all, dim=1, keep_dim=True)
#calculate hinge loss
# calculate hinge loss
loss_part1 = nn.elementwise_sub(
tensor.fill_constant_batch_size_like(
input=cos_pos,
......
......@@ -16,10 +16,10 @@ import sys
import numpy as np
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
pass
......
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
......@@ -17,7 +17,7 @@ import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -25,8 +25,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
......@@ -86,7 +86,7 @@ class Model(ModelBase):
predict = fluid.layers.scale(sim, scale=5)
self.predict = predict
#auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
# auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
# label=self.label_input,
# num_thresholds=10000,
# slide_steps=20)
......@@ -102,7 +102,7 @@ class Model(ModelBase):
#self._metrics["AUC"] = auc
#self._metrics["BATCH_AUC"] = batch_auc
#cost = fluid.layers.cross_entropy(
# cost = fluid.layers.cross_entropy(
# input=self.predict, label=self.label_input)
cost = fluid.layers.square_error_cost(
self.predict,
......
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
......@@ -17,7 +17,7 @@ import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -25,8 +25,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
......
......@@ -54,14 +54,14 @@ runner:
save_inference_feed_varnames: ["query", "doc_pos"] # feed vars of save inference
save_inference_fetch_varnames: ["cos_sim_0.tmp_0"] # fetch vars of save inference
init_model_path: "" # load model path
fetch_period: 2
print_interval: 2
- name: infer_runner
class: single_infer
# num of epochs
epochs: 1
# device to run training or infer
device: cpu
fetch_period: 1
print_interval: 1
init_model_path: "increment/2" # load model path
# runner will run all the phase in each epoch
......
......@@ -15,7 +15,7 @@
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......
......@@ -13,10 +13,10 @@
# limitations under the License.
from __future__ import print_function
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
pass
......
......@@ -14,10 +14,10 @@
from __future__ import print_function
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
pass
......
......@@ -61,14 +61,14 @@ runner:
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
fetch_period: 1
print_interval: 1
- name: infer_runner
class: single_infer
# num of epochs
epochs: 1
# device to run training or infer
device: cpu
fetch_period: 1
print_interval: 1
init_model_path: "increment/0" # load model path
# runner will run all the phase in each epoch
......
......@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
class EvaluateReader(Reader):
class Reader(ReaderBase):
def init(self):
self.query_slots = envs.get_global_env("hyper_parameters.query_slots",
None, "train.model")
......
......@@ -17,7 +17,7 @@ import paddle.fluid.layers.tensor as tensor
import paddle.fluid.layers.control_flow as cf
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class BowEncoder(object):
......
......@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
self.query_slots = envs.get_global_env("hyper_parameters.query_slots",
None, "train.model")
......
......@@ -56,11 +56,11 @@ runner:
epochs: 3
phase:
- name: train
# - name: train
# model: "{workspace}/model.py"
# dataset_name: dataset_train
# thread_num: 1
- name: infer
model: "{workspace}/model.py"
dataset_name: dataset_train
dataset_name: dataset_infer
thread_num: 1
#- name: infer
# model: "{workspace}/model.py"
# dataset_name: dataset_infer
# thread_num: 1
......@@ -16,10 +16,10 @@ from __future__ import print_function
from collections import defaultdict
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
all_field_id = [
'101', '109_14', '110_14', '127_14', '150_14', '121', '122', '124',
......
......@@ -16,7 +16,7 @@ import numpy as np
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......
......@@ -14,10 +14,10 @@
from __future__ import print_function
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
pass
......
......@@ -15,7 +15,7 @@
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......
......@@ -14,10 +14,10 @@
from __future__ import print_function
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
pass
......
......@@ -15,7 +15,7 @@
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......
文件模式从 100755 更改为 100644
......@@ -49,8 +49,8 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
epochs: 1
class: single_train
epochs: 2
device: cpu
init_model_path: ""
save_checkpoint_interval: 1
......@@ -59,7 +59,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -93,7 +93,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
element_wise_product_list = []
for i in range(self.num_field):
......
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
......@@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml, os
import yaml
import os
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
try:
......@@ -23,7 +24,7 @@ except ImportError:
import pickle
class TrainReader(dg.MultiSlotDataGenerator):
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
......@@ -45,7 +46,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
]
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
# load preprocessed feature dict
# load preprocessed feature dict
self.feat_dict_name = "sample_data/feat_dict_10.pkl2"
self.feat_dict_ = pickle.load(open(self.feat_dict_name, 'rb'))
......@@ -92,6 +93,6 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter
reader = TrainReader("../config.yaml")
reader = Reader("../config.yaml")
reader.init()
reader.run_from_stdin()
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
......@@ -50,7 +50,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -60,7 +60,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -11,16 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
import math
import os
try:
import cPickle as pickle
except ImportError:
import pickle
import paddle.fluid.incubate.data_generator as dg
class TrainReader(dg.MultiSlotDataGenerator):
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
......@@ -97,6 +98,6 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter
reader = TrainReader("../config.yaml")
reader = Reader("../config.yaml")
reader.init()
reader.run_from_stdin()
......@@ -17,7 +17,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......
......@@ -48,7 +48,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -58,7 +58,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -106,7 +106,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
concated = fluid.layers.concat(
[feat_embeddings, first_weights], axis=2)
......
......@@ -48,7 +48,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 2
device: cpu
init_model_path: ""
......@@ -58,7 +58,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -19,8 +19,10 @@ try:
except ImportError:
import pickle
import paddle.fluid.incubate.data_generator as dg
class TrainReader(dg.MultiSlotDataGenerator):
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
......@@ -36,7 +38,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
]
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
# load preprocessed feature dict
# load preprocessed feature dict
self.feat_dict_name = "sample_data/feat_dict_10.pkl2"
self.feat_dict_ = pickle.load(open(self.feat_dict_name, 'rb'))
......@@ -83,6 +85,6 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter
reader = TrainReader("../config.yaml")
reader = Reader("../config.yaml")
reader.init()
reader.run_from_stdin()
......@@ -17,7 +17,7 @@ import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -85,7 +85,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
# sum_square part
summed_features_emb = fluid.layers.reduce_sum(
......
......@@ -46,7 +46,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -56,10 +56,12 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
print_interval: 1
phase:
- name: phase1
model: "{workspace}/model.py"
......
......@@ -15,7 +15,7 @@
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -30,7 +30,7 @@ class Model(ModelBase):
self.act = envs.get_global_env("hyper_parameters.act", "sigmoid")
self.is_sparse = envs.get_global_env("hyper_parameters.is_sparse",
False)
#significant for speeding up the training process
# significant for speeding up the training process
self.use_DataLoader = envs.get_global_env(
"hyper_parameters.use_DataLoader", False)
self.item_count = envs.get_global_env("hyper_parameters.item_count",
......
......@@ -23,11 +23,11 @@ except ImportError:
import numpy as np
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
class TrainReader(Reader):
class Reader(ReaderBase):
def init(self):
self.train_data_path = envs.get_global_env(
"dataset.sample_1.data_path", None)
......
......@@ -17,12 +17,18 @@ workspace: "paddlerec.models.rank.dnn"
# list of dataset
dataset:
- name: dataset_train # name of dataset to distinguish different datasets
- name: dataloader_train # name of dataset to distinguish different datasets
batch_size: 2
type: DataLoader # or QueueDataset
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
- name: dataset_train # name of dataset to distinguish different datasets
batch_size: 2
type: QueueDataset # or DataLoader
data_path: "{workspace}/data/sample_data/train"
sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26"
dense_slots: "dense_var:13"
- name: dataset_infer # name
batch_size: 2
type: DataLoader # or QueueDataset
......@@ -45,14 +51,14 @@ hyper_parameters:
fc_sizes: [512, 256, 128, 32]
# select runner by name
mode: runner1
mode: single_cpu_train
# config of each runner.
# runner is a kind of paddle training class, which wraps the train/infer process.
runner:
- name: runner1
class: single_train
- name: single_cpu_train
class: train
# num of epochs
epochs: 10
epochs: 4
# device to run training or infer
device: cpu
save_checkpoint_interval: 2 # save model interval of epochs
......@@ -63,19 +69,59 @@ runner:
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
- name: runner2
class: single_infer
- name: single_gpu_train
class: train
# num of epochs
epochs: 4
# device to run training or infer
device: gpu
selected_gpus: "2"
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
- name: single_cpu_infer
class: infer
# num of epochs
epochs: 10
epochs: 1
# device to run training or infer
device: cpu
init_model_path: "increment/0" # load model path
- name: local_cluster_cpu_ps_train
class: local_cluster
epochs: 4
device: cpu
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 1
- name: multi_gpu_train
class: train
epochs: 4
device: gpu
selected_gpus: "2,3"
save_checkpoint_interval: 2 # save model interval of epochs
save_inference_interval: 4 # save inference
save_checkpoint_path: "increment" # save checkpoint path
save_inference_path: "inference" # save inference path
save_inference_feed_varnames: [] # feed vars of save inference
save_inference_fetch_varnames: [] # fetch vars of save inference
init_model_path: "" # load model path
print_interval: 10
# runner will run all the phase in each epoch
phase:
- name: phase1
model: "{workspace}/model.py" # user-defined model
dataset_name: dataset_train # select dataset by name
dataset_name: dataloader_train # select dataset by name
thread_num: 1
#- name: phase2
# model: "{workspace}/model.py" # user-defined model
......
......@@ -17,7 +17,7 @@ import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -25,8 +25,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number")
self.sparse_feature_dim = envs.get_global_env(
......
文件模式从 100755 更改为 100644
......@@ -47,7 +47,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -57,7 +57,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -94,7 +94,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, embedding_size_for_all_field
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * (embedding_size * num_field)
# batch_size * num_field * (embedding_size * num_field)
feat_embeddings = feat_embeddings * feat_value
field_aware_feat_embedding = fluid.layers.reshape(
feat_embeddings,
......
......@@ -52,7 +52,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -62,7 +62,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -80,7 +80,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
featuer_generation_input = fluid.layers.reshape(
feat_embeddings,
shape=[0, 1, self.num_field, self.sparse_feature_dim])
......
文件模式从 100755 更改为 100644
......@@ -47,7 +47,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -57,7 +57,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -93,7 +93,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
# sum_square part
summed_features_emb = fluid.layers.reduce_sum(
......
......@@ -47,7 +47,7 @@ mode: train_FM_runner #for FM phase: train_FM_runner for dnn phase: train_DNN_ru
runner:
- name: train_FM_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -57,7 +57,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: train_DNN_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -91,7 +91,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
# sum_square part
summed_features_emb = fluid.layers.reduce_sum(
......
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -85,7 +85,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
concated = fluid.layers.concat(
[feat_embeddings, first_weights], axis=2)
......
文件模式从 100755 更改为 100644
......@@ -46,7 +46,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 2
device: cpu
init_model_path: ""
......@@ -56,7 +56,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
文件模式从 100755 更改为 100644
......@@ -15,7 +15,7 @@
import yaml
import os
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
......@@ -25,7 +25,7 @@ except ImportError:
import pickle
class TrainReader(dg.MultiSlotDataGenerator):
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
......@@ -47,7 +47,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
]
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
# load preprocessed feature dict
# load preprocessed feature dict
self.feat_dict_name = "sample_data/feat_dict_10.pkl2"
self.feat_dict_ = pickle.load(open(self.feat_dict_name, 'rb'))
......@@ -94,7 +94,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter
reader = TrainReader(
reader = Reader(
"../config.yaml") # run this file in original folder to find config.yaml
reader.init()
reader.run_from_stdin()
文件模式从 100755 更改为 100644
......@@ -17,7 +17,7 @@ import math
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......
文件模式从 100755 更改为 100644
......@@ -53,7 +53,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -63,7 +63,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
......@@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml, os
import yaml
import os
from paddlerec.core.reader import Reader
from paddlerec.core.reader import ReaderBase
from paddlerec.core.utils import envs
import paddle.fluid.incubate.data_generator as dg
try:
......@@ -23,7 +24,7 @@ except ImportError:
import pickle
class TrainReader(dg.MultiSlotDataGenerator):
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
......@@ -45,7 +46,7 @@ class TrainReader(dg.MultiSlotDataGenerator):
]
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
# load preprocessed feature dict
# load preprocessed feature dict
self.feat_dict_name = "sample_data/feat_dict_10.pkl2"
self.feat_dict_ = pickle.load(open(self.feat_dict_name, 'rb'))
......@@ -92,6 +93,6 @@ class TrainReader(dg.MultiSlotDataGenerator):
return data_iter
reader = TrainReader("../config.yaml")
reader = Reader("../config.yaml")
reader.init()
reader.run_from_stdin()
文件模式从 100755 更改为 100644
......@@ -18,7 +18,7 @@ from collections import OrderedDict
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -26,8 +26,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -99,7 +99,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
# sum_square part
summed_features_emb = fluid.layers.reduce_sum(
......
文件模式从 100755 更改为 100644
......@@ -50,7 +50,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -60,7 +60,7 @@ runner:
save_inference_path: "inference"
print_interval: 1
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -32,7 +32,7 @@ import paddle
import paddle.fluid as fluid
from paddlerec.core.utils import envs
from paddlerec.core.model import Model as ModelBase
from paddlerec.core.model import ModelBase
class Model(ModelBase):
......@@ -40,8 +40,8 @@ class Model(ModelBase):
ModelBase.__init__(self, config)
def _init_hyper_parameters(self):
self.is_distributed = True if envs.get_trainer(
) == "CtrTrainer" else False
self.is_distributed = True if envs.get_fleet_mode().upper(
) == "PSLIB" else False
self.sparse_feature_number = envs.get_global_env(
"hyper_parameters.sparse_feature_number", None)
self.sparse_feature_dim = envs.get_global_env(
......@@ -107,7 +107,8 @@ class Model(ModelBase):
feat_embeddings_re,
shape=[-1, self.num_field, self.sparse_feature_dim
]) # batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # batch_size * num_field * embedding_size
# batch_size * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value
# ------------------------- Linear Signal --------------------------
......
......@@ -45,7 +45,7 @@ mode: train_runner
runner:
- name: train_runner
trainer_class: single_train
class: single_train
epochs: 1
device: cpu
init_model_path: ""
......@@ -54,7 +54,7 @@ runner:
save_checkpoint_path: "increment"
save_inference_path: "inference"
- name: infer_runner
trainer_class: single_infer
class: single_infer
epochs: 1
device: cpu
init_model_path: "increment/0"
......
......@@ -16,10 +16,11 @@ try:
import cPickle as pickle
except ImportError:
import pickle
import paddle.fluid.incubate.data_generator as dg
class TrainReader(dg.MultiSlotDataGenerator):
class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
......@@ -49,12 +50,11 @@ class TrainReader(dg.MultiSlotDataGenerator):
v = i[1]
for j in v:
s += " " + k + ":" + str(j)
print s.strip()
yield None
return data_iter
reader = TrainReader("../config.yaml")
reader = Reader("../config.yaml")
reader.init()
reader.run_from_stdin()
此差异已折叠。
此差异已折叠。
此差异已折叠。
文件模式从 100755 更改为 100644
此差异已折叠。
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
此差异已折叠。
此差异已折叠。
文件模式从 100755 更改为 100644
此差异已折叠。
此差异已折叠。
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
文件模式从 100755 更改为 100644
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
文件模式从 100755 更改为 100644
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册