get_pserver_program() is deprecated, call get_pserver_programs() to get pserver main and startup in a single call.
Created by: Sherlockxuhy
如果您没有查询到相似问题,为快速解决您的提问,建立issue时请提供如下细节信息:
- 标题:简洁、精准概括您的问题,例如“Insufficient Memory xxx" ”
- 版本、环境信息: 1)PaddlePaddle版本:fluid 1.4 /fluid 1.3 2)CPU:MPI集群 4)系统环境:paddleCloud
- 训练信息:job-0bb5d071a13a2f60
主函数代码如下,参考ctr例子写的分布式ernie的分类fine tune
import os import time import multiprocessing
import paddle.fluid as fluid from multiprocessing import cpu_count
import reader.task_reader as task_reader from model.ernie import ErnieConfig from finetune.classifier import create_model, evaluate from optimization import optimization from utils.args import print_arguments from utils.init import init_pretraining_params, init_checkpoint from finetune_args import parser import numpy as np
def train_loop(args, train_program, train_pyreader, loss, accuracy, graph_vars):
#
reader = task_reader.ClassifyReader(
vocab_path=args.vocab_path,
label_map_config=args.label_map_config,
max_seq_len=args.max_seq_len,
do_lower_case=args.do_lower_case,
in_tokens=args.in_tokens,
random_seed=args.random_seed)
#exe.run(startup_prog)
train_data_generator = reader.data_generator(
input_file=args.train_set,
batch_size=args.batch_size,
epoch=args.epoch,
shuffle=True,
phase="train")
#number of train_data
num_train_examples = reader.get_num_examples(args.train_set)
print("Num train examples: %d" % num_train_examples)
train_pyreader.decorate_tensor_provider(train_data_generator)
print("feed data")
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exec_strategy = fluid.ExecutionStrategy()
build_strategy = fluid.BuildStrategy()
if os.getenv("NUM_THREADS", ""):
exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))
cpu_num = int(os.environ.get('PADDLE_TRAINER_COUNT', cpu_count()))
build_strategy.reduce_strategy = \
fluid.BuildStrategy.ReduceStrategy.Reduce if cpu_num > 1 \
else fluid.BuildStrategy.ReduceStrategy.AllReduce
pe = fluid.ParallelExecutor(
use_cuda=False,
loss_name=loss.name,
main_program=train_program,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
#exe.run(fluid.default_startup_program())
train_pyreader.start()
steps = 0
# if warmup_steps > 0:
# graph_vars["learning_rate"] = scheduled_lr
time_begin = time.time()
print("time_begin")
while True:
try:
steps += 1
if steps % args.skip_steps != 0:
loss_val, auc_val = pe.run(fetch_list=[loss.name,accuracy.name])
loss_val = np.mean(loss_val)
auc_val = np.mean(auc_val)
else:
outputs = evaluate(exe, train_program, train_pyreader,
graph_vars, "train")
current_example, current_epoch = reader.get_train_progress()
time_end = time.time()
used_time = time_end - time_begin
print("epoch: %d, progress: %d/%d, step: %d, ave loss: %f, "
"ave acc: %f, speed: %f steps/s" %
(current_epoch, current_example, num_train_examples,
steps, outputs["loss"], outputs["accuracy"],
args.skip_steps / used_time))
time_begin = time.time()
except fluid.core.EOFException:
#save_path = os.path.join(args.checkpoints, "step_" + str(steps))
#fluid.io.save_persistables(exe, save_path, train_program)
train_pyreader.reset()
break
####loop end
def train():
args = parser.parse_args()
#ernie config,加载一些超参,建立模型
ernie_config = ErnieConfig(args.ernie_config_path)
ernie_config.print_config()
train_pyreader, graph_vars = create_model(
args,
pyreader_name='train_reader',
ernie_config=ernie_config)
print("========create model")
auc_var = graph_vars["accuracy"]
loss = graph_vars["loss"]
optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate)
print("========adam optimizer")
optimizer.minimize(loss)
port = os.getenv("PADDLE_PORT", "6174")
print("========PADDLE_PORT: " + port)
#pserver_endpoints = os.getenv("PADDLE_PSERVERS_IP_PORT_LIST")
#print("========PADDLE_PSERVERS_IP_PORT_LIST: " + pserver_endpoints)#难道是这里有问题?,难道是fluid版本?
pserver_ips = os.getenv("PADDLE_PSERVERS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
args.endpoints = ",".join(eplist)
args.trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
#trainers = int(os.getenv("PADDLE_TRAINERS_NUM"))
args.current_endpoint = os.getenv("POD_IP", "localhost") + ":" + port
#current_endpoint = os.getenv("POD_IP") + ":" + port
args.role = os.getenv("TRAINING_ROLE", "TRAINER")
args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
#trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
#training_role = os.getenv("TRAINING_ROLE")
t = fluid.DistributeTranspiler()
t.transpile(args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
print("=========t.transpile")
if args.role == "PSERVER" or args.role == "pserver":
pserver_prog = t.get_pserver_program(args.current_endpoint)
pserver_startup = t.get_startup_program(args.current_endpoint,
pserver_program=pserver_prog)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(pserver_startup)
print("pserver_startup")
init_pretraining_params(
exe,
args.init_pretraining_params,
main_program=pserver_prog,
use_fp16=args.use_fp16)
print("loaded pretrain model")
exe.run(pserver_prog)
elif args.role == "TRAINER" or args.role == "trainer":
train_prog = t.get_trainer_program()
train_loop(args, train_prog, train_pyreader, loss, auc_var, graph_vars)
if name == 'main': train()