diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index 1cffb16a0a4ffdde35897e667b00d368a6349a5f..34472d30cb6c8810dc8af90d53456f6ab93842d2 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -385,6 +385,7 @@ void Communicator::SendGlobalStep(const CommContext &ctx, int batches, if (batches == 0) { return; } + platform::RecordEvent record_event("Communicator->SendGlobalStep"); auto &table_id = ctx.table_id; size_t request_call_num = _worker_ptr->get_server_nums(); @@ -788,6 +789,7 @@ void SyncCommunicator::BarrierRecv() { void GeoCommunicator::Send(const std::vector &var_names, const framework::Scope &scope) { + platform::RecordEvent record_event("GeoCommunicator->Send"); waiting_ = false; auto before_send = GetCurrentUS(); auto table_name = var_names[0]; @@ -1024,6 +1026,7 @@ void GeoCommunicator::InitSparse(const std::string &var_name, int table_id) { std::vector GeoCommunicator::MergeSparseIds( const std::string &send_varname) { + platform::RecordEvent record_event("GeoCommunicator->MergeSparseIds"); size_t merge_num = 0, wait_times = 0; std::unordered_set sparse_ids; while (merge_num < static_cast(max_merge_var_num_)) { diff --git a/paddle/fluid/distributed/table/common_dense_table.cc b/paddle/fluid/distributed/table/common_dense_table.cc index 96bc104cb812bed2887961210dfb8a99cf182b66..13c177453ffe8fc7d51d26ac90b816886363f45c 100644 --- a/paddle/fluid/distributed/table/common_dense_table.cc +++ b/paddle/fluid/distributed/table/common_dense_table.cc @@ -28,6 +28,8 @@ void CommonDenseTable::create_initializer(const std::string& attr, initializers_[name] = new FillConstantInitializer(slices); } else if (slices[0] == "uniform_random") { initializers_[name] = new UniformInitializer(slices); + } else if (slices[0] == "truncated_gaussian_random") { + initializers_[name] = new TruncatedGaussianInitializer(slices); } else { PADDLE_THROW( platform::errors::InvalidArgument("%s can not be supported", name)); diff --git a/paddle/fluid/distributed/table/depends/initializers.h b/paddle/fluid/distributed/table/depends/initializers.h index 8d45e83f92d85ff6671a600689524437344de34e..494565e866a47af51cf30254be1444f106b89a83 100644 --- a/paddle/fluid/distributed/table/depends/initializers.h +++ b/paddle/fluid/distributed/table/depends/initializers.h @@ -17,12 +17,15 @@ #include #include #include +#include #include #include #include #include "paddle/fluid/framework/generator.h" +#include "paddle/fluid/operators/truncated_gaussian_random_op.h" + namespace paddle { namespace distributed { @@ -108,6 +111,40 @@ class GaussianInitializer : public Initializer { std::normal_distribution dist_; }; +class TruncatedGaussianInitializer : public Initializer { + public: + explicit TruncatedGaussianInitializer(const std::vector &attrs) { + name_ = attrs[0]; + seed_ = static_cast(std::stoi(attrs[1])); + mean_ = std::stof(attrs[2]); + std_ = std::stof(attrs[3]); + + std::uniform_real_distribution dist_( + std::numeric_limits::min(), 1.0); + random_engine_ = framework::GetCPURandomEngine(seed_); + } + + float GetValue() override { + paddle::operators::TruncatedNormal truncated_normal(mean_, std_); + float value = truncated_normal(dist_(*random_engine_)); + return value; + } + + void GetValue(float *value, int numel) { + paddle::operators::TruncatedNormal truncated_normal(mean_, std_); + for (int x = 0; x < numel; ++x) { + value[x] = truncated_normal(dist_(*random_engine_)); + } + } + + private: + float std_; + float mean_; + + std::shared_ptr random_engine_; + std::uniform_real_distribution dist_; +}; + class FillConstantInitializer : public Initializer { public: explicit FillConstantInitializer(const std::vector &attrs) { diff --git a/paddle/fluid/distributed/table/depends/large_scale_kv.h b/paddle/fluid/distributed/table/depends/large_scale_kv.h index 37ceb650664757c01062d4efb741ad9e8a852cfe..306e8e07beaf53505bc454513687b62d55f7cc67 100644 --- a/paddle/fluid/distributed/table/depends/large_scale_kv.h +++ b/paddle/fluid/distributed/table/depends/large_scale_kv.h @@ -125,6 +125,9 @@ class ValueBlock { } else if (slices[0] == "uniform_random") { initializers_.emplace_back( std::make_shared(slices)); + } else if (slices[0] == "truncated_gaussian_random") { + initializers_.emplace_back( + std::make_shared(slices)); } else { PADDLE_THROW(platform::errors::InvalidArgument( "%s can not be supported", attr)); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 3ddd7cc91823d17c1e63e720ea189466c8d4061b..c36044873db3a3fe90a14fd271679afbd5cb5516 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -1121,8 +1121,6 @@ void ParallelExecutor::BCastParamsToDevices( FetchResultType ParallelExecutor::Run( const std::vector &fetch_tensors, bool return_merged) { VLOG(3) << "enter ParallelExecutor Run"; - platform::RecordEvent parallel_executor_event( - "ParallelExecutor::Run", paddle::platform::EventRole::kSpecial); #ifdef WITH_GPERFTOOLS if (gProfileStarted) { ProfilerFlush(); diff --git a/python/paddle/distributed/fleet/data_generator/data_generator.py b/python/paddle/distributed/fleet/data_generator/data_generator.py index 669d2ea24a0c788bbe1c0cff38a843bd96e29016..9d743fc38bf3982454bd0e27779567bfb9f0717e 100644 --- a/python/paddle/distributed/fleet/data_generator/data_generator.py +++ b/python/paddle/distributed/fleet/data_generator/data_generator.py @@ -32,11 +32,11 @@ class DataGenerator(object): ''' Set batch size of current DataGenerator This is necessary only if a user wants to define generator_batch - + Example: .. code-block:: python - + import paddle.distributed.fleet.data_generator as dg class MyData(dg.DataGenerator): @@ -52,7 +52,7 @@ class DataGenerator(object): yield ("words", s[1].extend([s[1][0]])) mydata = MyData() mydata.set_batch(128) - + ''' self.batch_size_ = batch_size @@ -63,7 +63,7 @@ class DataGenerator(object): Example: .. code-block:: python - + import paddle.distributed.fleet.data_generator as dg class MyData(dg.DataGenerator): @@ -100,9 +100,9 @@ class DataGenerator(object): generated. Example: - + .. code-block:: python - + import paddle.distributed.fleet.data_generator as dg class MyData(dg.DataGenerator): @@ -161,7 +161,7 @@ class DataGenerator(object): The data format is list or tuple: [(name, [feasign, ...]), ...] or ((name, [feasign, ...]), ...) - + For example: [("words", [1926, 08, 17]), ("label", [1])] or (("words", [1926, 08, 17]), ("label", [1])) @@ -174,7 +174,7 @@ class DataGenerator(object): Example: .. code-block:: python - + import paddle.distributed.fleet.data_generator as dg class MyData(dg.DataGenerator): @@ -206,7 +206,7 @@ class DataGenerator(object): Example: .. code-block:: python - + import paddle.distributed.fleet.data_generator as dg class MyData(dg.DataGenerator): @@ -259,6 +259,9 @@ class MultiSlotStringDataGenerator(DataGenerator): Returns: Return a string data that can be read directly by the MultiSlotDataFeed. ''' + if sys.version > '3' and isinstance(line, zip): + line = list(line) + if not isinstance(line, list) and not isinstance(line, tuple): raise ValueError( "the output of process() must be in list or tuple type" @@ -289,7 +292,7 @@ class MultiSlotDataGenerator(DataGenerator): >>> [ids_num id1 id2 ...] ... The proto_info will be in this format: >>> [(name, type), ...] - + For example, if the input is like this: >>> [("words", [1926, 08, 17]), ("label", [1])] >>> or (("words", [1926, 08, 17]), ("label", [1])) @@ -304,6 +307,9 @@ class MultiSlotDataGenerator(DataGenerator): Returns: Return a string data that can be read directly by the MultiSlotDataFeed. ''' + if sys.version > '3' and isinstance(line, zip): + line = list(line) + if not isinstance(line, list) and not isinstance(line, tuple): raise ValueError( "the output of process() must be in list or tuple type" diff --git a/python/paddle/distributed/fleet/runtime/the_one_ps.py b/python/paddle/distributed/fleet/runtime/the_one_ps.py index 17ac6c021afe99222de1bac555719161e5db38ca..aa7df57e3c58bdc0c1cce80f7e03b972bd91f5d6 100644 --- a/python/paddle/distributed/fleet/runtime/the_one_ps.py +++ b/python/paddle/distributed/fleet/runtime/the_one_ps.py @@ -150,7 +150,8 @@ class CommonAccessor: oop = None for op in optimizer_ops: - if op.input("Param")[0] == param_name: + if ("Param" in op.input_names) and ( + op.input("Param")[0] == param_name): oop = op break diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index b987e01bba46ec1dff516381bc534d384b928c9e..baf8add04caad086fd2de26e3dd7f3dd55d4feba 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -31,7 +31,7 @@ from paddle.fluid.incubate.fleet.parameter_server.ir.ps_dispatcher import RoundR from paddle.fluid.transpiler.details.program_utils import delete_ops OP_NAME_SCOPE = "op_namescope" -CLIP_OP_NAME_SCOPE = "@CLIP" +CLIP_OP_NAME_SCOPE = "gradient_clip" STEP_COUNTER = "@PS_STEP_COUNTER@" LEARNING_RATE_DECAY_COUNTER = "@LR_DECAY_COUNTER@" diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py index 2292d4c0a4d6f311976eb040dbc7e1a003c8d07a..08e64c15c483b73943a50dc386a7302ae875480d 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py @@ -32,7 +32,7 @@ from paddle.fluid.incubate.fleet.parameter_server.ir.public import get_sparse_ta from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode OP_NAME_SCOPE = "op_namescope" -CLIP_OP_NAME_SCOPE = "@CLIP" +CLIP_OP_NAME_SCOPE = "gradient_clip" STEP_COUNTER = "@PS_STEP_COUNTER@" OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName() RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName() diff --git a/python/paddle/fluid/tests/unittests/test_data_generator.py b/python/paddle/fluid/tests/unittests/test_data_generator.py index 6381cb364026369734f1de3747d50cc1ca17d5ef..69d8e01fd464afc724d286740b6c8f42929dd387 100644 --- a/python/paddle/fluid/tests/unittests/test_data_generator.py +++ b/python/paddle/fluid/tests/unittests/test_data_generator.py @@ -95,6 +95,32 @@ class MyMultiSlotDataGenerator_error_5(fleet.MultiSlotDataGenerator): return data_iter +class MyMultiSlotStringDataGenerator_zip(fleet.MultiSlotStringDataGenerator): + def generate_sample(self, line): + def data_iter(): + for i in range(40): + if i == 1: + yield None + feature_name = ["words", "label"] + data = [["1", "2", "3", "4"], ["0"]] + yield zip(feature_name, data) + + return data_iter + + +class MyMultiSlotDataGenerator_zip(fleet.MultiSlotDataGenerator): + def generate_sample(self, line): + def data_iter(): + for i in range(40): + if i == 1: + yield None + feature_name = ["words", "label"] + data = [[1, 2, 3, 4], [0]] + yield zip(feature_name, data) + + return data_iter + + class TestMultiSlotDataGenerator(unittest.TestCase): def test_MultiSlotDataGenerator_basic(self): my_ms_dg = MyMultiSlotDataGenerator() @@ -149,5 +175,19 @@ class TestMultiSlotDataGenerator_error_5(unittest.TestCase): my_ms_dg.run_from_memory() +class TestMultiSlotStringDataGeneratorZip(unittest.TestCase): + def test_MultiSlotStringDataGenerator_zip(self): + my_ms_dg = MyMultiSlotStringDataGenerator_zip() + my_ms_dg.set_batch(1) + my_ms_dg.run_from_memory() + + +class TestMultiSlotDataGeneratorZip(unittest.TestCase): + def test_MultiSlotDataGenerator_zip(self): + my_ms_dg = MyMultiSlotDataGenerator_zip() + my_ms_dg.set_batch(1) + my_ms_dg.run_from_memory() + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index 03d7251f8292fb0bdd1c34763ad2fbdbd1cf5707..e84e91de0ba79ac195540dce620034e30e70f0d1 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -18,6 +18,7 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distribu import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker import paddle.fluid as fluid +import paddle """ high level unit test for distribute fleet. """ @@ -112,23 +113,21 @@ class FleetDistRunnerBase(object): def build_optimizer(self, avg_cost, strategy): use_grad_clip = int(os.getenv('GRAD_CLIP', 0)) + grad_clip = None if use_grad_clip: # 1: clip_by_value; 2: clip_by_norm; 3:clip_by_global_norm if use_grad_clip == 1: - fluid.clip.set_gradient_clip( - clip=fluid.clip.GradientClipByValue(2.0)) + grad_clip = paddle.nn.ClipGradByValue(min=-5.0, max=5.0) elif use_grad_clip == 2: - fluid.clip.set_gradient_clip( - clip=fluid.clip.GradientClipByNorm(2.0)) + grad_clip = paddle.nn.ClipGradByNorm(2.0) elif use_grad_clip == 3: - fluid.clip.set_gradient_clip( - clip=fluid.clip.GradientClipByGlobalNorm(2.0)) + grad_clip = paddle.nn.ClipGradByGlobalNorm(2.0) use_decay = int(os.getenv("USE_DECAY", "0")) if use_decay: scheduler = paddle.optimizer.lr.ExponentialDecay( learning_rate=LEARNING_RATE, gamma=0.999, verbose=True) - optimizer = fluid.optimizer.SGD(scheduler) + optimizer = fluid.optimizer.SGD(scheduler, grad_clip=grad_clip) """ # learning rate decay method before 2.0 optimizer = fluid.optimizer.SGD( @@ -139,7 +138,7 @@ class FleetDistRunnerBase(object): staircase=True)) """ else: - optimizer = fluid.optimizer.SGD(LEARNING_RATE) + optimizer = fluid.optimizer.SGD(LEARNING_RATE, grad_clip=grad_clip) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) optimizer.minimize(avg_cost) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py index 3c68af474cf7cae96a9fa62688460f84123438f5..f9509d60072f877f0713d6da23018153dc138304 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_grad_clip.py @@ -16,53 +16,66 @@ from __future__ import print_function import os import unittest -import paddle.fluid as fluid -import paddle.fluid.incubate.fleet.base.role_maker as role_maker -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig from test_dist_fleet_base import TestFleetBase -from dist_fleet_simnet_bow import train_network -@unittest.skip(reason="Skip unstable ut, add it after PR 22957 merged") -class TestDistGeoClipByGlobalNormTranspiler(unittest.TestCase): - def test_pserver(self): - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.SERVER, - worker_num=2, - server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"]) +class TestDistGeoClipByGlobalNorm(TestFleetBase): + def _setup_config(self): + self._mode = "geo" + self._reader = "dataset" + self._geo_sgd_need_push_nums = 5 + self._grad_clip_mode = 3 - fleet.init(role) + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "" + } + required_envs.update(need_envs) - batch_size = 128 - is_sparse = True - is_distribute = False + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) - strategy = DistributeTranspilerConfig() - strategy.sync_mode = False - strategy.geo_sgd_mode = True - strategy.geo_sgd_need_push_nums = 5 + def test_dist_train(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) - avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse) - fluid.clip.set_gradient_clip( - clip=fluid.clip.GradientClipByGlobalNorm(2.0)) + def _setup_config(self): + self._sync_mode = False + self._grad_clip_mode = 2 - optimizer = fluid.optimizer.SGD(0.1) - optimizer = fleet.distributed_optimizer(optimizer, strategy) - optimizer.minimize(avg_cost) + def check_with_place(self, + model_file, + delta=1e-3, + check_error_log=False, + need_envs={}): + required_envs = { + "PATH": os.getenv("PATH", ""), + "PYTHONPATH": os.getenv("PYTHONPATH", ""), + "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), + "FLAGS_rpc_deadline": "5000", # 5sec to fail fast + "http_proxy": "" + } + required_envs.update(need_envs) + + tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs) - pserver_startup_program = fleet.startup_program - pserver_mian_program = fleet.main_program + def test_dist_train(self): + self.check_with_place( + "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) -@unittest.skip(reason="Skip unstable ut, add it after PR 22957 merged") -class TestDistGeoClipByGlobalNorm(TestFleetBase): +class TestDistASyncClipByValue(TestFleetBase): def _setup_config(self): - self._mode = "geo" + self._mode = "async" self._reader = "dataset" - self._geo_sgd_need_push_nums = 5 - self._grad_clip_mode = 3 + self._grad_clip_mode = 1 def check_with_place(self, model_file, @@ -84,8 +97,11 @@ class TestDistGeoClipByGlobalNorm(TestFleetBase): self.check_with_place( "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) + +class TestDistASyncClipByNorm(TestFleetBase): def _setup_config(self): - self._sync_mode = False + self._mode = "async" + self._reader = "dataset" self._grad_clip_mode = 2 def check_with_place(self, @@ -109,7 +125,6 @@ class TestDistGeoClipByGlobalNorm(TestFleetBase): "dist_fleet_ctr.py", delta=1e-5, check_error_log=True) -@unittest.skip(reason="Skip unstable ut, add it after PR 22957 merged") class TestDistASyncClipByGlobalNorm(TestFleetBase): def _setup_config(self): self._mode = "async"