未验证 提交 25fd8885 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #1479 from ccmeteorljh/develop

add paddle_cloud_train for ctr
...@@ -11,7 +11,7 @@ import argparse ...@@ -11,7 +11,7 @@ import argparse
from reader import CityscapeDataset from reader import CityscapeDataset
import reader import reader
import models import models
import time
def add_argument(name, type, default, help): def add_argument(name, type, default, help):
parser.add_argument('--' + name, default=default, type=type, help=help) parser.add_argument('--' + name, default=default, type=type, help=help)
...@@ -141,6 +141,7 @@ if args.parallel: ...@@ -141,6 +141,7 @@ if args.parallel:
batches = dataset.get_batch_generator(batch_size, total_step) batches = dataset.get_batch_generator(batch_size, total_step)
for i, imgs, labels, names in batches: for i, imgs, labels, names in batches:
prev_start_time = time.time()
if args.parallel: if args.parallel:
retv = exe_p.run(fetch_list=[pred.name, loss_mean.name], retv = exe_p.run(fetch_list=[pred.name, loss_mean.name],
feed={'img': imgs, feed={'img': imgs,
...@@ -150,10 +151,12 @@ for i, imgs, labels, names in batches: ...@@ -150,10 +151,12 @@ for i, imgs, labels, names in batches:
feed={'img': imgs, feed={'img': imgs,
'label': labels}, 'label': labels},
fetch_list=[pred, loss_mean]) fetch_list=[pred, loss_mean])
end_time = time.time()
if i % 100 == 0: if i % 100 == 0:
print("Model is saved to", args.save_weights_path) print("Model is saved to", args.save_weights_path)
save_model() save_model()
print("step %s, loss: %s" % (i, np.mean(retv[1]))) print("step {:d}, loss: {:.6f}, step_time_cost: {:.3f}" .format(i,
np.mean(retv[1]), end_time - prev_start_time))
print("Training done. Model is saved to", args.save_weights_path) print("Training done. Model is saved to", args.save_weights_path)
save_model() save_model()
...@@ -73,8 +73,8 @@ def train(train_reader, ...@@ -73,8 +73,8 @@ def train(train_reader,
avg_cost = total_cost / data_count avg_cost = total_cost / data_count
avg_acc = total_acc / data_count avg_acc = total_acc / data_count
print("pass_id: %d, avg_acc: %f, avg_cost: %f" % print("pass_id: %d, avg_acc: %f, avg_cost: %f, pass_time_cost: %f" %
(pass_id, avg_acc, avg_cost)) (pass_id, avg_acc, avg_cost, time.time() - pass_start))
epoch_model = save_dirname + "/" + "epoch" + str(pass_id) epoch_model = save_dirname + "/" + "epoch" + str(pass_id)
fluid.io.save_inference_model(epoch_model, ["words", "label"], acc, exe) fluid.io.save_inference_model(epoch_model, ["words", "label"], acc, exe)
......
...@@ -171,6 +171,7 @@ def train_and_evaluate(train_reader, ...@@ -171,6 +171,7 @@ def train_and_evaluate(train_reader,
for epoch_id in range(global_config.epoch_num): for epoch_id in range(global_config.epoch_num):
data_size, data_count, total_acc, total_cost = 0, 0, 0.0, 0.0 data_size, data_count, total_acc, total_cost = 0, 0, 0.0, 0.0
batch_id = 0 batch_id = 0
epoch_begin_time = time.time()
for data in train_reader(): for data in train_reader():
avg_cost_np, avg_acc_np = exe.run(fluid.default_main_program(), avg_cost_np, avg_acc_np = exe.run(fluid.default_main_program(),
feed=feeder.feed(data), feed=feeder.feed(data),
...@@ -192,8 +193,10 @@ def train_and_evaluate(train_reader, ...@@ -192,8 +193,10 @@ def train_and_evaluate(train_reader,
avg_acc = total_acc / data_count avg_acc = total_acc / data_count
print("") print("")
print("[%s] epoch_id: %d, train_avg_cost: %f, train_avg_acc: %f" % ( print("[%s] epoch_id: %d, train_avg_cost: %f, train_avg_acc: %f, epoch_time_cost: %f" % (
time.asctime( time.localtime(time.time()) ), epoch_id, avg_cost, avg_acc)) time.asctime( time.localtime(time.time())),
epoch_id, avg_cost, avg_acc,
time.time() - epoch_begin_time))
epoch_model = global_config.save_dirname + "/" + "epoch" + str(epoch_id) epoch_model = global_config.save_dirname + "/" + "epoch" + str(epoch_id)
fluid.io.save_inference_model(epoch_model, ["question1", "question2", "label"], acc, exe) fluid.io.save_inference_model(epoch_model, ["question1", "question2", "label"], acc, exe)
......
...@@ -3,6 +3,7 @@ from __future__ import print_function ...@@ -3,6 +3,7 @@ from __future__ import print_function
import argparse import argparse
import logging import logging
import os import os
import time
# disable gpu training for this example # disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = "" os.environ["CUDA_VISIBLE_DEVICES"] = ""
...@@ -56,12 +57,26 @@ def parse_args(): ...@@ -56,12 +57,26 @@ def parse_args():
type=int, type=int,
default=1000001, default=1000001,
help='sparse feature hashing space for index processing') help='sparse feature hashing space for index processing')
parser.add_argument( parser.add_argument(
'--is_local', '--is_local',
type=int, type=int,
default=1, default=1,
help='Local train or distributed train (default: 1)') help='Local train or distributed train (default: 1)')
parser.add_argument(
'--cloud_train',
type=int,
default=0,
help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
# the following arguments is used for distributed train, if is_local == false, then you should set them # the following arguments is used for distributed train, if is_local == false, then you should set them
parser.add_argument( parser.add_argument(
'--role', '--role',
...@@ -108,6 +123,7 @@ def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var, ...@@ -108,6 +123,7 @@ def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var,
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
for pass_id in range(args.num_passes): for pass_id in range(args.num_passes):
pass_start = time.time()
for batch_id, data in enumerate(train_reader()): for batch_id, data in enumerate(train_reader()):
loss_val, auc_val, batch_auc_val = exe.run( loss_val, auc_val, batch_auc_val = exe.run(
train_program, train_program,
...@@ -120,6 +136,7 @@ def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var, ...@@ -120,6 +136,7 @@ def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var,
model_dir = args.model_output_dir + '/batch-' + str(batch_id) model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if args.trainer_id == 0: if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe) fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start))
model_dir = args.model_output_dir + '/pass-' + str(pass_id) model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if args.trainer_id == 0: if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe) fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
...@@ -134,6 +151,21 @@ def train(): ...@@ -134,6 +151,21 @@ def train():
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim) loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.Adam(learning_rate=1e-4) optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(loss) optimizer.minimize(loss)
if args.cloud_train:
# the port of all pservers, needed by both trainer and pserver
port = os.getenv("PADDLE_PORT", "6174")
# comma separated ips of all pservers, needed by trainer and
pserver_ips = os.getenv("PADDLE_PSERVERS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
args.endpoints = ",".join(eplist)
args.trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
args.current_endpoint = os.getenv("POD_IP", "localhost") + ":" + port
args.role = os.getenv("TRAINING_ROLE", "TRAINER")
args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))
if args.is_local: if args.is_local:
logger.info("run local training") logger.info("run local training")
...@@ -143,18 +175,22 @@ def train(): ...@@ -143,18 +175,22 @@ def train():
logger.info("run dist training") logger.info("run dist training")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
t.transpile(args.trainer_id, pservers=args.endpoints, trainers=args.trainers) t.transpile(args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver": if args.role == "pserver" or args.role == "PSERVER":
logger.info("run pserver") logger.info("run pserver")
prog = t.get_pserver_program(args.current_endpoint) prog = t.get_pserver_program(args.current_endpoint)
startup = t.get_startup_program(args.current_endpoint, pserver_program=prog) startup = t.get_startup_program(args.current_endpoint, pserver_program=prog)
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup) exe.run(startup)
exe.run(prog) exe.run(prog)
elif args.role == "trainer": elif args.role == "trainer" or args.role == "TRAINER":
logger.info("run trainer") logger.info("run trainer")
train_prog = t.get_trainer_program() train_prog = t.get_trainer_program()
train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var, train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var,
args.trainers, args.trainer_id) args.trainers, args.trainer_id)
else:
raise ValueError(
'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册