From c49560da7a9ba0104e4bc614b3100f5a9d7493d4 Mon Sep 17 00:00:00 2001 From: Chengmo Date: Wed, 2 Sep 2020 18:34:42 +0800 Subject: [PATCH] supplement bug fix of parameter server (#26217) (#26909) * fix fluid.embedding --- .../distributed_lookup_table_op.cc | 25 +- .../distributed_lookup_table_op.h | 21 ++ .../fleet/runtime/parameter_server_runtime.py | 7 +- .../fleet/parameter_server/ir/public.py | 12 +- .../fleet/parameter_server/ir/trainer_pass.py | 11 +- .../fluid/tests/unittests/CMakeLists.txt | 2 - .../fluid/tests/unittests/dist_fleet_ctr.py | 7 +- ...simnet_bow.py => dist_fleet_simnet_bow.py} | 217 +++++++++--------- .../tests/unittests/simnet_dataset_reader.py | 33 +++ .../tests/unittests/test_dist_fleet_ctr.py | 35 --- .../tests/unittests/test_dist_fleet_geo.py | 4 +- .../unittests/test_dist_fleet_grad_clip.py | 4 +- .../tests/unittests/test_dist_fleet_simnet.py | 56 +++++ .../tests/unittests/test_dist_simnet_bow.py | 161 ------------- 14 files changed, 255 insertions(+), 340 deletions(-) rename python/paddle/fluid/tests/unittests/{dist_simnet_bow.py => dist_fleet_simnet_bow.py} (55%) create mode 100644 python/paddle/fluid/tests/unittests/simnet_dataset_reader.py create mode 100644 python/paddle/fluid/tests/unittests/test_dist_fleet_simnet.py delete mode 100644 python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py diff --git a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc index 8c093d12585..6dfa2670c14 100644 --- a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc +++ b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc @@ -25,25 +25,32 @@ class DistributedLookupTableOp : public framework::OperatorWithKernel { using framework::OperatorWithKernel::OperatorWithKernel; void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInputs("Ids"), - "Input(Ids) of LookupTableOp should not be null."); - PADDLE_ENFORCE(ctx->HasInput("W"), - "Input(W) of LookupTableOp should not be null."); - PADDLE_ENFORCE(ctx->HasOutputs("Outputs"), - "Output(Outs) of LookupTableOp should not be null."); + PADDLE_ENFORCE_EQ(ctx->HasInputs("Ids"), true, + platform::errors::InvalidArgument( + "Input(Ids) of LookupTableOp should not be null.")); + PADDLE_ENFORCE_EQ(ctx->HasInput("W"), true, + platform::errors::InvalidArgument( + "Input(W) of LookupTableOp should not be null.")); + PADDLE_ENFORCE_EQ(ctx->HasOutputs("Outputs"), true, + platform::errors::InvalidArgument( + "Output(Outs) of LookupTableOp should not be null.")); auto ids_dims = ctx->GetInputsDim("Ids"); auto table_dims = ctx->GetInputDim("W"); - PADDLE_ENFORCE_EQ(table_dims.size(), 2, - "Only 2 dimensions of the 'Embedding' is supported."); + PADDLE_ENFORCE_EQ( + table_dims.size(), 2, + platform::errors::InvalidArgument( + "Only 2 dimensions of the 'Embedding' is supported.")); for (auto &ids_dim : ids_dims) { PADDLE_ENFORCE_EQ(ids_dim.size(), 2, - "The dimension of the 'Ids' tensor must be 2."); + platform::errors::InvalidArgument( + "The dimension of the 'Ids' tensor must be 2.")); } auto endpoints = ctx->Attrs().Get>("endpoints"); + // for fluid.embedding auto lookup_table_version = ctx->Attrs().Get("lookup_table_version"); diff --git a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h index a71451c78a8..6387120bc87 100644 --- a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h +++ b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.h @@ -35,9 +35,30 @@ class DistributedLookupTableKernel : public framework::OpKernel { auto endpoints = context.Attr>("endpoints"); auto is_distributed = context.Attr("is_distributed"); + auto lookup_table_version = + context.Attr("lookup_table_version"); + operators::distributed::prefetchs(id_names, out_names, embedding_name, is_distributed, lookup_tables, endpoints, context, context.scope()); + + if (lookup_table_version == "lookup_table_v2") { + auto &scope = context.scope(); + auto emb_dim = + scope.FindVar(embedding_name)->Get().dims()[1]; + + for (size_t i = 0; i < id_names.size(); ++i) { + auto *id_var = scope.FindVar(id_names[i]); + auto *out_var = scope.FindVar(out_names[i]); + auto *id_tensor = id_var->GetMutable(); + auto *out_tensor = out_var->GetMutable(); + + auto id_dims = id_tensor->dims(); + out_tensor->Resize(framework::make_ddim( + {static_cast(id_dims[0]), static_cast(id_dims[1]), + static_cast(emb_dim)})); + } + } } }; diff --git a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py index 1741f10ccb1..870c3fe8be4 100644 --- a/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py +++ b/python/paddle/distributed/fleet/runtime/parameter_server_runtime.py @@ -154,15 +154,16 @@ class ParameterServerRuntime(RuntimeBase): kwargs["sparse_attrs"] = get_sparse_attrs() return kwargs - from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops + from paddle.fluid.incubate.fleet.parameter_server.ir.public import _get_lr_ops, _has_global_step from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import \ SyncStrategy, GeoStrategy trainer_config = self.async_strategy.get_trainer_runtime_config() - lrs = _get_lr_ops(self.origin_main_program) - if len(lrs) > 0: + lrs = _has_global_step(_get_lr_ops(self.origin_main_program)) + + if lrs: kwargs = {"need_global_step": "1"} else: kwargs = {"need_global_step": "0"} diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index 378c8fc23d7..216478479a7 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -42,6 +42,9 @@ op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize +SPARSE_OP_LIST = ["lookup_table", "lookup_table_v2"] +SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"} + def _get_lr_ops(program): lr_ops = [] @@ -66,7 +69,7 @@ def _has_global_step(lr_ops): def is_sparse_op(op): - if op.type == "lookup_table" and op.attr('is_sparse') is True and op.attr( + if op.type in SPARSE_OP_LIST and op.attr('is_sparse') is True and op.attr( 'is_distributed') is False: return True @@ -78,7 +81,7 @@ def is_sparse_op(op): def is_distributed_sparse_op(op): - if op.type == "lookup_table" and op.attr('is_distributed') is True: + if op.type in SPARSE_OP_LIST and op.attr('is_distributed') is True: return True if op.type == "distributed_lookup_table" and op.attr( @@ -802,11 +805,10 @@ class CompileTimeStrategy(object): def _get_sparse_varnames(): varnames = [] - op_types = {"lookup_table": "W"} for op in origin_program.global_block().ops: - if op.type in op_types.keys() \ + if op.type in SPARSE_OP_TYPE_DICT.keys() \ and op.attr('remote_prefetch') is True: - param_name = op.input(op_types[op.type])[0] + param_name = op.input(SPARSE_OP_TYPE_DICT[op.type])[0] varnames.append(param_name) return list(set(varnames)) diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 201b3863a4b..5e6b8ca6399 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -40,6 +40,8 @@ LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() +SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"} + DEVICE_LIST = ["cpu", "gpu", "xpu"] COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"] DEFAULT_DEVICE = 'cpu' @@ -81,11 +83,10 @@ def distributed_ops_pass(program, config): def _get_pull_sparse_ops(_program): pull_sparse_ops = {} - op_types = {"lookup_table": "W"} for op in _program.global_block().ops: - if op.type in op_types.keys() \ + if op.type in SPARSE_OP_TYPE_DICT.keys() \ and op.attr('remote_prefetch') is True: - param_name = op.input(op_types[op.type])[0] + param_name = op.input(SPARSE_OP_TYPE_DICT[op.type])[0] ops = pull_sparse_ops.get(param_name, []) ops.append(op) pull_sparse_ops[param_name] = ops @@ -101,6 +102,7 @@ def distributed_ops_pass(program, config): w = program.global_block().vars[ops[0].input("W")[0]] padding_idx = ops[0].attr("padding_idx") is_distributed = ops[0].attr("is_distributed") + op_type = ops[0].type outputs = [ program.global_block().vars[op.output("Out")[0]] for op in ops @@ -149,7 +151,8 @@ def distributed_ops_pass(program, config): "is_distributed": is_distributed, "pserver_num": len(pserver_endpoints), "padding_idx": padding_idx, - "trainer_id": trainer_id + "trainer_id": trainer_id, + "lookup_table_version": op_type }) else: raise ValueError( diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 6220bf62c79..b78c597de83 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -432,8 +432,6 @@ if(WITH_DISTRIBUTE) list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_lars") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_train") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_save_load") - list(REMOVE_ITEM DIST_TEST_OPS "test_dist_simnet_bow") - list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_ctr") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_text_classification") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_train") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_word2vec") diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 73b546b95cf..dc39472d7ae 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -196,8 +196,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): fleet.stop_worker() def do_dataset_training(self, fleet): - dnn_input_dim, lr_input_dim, train_file_path = ctr_dataset_reader.prepare_data( - ) + train_file_list = ctr_dataset_reader.prepare_fake_data() exe = fluid.Executor(fluid.CPUPlace()) @@ -206,9 +205,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): thread_num = 2 batch_size = 128 - filelist = [] - for _ in range(thread_num): - filelist.append(train_file_path) + filelist = train_file_list # config dataset dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() diff --git a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/dist_fleet_simnet_bow.py similarity index 55% rename from python/paddle/fluid/tests/unittests/dist_simnet_bow.py rename to python/paddle/fluid/tests/unittests/dist_fleet_simnet_bow.py index 9fcba2aede1..7d5ca4fc6e3 100644 --- a/python/paddle/fluid/tests/unittests/dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_simnet_bow.py @@ -19,6 +19,8 @@ import argparse import time import math import random +import shutil +import tempfile import paddle import paddle.fluid as fluid @@ -29,7 +31,8 @@ from multiprocessing import Process import os import signal from functools import reduce -from test_dist_base import TestDistRunnerBase, runtime_main +from test_dist_fleet_base import runtime_main, FleetDistRunnerBase +from paddle.distributed.fleet.base.util_factory import fleet_util DTYPE = "int64" DATA_URL = 'http://paddle-dist-ce-data.bj.bcebos.com/simnet.train.1000' @@ -49,6 +52,18 @@ fluid.default_startup_program().random_seed = 1 fluid.default_main_program().random_seed = 1 +def fake_simnet_reader(): + def reader(): + for _ in range(1000): + q = np.random.random_integers(0, 1500 - 1, size=1).tolist() + label = np.random.random_integers(0, 1, size=1).tolist() + pt = np.random.random_integers(0, 1500 - 1, size=1).tolist() + nt = np.random.random_integers(0, 1500 - 1, size=1).tolist() + yield [q, label, pt, nt] + + return reader + + def get_acc(cos_q_nt, cos_q_pt, batch_size): cond = fluid.layers.less_than(cos_q_nt, cos_q_pt) cond = fluid.layers.cast(cond, dtype='float64') @@ -75,34 +90,40 @@ def get_loss(cos_q_pt, cos_q_nt): return avg_cost -def get_optimizer(op="sgd"): - if op.upper() == "sgd".upper(): - optimizer = fluid.optimizer.SGD(learning_rate=base_lr) - elif op.upper() == "adam".upper(): - optimizer = fluid.optimizer.Adam(learning_rate=base_lr) - else: - optimizer = fluid.optimizer.SGD(learning_rate=base_lr) - return optimizer - - def train_network(batch_size, is_distributed=False, is_sparse=False, - is_self_contained_lr=False): + is_self_contained_lr=False, + is_pyreader=False): # query q = fluid.layers.data( name="query_ids", shape=[1], dtype="int64", lod_level=1) + # label data + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + # pt + pt = fluid.layers.data( + name="pos_title_ids", shape=[1], dtype="int64", lod_level=1) + # nt + nt = fluid.layers.data( + name="neg_title_ids", shape=[1], dtype="int64", lod_level=1) + + datas = [q, label, pt, nt] + + reader = None + if is_pyreader: + reader = fluid.io.PyReader( + feed_list=datas, + capacity=64, + iterable=False, + use_double_buffer=False) + # embedding q_emb = fluid.embedding( input=q, is_distributed=is_distributed, size=[dict_dim, emb_dim], param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=0.01), - name="__emb__", - learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=0.01), - name="__emb__"), + initializer=fluid.initializer.Constant(value=0.01), name="__emb__"), is_sparse=is_sparse) q_emb = fluid.layers.reshape(q_emb, [-1, emb_dim]) # vsum @@ -115,12 +136,8 @@ def train_network(batch_size, param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__q_fc__", - learning_rate=base_lr)) - # label data - label = fluid.layers.data(name="label", shape=[1], dtype="int64") - # pt - pt = fluid.layers.data( - name="pos_title_ids", shape=[1], dtype="int64", lod_level=1) + learning_rate=base_lr), ) + # embedding pt_emb = fluid.embedding( input=pt, @@ -129,9 +146,7 @@ def train_network(batch_size, param_attr=fluid.ParamAttr( initializer=fluid.initializer.Constant(value=0.01), name="__emb__", - learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=0.01), - name="__emb__"), + learning_rate=emb_lr), is_sparse=is_sparse) pt_emb = fluid.layers.reshape(pt_emb, [-1, emb_dim]) # vsum @@ -142,24 +157,16 @@ def train_network(batch_size, input=pt_ss, size=hid_dim, param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=0.01), - name="__fc__", - learning_rate=base_lr), + initializer=fluid.initializer.Constant(value=0.01), name="__fc__"), bias_attr=fluid.ParamAttr(name="__fc_b__")) - # nt - nt = fluid.layers.data( - name="neg_title_ids", shape=[1], dtype="int64", lod_level=1) + # embedding nt_emb = fluid.embedding( input=nt, is_distributed=is_distributed, size=[dict_dim, emb_dim], param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=0.01), - name="__emb__", - learning_rate=emb_lr) if is_self_contained_lr else fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=0.01), - name="__emb__"), + initializer=fluid.initializer.Constant(value=0.01), name="__emb__"), is_sparse=is_sparse) nt_emb = fluid.layers.reshape(nt_emb, [-1, emb_dim]) # vsum @@ -170,9 +177,7 @@ def train_network(batch_size, input=nt_ss, size=hid_dim, param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=0.01), - name="__fc__", - learning_rate=base_lr), + initializer=fluid.initializer.Constant(value=0.01), name="__fc__"), bias_attr=fluid.ParamAttr(name="__fc_b__")) cos_q_pt = fluid.layers.cos_sim(q_fc, pt_fc) cos_q_nt = fluid.layers.cos_sim(q_fc, nt_fc) @@ -180,79 +185,67 @@ def train_network(batch_size, avg_cost = get_loss(cos_q_pt, cos_q_nt) # acc acc = get_acc(cos_q_nt, cos_q_pt, batch_size) - return [avg_cost, acc, cos_q_pt] - - -def combination(x, y): - res = [[[xi, yi] for yi in y] for xi in x] - return res[0] - - -def get_one_data(file_list): - for file in file_list: - contents = [] - with open(file, "r") as fin: - for i in fin: - contents.append(i.strip()) - for index, q in enumerate(contents): - try: - one_data = [[int(j) for j in i.split(" ")] - for i in q.split(";")[:-1]] - if one_data[1][0] + one_data[1][1] != len(one_data) - 3: - q = fin.readline() - continue - tmp = combination(one_data[3:3 + one_data[1][0]], - one_data[3 + one_data[1][0]:]) - except Exception as e: - continue - - for each in tmp: - yield [one_data[2], 0, each[0], each[1]] - - -def get_batch_reader(file_list, batch_size): - def batch_reader(): - res = [] - for i in get_one_data(file_list): - if random.random() <= sample_rate: - res.append(i) - if len(res) >= batch_size: - yield res - res = [] - - return batch_reader - - -def get_train_reader(batch_size): - # The training data set. - train_file = os.path.join(paddle.dataset.common.DATA_HOME, "simnet", - "train") - train_reader = get_batch_reader([train_file], batch_size) - train_feed = ["query_ids", "pos_title_ids", "neg_title_ids", "label"] - return train_reader, train_feed - - -class TestDistSimnetBow2x2(TestDistRunnerBase): - def get_model(self, batch_size=2): - # Train program - avg_cost, acc, predict = \ - train_network(batch_size, - bool(int(os.environ["IS_DISTRIBUTED"])), - bool(int(os.environ["IS_SPARSE"])), - bool(int(os.environ["IS_SELF_CONTAINED_LR"]))) - - inference_program = fluid.default_main_program().clone() - - # Optimization - opt = os.getenv('OPTIMIZER', 'sgd') - opt = get_optimizer(opt) - opt.minimize(avg_cost) - - # Reader - train_reader, _ = get_train_reader(batch_size) - return inference_program, avg_cost, train_reader, train_reader, acc, predict + return avg_cost, acc, cos_q_pt, reader + + +class TestDistSimnetBow2x2(FleetDistRunnerBase): + """ + For test SimnetBow model, use Fleet api + """ + + def net(self, args, batch_size=4, lr=0.01): + avg_cost, _, predict, self.reader = \ + train_network(batch_size=batch_size, is_distributed=False, + is_sparse=True, is_self_contained_lr=False, is_pyreader=(args.reader == "pyreader")) + self.avg_cost = avg_cost + self.predict = predict + + return avg_cost + + def check_model_right(self, dirname): + model_filename = os.path.join(dirname, "__model__") + + with open(model_filename, "rb") as f: + program_desc_str = f.read() + + program = fluid.Program.parse_from_string(program_desc_str) + with open(os.path.join(dirname, "__model__.proto"), "w") as wn: + wn.write(str(program)) + + def do_pyreader_training(self, fleet): + """ + do training using dataset, using fetch handler to catch variable + Args: + fleet(Fleet api): the fleet object of Parameter Server, define distribute training role + """ + + exe = fluid.Executor(fluid.CPUPlace()) + fleet.init_worker() + exe.run(fluid.default_startup_program()) + batch_size = 4 + # reader + train_reader = paddle.batch(fake_simnet_reader(), batch_size=batch_size) + self.reader.decorate_sample_list_generator(train_reader) + for epoch_id in range(1): + self.reader.start() + try: + pass_start = time.time() + while True: + loss_val = exe.run(program=fluid.default_main_program(), + fetch_list=[self.avg_cost.name]) + loss_val = np.mean(loss_val) + message = "TRAIN ---> pass: {} loss: {}\n".format(epoch_id, + loss_val) + fleet_util.print_on_rank(message, 0) + + pass_time = time.time() - pass_start + except fluid.core.EOFException: + self.reader.reset() + fleet.stop_worker() + + def do_dataset_training(self, fleet): + pass if __name__ == "__main__": - paddle.dataset.common.download(DATA_URL, 'simnet', DATA_MD5, "train") runtime_main(TestDistSimnetBow2x2) diff --git a/python/paddle/fluid/tests/unittests/simnet_dataset_reader.py b/python/paddle/fluid/tests/unittests/simnet_dataset_reader.py new file mode 100644 index 00000000000..41eadc13a2a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/simnet_dataset_reader.py @@ -0,0 +1,33 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import logging +import tarfile + +import random + +import paddle +import paddle.fluid.incubate.data_generator as data_generator + +logging.basicConfig() +logger = logging.getLogger("paddle") +logger.setLevel(logging.INFO) + + +class DatasetSimnetReader(data_generator.MultiSlotDataGenerator): + def generate_sample(self, line): + pass diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index b506f179143..e2336caac1c 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -156,40 +156,5 @@ class TestDistCtrHalfAsync2x2(TestFleetBase): "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) -class TestDistCtrPsGpuPyreaderAsync2x2(TestFleetBase): - def _setup_config(self): - self._mode = "async" - self._reader = "pyreader" - - def check_with_place(self, - model_file, - delta=1e-3, - check_error_log=False, - need_envs={}): - required_envs = { - "PATH": os.getenv("PATH", ""), - "PYTHONPATH": os.getenv("PYTHONPATH", ""), - "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), - "FLAGS_rpc_deadline": "30000", # 5sec to fail fast - "http_proxy": "", - "FLAGS_communicator_send_queue_size": "2", - "FLAGS_communicator_max_merge_var_num": "2", - "CPU_NUM": "2", - "SAVE_MODEL": "1" - } - - required_envs.update(need_envs) - - if check_error_log: - required_envs["GLOG_v"] = "3" - required_envs["GLOG_logtostderr"] = "1" - - tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) - - def test_dist_train(self): - self.check_with_place( - "dist_fleet_ctr_ps_gpu.py", delta=1e-5, check_error_log=True) - - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py index 0fe7c386c1e..7d18e935f58 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_geo.py @@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from test_dist_fleet_base import TestFleetBase -from dist_simnet_bow import train_network +from dist_fleet_simnet_bow import train_network class TestDistGeoCtr_2x2(TestFleetBase): @@ -72,7 +72,7 @@ class TestGeoSgdTranspiler(unittest.TestCase): strategy = StrategyFactory.create_geo_strategy(5) - avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse) + avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse) optimizer = fluid.optimizer.SGD(0.1) optimizer = fleet.distributed_optimizer(optimizer, strategy) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py index 46616f3dde4..3c68af474cf 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py @@ -21,7 +21,7 @@ import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig from test_dist_fleet_base import TestFleetBase -from dist_simnet_bow import train_network +from dist_fleet_simnet_bow import train_network @unittest.skip(reason="Skip unstable ut, add it after PR 22957 merged") @@ -44,7 +44,7 @@ class TestDistGeoClipByGlobalNormTranspiler(unittest.TestCase): strategy.geo_sgd_mode = True strategy.geo_sgd_need_push_nums = 5 - avg_cost, _, _ = train_network(batch_size, is_distribute, is_sparse) + avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse) fluid.clip.set_gradient_clip( clip=fluid.clip.GradientClipByGlobalNorm(2.0)) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_simnet.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_simnet.py new file mode 100644 index 00000000000..ec34993905e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_simnet.py @@ -0,0 +1,56 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import unittest +import tempfile +from test_dist_fleet_base import TestFleetBase + + +class TestDistSimnetASync2x2(TestFleetBase): + def _setup_config(self): + self._mode = "async" + self._reader = "pyreader" + + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "", + "CPU_NUM": "2" + } + + required_envs.update(need_envs) + + if check_error_log: + required_envs["GLOG_v"] = "3" + required_envs["GLOG_logtostderr"] = "1" + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) + + def test_dist_train(self): + self.check_with_place( + "dist_fleet_simnet_bow.py", delta=1e-5, check_error_log=True) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py deleted file mode 100644 index 3189f092413..00000000000 --- a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from __future__ import print_function - -import os -import unittest - -from test_dist_base import TestDistBase - -import os -flag_name = os.path.splitext(__file__)[0] - - -class TestDistSimnetBowDense2x2(TestDistBase): - def _setup_config(self): - self._sync_mode = True - self._enforce_place = "CPU" - - def test_simnet_bow(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '0', - 'IS_SELF_CONTAINED_LR': '1' - } - self.check_with_place( - "dist_simnet_bow.py", - delta=1e-5, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name) - - -class TestDistSimnetBow2x2DenseAsync(TestDistBase): - def _setup_config(self): - self._sync_mode = False - self._enforce_place = "CPU" - - # FIXME(typhoonzero): fix async tests later - def notest_simnet_bow(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '0', - 'IS_SELF_CONTAINED_LR': '1', - } - self.check_with_place( - "dist_simnet_bow.py", - delta=100, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name) - - -class TestDistSimnetBowSparse2x2(TestDistBase): - def _setup_config(self): - self._sync_mode = True - self._enforce_place = "CPU" - - def test_simnet_bow(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '1', - 'IS_SELF_CONTAINED_LR': '1' - } - self.check_with_place( - "dist_simnet_bow.py", - delta=1e-5, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name) - - -class TestDistSimnetBow2x2SparseAsync(TestDistBase): - def _setup_config(self): - self._sync_mode = False - self._enforce_place = "CPU" - - def test_simnet_bow(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '1', - 'IS_SELF_CONTAINED_LR': '1' - } - self.check_with_place( - "dist_simnet_bow.py", - delta=100, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name) - - -# FIXME(tangwei): Learningrate variable is not created on pserver. -class TestDistSimnetBow2x2LookupTableSync(TestDistBase): - def _setup_config(self): - self._sync_mode = True - self._enforce_place = "CPU" - - def test_simnet_bow(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '1', - 'IS_SELF_CONTAINED_LR': '1' - } - self.check_with_place( - "dist_simnet_bow.py", - delta=1e-5, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name) - - -class TestDistSimnetBow2x2LookupTableAsync(TestDistBase): - def _setup_config(self): - self._sync_mode = False - self._enforce_place = "CPU" - - def test_simnet_bow(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '1', - 'IS_SELF_CONTAINED_LR': '1' - } - self.check_with_place( - "dist_simnet_bow.py", - delta=100, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name) - - -class TestDistSimnetBow2x2LookupTableNotContainLRSync(TestDistBase): - def _setup_config(self): - self._sync_mode = True - self._enforce_place = "CPU" - - def test_simnet_bow(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '1', - 'IS_SELF_CONTAINED_LR': '0' - } - self.check_with_place( - "dist_simnet_bow.py", - delta=1e-5, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name) - - -if __name__ == "__main__": - unittest.main() -- GitLab