diff --git a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py index c3fb0df3415815e152716cd862da2d2857f7874b..33828f8a8d28d5a60437d729226d16e067ebd4d9 100644 --- a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/dist_simnet_bow.py @@ -31,18 +31,17 @@ import signal from functools import reduce from test_dist_base import TestDistRunnerBase, runtime_main -DTYPE = "float32" -DATA_URL = 'http://paddle-dist-ce-data.cdn.bcebos.com/train.1000' +DTYPE = "int64" +DATA_URL = 'http://paddle-dist-ce-data.cdn.bcebos.com/simnet.train.1000' DATA_MD5 = '4cc060b0a0939a343fc9242aa1ee2e4e' # For Net -base_lr = 0.005 +base_lr = 0.2 emb_lr = base_lr * 3 dict_dim = 1451594 emb_dim = 128 hid_dim = 128 margin = 0.1 -batch_size = 128 sample_rate = 1 # Fix seed for test @@ -50,7 +49,7 @@ fluid.default_startup_program().random_seed = 1 fluid.default_main_program().random_seed = 1 -def get_acc(cos_q_nt, cos_q_pt): +def get_acc(cos_q_nt, cos_q_pt, batch_size): cond = fluid.layers.less_than(cos_q_nt, cos_q_pt) cond = fluid.layers.cast(cond, dtype='float64') cond_3 = fluid.layers.reduce_sum(cond) @@ -82,13 +81,14 @@ def get_optimizer(): return optimizer -def train_network(): +def train_network(batch_size, is_distributed=False): # query q = fluid.layers.data( name="query_ids", shape=[1], dtype="int64", lod_level=1) ## embedding q_emb = fluid.layers.embedding( input=q, + is_distributed=is_distributed, size=[dict_dim, emb_dim], param_attr=fluid.ParamAttr( name="__emb__", learning_rate=emb_lr), @@ -109,6 +109,7 @@ def train_network(): ## embedding pt_emb = fluid.layers.embedding( input=pt, + is_distributed=is_distributed, size=[dict_dim, emb_dim], param_attr=fluid.ParamAttr( name="__emb__", learning_rate=emb_lr), @@ -128,6 +129,7 @@ def train_network(): ## embedding nt_emb = fluid.layers.embedding( input=nt, + is_distributed=is_distributed, size=[dict_dim, emb_dim], param_attr=fluid.ParamAttr( name="__emb__", learning_rate=emb_lr), @@ -146,7 +148,7 @@ def train_network(): # loss avg_cost = get_loss(cos_q_pt, cos_q_nt) # acc - acc = get_acc(cos_q_nt, cos_q_pt) + acc = get_acc(cos_q_nt, cos_q_pt, batch_size) return [avg_cost, acc, cos_q_pt] @@ -175,10 +177,10 @@ def get_one_data(file_list): continue for each in tmp: - yield [one_data[2], each[0], each[1], [0]] + yield [one_data[2], 0, each[0], each[1]] -def get_batch_reader(file_list): +def get_batch_reader(file_list, batch_size): def batch_reader(): res = [] for i in get_one_data(file_list): @@ -191,11 +193,11 @@ def get_batch_reader(file_list): return batch_reader -def get_train_reader(): +def get_train_reader(batch_size): # The training data set. train_file = os.path.join(paddle.dataset.common.DATA_HOME, "simnet", "train") - train_reader = get_batch_reader(train_file) + train_reader = get_batch_reader([train_file], batch_size) train_feed = ["query_ids", "pos_title_ids", "neg_title_ids", "label"] return train_reader, train_feed @@ -203,7 +205,7 @@ def get_train_reader(): class TestDistSimnetBow2x2(TestDistRunnerBase): def get_model(self, batch_size=2): # Train program - avg_cost, acc, predict = train_network() + avg_cost, acc, predict = train_network(batch_size, False) inference_program = fluid.default_main_program().clone() @@ -212,10 +214,12 @@ class TestDistSimnetBow2x2(TestDistRunnerBase): opt.minimize(avg_cost) # Reader - train_reader, _ = get_train_reader() + train_reader, _ = get_train_reader(batch_size) return inference_program, avg_cost, train_reader, train_reader, acc, predict if __name__ == "__main__": + import os + os.environ['CPU_NUM'] = '1' paddle.dataset.common.download(DATA_URL, 'simnet', DATA_MD5, "train") runtime_main(TestDistSimnetBow2x2) diff --git a/python/paddle/fluid/tests/unittests/dist_text_classification.py b/python/paddle/fluid/tests/unittests/dist_text_classification.py index fdf63b94cefe5be6bbeffd6ec779dd53a7143fd2..68e8235a1bfbbde113813af2b7e3ed25c5d3d683 100644 --- a/python/paddle/fluid/tests/unittests/dist_text_classification.py +++ b/python/paddle/fluid/tests/unittests/dist_text_classification.py @@ -27,17 +27,40 @@ import unittest from multiprocessing import Process import os import signal +import six +import tarfile +import string +import re from functools import reduce from test_dist_base import TestDistRunnerBase, runtime_main DTYPE = "float32" -paddle.dataset.imdb.fetch() +VOCAB_URL = 'http://paddle-dist-ce-data.bj.bcebos.com/imdb.vocab' +VOCAB_MD5 = '23c86a0533c0151b6f12fa52b106dcc2' +DATA_URL = 'http://paddle-dist-ce-data.bj.bcebos.com/text_classification.tar.gz' +DATA_MD5 = '29ebfc94f11aea9362bbb7f5e9d86b8a' # Fix seed for test fluid.default_startup_program().random_seed = 1 fluid.default_main_program().random_seed = 1 +# Load the dictionary. +def load_vocab(filename): + vocab = {} + with open(filename) as f: + for idx, line in enumerate(f): + vocab[line.strip()] = idx + return vocab + + +def get_worddict(dict_path): + word_dict = load_vocab(dict_path) + word_dict[""] = len(word_dict) + dict_dim = len(word_dict) + return (word_dict, dict_dim) + + def conv_net(input, dict_dim, emb_dim=128, @@ -69,12 +92,10 @@ def inference_network(dict_dim): def get_reader(word_dict, batch_size): # The training data set. - train_reader = paddle.batch( - paddle.dataset.imdb.train(word_dict), batch_size=batch_size) + train_reader = paddle.batch(train(word_dict), batch_size=batch_size) # The testing data set. - test_reader = paddle.batch( - paddle.dataset.imdb.test(word_dict), batch_size=batch_size) + test_reader = paddle.batch(test(word_dict), batch_size=batch_size) return train_reader, test_reader @@ -86,8 +107,9 @@ def get_optimizer(learning_rate): class TestDistTextClassification2x2(TestDistRunnerBase): def get_model(self, batch_size=2): - word_dict = paddle.dataset.imdb.word_dict() - dict_dim = word_dict[""] + vocab = os.path.join(paddle.dataset.common.DATA_HOME, + "text_classification", "imdb.vocab") + word_dict, dict_dim = get_worddict(vocab) # Input data data = fluid.layers.data( @@ -99,8 +121,8 @@ class TestDistTextClassification2x2(TestDistRunnerBase): cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(x=cost) acc = fluid.layers.accuracy(input=predict, label=label) - inference_program = fluid.default_main_program().clone() + # Optimization opt = get_optimizer(learning_rate=0.001) opt.minimize(avg_cost) @@ -111,5 +133,81 @@ class TestDistTextClassification2x2(TestDistRunnerBase): return inference_program, avg_cost, train_reader, test_reader, acc, predict +def tokenize(pattern): + """ + Read files that match the given pattern. Tokenize and yield each file. + """ + + with tarfile.open( + paddle.dataset.common.download(DATA_URL, 'text_classification', + DATA_MD5)) as tarf: + # Note that we should use tarfile.next(), which does + # sequential access of member files, other than + # tarfile.extractfile, which does random access and might + # destroy hard disks. + tf = tarf.next() + while tf != None: + if bool(pattern.match(tf.name)): + # newline and punctuations removal and ad-hoc tokenization. + yield tarf.extractfile(tf).read().rstrip(six.b( + "\n\r")).translate( + None, six.b(string.punctuation)).lower().split() + tf = tarf.next() + + +def reader_creator(pos_pattern, neg_pattern, word_idx): + UNK = word_idx[''] + INS = [] + + def load(pattern, out, label): + for doc in tokenize(pattern): + out.append(([word_idx.get(w, UNK) for w in doc], label)) + + load(pos_pattern, INS, 0) + load(neg_pattern, INS, 1) + + def reader(): + for doc, label in INS: + yield doc, label + + return reader + + +def train(word_idx): + """ + IMDB training set creator. + + It returns a reader creator, each sample in the reader is an zero-based ID + sequence and label in [0, 1]. + + :param word_idx: word dictionary + :type word_idx: dict + :return: Training reader creator + :rtype: callable + """ + return reader_creator( + re.compile("train/pos/.*\.txt$"), + re.compile("train/neg/.*\.txt$"), word_idx) + + +def test(word_idx): + """ + IMDB test set creator. + + It returns a reader creator, each sample in the reader is an zero-based ID + sequence and label in [0, 1]. + + :param word_idx: word dictionary + :type word_idx: dict + :return: Test reader creator + :rtype: callable + """ + return reader_creator( + re.compile("test/pos/.*\.txt$"), + re.compile("test/neg/.*\.txt$"), word_idx) + + if __name__ == "__main__": + paddle.dataset.common.download(VOCAB_URL, 'text_classification', VOCAB_MD5) + paddle.dataset.common.download(DATA_URL, 'text_classification', DATA_MD5) runtime_main(TestDistTextClassification2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_inference_save.py b/python/paddle/fluid/tests/unittests/test_dist_inference_save_load.py similarity index 61% rename from python/paddle/fluid/tests/unittests/test_dist_inference_save.py rename to python/paddle/fluid/tests/unittests/test_dist_inference_save_load.py index 55fe38dc376cba07f52cb0266edbe89402ef0c05..be01e40a617bf150df72ab7244fdea079f55ae63 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_inference_save.py +++ b/python/paddle/fluid/tests/unittests/test_dist_inference_save_load.py @@ -20,19 +20,97 @@ import os import sys import signal import subprocess +import six import paddle.compat as cpt -class TestDistMnist2x2(TestDistBase): +class TestDistInferenceSaveAndLoad2x2(TestDistBase): def _setup_config(self): self._sync_mode = True - def check_with_place(self, model_file, delta=1e-3, check_error_log=False): + @staticmethod + def _save_model(dirname, feeded_var_names, target_vars): + import paddle.fluid as fluid + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + + fluid.io.save_inference_model(dirname, feeded_var_names, target_vars, + exe) + + def run_pserver(self, + pserver_endpoints, + trainers, + current_endpoint, + trainer_id, + sync_mode=True): + import paddle + import paddle.fluid as fluid + self.get_model(batch_size=2) + t = self.get_transpiler(trainer_id, + fluid.default_main_program(), pserver_endpoints, + trainers, sync_mode) + pserver_prog = t.get_pserver_program(current_endpoint) + startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup_prog) + exe.run(pserver_prog) + + def run_trainer(self, + place, + endpoints, + trainer_id, + trainers, + is_dist=True, + sync_mode=True): + import paddle + import paddle.fluid as fluid + test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ + self.get_model(batch_size=2) + if is_dist: + t = self.get_transpiler(trainer_id, + fluid.default_main_program(), endpoints, + trainers, sync_mode) + trainer_prog = t.get_trainer_program() + else: + trainer_prog = fluid.default_main_program() + + startup_exe = fluid.Executor(place) + startup_exe.run(fluid.default_startup_program()) + + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + strategy.allow_op_delay = False + exe = fluid.ParallelExecutor( + True, loss_name=avg_cost.name, exec_strategy=strategy) + + feed_var_list = [ + var for var in trainer_prog.global_block().vars.values() + if var.is_data + ] + + feeder = fluid.DataFeeder(feed_var_list, place) + reader_generator = test_reader() + + data = next(reader_generator) + first_loss, = exe.run(fetch_list=[avg_cost.name], + feed=feeder.feed(data)) + print(first_loss) + + dirname = "/tmp/simnet.infer.model" + if trainer_id == 0: + self._save_model(dirname, [], [predict]) + + def check_with_save_inference(self, + model_file, + delta=1e-3, + check_error_log=False): # *ATTENTION* THIS TEST NEEDS AT LEAST 2GPUS TO RUN required_envs = { "PATH": os.getenv("PATH"), - "PYTHONPATH": os.getenv("PYTHONPATH"), - "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH"), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), "FLAGS_fraction_of_gpu_memory_to_use": "0.15", "FLAGS_cudnn_deterministic": "1" } @@ -139,7 +217,7 @@ class TestDistMnist2x2(TestDistBase): @unittest.skip(reason="Not Ready, Debugging") def test_dist_save_inference_model(self): - self.check_with_place("dist_simnet_bow.py", delta=1e-7) + self.check_with_save_inference("dist_simnet_bow.py", delta=1e-7) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py index 647d459d975a6417256871b23cc5e17f34f13eda..0b3ef8c2632887751c85918cbad79edb7131dac0 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py @@ -22,7 +22,8 @@ class TestDistSimnetBow2x2(TestDistBase): self._sync_mode = True def test_simnet_bow(self): - self.check_with_place("dist_simnet_bow.py", delta=1e-7) + self.check_with_place( + "dist_simnet_bow.py", delta=2, check_error_log=False) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_text_classification.py b/python/paddle/fluid/tests/unittests/test_dist_text_classification.py index 40bffd4e81036b1ac91d0491a4b4734ffef8104d..7638301a035412d3cd46f311c2e9a27c53729c11 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_text_classification.py +++ b/python/paddle/fluid/tests/unittests/test_dist_text_classification.py @@ -22,7 +22,7 @@ class TestDistTextClassification2x2(TestDistBase): self._sync_mode = True def test_text_classification(self): - self.check_with_place("dist_text_classification.py", delta=1e-7) + self.check_with_place("dist_text_classification.py", delta=1e-2) if __name__ == "__main__":