diff --git a/python/paddle/fluid/tests/CMakeLists.txt b/python/paddle/fluid/tests/CMakeLists.txt index 5ff7b1b027e0e17d233f2a8a1c9775ccfbe1797e..d24417bbacb503d9ea70e68e7e0edb59e7dddbde 100644 --- a/python/paddle/fluid/tests/CMakeLists.txt +++ b/python/paddle/fluid/tests/CMakeLists.txt @@ -7,5 +7,4 @@ endforeach() add_subdirectory(unittests) add_subdirectory(book) -add_subdirectory(book_distribute) add_subdirectory(book_memory_optimization) diff --git a/python/paddle/fluid/tests/book/test_fit_a_line.py b/python/paddle/fluid/tests/book/test_fit_a_line.py index 8a45533e3bfbacffbef3fc226892062d8cc8e6c7..93ef66851b0efd65361122853dadeefe11992ed5 100644 --- a/python/paddle/fluid/tests/book/test_fit_a_line.py +++ b/python/paddle/fluid/tests/book/test_fit_a_line.py @@ -19,9 +19,10 @@ import numpy import unittest import math import sys +import os -def train(use_cuda, save_dirname): +def train(use_cuda, save_dirname, is_local): x = fluid.layers.data(name='x', shape=[13], dtype='float32') y_predict = fluid.layers.fc(input=x, size=1, act=None) @@ -32,7 +33,7 @@ def train(use_cuda, save_dirname): avg_cost = fluid.layers.mean(cost) sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(avg_cost) + optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) BATCH_SIZE = 20 @@ -42,27 +43,57 @@ def train(use_cuda, save_dirname): batch_size=BATCH_SIZE) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - PASS_NUM = 100 - for pass_id in range(PASS_NUM): - for data in train_reader(): - avg_loss_value, = exe.run(fluid.default_main_program(), - feed=feeder.feed(data), - fetch_list=[avg_cost]) - print(avg_loss_value) - if avg_loss_value[0] < 10.0: - if save_dirname is not None: - fluid.io.save_inference_model(save_dirname, ['x'], - [y_predict], exe) - return - if math.isnan(float(avg_loss_value)): - sys.exit("got NaN loss, training failed.") - raise AssertionError("Fit a line cost is too large, {0:2.2}".format( - avg_loss_value[0])) + def train_loop(main_program): + feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) + exe.run(fluid.default_startup_program()) + + PASS_NUM = 100 + for pass_id in range(PASS_NUM): + for data in train_reader(): + avg_loss_value, = exe.run(main_program, + feed=feeder.feed(data), + fetch_list=[avg_cost]) + print(avg_loss_value) + if avg_loss_value[0] < 10.0: + if save_dirname is not None: + fluid.io.save_inference_model(save_dirname, ['x'], + [y_predict], exe) + return + if math.isnan(float(avg_loss_value)): + sys.exit("got NaN loss, training failed.") + raise AssertionError("Fit a line cost is too large, {0:2.2}".format( + avg_loss_value[0])) + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def infer(use_cuda, save_dirname=None): @@ -94,14 +125,14 @@ def infer(use_cuda, save_dirname=None): print("infer results: ", results[0]) -def main(use_cuda): +def main(use_cuda, is_local=True): if use_cuda and not fluid.core.is_compiled_with_cuda(): return # Directory for saving the trained model save_dirname = "fit_a_line.inference.model" - train(use_cuda, save_dirname) + train(use_cuda, save_dirname, is_local) infer(use_cuda, save_dirname) diff --git a/python/paddle/fluid/tests/book/test_image_classification.py b/python/paddle/fluid/tests/book/test_image_classification.py index 60c66bc22c69ec836949d40ce2e18f8ecf0e07b8..613f4a7bf1c41f9f320ba8d310545a182f95e316 100644 --- a/python/paddle/fluid/tests/book/test_image_classification.py +++ b/python/paddle/fluid/tests/book/test_image_classification.py @@ -21,6 +21,7 @@ import math import sys import numpy import unittest +import os def resnet_cifar10(input, depth=32): @@ -92,7 +93,7 @@ def vgg16_bn_drop(input): return fc2 -def train(net_type, use_cuda, save_dirname): +def train(net_type, use_cuda, save_dirname, is_local): classdim = 10 data_shape = [3, 32, 32] @@ -117,7 +118,7 @@ def train(net_type, use_cuda, save_dirname): test_program = fluid.default_main_program().clone() optimizer = fluid.optimizer.Adam(learning_rate=0.001) - optimizer.minimize(avg_cost) + optimize_ops, params_grads = optimizer.minimize(avg_cost) BATCH_SIZE = 128 PASS_NUM = 1 @@ -133,38 +134,68 @@ def train(net_type, use_cuda, save_dirname): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) feeder = fluid.DataFeeder(place=place, feed_list=[images, label]) - exe.run(fluid.default_startup_program()) - - loss = 0.0 - for pass_id in range(PASS_NUM): - for batch_id, data in enumerate(train_reader()): - exe.run(feed=feeder.feed(data)) - - if (batch_id % 10) == 0: - acc_list = [] - avg_loss_list = [] - for tid, test_data in enumerate(test_reader()): - loss_t, acc_t = exe.run(program=test_program, - feed=feeder.feed(test_data), - fetch_list=[avg_cost, acc]) - if math.isnan(float(loss_t)): - sys.exit("got NaN loss, training failed.") - acc_list.append(float(acc_t)) - avg_loss_list.append(float(loss_t)) - break # Use 1 segment for speeding up CI - - acc_value = numpy.array(acc_list).mean() - avg_loss_value = numpy.array(avg_loss_list).mean() - - print( - 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. - format(pass_id, batch_id + 1, - float(avg_loss_value), float(acc_value))) - - if acc_value > 0.01: # Low threshold for speeding up CI - fluid.io.save_inference_model(save_dirname, ["pixel"], - [predict], exe) - return + + def train_loop(main_program): + exe.run(fluid.default_startup_program()) + loss = 0.0 + for pass_id in range(PASS_NUM): + for batch_id, data in enumerate(train_reader()): + exe.run(main_program, feed=feeder.feed(data)) + + if (batch_id % 10) == 0: + acc_list = [] + avg_loss_list = [] + for tid, test_data in enumerate(test_reader()): + loss_t, acc_t = exe.run(program=test_program, + feed=feeder.feed(test_data), + fetch_list=[avg_cost, acc]) + if math.isnan(float(loss_t)): + sys.exit("got NaN loss, training failed.") + acc_list.append(float(acc_t)) + avg_loss_list.append(float(loss_t)) + break # Use 1 segment for speeding up CI + + acc_value = numpy.array(acc_list).mean() + avg_loss_value = numpy.array(avg_loss_list).mean() + + print( + 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. + format(pass_id, batch_id + 1, + float(avg_loss_value), float(acc_value))) + + if acc_value > 0.01: # Low threshold for speeding up CI + fluid.io.save_inference_model(save_dirname, ["pixel"], + [predict], exe) + return + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def infer(use_cuda, save_dirname=None): @@ -196,14 +227,14 @@ def infer(use_cuda, save_dirname=None): print("infer results: ", results[0]) -def main(net_type, use_cuda): +def main(net_type, use_cuda, is_local=True): if use_cuda and not fluid.core.is_compiled_with_cuda(): return # Directory for saving the trained model save_dirname = "image_classification_" + net_type + ".inference.model" - train(net_type, use_cuda, save_dirname) + train(net_type, use_cuda, save_dirname, is_local) infer(use_cuda, save_dirname) diff --git a/python/paddle/fluid/tests/book/test_label_semantic_roles.py b/python/paddle/fluid/tests/book/test_label_semantic_roles.py index cbb4d4b0401d160db7b97ad3d5e6489e2766d19c..13efe4efb1f2ba4ebd25868c7f3d94ca3c5fb1a1 100644 --- a/python/paddle/fluid/tests/book/test_label_semantic_roles.py +++ b/python/paddle/fluid/tests/book/test_label_semantic_roles.py @@ -22,6 +22,7 @@ from paddle.fluid.initializer import init_on_cpu import contextlib import time import unittest +import os word_dict, verb_dict, label_dict = conll05.get_dict() word_dict_len = len(word_dict) @@ -138,7 +139,7 @@ def create_random_lodtensor(lod, place, low, high): return res -def train(use_cuda, save_dirname=None): +def train(use_cuda, save_dirname=None, is_local=True): # define network topology word = fluid.layers.data( name='word_data', shape=[1], dtype='int64', lod_level=1) @@ -178,7 +179,7 @@ def train(use_cuda, save_dirname=None): decay_rate=0.5, staircase=True), global_step=global_step) - sgd_optimizer.minimize(avg_cost) + optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) # TODO(qiao) # add dependency track and move this config before optimizer @@ -204,45 +205,78 @@ def train(use_cuda, save_dirname=None): place=place) exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - embedding_param = fluid.global_scope().find_var(embedding_name).get_tensor() - embedding_param.set( - load_parameter(conll05.get_embedding(), word_dict_len, word_dim), place) - - start_time = time.time() - batch_id = 0 - for pass_id in xrange(PASS_NUM): - chunk_evaluator.reset(exe) - for data in train_data(): - cost, precision, recall, f1_score = exe.run( - fluid.default_main_program(), - feed=feeder.feed(data), - fetch_list=[avg_cost] + chunk_evaluator.metrics) - pass_precision, pass_recall, pass_f1_score = chunk_evaluator.eval( - exe) - - if batch_id % 10 == 0: - print("avg_cost:" + str(cost) + " precision:" + str( - precision) + " recall:" + str(recall) + " f1_score:" + str( - f1_score) + " pass_precision:" + str( - pass_precision) + " pass_recall:" + str(pass_recall) - + " pass_f1_score:" + str(pass_f1_score)) - if batch_id != 0: - print("second per batch: " + str((time.time() - start_time) - / batch_id)) - # Set the threshold low to speed up the CI test - if float(pass_precision) > 0.05: - if save_dirname is not None: - # TODO(liuyiqun): Change the target to crf_decode - fluid.io.save_inference_model(save_dirname, [ - 'word_data', 'verb_data', 'ctx_n2_data', - 'ctx_n1_data', 'ctx_0_data', 'ctx_p1_data', - 'ctx_p2_data', 'mark_data' - ], [feature_out], exe) - return - - batch_id = batch_id + 1 + def train_loop(main_program): + exe.run(fluid.default_startup_program()) + + embedding_param = fluid.global_scope().find_var( + embedding_name).get_tensor() + embedding_param.set( + load_parameter(conll05.get_embedding(), word_dict_len, word_dim), + place) + + start_time = time.time() + batch_id = 0 + for pass_id in xrange(PASS_NUM): + chunk_evaluator.reset(exe) + for data in train_data(): + cost, precision, recall, f1_score = exe.run( + main_program, + feed=feeder.feed(data), + fetch_list=[avg_cost] + chunk_evaluator.metrics) + pass_precision, pass_recall, pass_f1_score = chunk_evaluator.eval( + exe) + + if batch_id % 10 == 0: + print("avg_cost:" + str(cost) + " precision:" + str( + precision) + " recall:" + str(recall) + " f1_score:" + + str(f1_score) + " pass_precision:" + str( + pass_precision) + " pass_recall:" + str( + pass_recall) + " pass_f1_score:" + str( + pass_f1_score)) + if batch_id != 0: + print("second per batch: " + str((time.time( + ) - start_time) / batch_id)) + # Set the threshold low to speed up the CI test + if float(pass_precision) > 0.05: + if save_dirname is not None: + # TODO(liuyiqun): Change the target to crf_decode + fluid.io.save_inference_model(save_dirname, [ + 'word_data', 'verb_data', 'ctx_n2_data', + 'ctx_n1_data', 'ctx_0_data', 'ctx_p1_data', + 'ctx_p2_data', 'mark_data' + ], [feature_out], exe) + return + + batch_id = batch_id + 1 + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def infer(use_cuda, save_dirname=None): @@ -308,14 +342,14 @@ def infer(use_cuda, save_dirname=None): print("Inference Shape: ", np_data.shape) -def main(use_cuda): +def main(use_cuda, is_local=True): if use_cuda and not fluid.core.is_compiled_with_cuda(): return # Directory for saving the trained model save_dirname = "label_semantic_roles.inference.model" - train(use_cuda, save_dirname) + train(use_cuda, save_dirname, is_local) infer(use_cuda, save_dirname) diff --git a/python/paddle/fluid/tests/book/test_machine_translation.py b/python/paddle/fluid/tests/book/test_machine_translation.py index bd768d5f08de57005f76ea3ea25b318a930a58c7..caa9596a100de4f9364467690db1e80ee227c3c1 100644 --- a/python/paddle/fluid/tests/book/test_machine_translation.py +++ b/python/paddle/fluid/tests/book/test_machine_translation.py @@ -20,6 +20,7 @@ import paddle.fluid.framework as framework import paddle.fluid.layers as pd from paddle.fluid.executor import Executor import unittest +import os dict_size = 30000 source_dict_dim = target_dict_dim = dict_size @@ -168,7 +169,7 @@ def to_lodtensor(data, place): return res -def train_main(use_cuda, is_sparse): +def train_main(use_cuda, is_sparse, is_local=True): if use_cuda and not fluid.core.is_compiled_with_cuda(): return place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() @@ -181,7 +182,7 @@ def train_main(use_cuda, is_sparse): avg_cost = pd.mean(cost) optimizer = fluid.optimizer.Adagrad(learning_rate=1e-4) - optimizer.minimize(avg_cost) + optimize_ops, params_grads = optimizer.minimize(avg_cost) train_data = paddle.batch( paddle.reader.shuffle( @@ -190,27 +191,57 @@ def train_main(use_cuda, is_sparse): exe = Executor(place) - exe.run(framework.default_startup_program()) - - batch_id = 0 - for pass_id in xrange(1): - for data in train_data(): - word_data = to_lodtensor(map(lambda x: x[0], data), place) - trg_word = to_lodtensor(map(lambda x: x[1], data), place) - trg_word_next = to_lodtensor(map(lambda x: x[2], data), place) - outs = exe.run(framework.default_main_program(), - feed={ - 'src_word_id': word_data, - 'target_language_word': trg_word, - 'target_language_next_word': trg_word_next - }, - fetch_list=[avg_cost]) - avg_cost_val = np.array(outs[0]) - print('pass_id=' + str(pass_id) + ' batch=' + str(batch_id) + - " avg_cost=" + str(avg_cost_val)) - if batch_id > 3: - break - batch_id += 1 + def train_loop(main_program): + exe.run(framework.default_startup_program()) + + batch_id = 0 + for pass_id in xrange(1): + for data in train_data(): + word_data = to_lodtensor(map(lambda x: x[0], data), place) + trg_word = to_lodtensor(map(lambda x: x[1], data), place) + trg_word_next = to_lodtensor(map(lambda x: x[2], data), place) + outs = exe.run(main_program, + feed={ + 'src_word_id': word_data, + 'target_language_word': trg_word, + 'target_language_next_word': trg_word_next + }, + fetch_list=[avg_cost]) + avg_cost_val = np.array(outs[0]) + print('pass_id=' + str(pass_id) + ' batch=' + str(batch_id) + + " avg_cost=" + str(avg_cost_val)) + if batch_id > 3: + break + batch_id += 1 + + if is_local: + train_loop(framework.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def decode_main(use_cuda, is_sparse): diff --git a/python/paddle/fluid/tests/book/test_recognize_digits.py b/python/paddle/fluid/tests/book/test_recognize_digits.py index 285e91420375f63d8b37138f1565e7b77defb0c7..b57fe08e1a367c33db31c89127b6c2bc08253655 100644 --- a/python/paddle/fluid/tests/book/test_recognize_digits.py +++ b/python/paddle/fluid/tests/book/test_recognize_digits.py @@ -20,27 +20,7 @@ import numpy import unittest import math import sys - - -def parse_arg(): - parser = argparse.ArgumentParser() - parser.add_argument( - "nn_type", - help="The neural network type, in ['mlp', 'conv']", - type=str, - choices=['mlp', 'conv']) - parser.add_argument( - "--parallel", - help='Run in parallel or not', - default=False, - action="store_true") - parser.add_argument( - "--use_cuda", - help="Run the program by using CUDA", - default=False, - action="store_true") - return parser.parse_args() - +import os BATCH_SIZE = 64 @@ -83,7 +63,8 @@ def train(nn_type, parallel, save_dirname=None, model_filename=None, - params_filename=None): + params_filename=None, + is_local=True): if use_cuda and not fluid.core.is_compiled_with_cuda(): return img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') @@ -114,12 +95,11 @@ def train(nn_type, test_program = fluid.default_main_program().clone() optimizer = fluid.optimizer.Adam(learning_rate=0.001) - optimizer.minimize(avg_loss) + optimize_ops, params_grads = optimizer.minimize(avg_loss) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) train_reader = paddle.batch( paddle.reader.shuffle( @@ -129,39 +109,74 @@ def train(nn_type, paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) feeder = fluid.DataFeeder(feed_list=[img, label], place=place) - PASS_NUM = 100 - for pass_id in range(PASS_NUM): - for batch_id, data in enumerate(train_reader()): - # train a mini-batch, fetch nothing - exe.run(feed=feeder.feed(data)) - if (batch_id + 1) % 10 == 0: - acc_set = [] - avg_loss_set = [] - for test_data in test_reader(): - acc_np, avg_loss_np = exe.run(program=test_program, - feed=feeder.feed(test_data), - fetch_list=[acc, avg_loss]) - acc_set.append(float(acc_np)) - avg_loss_set.append(float(avg_loss_np)) - # get test acc and loss - acc_val = numpy.array(acc_set).mean() - avg_loss_val = numpy.array(avg_loss_set).mean() - if float(acc_val) > 0.2: # Smaller value to increase CI speed - if save_dirname is not None: - fluid.io.save_inference_model( - save_dirname, ["img"], [prediction], - exe, - model_filename=model_filename, - params_filename=params_filename) - return - else: - print( - 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. - format(pass_id, batch_id + 1, - float(avg_loss_val), float(acc_val))) - if math.isnan(float(avg_loss_val)): - sys.exit("got NaN loss, training failed.") - raise AssertionError("Loss of recognize digits is too large") + def train_loop(main_program): + exe.run(fluid.default_startup_program()) + + PASS_NUM = 100 + for pass_id in range(PASS_NUM): + for batch_id, data in enumerate(train_reader()): + # train a mini-batch, fetch nothing + exe.run(main_program, feed=feeder.feed(data)) + if (batch_id + 1) % 10 == 0: + acc_set = [] + avg_loss_set = [] + for test_data in test_reader(): + acc_np, avg_loss_np = exe.run( + program=test_program, + feed=feeder.feed(test_data), + fetch_list=[acc, avg_loss]) + acc_set.append(float(acc_np)) + avg_loss_set.append(float(avg_loss_np)) + # get test acc and loss + acc_val = numpy.array(acc_set).mean() + avg_loss_val = numpy.array(avg_loss_set).mean() + if float(acc_val + ) > 0.2: # Smaller value to increase CI speed + if save_dirname is not None: + fluid.io.save_inference_model( + save_dirname, ["img"], [prediction], + exe, + model_filename=model_filename, + params_filename=params_filename) + return + else: + print( + 'PassID {0:1}, BatchID {1:04}, Test Loss {2:2.2}, Acc {3:2.2}'. + format(pass_id, batch_id + 1, + float(avg_loss_val), float(acc_val))) + if math.isnan(float(avg_loss_val)): + sys.exit("got NaN loss, training failed.") + raise AssertionError("Loss of recognize digits is too large") + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + pserver_endpoints = os.getenv("PSERVERS") + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def infer(use_cuda, @@ -208,6 +223,7 @@ def main(use_cuda, parallel, nn_type, combine): model_filename = "__model_combined__" params_filename = "__params_combined__" + # call train() with is_local argument to run distributed train train( nn_type=nn_type, use_cuda=use_cuda, diff --git a/python/paddle/fluid/tests/book/test_recommender_system.py b/python/paddle/fluid/tests/book/test_recommender_system.py index 7c58c3e7823a82b5ccc7bb55a5e833969242ad96..5e258a2c5170f63aa1fbaab5f38efdba04c8d391 100644 --- a/python/paddle/fluid/tests/book/test_recommender_system.py +++ b/python/paddle/fluid/tests/book/test_recommender_system.py @@ -14,6 +14,7 @@ import math import sys +import os import numpy as np import paddle.v2 as paddle import paddle.fluid as fluid @@ -152,19 +153,18 @@ def model(): return scale_infer, avg_cost -def train(use_cuda, save_dirname): +def train(use_cuda, save_dirname, is_local=True): scale_infer, avg_cost = model() # test program test_program = fluid.default_main_program().clone() sgd_optimizer = SGDOptimizer(learning_rate=0.2) - opts = sgd_optimizer.minimize(avg_cost) + optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = Executor(place) - exe.run(framework.default_startup_program()) train_reader = paddle.batch( paddle.reader.shuffle( @@ -212,36 +212,69 @@ def train(use_cuda, save_dirname): feed_tensors[key] = tensor return feed_tensors - PASS_NUM = 100 - for pass_id in range(PASS_NUM): - for batch_id, data in enumerate(train_reader()): - # train a mini-batch - outs = exe.run(program=fluid.default_main_program(), - feed=func_feed(feeding, data), - fetch_list=[avg_cost]) - out = np.array(outs[0]) - if (batch_id + 1) % 10 == 0: - avg_cost_set = [] - for test_data in test_reader(): - avg_cost_np = exe.run(program=test_program, - feed=func_feed(feeding, test_data), - fetch_list=[avg_cost]) - avg_cost_set.append(avg_cost_np[0]) - break # test only 1 segment for speeding up CI - - # get test avg_cost - test_avg_cost = np.array(avg_cost_set).mean() - if test_avg_cost < 6.0: - # if avg_cost less than 6.0, we think our code is good. - if save_dirname is not None: - fluid.io.save_inference_model(save_dirname, [ - "user_id", "gender_id", "age_id", "job_id", - "movie_id", "category_id", "movie_title" - ], [scale_infer], exe) - return - - if math.isnan(float(out[0])): - sys.exit("got NaN loss, training failed.") + def train_loop(main_program): + exe.run(framework.default_startup_program()) + + PASS_NUM = 100 + for pass_id in range(PASS_NUM): + for batch_id, data in enumerate(train_reader()): + # train a mini-batch + outs = exe.run(program=main_program, + feed=func_feed(feeding, data), + fetch_list=[avg_cost]) + out = np.array(outs[0]) + if (batch_id + 1) % 10 == 0: + avg_cost_set = [] + for test_data in test_reader(): + avg_cost_np = exe.run( + program=test_program, + feed=func_feed(feeding, test_data), + fetch_list=[avg_cost]) + avg_cost_set.append(avg_cost_np[0]) + break # test only 1 segment for speeding up CI + + # get test avg_cost + test_avg_cost = np.array(avg_cost_set).mean() + if test_avg_cost < 6.0: + # if avg_cost less than 6.0, we think our code is good. + if save_dirname is not None: + fluid.io.save_inference_model(save_dirname, [ + "user_id", "gender_id", "age_id", "job_id", + "movie_id", "category_id", "movie_title" + ], [scale_infer], exe) + return + + if math.isnan(float(out[0])): + sys.exit("got NaN loss, training failed.") + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def infer(use_cuda, save_dirname=None): diff --git a/python/paddle/fluid/tests/book/test_understand_sentiment.py b/python/paddle/fluid/tests/book/test_understand_sentiment.py index fae74c355710e472734b8b15176baf2cfdc5acc4..1b7e84ea05cab5750865032ee7440cd5f5aa519b 100644 --- a/python/paddle/fluid/tests/book/test_understand_sentiment.py +++ b/python/paddle/fluid/tests/book/test_understand_sentiment.py @@ -20,6 +20,7 @@ import contextlib import math import numpy as np import sys +import os def convolution_net(data, label, input_dim, class_dim=2, emb_dim=32, @@ -132,7 +133,12 @@ def create_random_lodtensor(lod, place, low, high): return res -def train(word_dict, net_method, use_cuda, parallel=False, save_dirname=None): +def train(word_dict, + net_method, + use_cuda, + parallel=False, + save_dirname=None, + is_local=True): BATCH_SIZE = 128 PASS_NUM = 5 dict_dim = len(word_dict) @@ -164,7 +170,7 @@ def train(word_dict, net_method, use_cuda, parallel=False, save_dirname=None): assert save_dirname is None adagrad = fluid.optimizer.Adagrad(learning_rate=0.002) - adagrad.minimize(cost) + optimize_ops, params_grads = adagrad.minimize(cost) train_data = paddle.batch( paddle.reader.shuffle( @@ -174,23 +180,53 @@ def train(word_dict, net_method, use_cuda, parallel=False, save_dirname=None): exe = fluid.Executor(place) feeder = fluid.DataFeeder(feed_list=[data, label], place=place) - exe.run(fluid.default_startup_program()) - - for pass_id in xrange(PASS_NUM): - for data in train_data(): - cost_val, acc_val = exe.run(fluid.default_main_program(), - feed=feeder.feed(data), - fetch_list=[cost, acc_out]) - print("cost=" + str(cost_val) + " acc=" + str(acc_val)) - if cost_val < 0.4 and acc_val > 0.8: - if save_dirname is not None: - fluid.io.save_inference_model(save_dirname, ["words"], - prediction, exe) - return - if math.isnan(float(cost_val)): - sys.exit("got NaN loss, training failed.") - raise AssertionError("Cost is too large for {0}".format( - net_method.__name__)) + def train_loop(main_program): + exe.run(fluid.default_startup_program()) + + for pass_id in xrange(PASS_NUM): + for data in train_data(): + cost_val, acc_val = exe.run(main_program, + feed=feeder.feed(data), + fetch_list=[cost, acc_out]) + print("cost=" + str(cost_val) + " acc=" + str(acc_val)) + if cost_val < 0.4 and acc_val > 0.8: + if save_dirname is not None: + fluid.io.save_inference_model(save_dirname, ["words"], + prediction, exe) + return + if math.isnan(float(cost_val)): + sys.exit("got NaN loss, training failed.") + raise AssertionError("Cost is too large for {0}".format( + net_method.__name__)) + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def infer(word_dict, use_cuda, save_dirname=None): diff --git a/python/paddle/fluid/tests/book/test_word2vec.py b/python/paddle/fluid/tests/book/test_word2vec.py index 696abd5499c826eda5c868ab1e7c9f4f839cdce3..26b97c3e254f54b83515436660e44d4908c98fbe 100644 --- a/python/paddle/fluid/tests/book/test_word2vec.py +++ b/python/paddle/fluid/tests/book/test_word2vec.py @@ -30,7 +30,7 @@ def create_random_lodtensor(lod, place, low, high): return res -def train(use_cuda, is_sparse, is_parallel, save_dirname): +def train(use_cuda, is_sparse, is_parallel, save_dirname, is_local=True): PASS_NUM = 100 EMBED_SIZE = 32 HIDDEN_SIZE = 256 @@ -101,7 +101,7 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname): avg_cost = fluid.layers.mean(pd()) sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(avg_cost) + optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) train_reader = paddle.batch( paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) @@ -112,23 +112,53 @@ def train(use_cuda, is_sparse, is_parallel, save_dirname): feed_list=[first_word, second_word, third_word, forth_word, next_word], place=place) - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - for data in train_reader(): - avg_cost_np = exe.run(fluid.default_main_program(), - feed=feeder.feed(data), - fetch_list=[avg_cost]) - if avg_cost_np[0] < 5.0: - if save_dirname is not None: - fluid.io.save_inference_model(save_dirname, [ - 'firstw', 'secondw', 'thirdw', 'forthw' - ], [predict_word], exe) - return - if math.isnan(float(avg_cost_np[0])): - sys.exit("got NaN loss, training failed.") - - raise AssertionError("Cost is too large {0:2.2}".format(avg_cost_np[0])) + def train_loop(main_program): + exe.run(fluid.default_startup_program()) + + for pass_id in range(PASS_NUM): + for data in train_reader(): + avg_cost_np = exe.run(main_program, + feed=feeder.feed(data), + fetch_list=[avg_cost]) + if avg_cost_np[0] < 5.0: + if save_dirname is not None: + fluid.io.save_inference_model(save_dirname, [ + 'firstw', 'secondw', 'thirdw', 'forthw' + ], [predict_word], exe) + return + if math.isnan(float(avg_cost_np[0])): + sys.exit("got NaN loss, training failed.") + + raise AssertionError("Cost is too large {0:2.2}".format(avg_cost_np[0])) + + if is_local: + train_loop(fluid.default_main_program()) + else: + port = os.getenv("PADDLE_INIT_PORT", "6174") + pserver_ips = os.getenv("PADDLE_INIT_PSERVERS") # ip,ip... + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) # ip:port,ip:port... + trainers = int(os.getenv("TRAINERS")) + current_endpoint = os.getenv("POD_IP") + ":" + port + trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t = fluid.DistributeTranspiler() + t.transpile( + optimize_ops, + params_grads, + trainer_id, + pservers=pserver_endpoints, + trainers=trainers) + if training_role == "PSERVER": + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, + pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + train_loop(t.get_trainer_program()) def infer(use_cuda, save_dirname=None): diff --git a/python/paddle/fluid/tests/book_distribute/CMakeLists.txt b/python/paddle/fluid/tests/book_distribute/CMakeLists.txt deleted file mode 100644 index 4d7664469e481344cf9eea84688f068b4fb99dee..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/CMakeLists.txt +++ /dev/null @@ -1,5 +0,0 @@ -file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py") -string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}") -foreach(src ${TEST_OPS}) - py_test(${src} SRCS ${src}.py) -endforeach() diff --git a/python/paddle/fluid/tests/book_distribute/notest_dist_fit_a_line.py b/python/paddle/fluid/tests/book_distribute/notest_dist_fit_a_line.py deleted file mode 100644 index cff82a89482a290c67c5365401732d309e2441a4..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_dist_fit_a_line.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -import paddle.v2 as paddle -import paddle.fluid as fluid -import os - -x = fluid.layers.data(name='x', shape=[13], dtype='float32') - -y_predict = fluid.layers.fc(input=x, size=1, act=None) - -y = fluid.layers.data(name='y', shape=[1], dtype='float32') - -cost = fluid.layers.square_error_cost(input=y_predict, label=y) -avg_cost = fluid.layers.mean(cost) - -sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) -optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) - -BATCH_SIZE = 20 - -train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.uci_housing.train(), buf_size=500), - batch_size=BATCH_SIZE) - -place = fluid.CPUPlace() -feeder = fluid.DataFeeder(place=place, feed_list=[x, y]) -exe = fluid.Executor(place) - -t = fluid.DistributeTranspiler() -# all parameter server endpoints list for spliting parameters -pserver_endpoints = os.getenv("PSERVERS") -# server endpoint for current node -current_endpoint = os.getenv("SERVER_ENDPOINT") -# run as trainer or parameter server -training_role = os.getenv("TRAINING_ROLE", - "TRAINER") # get the training role: trainer/pserver - -t.transpile(optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) - -if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) -else: - trainer_prog = t.get_trainer_program() - - exe.run(fluid.default_startup_program()) - - PASS_NUM = 100 - for pass_id in range(PASS_NUM): - for data in train_reader(): - avg_loss_value = exe.run(trainer_prog, - feed=feeder.feed(data), - fetch_list=[avg_cost]) - print("loss:" + str(avg_loss_value)) - if avg_loss_value[0] < 10.0: - exit(0) -exit(1) diff --git a/python/paddle/fluid/tests/book_distribute/notest_dist_image_classification.py b/python/paddle/fluid/tests/book_distribute/notest_dist_image_classification.py deleted file mode 100644 index 46630db43e8803d5decb0dd06c57cd66033a695f..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_dist_image_classification.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import paddle.v2 as paddle -import paddle.fluid as fluid -import os -import sys - -TRAINERS = 5 -BATCH_SIZE = 128 -PASS_NUM = 100 - - -def resnet_cifar10(input, depth=32): - def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): - tmp = fluid.layers.conv2d( - input=input, - filter_size=filter_size, - num_filters=ch_out, - stride=stride, - padding=padding, - act=None, - bias_attr=False) - return fluid.layers.batch_norm(input=tmp, act=act) - - def shortcut(input, ch_in, ch_out, stride): - if ch_in != ch_out: - return conv_bn_layer(input, ch_out, 1, stride, 0, None) - else: - return input - - def basicblock(input, ch_in, ch_out, stride): - tmp = conv_bn_layer(input, ch_out, 3, stride, 1) - tmp = conv_bn_layer(tmp, ch_out, 3, 1, 1, act=None) - short = shortcut(input, ch_in, ch_out, stride) - return fluid.layers.elementwise_add(x=tmp, y=short, act='relu') - - def layer_warp(block_func, input, ch_in, ch_out, count, stride): - tmp = block_func(input, ch_in, ch_out, stride) - for i in range(1, count): - tmp = block_func(tmp, ch_out, ch_out, 1) - return tmp - - assert (depth - 2) % 6 == 0 - n = (depth - 2) / 6 - conv1 = conv_bn_layer( - input=input, ch_out=16, filter_size=3, stride=1, padding=1) - res1 = layer_warp(basicblock, conv1, 16, 16, n, 1) - res2 = layer_warp(basicblock, res1, 16, 32, n, 2) - res3 = layer_warp(basicblock, res2, 32, 64, n, 2) - pool = fluid.layers.pool2d( - input=res3, pool_size=8, pool_type='avg', pool_stride=1) - return pool - - -def vgg16_bn_drop(input): - def conv_block(input, num_filter, groups, dropouts): - return fluid.nets.img_conv_group( - input=input, - pool_size=2, - pool_stride=2, - conv_num_filter=[num_filter] * groups, - conv_filter_size=3, - conv_act='relu', - conv_with_batchnorm=True, - conv_batchnorm_drop_rate=dropouts, - pool_type='max') - - conv1 = conv_block(input, 64, 2, [0.3, 0]) - conv2 = conv_block(conv1, 128, 2, [0.4, 0]) - conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0]) - conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0]) - conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0]) - - drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5) - fc1 = fluid.layers.fc(input=drop, size=512, act=None) - bn = fluid.layers.batch_norm(input=fc1, act='relu') - drop2 = fluid.layers.dropout(x=bn, dropout_prob=0.5) - fc2 = fluid.layers.fc(input=drop2, size=512, act=None) - return fc2 - - -classdim = 10 -data_shape = [3, 32, 32] - -images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32') -label = fluid.layers.data(name='label', shape=[1], dtype='int64') - -net_type = "vgg" -if len(sys.argv) >= 2: - net_type = sys.argv[1] - -if net_type == "vgg": - print("training vgg net") - net = vgg16_bn_drop(images) -elif net_type == "resnet": - print("training resnet") - net = resnet_cifar10(images, 32) -else: - raise ValueError("%s network is not supported" % net_type) - -predict = fluid.layers.fc(input=net, size=classdim, act='softmax') -cost = fluid.layers.cross_entropy(input=predict, label=label) -avg_cost = fluid.layers.mean(cost) - -optimizer = fluid.optimizer.Adam(learning_rate=0.001) -optimize_ops, params_grads = optimizer.minimize(avg_cost) - -accuracy = fluid.evaluator.Accuracy(input=predict, label=label) - -train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.cifar.train10(), buf_size=128 * 10), - batch_size=BATCH_SIZE) - -place = fluid.CPUPlace() -feeder = fluid.DataFeeder(place=place, feed_list=[images, label]) -exe = fluid.Executor(place) - -t = fluid.DistributeTranspiler() -# all parameter server endpoints list for spliting parameters -pserver_endpoints = os.getenv("PSERVERS") -# server endpoint for current node -current_endpoint = os.getenv("SERVER_ENDPOINT") -# run as trainer or parameter server -training_role = os.getenv("TRAINING_ROLE", - "TRAINER") # get the training role: trainer/pserver - -t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=TRAINERS) - -if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) -elif training_role == "TRAINER": - trainer_prog = t.get_trainer_program() - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - accuracy.reset(exe) - for data in train_reader(): - loss, acc = exe.run(trainer_prog, - feed=feeder.feed(data), - fetch_list=[avg_cost] + accuracy.metrics) - pass_acc = accuracy.eval(exe) - print("pass_id:" + str(pass_id) + "loss:" + str(loss) + " pass_acc:" - + str(pass_acc)) - # this model is slow, so if we can train two mini batches, - # we think it works properly. - print("trainer run end") -else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") -exit(1) diff --git a/python/paddle/fluid/tests/book_distribute/notest_dist_label_semantic_roles.py b/python/paddle/fluid/tests/book_distribute/notest_dist_label_semantic_roles.py deleted file mode 100644 index 3ec85517ab51554b4b8025822b4c5bf241552a57..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_dist_label_semantic_roles.py +++ /dev/null @@ -1,241 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import math - -import numpy as np -import paddle.v2 as paddle -import paddle.v2.dataset.conll05 as conll05 -import paddle.fluid as fluid -import time -import os - -word_dict, verb_dict, label_dict = conll05.get_dict() -word_dict_len = len(word_dict) -label_dict_len = len(label_dict) -pred_len = len(verb_dict) - -mark_dict_len = 2 -word_dim = 32 -mark_dim = 5 -hidden_dim = 512 -depth = 8 -mix_hidden_lr = 1e-3 - -IS_SPARSE = True -PASS_NUM = 10 -BATCH_SIZE = 20 - -embedding_name = 'emb' - - -def load_parameter(file_name, h, w): - with open(file_name, 'rb') as f: - f.read(16) # skip header. - return np.fromfile(f, dtype=np.float32).reshape(h, w) - - -def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, - **ignored): - # 8 features - predicate_embedding = fluid.layers.embedding( - input=predicate, - size=[pred_len, word_dim], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='vemb') - - mark_embedding = fluid.layers.embedding( - input=mark, - size=[mark_dict_len, mark_dim], - dtype='float32', - is_sparse=IS_SPARSE) - - word_input = [word, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2] - emb_layers = [ - fluid.layers.embedding( - size=[word_dict_len, word_dim], - input=x, - param_attr=fluid.ParamAttr( - name=embedding_name, trainable=False)) for x in word_input - ] - emb_layers.append(predicate_embedding) - emb_layers.append(mark_embedding) - - hidden_0_layers = [ - fluid.layers.fc(input=emb, size=hidden_dim) for emb in emb_layers - ] - - hidden_0 = fluid.layers.sums(input=hidden_0_layers) - - lstm_0 = fluid.layers.dynamic_lstm( - input=hidden_0, - size=hidden_dim, - candidate_activation='relu', - gate_activation='sigmoid', - cell_activation='sigmoid') - - # stack L-LSTM and R-LSTM with direct edges - input_tmp = [hidden_0, lstm_0] - - for i in range(1, depth): - mix_hidden = fluid.layers.sums(input=[ - fluid.layers.fc(input=input_tmp[0], size=hidden_dim), - fluid.layers.fc(input=input_tmp[1], size=hidden_dim) - ]) - - lstm = fluid.layers.dynamic_lstm( - input=mix_hidden, - size=hidden_dim, - candidate_activation='relu', - gate_activation='sigmoid', - cell_activation='sigmoid', - is_reverse=((i % 2) == 1)) - - input_tmp = [mix_hidden, lstm] - - feature_out = fluid.layers.sums(input=[ - fluid.layers.fc(input=input_tmp[0], size=label_dict_len), - fluid.layers.fc(input=input_tmp[1], size=label_dict_len) - ]) - - return feature_out - - -def to_lodtensor(data, place): - seq_lens = [len(seq) for seq in data] - cur_len = 0 - lod = [cur_len] - for l in seq_lens: - cur_len += l - lod.append(cur_len) - flattened_data = np.concatenate(data, axis=0).astype("int64") - flattened_data = flattened_data.reshape([len(flattened_data), 1]) - res = fluid.LoDTensor() - res.set(flattened_data, place) - res.set_lod([lod]) - return res - - -def main(): - # define network topology - word = fluid.layers.data( - name='word_data', shape=[1], dtype='int64', lod_level=1) - predicate = fluid.layers.data( - name='verb_data', shape=[1], dtype='int64', lod_level=1) - ctx_n2 = fluid.layers.data( - name='ctx_n2_data', shape=[1], dtype='int64', lod_level=1) - ctx_n1 = fluid.layers.data( - name='ctx_n1_data', shape=[1], dtype='int64', lod_level=1) - ctx_0 = fluid.layers.data( - name='ctx_0_data', shape=[1], dtype='int64', lod_level=1) - ctx_p1 = fluid.layers.data( - name='ctx_p1_data', shape=[1], dtype='int64', lod_level=1) - ctx_p2 = fluid.layers.data( - name='ctx_p2_data', shape=[1], dtype='int64', lod_level=1) - mark = fluid.layers.data( - name='mark_data', shape=[1], dtype='int64', lod_level=1) - feature_out = db_lstm(**locals()) - target = fluid.layers.data( - name='target', shape=[1], dtype='int64', lod_level=1) - crf_cost = fluid.layers.linear_chain_crf( - input=feature_out, - label=target, - param_attr=fluid.ParamAttr( - name='crfw', learning_rate=mix_hidden_lr)) - avg_cost = fluid.layers.mean(crf_cost) - - # TODO(qiao) - # check other optimizers and check why out will be NAN - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.0001) - optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) - - # TODO(qiao) - # add dependency track and move this config before optimizer - crf_decode = fluid.layers.crf_decoding( - input=feature_out, param_attr=fluid.ParamAttr(name='crfw')) - - chunk_evaluator = fluid.evaluator.ChunkEvaluator( - input=crf_decode, - label=target, - chunk_scheme="IOB", - num_chunk_types=int(math.ceil((label_dict_len - 1) / 2.0))) - - train_data = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.conll05.test(), buf_size=8192), - batch_size=BATCH_SIZE) - place = fluid.CPUPlace() - feeder = fluid.DataFeeder( - feed_list=[ - word, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, predicate, mark, target - ], - place=place) - exe = fluid.Executor(place) - - t = fluid.DistributeTranspiler() - pserver_endpoints = os.getenv("PSERVERS") - # server endpoint for current node - current_endpoint = os.getenv("SERVER_ENDPOINT") - # run as trainer or parameter server - training_role = os.getenv( - "TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver - - t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) - - if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) - elif training_role == "TRAINER": - trainer_prog = t.get_trainer_program() - start_time = time.time() - batch_id = 0 - exe.run(fluid.default_startup_program()) - embedding_param = fluid.global_scope().find_var( - embedding_name).get_tensor() - embedding_param.set( - load_parameter(conll05.get_embedding(), word_dict_len, word_dim), - place) - for pass_id in xrange(PASS_NUM): - chunk_evaluator.reset(exe) - for data in train_data(): - cost, precision, recall, f1_score = exe.run( - trainer_prog, - feed=feeder.feed(data), - fetch_list=[avg_cost] + chunk_evaluator.metrics) - pass_precision, pass_recall, pass_f1_score = chunk_evaluator.eval( - exe) - - if batch_id % 10 == 0: - print("avg_cost:" + str(cost) + " precision:" + str( - precision) + " recall:" + str(recall) + " f1_score:" + - str(f1_score) + " pass_precision:" + str( - pass_precision) + " pass_recall:" + str( - pass_recall) + " pass_f1_score:" + str( - pass_f1_score)) - if batch_id != 0: - print("second per batch: " + str((time.time( - ) - start_time) / batch_id)) - - batch_id = batch_id + 1 - - -if __name__ == '__main__': - main() diff --git a/python/paddle/fluid/tests/book_distribute/notest_dist_word2vec.py b/python/paddle/fluid/tests/book_distribute/notest_dist_word2vec.py deleted file mode 100644 index 8164ba5428ee1316f8184ae68ef54742393561ed..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_dist_word2vec.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -import numpy as np -import paddle.v2 as paddle -import paddle.fluid as fluid -import os - -PASS_NUM = 100 -EMBED_SIZE = 32 -HIDDEN_SIZE = 256 -N = 5 -BATCH_SIZE = 32 -IS_SPARSE = True -TRAINERS = 2 - -word_dict = paddle.dataset.imikolov.build_dict() -dict_size = len(word_dict) - -first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') -second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') -third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') -forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') -next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') - -embed_first = fluid.layers.embedding( - input=first_word, - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') -embed_second = fluid.layers.embedding( - input=second_word, - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') -embed_third = fluid.layers.embedding( - input=third_word, - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') -embed_forth = fluid.layers.embedding( - input=forth_word, - size=[dict_size, EMBED_SIZE], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr='shared_w') - -concat_embed = fluid.layers.concat( - input=[embed_first, embed_second, embed_third, embed_forth], axis=1) -hidden1 = fluid.layers.fc(input=concat_embed, size=HIDDEN_SIZE, act='sigmoid') -predict_word = fluid.layers.fc(input=hidden1, size=dict_size, act='softmax') -cost = fluid.layers.cross_entropy(input=predict_word, label=next_word) -avg_cost = fluid.layers.mean(cost) -sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) -optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) -train_reader = paddle.batch( - paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) - -place = fluid.CPUPlace() -exe = fluid.Executor(place) - -t = fluid.DistributeTranspiler() -# all parameter server endpoints list for spliting parameters -pserver_endpoints = os.getenv("PSERVERS") -# server endpoint for current node -current_endpoint = os.getenv("SERVER_ENDPOINT") -# run as trainer or parameter server -training_role = os.getenv("TRAINING_ROLE", - "TRAINER") # get the training role: trainer/pserver - -t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=TRAINERS) -if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) -elif training_role == "TRAINER": - feeder = fluid.DataFeeder( - feed_list=[first_word, second_word, third_word, forth_word, next_word], - place=place) - exe.run(fluid.default_startup_program()) - trainer_prog = t.get_trainer_program() - for pass_id in range(PASS_NUM): - for data in train_reader(): - avg_cost_np = exe.run(trainer_prog, - feed=feeder.feed(data), - fetch_list=[avg_cost]) - print("avg_cost_np", avg_cost_np) - if avg_cost_np[0] < 5.0: - exit( - 0) # if avg cost less than 10.0, we think our code is good. -else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") -exit(1) diff --git a/python/paddle/fluid/tests/book_distribute/notest_machine_translation.py b/python/paddle/fluid/tests/book_distribute/notest_machine_translation.py deleted file mode 100644 index fee8db249770bafdd9011a7fea176a966f23cfe3..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_machine_translation.py +++ /dev/null @@ -1,158 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -import paddle.v2 as paddle -import paddle.fluid as fluid -import paddle.fluid.core as core -import paddle.fluid.framework as framework -import paddle.fluid.layers as layers -from paddle.fluid.executor import Executor -import os - -dict_size = 30000 -source_dict_dim = target_dict_dim = dict_size -src_dict, trg_dict = paddle.dataset.wmt14.get_dict(dict_size) -hidden_dim = 32 -word_dim = 16 -IS_SPARSE = True -batch_size = 10 -max_length = 50 -topk_size = 50 -trg_dic_size = 10000 - -decoder_size = hidden_dim - - -def encoder_decoder(): - # encoder - src_word_id = layers.data( - name="src_word_id", shape=[1], dtype='int64', lod_level=1) - src_embedding = layers.embedding( - input=src_word_id, - size=[dict_size, word_dim], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr=fluid.ParamAttr(name='vemb')) - - fc1 = fluid.layers.fc(input=src_embedding, size=hidden_dim * 4, act='tanh') - lstm_hidden0, lstm_0 = layers.dynamic_lstm(input=fc1, size=hidden_dim * 4) - encoder_out = layers.sequence_last_step(input=lstm_hidden0) - - # decoder - trg_language_word = layers.data( - name="target_language_word", shape=[1], dtype='int64', lod_level=1) - trg_embedding = layers.embedding( - input=trg_language_word, - size=[dict_size, word_dim], - dtype='float32', - is_sparse=IS_SPARSE, - param_attr=fluid.ParamAttr(name='vemb')) - - rnn = fluid.layers.DynamicRNN() - with rnn.block(): - current_word = rnn.step_input(trg_embedding) - mem = rnn.memory(init=encoder_out) - fc1 = fluid.layers.fc(input=[current_word, mem], - size=decoder_size, - act='tanh') - out = fluid.layers.fc(input=fc1, size=target_dict_dim, act='softmax') - rnn.update_memory(mem, fc1) - rnn.output(out) - - return rnn() - - -def to_lodtensor(data, place): - seq_lens = [len(seq) for seq in data] - cur_len = 0 - lod = [cur_len] - for l in seq_lens: - cur_len += l - lod.append(cur_len) - flattened_data = np.concatenate(data, axis=0).astype("int64") - flattened_data = flattened_data.reshape([len(flattened_data), 1]) - res = core.LoDTensor() - res.set(flattened_data, place) - res.set_lod([lod]) - return res - - -def main(): - rnn_out = encoder_decoder() - label = layers.data( - name="target_language_next_word", shape=[1], dtype='int64', lod_level=1) - cost = layers.cross_entropy(input=rnn_out, label=label) - avg_cost = fluid.layers.mean(cost) - - optimizer = fluid.optimizer.Adagrad(learning_rate=1e-4) - optimize_ops, params_grads = optimizer.minimize(avg_cost) - - train_data = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.wmt14.train(dict_size), buf_size=1000), - batch_size=batch_size) - - place = core.CPUPlace() - exe = Executor(place) - - t = fluid.DistributeTranspiler() - # all parameter server endpoints list for spliting parameters - pserver_endpoints = os.getenv("PSERVERS") - # server endpoint for current node - current_endpoint = os.getenv("SERVER_ENDPOINT") - # run as trainer or parameter server - training_role = os.getenv( - "TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver - - t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) - - if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) - elif training_role == "TRAINER": - trainer_prog = t.get_trainer_program() - exe.run(framework.default_startup_program()) - - batch_id = 0 - for pass_id in xrange(2): - for data in train_data(): - word_data = to_lodtensor(map(lambda x: x[0], data), place) - trg_word = to_lodtensor(map(lambda x: x[1], data), place) - trg_word_next = to_lodtensor(map(lambda x: x[2], data), place) - outs = exe.run(trainer_prog, - feed={ - 'src_word_id': word_data, - 'target_language_word': trg_word, - 'target_language_next_word': trg_word_next - }, - fetch_list=[avg_cost]) - avg_cost_val = np.array(outs[0]) - print('pass_id=' + str(pass_id) + ' batch=' + str(batch_id) + - " avg_cost=" + str(avg_cost_val)) - if batch_id > 3: - exit(0) - batch_id += 1 - else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") - - -if __name__ == '__main__': - main() diff --git a/python/paddle/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py b/python/paddle/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py deleted file mode 100644 index b6ad6a992d7ce23ff9f5b73283db484638f84cbe..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -import numpy as np -import paddle.v2 as paddle -import paddle.fluid as fluid -import os - -images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype='float32') -label = fluid.layers.data(name='label', shape=[1], dtype='int64') -conv_pool_1 = fluid.nets.simple_img_conv_pool( - input=images, - filter_size=5, - num_filters=20, - pool_size=2, - pool_stride=2, - act="relu") -conv_pool_2 = fluid.nets.simple_img_conv_pool( - input=conv_pool_1, - filter_size=5, - num_filters=50, - pool_size=2, - pool_stride=2, - act="relu") - -predict = fluid.layers.fc(input=conv_pool_2, size=10, act="softmax") -cost = fluid.layers.cross_entropy(input=predict, label=label) -avg_cost = fluid.layers.mean(cost) -optimizer = fluid.optimizer.Adam(learning_rate=0.01) -optimize_ops, params_grads = optimizer.minimize(avg_cost) - -accuracy = fluid.evaluator.Accuracy(input=predict, label=label) - -BATCH_SIZE = 50 -PASS_NUM = 3 -train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.mnist.train(), buf_size=500), - batch_size=BATCH_SIZE) - -place = fluid.CPUPlace() -exe = fluid.Executor(place) - -pserver_endpoints = os.getenv("PSERVERS") # all pserver endpoints -trainers = int(os.getenv("TRAINERS")) # total trainer count -current_endpoint = os.getenv("SERVER_ENDPOINT") # current pserver endpoint -training_role = os.getenv("TRAINING_ROLE", - "TRAINER") # get the training role: trainer/pserver -if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - -t = fluid.DistributeTranspiler() -t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=trainers) - -if training_role == "PSERVER": - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) -elif training_role == "TRAINER": - trainer_prog = t.get_trainer_program() - feeder = fluid.DataFeeder(feed_list=[images, label], place=place) - # TODO(typhoonzero): change trainer startup program to fetch parameters from pserver - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - accuracy.reset(exe) - batch_id = 0 - for data in train_reader(): - loss, acc = exe.run(trainer_prog, - feed=feeder.feed(data), - fetch_list=[avg_cost] + accuracy.metrics) - pass_acc = accuracy.eval(exe) - if batch_id % 100 == 0: - print("batch_id %d, loss: %f, acc: %f" % - (batch_id, loss, pass_acc)) - batch_id += 1 - - pass_acc = accuracy.eval(exe) - print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc)) -else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") diff --git a/python/paddle/fluid/tests/book_distribute/notest_recognize_digits_mlp_dist.py b/python/paddle/fluid/tests/book_distribute/notest_recognize_digits_mlp_dist.py deleted file mode 100644 index dad95c0f3fff7fb5f3521e561617a2accad86823..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_recognize_digits_mlp_dist.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -import numpy as np -import paddle.v2 as paddle -import paddle.fluid as fluid -import os - -BATCH_SIZE = 128 -PASS_NUM = 100 - -images = fluid.layers.data(name='x', shape=[784], dtype='float32') - -# TODO(aroraabhinav) Add regularization and error clipping after -# Issue 7432(https://github.com/PaddlePaddle/Paddle/issues/7432) is resolved. -hidden1 = fluid.layers.fc(input=images, size=128, act='relu') -hidden2 = fluid.layers.fc(input=hidden1, size=64, act='relu') -predict = fluid.layers.fc(input=hidden2, size=10, act='softmax') - -label = fluid.layers.data(name='y', shape=[1], dtype='int64') - -cost = fluid.layers.cross_entropy(input=predict, label=label) -avg_cost = fluid.layers.mean(cost) - -optimizer = fluid.optimizer.Momentum(learning_rate=0.001, momentum=0.9) -optimize_ops, params_grads = optimizer.minimize(avg_cost) - -accuracy = fluid.evaluator.Accuracy(input=predict, label=label) - -train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.mnist.train(), buf_size=8192), - batch_size=BATCH_SIZE) - -place = fluid.CPUPlace() -exe = fluid.Executor(place) - -t = fluid.DistributeTranspiler() -# all parameter server endpoints list for spliting parameters -pserver_endpoints = os.getenv("PSERVERS") -# server endpoint for current node -current_endpoint = os.getenv("SERVER_ENDPOINT") -# run as trainer or parameter server -training_role = os.getenv("TRAINING_ROLE", - "TRAINER") # get the training role: trainer/pserver -t.transpile(optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) - -if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) -elif training_role == "TRAINER": - trainer_prog = t.get_trainer_program() - feeder = fluid.DataFeeder(feed_list=[images, label], place=place) - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - accuracy.reset(exe) - batch_id = 0 - for data in train_reader(): - loss, acc = exe.run(trainer_prog, - feed=feeder.feed(data), - fetch_list=[avg_cost] + accuracy.metrics) - pass_acc = accuracy.eval(exe) - if batch_id % 100 == 0: - print("batch_id %d, loss: %f, acc: %f" % - (batch_id, loss, pass_acc)) - batch_id += 1 - - pass_acc = accuracy.eval(exe) - print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc)) -else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") diff --git a/python/paddle/fluid/tests/book_distribute/notest_recommender_system_dist.py b/python/paddle/fluid/tests/book_distribute/notest_recommender_system_dist.py deleted file mode 100644 index 741ec33639585817b9c4f4e2ce93b8e8f4bb5829..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_recommender_system_dist.py +++ /dev/null @@ -1,217 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -import os -import paddle.v2 as paddle -import paddle.fluid as fluid -import paddle.fluid.core as core -import paddle.fluid.layers as layers -import paddle.fluid.nets as nets -from paddle.fluid.optimizer import SGDOptimizer - -IS_SPARSE = True -BATCH_SIZE = 256 -PASS_NUM = 100 - - -def get_usr_combined_features(): - USR_DICT_SIZE = paddle.dataset.movielens.max_user_id() + 1 - uid = layers.data(name='user_id', shape=[1], dtype='int64') - usr_emb = layers.embedding( - input=uid, - dtype='float32', - size=[USR_DICT_SIZE, 32], - param_attr='user_table', - is_sparse=IS_SPARSE) - usr_fc = layers.fc(input=usr_emb, size=32) - USR_GENDER_DICT_SIZE = 2 - - usr_gender_id = layers.data(name='gender_id', shape=[1], dtype='int64') - usr_gender_emb = layers.embedding( - input=usr_gender_id, - size=[USR_GENDER_DICT_SIZE, 16], - param_attr='gender_table', - is_sparse=IS_SPARSE) - usr_gender_fc = layers.fc(input=usr_gender_emb, size=16) - - USR_AGE_DICT_SIZE = len(paddle.dataset.movielens.age_table) - usr_age_id = layers.data(name='age_id', shape=[1], dtype="int64") - usr_age_emb = layers.embedding( - input=usr_age_id, - size=[USR_AGE_DICT_SIZE, 16], - is_sparse=IS_SPARSE, - param_attr='age_table') - usr_age_fc = layers.fc(input=usr_age_emb, size=16) - - USR_JOB_DICT_SIZE = paddle.dataset.movielens.max_job_id() + 1 - usr_job_id = layers.data(name='job_id', shape=[1], dtype="int64") - usr_job_emb = layers.embedding( - input=usr_job_id, - size=[USR_JOB_DICT_SIZE, 16], - param_attr='job_table', - is_sparse=IS_SPARSE) - usr_job_fc = layers.fc(input=usr_job_emb, size=16) - - concat_embed = layers.concat( - input=[usr_fc, usr_gender_fc, usr_age_fc, usr_job_fc], axis=1) - - usr_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") - return usr_combined_features - - -def get_mov_combined_features(): - MOV_DICT_SIZE = paddle.dataset.movielens.max_movie_id() + 1 - mov_id = layers.data(name='movie_id', shape=[1], dtype='int64') - mov_emb = layers.embedding( - input=mov_id, - dtype='float32', - size=[MOV_DICT_SIZE, 32], - param_attr='movie_table', - is_sparse=IS_SPARSE) - mov_fc = layers.fc(input=mov_emb, size=32) - - CATEGORY_DICT_SIZE = len(paddle.dataset.movielens.movie_categories()) - category_id = layers.data(name='category_id', shape=[1], dtype='int64') - mov_categories_emb = layers.embedding( - input=category_id, size=[CATEGORY_DICT_SIZE, 32], is_sparse=IS_SPARSE) - mov_categories_hidden = layers.sequence_pool( - input=mov_categories_emb, pool_type="sum") - - MOV_TITLE_DICT_SIZE = len(paddle.dataset.movielens.get_movie_title_dict()) - mov_title_id = layers.data(name='movie_title', shape=[1], dtype='int64') - mov_title_emb = layers.embedding( - input=mov_title_id, size=[MOV_TITLE_DICT_SIZE, 32], is_sparse=IS_SPARSE) - mov_title_conv = nets.sequence_conv_pool( - input=mov_title_emb, - num_filters=32, - filter_size=3, - act="tanh", - pool_type="sum") - - concat_embed = layers.concat( - input=[mov_fc, mov_categories_hidden, mov_title_conv], axis=1) - - mov_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") - return mov_combined_features - - -def model(): - usr_combined_features = get_usr_combined_features() - mov_combined_features = get_mov_combined_features() - - # need cos sim - inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features) - scale_infer = layers.scale(x=inference, scale=5.0) - - label = layers.data(name='score', shape=[1], dtype='float32') - square_cost = layers.square_error_cost(input=scale_infer, label=label) - avg_cost = layers.mean(square_cost) - - return avg_cost - - -def func_feed(feeding, data, place): - feed_tensors = {} - for (key, idx) in feeding.iteritems(): - tensor = core.LoDTensor() - if key != "category_id" and key != "movie_title": - if key == "score": - numpy_data = np.array(map(lambda x: x[idx], data)).astype( - "float32") - else: - numpy_data = np.array(map(lambda x: x[idx], data)).astype( - "int64") - else: - numpy_data = map(lambda x: np.array(x[idx]).astype("int64"), data) - lod_info = [len(item) for item in numpy_data] - offset = 0 - lod = [offset] - for item in lod_info: - offset += item - lod.append(offset) - numpy_data = np.concatenate(numpy_data, axis=0) - tensor.set_lod([lod]) - - numpy_data = numpy_data.reshape([numpy_data.shape[0], 1]) - tensor.set(numpy_data, place) - feed_tensors[key] = tensor - return feed_tensors - - -def main(): - cost = model() - optimizer = SGDOptimizer(learning_rate=0.2) - optimize_ops, params_grads = optimizer.minimize(cost) - - train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.movielens.train(), buf_size=8192), - batch_size=BATCH_SIZE) - - place = fluid.CPUPlace() - exe = fluid.Executor(place) - - t = fluid.DistributeTranspiler() - - # all parameter server endpoints list for spliting parameters - pserver_endpoints = os.getenv("PSERVERS") - # server endpoint for current node - current_endpoint = os.getenv("SERVER_ENDPOINT") - # run as trainer or parameter server - training_role = os.getenv("TRAINING_ROLE", "TRAINER") - - t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) - - if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) - elif training_role == "TRAINER": - exe.run(fluid.default_startup_program()) - trainer_prog = t.get_trainer_program() - - feeding = { - 'user_id': 0, - 'gender_id': 1, - 'age_id': 2, - 'job_id': 3, - 'movie_id': 4, - 'category_id': 5, - 'movie_title': 6, - 'score': 7 - } - - for pass_id in range(PASS_NUM): - for data in train_reader(): - outs = exe.run(trainer_prog, - feed=func_feed(feeding, data, place), - fetch_list=[cost]) - out = np.array(outs[0]) - print("cost=" + str(out[0])) - if out[0] < 6.0: - print("Training complete. Average cost is less than 6.0.") - # if avg cost less than 6.0, we think our code is good. - exit(0) - else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") - - -if __name__ == '__main__': - main() diff --git a/python/paddle/fluid/tests/book_distribute/notest_understand_sentiment_conv_dist.py b/python/paddle/fluid/tests/book_distribute/notest_understand_sentiment_conv_dist.py deleted file mode 100644 index 0467184bbfa83137effdc265a2b0b6264ad1b1a9..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_understand_sentiment_conv_dist.py +++ /dev/null @@ -1,126 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -import os -import numpy as np -import paddle.v2 as paddle -import paddle.fluid as fluid - - -def convolution_net(data, label, input_dim, class_dim=2, emb_dim=32, - hid_dim=32): - emb = fluid.layers.embedding(input=data, size=[input_dim, emb_dim]) - conv_3 = fluid.nets.sequence_conv_pool( - input=emb, - num_filters=hid_dim, - filter_size=3, - act="tanh", - pool_type="sqrt") - conv_4 = fluid.nets.sequence_conv_pool( - input=emb, - num_filters=hid_dim, - filter_size=4, - act="tanh", - pool_type="sqrt") - prediction = fluid.layers.fc(input=[conv_3, conv_4], - size=class_dim, - act="softmax") - cost = fluid.layers.cross_entropy(input=prediction, label=label) - avg_cost = fluid.layers.mean(cost) - adam_optimizer = fluid.optimizer.Adam(learning_rate=0.002) - optimize_ops, params_grads = adam_optimizer.minimize(avg_cost) - accuracy = fluid.evaluator.Accuracy(input=prediction, label=label) - return avg_cost, accuracy, accuracy.metrics[0], optimize_ops, params_grads - - -def to_lodtensor(data, place): - seq_lens = [len(seq) for seq in data] - cur_len = 0 - lod = [cur_len] - for l in seq_lens: - cur_len += l - lod.append(cur_len) - flattened_data = np.concatenate(data, axis=0).astype("int64") - flattened_data = flattened_data.reshape([len(flattened_data), 1]) - res = fluid.LoDTensor() - res.set(flattened_data, place) - res.set_lod([lod]) - return res - - -def main(): - BATCH_SIZE = 100 - PASS_NUM = 5 - - word_dict = paddle.dataset.imdb.word_dict() - dict_dim = len(word_dict) - class_dim = 2 - - data = fluid.layers.data( - name="words", shape=[1], dtype="int64", lod_level=1) - label = fluid.layers.data(name="label", shape=[1], dtype="int64") - cost, accuracy, acc_out, optimize_ops, params_grads = convolution_net( - data, label, input_dim=dict_dim, class_dim=class_dim) - - train_data = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.imdb.train(word_dict), buf_size=1000), - batch_size=BATCH_SIZE) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - - t = fluid.DistributeTranspiler() - - # all parameter server endpoints list for spliting parameters - pserver_endpoints = os.getenv("PSERVERS") - # server endpoint for current node - current_endpoint = os.getenv("SERVER_ENDPOINT") - # run as trainer or parameter server - training_role = os.getenv( - "TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver - - t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) - - if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) - elif training_role == "TRAINER": - exe.run(fluid.default_startup_program()) - trainer_prog = t.get_trainer_program() - feeder = fluid.DataFeeder(feed_list=[data, label], place=place) - - for pass_id in xrange(PASS_NUM): - accuracy.reset(exe) - for data in train_data(): - cost_val, acc_val = exe.run(trainer_prog, - feed=feeder.feed(data), - fetch_list=[cost, acc_out]) - pass_acc = accuracy.eval(exe) - print("cost=" + str(cost_val) + " acc=" + str(acc_val) + - " pass_acc=" + str(pass_acc)) - if cost_val < 1.0 and pass_acc > 0.8: - exit(0) - else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") - - -if __name__ == '__main__': - main() diff --git a/python/paddle/fluid/tests/book_distribute/notest_understand_sentiment_dynamic_lstm.py b/python/paddle/fluid/tests/book_distribute/notest_understand_sentiment_dynamic_lstm.py deleted file mode 100644 index 1e1338585232005540cd87b48ee2249ba0d04d18..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/book_distribute/notest_understand_sentiment_dynamic_lstm.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import numpy as np -import os -import paddle.v2 as paddle -import paddle.fluid as fluid - - -def stacked_lstm_net(data, - label, - input_dim, - class_dim=2, - emb_dim=128, - hid_dim=512, - stacked_num=3): - assert stacked_num % 2 == 1 - - emb = fluid.layers.embedding(input=data, size=[input_dim, emb_dim]) - # add bias attr - - # TODO(qijun) linear act - fc1 = fluid.layers.fc(input=emb, size=hid_dim) - lstm1, cell1 = fluid.layers.dynamic_lstm(input=fc1, size=hid_dim) - - inputs = [fc1, lstm1] - - for i in range(2, stacked_num + 1): - fc = fluid.layers.fc(input=inputs, size=hid_dim) - lstm, cell = fluid.layers.dynamic_lstm( - input=fc, size=hid_dim, is_reverse=(i % 2) == 0) - inputs = [fc, lstm] - - fc_last = fluid.layers.sequence_pool(input=inputs[0], pool_type='max') - lstm_last = fluid.layers.sequence_pool(input=inputs[1], pool_type='max') - - prediction = fluid.layers.fc(input=[fc_last, lstm_last], - size=class_dim, - act='softmax') - cost = fluid.layers.cross_entropy(input=prediction, label=label) - avg_cost = fluid.layers.mean(cost) - adam_optimizer = fluid.optimizer.Adam(learning_rate=0.002) - optimize_ops, params_grads = adam_optimizer.minimize(avg_cost) - accuracy = fluid.evaluator.Accuracy(input=prediction, label=label) - return avg_cost, accuracy, accuracy.metrics[0], optimize_ops, params_grads - - -def to_lodtensor(data, place): - seq_lens = [len(seq) for seq in data] - cur_len = 0 - lod = [cur_len] - for l in seq_lens: - cur_len += l - lod.append(cur_len) - flattened_data = np.concatenate(data, axis=0).astype("int64") - flattened_data = flattened_data.reshape([len(flattened_data), 1]) - res = fluid.LoDTensor() - res.set(flattened_data, place) - res.set_lod([lod]) - return res - - -def main(): - BATCH_SIZE = 100 - PASS_NUM = 5 - - word_dict = paddle.dataset.imdb.word_dict() - print "loaded word dict successfully" - dict_dim = len(word_dict) - class_dim = 2 - - data = fluid.layers.data( - name="words", shape=[1], dtype="int64", lod_level=1) - label = fluid.layers.data(name="label", shape=[1], dtype="int64") - cost, accuracy, acc_out, optimize_ops, params_grads = stacked_lstm_net( - data, label, input_dim=dict_dim, class_dim=class_dim) - - train_data = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.imdb.train(word_dict), buf_size=1000), - batch_size=BATCH_SIZE) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - feeder = fluid.DataFeeder(feed_list=[data, label], place=place) - - t = fluid.DistributeTranspiler() - # all parameter server endpoints list for spliting parameters - pserver_endpoints = os.getenv("PSERVERS") - # server endpoint for current node - current_endpoint = os.getenv("SERVER_ENDPOINT") - # run as trainer or parameter server - training_role = os.getenv( - "TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver - - t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) - - if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) - pserver_prog = t.get_pserver_program(current_endpoint) - pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) - exe.run(pserver_startup) - exe.run(pserver_prog) - elif training_role == "TRAINER": - exe.run(fluid.default_startup_program()) - trainer_prog = t.get_trainer_program() - for pass_id in xrange(PASS_NUM): - accuracy.reset(exe) - for data in train_data(): - cost_val, acc_val = exe.run(trainer_prog, - feed=feeder.feed(data), - fetch_list=[cost, acc_out]) - pass_acc = accuracy.eval(exe) - print("cost=" + str(cost_val) + " acc=" + str(acc_val) + - " pass_acc=" + str(pass_acc)) - if cost_val < 1.0 and acc_val > 0.8: - exit(0) - else: - print("environment var TRAINER_ROLE should be TRAINER os PSERVER") - - -if __name__ == '__main__': - main()