diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 5d293665f0bcc098126ad3ec6c9bf34ff54c3b6f..a4c925b538ef916e88ec06cea6de57f31eaf069b 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include #include #include // NOLINT #include @@ -67,7 +68,7 @@ ListenAndServOp::ListenAndServOp(const std::string &type, const framework::AttributeMap &attrs) : OperatorBase(type, inputs, outputs, attrs) {} -int ListenAndServOp::GetSelectedPort() { +int ListenAndServOp::GetSelectedPort() const { return rpc_service_->GetSelectedPort(); } @@ -99,7 +100,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, framework::Executor executor(dev_place); std::vector block_list; for (size_t blkid = 1; blkid < num_blocks; ++blkid) { - if (blkid != prefetch_block->ID()) { + if (blkid != static_cast(prefetch_block->ID())) { block_list.push_back(blkid); } } @@ -121,10 +122,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_->SetProgram(program); // start the server listening after all member initialized. server_thread_.reset(new std::thread(RunServer, rpc_service_)); - // FIXME(typhoonzero): do we need to wait until the server port is ready? + VLOG(3) << "wait server thread to become ready..."; sleep(5); + // Write to a file of server selected port for python use. + std::ofstream port_file; + port_file.open("/tmp/paddle.selected_port"); + port_file << rpc_service_->GetSelectedPort(); + port_file.close(); - // TODO(typhoonzero): change this to a while_op for every cluster-batch. bool exit_flag = false; // Record received sparse variables, so that // we could reset those after execute optimize program @@ -175,7 +180,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, parallel_blkids.push_back(1); double ts = detail::GetTimestamp(); for (size_t blkid = 2; blkid < num_blocks; ++blkid) { - if (blkid != prefetch_block->ID()) { + if (blkid != static_cast(prefetch_block->ID())) { if (program->Block(blkid).Parent() != last_parent_blkid) { ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, program, &recv_scope); diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 759b2a462ba5b938991aa86be9b9dc3e59fe3f7e..9744921cef7c0f13c94b7fe729561de8e181650c 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -39,7 +39,7 @@ class ListenAndServOp : public framework::OperatorBase { const framework::VariableNameMap &outputs, const framework::AttributeMap &attrs); - int GetSelectedPort(); + int GetSelectedPort() const; void Stop() override; diff --git a/paddle/fluid/operators/send_recv_op_test.cc b/paddle/fluid/operators/send_recv_op_test.cc index 3bf5d57809019d3ae469471c2ee2e7aac70b9faf..a342874f97460cf624ff0047915d33ba4161f19b 100644 --- a/paddle/fluid/operators/send_recv_op_test.cc +++ b/paddle/fluid/operators/send_recv_op_test.cc @@ -139,7 +139,6 @@ void StartServerNet(bool is_sparse) { attrs.insert({"PrefetchBlock", prefetch_block}); listen_and_serv_op = f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs); - LOG(INFO) << "selected port before run " << selected_port; listen_and_serv_op->Run(scope, place); LOG(INFO) << "server exit"; } @@ -158,16 +157,13 @@ TEST(SendRecvOp, CPUDense) { selected_port = static_cast( listen_and_serv_op.get()) ->GetSelectedPort(); - LOG(INFO) << "selected port " << selected_port; std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); attrs.insert({"endpoints", std::vector({endpoint})}); attrs.insert({"epmap", std::vector({endpoint})}); auto send_op = f::OpRegistry::CreateOp( "send", {{"X", {"x1"}}}, {{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs); - LOG(INFO) << "before run " << endpoint; send_op->Run(scope, place); - LOG(INFO) << "end run"; auto in_var = scope.Var("x1"); auto tensor = in_var->GetMutable(); @@ -180,7 +176,6 @@ TEST(SendRecvOp, CPUDense) { for (int64_t i = 0; i < target->numel(); ++i) { EXPECT_EQ(expected[i] * 2, actual[i]); } - LOG(INFO) << "before stop"; listen_and_serv_op->Stop(); server_thread.join(); listen_and_serv_op.reset(nullptr); @@ -199,7 +194,6 @@ TEST(SendRecvOp, CPUSparse) { selected_port = static_cast( listen_and_serv_op.get()) ->GetSelectedPort(); - LOG(INFO) << "selected port " << selected_port; std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); attrs.insert({"endpoints", std::vector({endpoint})}); attrs.insert({"epmap", std::vector({endpoint})}); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index e7d6c4e2521bee133c4794ed1db669b02fc2152b..ead57ac370d1bec13c1b21e83dd4be1a7331f87e 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -13,7 +13,7 @@ # limitations under the License. from .. import core -from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program +from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program from ..unique_name import generate as unique_name from control_flow import BlockGuard from ..layer_helper import LayerHelper @@ -158,6 +158,7 @@ class ListenAndServ(object): main_program = self.helper.main_program current_block = main_program.current_block() parent_block = self.parent_block() + empty_block = Program().global_block() parent_block.append_op( type='listen_and_serv', @@ -166,11 +167,12 @@ class ListenAndServ(object): attrs={ 'endpoint': self.endpoint, 'Fanin': self.fan_in, - 'OptimizeBlock': current_block + 'OptimizeBlock': current_block, + 'PrefetchBlock': empty_block }) -def Send(endpoints, send_vars, get_vars): +def Send(endpoints, send_vars, get_vars=None): """ Send layer @@ -184,7 +186,6 @@ def Send(endpoints, send_vars, get_vars): side when server have finished running server side program. """ assert (type(send_vars) == list) - assert (type(get_vars) == list) epmap = endpoints.split(",") endpoints = list(set(epmap)) @@ -192,6 +193,11 @@ def Send(endpoints, send_vars, get_vars): helper = LayerHelper("Send", **locals()) rpc_client_var = default_main_program().global_block().create_var( name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW) + if not get_vars: + get_vars = [] + for s in send_vars: + v = helper.create_tmp_variable(dtype=s.dtype, stop_gradient=True) + get_vars.append(v) helper.append_op( type="send", @@ -200,6 +206,7 @@ def Send(endpoints, send_vars, get_vars): "RPCClient": rpc_client_var}, attrs={"endpoints": endpoints, "epmap": epmap}) + return get_vars def Recv(endpoints, get_vars): diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 8f17eeea139c86055f0d4a06b21cbf66d8395cdc..d9190408e151283ece8460286dd67818dd39da3e 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -65,6 +65,7 @@ list(REMOVE_ITEM TEST_OPS test_registry) list(REMOVE_ITEM TEST_OPS test_fetch_var) list(REMOVE_ITEM TEST_OPS test_parallel_op) list(REMOVE_ITEM TEST_OPS test_dynrnn_static_input) +list(REMOVE_ITEM TEST_OPS test_dist_train) # tests that can be bundled together in one python process for speed. if(WITH_FAST_BUNDLE_TEST) @@ -103,3 +104,4 @@ py_test_modules(test_registry MODULES test_registry) py_test_modules(test_fetch_var MODULES test_fetch_var) py_test_modules(test_dynrnn_static_input MODULES test_dynrnn_static_input) py_test_modules(test_parallel_op MODULES test_parallel_op) +py_test_modules(test_dist_train MODULES test_dist_train) diff --git a/python/paddle/fluid/tests/unittests/test_recv_op.py b/python/paddle/fluid/tests/unittests/test_dist_train.py similarity index 57% rename from python/paddle/fluid/tests/unittests/test_recv_op.py rename to python/paddle/fluid/tests/unittests/test_dist_train.py index 2ebceca7e4b7b824194d94180462870e6cfe6d21..c7fdd06f105e3b5fd906d3524d41df8f84160e63 100644 --- a/python/paddle/fluid/tests/unittests/test_recv_op.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -15,31 +15,42 @@ import unittest import paddle.fluid as fluid +import paddle.fluid.core as core import paddle.fluid.layers as layers import numpy from multiprocessing import Process +from threading import Thread import os, sys import time -class TestRecvOp(unittest.TestCase): - def no_test_send(self): +class TestSendOp(unittest.TestCase): + def test_send(self): # Run init_serv in a thread place = fluid.CPUPlace() + # NOTE: python thread will not work here due to GIL. p = Process(target=self.init_serv, args=(place, )) p.daemon = True p.start() - time.sleep(1) - self.init_client(place) + + time.sleep(10) + with open("/tmp/paddle.selected_port", "r") as fn: + selected_port = int(fn.readlines()[0]) + self.init_client(place, selected_port) + + self.run_local(place) + self.assertTrue(numpy.allclose(self.local_out, self.dist_out)) + # FIXME(typhoonzero): find a way to gracefully shutdown the server. os.system("kill -9 %d" % p.pid) p.join() def init_serv(self, place): main = fluid.Program() + with fluid.program_guard(main): serv = layers.ListenAndServ( - "127.0.0.1:6174", ["X"], optimizer_mode=False) + "127.0.0.1:0", ["X"], optimizer_mode=False) with serv.do(): x = layers.data( shape=[32, 32], @@ -50,10 +61,29 @@ class TestRecvOp(unittest.TestCase): o = layers.scale(x=x, scale=10.0) main.global_block().create_var( name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) + + self.server_exe = fluid.Executor(place) + self.server_exe.run(main) + + def init_client(self, place, port): + main = fluid.Program() + with fluid.program_guard(main): + x = layers.data( + shape=[32, 32], + dtype='float32', + name='X', + append_batch_size=False) + fluid.initializer.Constant(value=2.3)(x, main.global_block()) + get_var = main.global_block().create_var( + name="scale_0.tmp_0", # server side var + dtype="float32", + persistable=False, + shape=[32, 32]) + o = layers.Send("127.0.0.1:%d" % port, [x], [get_var]) exe = fluid.Executor(place) - exe.run(main) + self.dist_out = exe.run(main, fetch_list=o) # o is a list - def init_client(self, place): + def run_local(self, place): main = fluid.Program() with fluid.program_guard(main): x = layers.data( @@ -61,10 +91,10 @@ class TestRecvOp(unittest.TestCase): dtype='float32', name='X', append_batch_size=False) - fluid.initializer.Constant(value=1.0)(x, main.global_block()) - layers.Send("127.0.0.1:6174", [x], [x]) + fluid.initializer.Constant(value=2.3)(x, main.global_block()) + o = layers.scale(x=x, scale=10.0) exe = fluid.Executor(place) - exe.run(main) + self.local_out = exe.run(main, fetch_list=[o]) if __name__ == "__main__":