提交 4ece6cc5 编写于 作者: T tangwei

add cluster training

上级 1e953617
...@@ -29,6 +29,9 @@ train: ...@@ -29,6 +29,9 @@ train:
threads: 12 threads: 12
epochs: 10 epochs: 10
trainer: "SingleTraining" trainer: "SingleTraining"
role_maler: "PaddleCloudRoleMaker"
strategy:
mode: "async"
reader: reader:
mode: "dataset" mode: "dataset"
......
...@@ -125,10 +125,6 @@ class Train(object): ...@@ -125,10 +125,6 @@ class Train(object):
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True) optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer return optimizer
def optimize(self):
optimizer = self.optimizer()
optimizer.minimize(self.loss)
class Evaluate(object): class Evaluate(object):
def input(self): def input(self):
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Training use fluid with one node only.
"""
from __future__ import print_function
import os
import time
import numpy as np
import logging
import paddle.fluid as fluid
from .trainer import Trainer
from ..utils import envs
from ..reader import dataset
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def need_save(epoch_id, epoch_interval, is_last=False):
if is_last:
return True
return epoch_id % epoch_interval == 0
class ClusterTrainer(Trainer):
def __init__(self, config=None, yaml_file=None):
Trainer.__init__(self, config, yaml_file)
self.exe = fluid.Executor(fluid.CPUPlace())
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
self.regist_context_processor('train_pass', self.train)
self.regist_context_processor('terminal_pass', self.terminal)
def build_role_maker(self):
role_maker = envs.get_global_env("train.role_maker")
if role_maker == "PaddleCloudRoleMaker":
role = PaddleCloudRoleMaker()
return role
else:
raise ValueError("only support PaddleCloudRoleMaker now")
def build_strategy(self):
mode = envs.get_global_env("train.strategy.mode")
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
return strategy
def instance(self, context):
model_package = __import__(envs.get_global_env("train.model.models"))
train_model = getattr(model_package, 'Train')
self.model = train_model()
context['status'] = 'init_pass'
def init(self, context):
fleet.init(self.build_role_maker())
self.model.input()
self.model.net()
self.model.loss()
self.metrics = self.model.metrics()
self.loss = self.model.avg_loss()
optimizer = self.model.get_optimizer()
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.loss)
if fleet.is_server():
context['status'] = 'server_pass'
else:
context['status'] = 'train_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['status'] = 'wait'
def terminal(self, context):
fleet.stop_worker()
context['is_exit'] = True
def train(self, context):
print("Need to be implement")
context['is_exit'] = True
class ClusterTrainerWithDataloader(ClusterTrainer):
pass
class ClusterTrainerWithDataset(ClusterTrainer):
def _get_dataset(self, inputs, threads, batch_size, pipe_command, train_files_path):
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
file_list = [
os.path.join(train_files_path, x)
for x in os.listdir(train_files_path)
]
dataset.set_filelist(file_list)
return dataset
def save(self, epoch_id):
def save_inference_model():
is_save_inference = envs.get_global_env("save.inference", False)
if not is_save_inference:
return
save_interval = envs.get_global_env("save.inference.epoch_interval", 1)
if not need_save(epoch_id, save_interval, False):
return
feed_varnames = envs.get_global_env("save.inference.feed_varnames", None)
fetch_varnames = envs.get_global_env("save.inference.fetch_varnames", None)
fetch_vars = [fluid.global_scope().vars[varname] for varname in fetch_varnames]
dirname = envs.get_global_env("save.inference.dirname", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
fluid.io.save_inference_model(dirname, feed_varnames, fetch_vars, self.exe)
def save_persistables():
is_save_increment = envs.get_global_env("save.increment", False)
if not is_save_increment:
return
save_interval = envs.get_global_env("save.increment.epoch_interval", 1)
if not need_save(epoch_id, save_interval, False):
return
dirname = envs.get_global_env("save.inference.dirname", None)
assert dirname is not None
dirname = os.path.join(dirname, str(epoch_id))
fluid.io.save_persistables(self.exe, dirname)
is_save = envs.get_global_env("save", False)
if not is_save:
return
save_persistables()
save_inference_model()
def train(self, context):
inputs = self.model.input_vars()
threads = envs.get_global_env("threads")
batch_size = envs.get_global_env("batch_size")
pipe_command = envs.get_global_env("pipe_command")
train_data_path = envs.get_global_env("train_data_path")
dataset = self._get_dataset(inputs, threads, batch_size, pipe_command, train_data_path)
fleet.init_worker()
self.exe.run(fleet.startup_program)
epochs = envs.get_global_env("epochs")
for i in range(epochs):
self.exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=[self.metrics],
fetch_info=["epoch {} auc ".format(i)],
print_period=100)
self.save(i)
context['status'] = 'infer_pass'
def infer(self, context):
context['status'] = 'terminal_pass'
...@@ -62,9 +62,11 @@ class SingleTrainer(Trainer): ...@@ -62,9 +62,11 @@ class SingleTrainer(Trainer):
def init(self, context): def init(self, context):
self.model.input() self.model.input()
self.model.net() self.model.net()
self.model.loss()
self.metrics = self.model.metrics() self.metrics = self.model.metrics()
self.model.optimize() loss = self.model.avg_loss()
optimizer = self.model.get_optimizer()
optimizer.minimize(loss)
# run startup program at once # run startup program at once
self.exe.run(fluid.default_startup_program()) self.exe.run(fluid.default_startup_program())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册