未验证 提交 8352f938 编写于 作者: W Wu Yi 提交者: GitHub

Merge pull request #9933 from typhoonzero/add_dist_unit_test

Add dist unitest data compare, ensure that dist train have same behavior as local training
...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <fstream>
#include <ostream> #include <ostream>
#include <thread> // NOLINT #include <thread> // NOLINT
#include <vector> #include <vector>
...@@ -67,7 +68,7 @@ ListenAndServOp::ListenAndServOp(const std::string &type, ...@@ -67,7 +68,7 @@ ListenAndServOp::ListenAndServOp(const std::string &type,
const framework::AttributeMap &attrs) const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {} : OperatorBase(type, inputs, outputs, attrs) {}
int ListenAndServOp::GetSelectedPort() { int ListenAndServOp::GetSelectedPort() const {
return rpc_service_->GetSelectedPort(); return rpc_service_->GetSelectedPort();
} }
...@@ -99,7 +100,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -99,7 +100,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
std::vector<int> block_list; std::vector<int> block_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid) { for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
if (blkid != prefetch_block->ID()) { if (blkid != static_cast<size_t>(prefetch_block->ID())) {
block_list.push_back(blkid); block_list.push_back(blkid);
} }
} }
...@@ -121,10 +122,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -121,10 +122,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
rpc_service_->SetProgram(program); rpc_service_->SetProgram(program);
// start the server listening after all member initialized. // start the server listening after all member initialized.
server_thread_.reset(new std::thread(RunServer, rpc_service_)); server_thread_.reset(new std::thread(RunServer, rpc_service_));
// FIXME(typhoonzero): do we need to wait until the server port is ready? VLOG(3) << "wait server thread to become ready...";
sleep(5); sleep(5);
// Write to a file of server selected port for python use.
std::ofstream port_file;
port_file.open("/tmp/paddle.selected_port");
port_file << rpc_service_->GetSelectedPort();
port_file.close();
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false; bool exit_flag = false;
// Record received sparse variables, so that // Record received sparse variables, so that
// we could reset those after execute optimize program // we could reset those after execute optimize program
...@@ -175,7 +180,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -175,7 +180,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
parallel_blkids.push_back(1); parallel_blkids.push_back(1);
double ts = detail::GetTimestamp(); double ts = detail::GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) { for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (blkid != prefetch_block->ID()) { if (blkid != static_cast<size_t>(prefetch_block->ID())) {
if (program->Block(blkid).Parent() != last_parent_blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) {
ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared,
program, &recv_scope); program, &recv_scope);
......
...@@ -39,7 +39,7 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -39,7 +39,7 @@ class ListenAndServOp : public framework::OperatorBase {
const framework::VariableNameMap &outputs, const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs); const framework::AttributeMap &attrs);
int GetSelectedPort(); int GetSelectedPort() const;
void Stop() override; void Stop() override;
......
...@@ -139,7 +139,6 @@ void StartServerNet(bool is_sparse) { ...@@ -139,7 +139,6 @@ void StartServerNet(bool is_sparse) {
attrs.insert({"PrefetchBlock", prefetch_block}); attrs.insert({"PrefetchBlock", prefetch_block});
listen_and_serv_op = listen_and_serv_op =
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs); f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
LOG(INFO) << "selected port before run " << selected_port;
listen_and_serv_op->Run(scope, place); listen_and_serv_op->Run(scope, place);
LOG(INFO) << "server exit"; LOG(INFO) << "server exit";
} }
...@@ -158,16 +157,13 @@ TEST(SendRecvOp, CPUDense) { ...@@ -158,16 +157,13 @@ TEST(SendRecvOp, CPUDense) {
selected_port = static_cast<paddle::operators::ListenAndServOp *>( selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get()) listen_and_serv_op.get())
->GetSelectedPort(); ->GetSelectedPort();
LOG(INFO) << "selected port " << selected_port;
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})}); attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})}); attrs.insert({"epmap", std::vector<std::string>({endpoint})});
auto send_op = f::OpRegistry::CreateOp( auto send_op = f::OpRegistry::CreateOp(
"send", {{"X", {"x1"}}}, "send", {{"X", {"x1"}}},
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs); {{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
LOG(INFO) << "before run " << endpoint;
send_op->Run(scope, place); send_op->Run(scope, place);
LOG(INFO) << "end run";
auto in_var = scope.Var("x1"); auto in_var = scope.Var("x1");
auto tensor = in_var->GetMutable<f::LoDTensor>(); auto tensor = in_var->GetMutable<f::LoDTensor>();
...@@ -180,7 +176,6 @@ TEST(SendRecvOp, CPUDense) { ...@@ -180,7 +176,6 @@ TEST(SendRecvOp, CPUDense) {
for (int64_t i = 0; i < target->numel(); ++i) { for (int64_t i = 0; i < target->numel(); ++i) {
EXPECT_EQ(expected[i] * 2, actual[i]); EXPECT_EQ(expected[i] * 2, actual[i]);
} }
LOG(INFO) << "before stop";
listen_and_serv_op->Stop(); listen_and_serv_op->Stop();
server_thread.join(); server_thread.join();
listen_and_serv_op.reset(nullptr); listen_and_serv_op.reset(nullptr);
...@@ -199,7 +194,6 @@ TEST(SendRecvOp, CPUSparse) { ...@@ -199,7 +194,6 @@ TEST(SendRecvOp, CPUSparse) {
selected_port = static_cast<paddle::operators::ListenAndServOp *>( selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get()) listen_and_serv_op.get())
->GetSelectedPort(); ->GetSelectedPort();
LOG(INFO) << "selected port " << selected_port;
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})}); attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})}); attrs.insert({"epmap", std::vector<std::string>({endpoint})});
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
from .. import core from .. import core
from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program
from ..unique_name import generate as unique_name from ..unique_name import generate as unique_name
from control_flow import BlockGuard from control_flow import BlockGuard
from ..layer_helper import LayerHelper from ..layer_helper import LayerHelper
...@@ -158,6 +158,7 @@ class ListenAndServ(object): ...@@ -158,6 +158,7 @@ class ListenAndServ(object):
main_program = self.helper.main_program main_program = self.helper.main_program
current_block = main_program.current_block() current_block = main_program.current_block()
parent_block = self.parent_block() parent_block = self.parent_block()
empty_block = Program().global_block()
parent_block.append_op( parent_block.append_op(
type='listen_and_serv', type='listen_and_serv',
...@@ -166,11 +167,12 @@ class ListenAndServ(object): ...@@ -166,11 +167,12 @@ class ListenAndServ(object):
attrs={ attrs={
'endpoint': self.endpoint, 'endpoint': self.endpoint,
'Fanin': self.fan_in, 'Fanin': self.fan_in,
'OptimizeBlock': current_block 'OptimizeBlock': current_block,
'PrefetchBlock': empty_block
}) })
def Send(endpoints, send_vars, get_vars): def Send(endpoints, send_vars, get_vars=None):
""" """
Send layer Send layer
...@@ -184,7 +186,6 @@ def Send(endpoints, send_vars, get_vars): ...@@ -184,7 +186,6 @@ def Send(endpoints, send_vars, get_vars):
side when server have finished running server side program. side when server have finished running server side program.
""" """
assert (type(send_vars) == list) assert (type(send_vars) == list)
assert (type(get_vars) == list)
epmap = endpoints.split(",") epmap = endpoints.split(",")
endpoints = list(set(epmap)) endpoints = list(set(epmap))
...@@ -192,6 +193,11 @@ def Send(endpoints, send_vars, get_vars): ...@@ -192,6 +193,11 @@ def Send(endpoints, send_vars, get_vars):
helper = LayerHelper("Send", **locals()) helper = LayerHelper("Send", **locals())
rpc_client_var = default_main_program().global_block().create_var( rpc_client_var = default_main_program().global_block().create_var(
name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW) name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW)
if not get_vars:
get_vars = []
for s in send_vars:
v = helper.create_tmp_variable(dtype=s.dtype, stop_gradient=True)
get_vars.append(v)
helper.append_op( helper.append_op(
type="send", type="send",
...@@ -200,6 +206,7 @@ def Send(endpoints, send_vars, get_vars): ...@@ -200,6 +206,7 @@ def Send(endpoints, send_vars, get_vars):
"RPCClient": rpc_client_var}, "RPCClient": rpc_client_var},
attrs={"endpoints": endpoints, attrs={"endpoints": endpoints,
"epmap": epmap}) "epmap": epmap})
return get_vars
def Recv(endpoints, get_vars): def Recv(endpoints, get_vars):
......
...@@ -65,6 +65,7 @@ list(REMOVE_ITEM TEST_OPS test_registry) ...@@ -65,6 +65,7 @@ list(REMOVE_ITEM TEST_OPS test_registry)
list(REMOVE_ITEM TEST_OPS test_fetch_var) list(REMOVE_ITEM TEST_OPS test_fetch_var)
list(REMOVE_ITEM TEST_OPS test_parallel_op) list(REMOVE_ITEM TEST_OPS test_parallel_op)
list(REMOVE_ITEM TEST_OPS test_dynrnn_static_input) list(REMOVE_ITEM TEST_OPS test_dynrnn_static_input)
list(REMOVE_ITEM TEST_OPS test_dist_train)
# tests that can be bundled together in one python process for speed. # tests that can be bundled together in one python process for speed.
if(WITH_FAST_BUNDLE_TEST) if(WITH_FAST_BUNDLE_TEST)
...@@ -103,3 +104,4 @@ py_test_modules(test_registry MODULES test_registry) ...@@ -103,3 +104,4 @@ py_test_modules(test_registry MODULES test_registry)
py_test_modules(test_fetch_var MODULES test_fetch_var) py_test_modules(test_fetch_var MODULES test_fetch_var)
py_test_modules(test_dynrnn_static_input MODULES test_dynrnn_static_input) py_test_modules(test_dynrnn_static_input MODULES test_dynrnn_static_input)
py_test_modules(test_parallel_op MODULES test_parallel_op) py_test_modules(test_parallel_op MODULES test_parallel_op)
py_test_modules(test_dist_train MODULES test_dist_train)
...@@ -15,31 +15,42 @@ ...@@ -15,31 +15,42 @@
import unittest import unittest
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.layers as layers import paddle.fluid.layers as layers
import numpy import numpy
from multiprocessing import Process from multiprocessing import Process
from threading import Thread
import os, sys import os, sys
import time import time
class TestRecvOp(unittest.TestCase): class TestSendOp(unittest.TestCase):
def no_test_send(self): def test_send(self):
# Run init_serv in a thread # Run init_serv in a thread
place = fluid.CPUPlace() place = fluid.CPUPlace()
# NOTE: python thread will not work here due to GIL.
p = Process(target=self.init_serv, args=(place, )) p = Process(target=self.init_serv, args=(place, ))
p.daemon = True p.daemon = True
p.start() p.start()
time.sleep(1)
self.init_client(place) time.sleep(10)
with open("/tmp/paddle.selected_port", "r") as fn:
selected_port = int(fn.readlines()[0])
self.init_client(place, selected_port)
self.run_local(place)
self.assertTrue(numpy.allclose(self.local_out, self.dist_out))
# FIXME(typhoonzero): find a way to gracefully shutdown the server. # FIXME(typhoonzero): find a way to gracefully shutdown the server.
os.system("kill -9 %d" % p.pid) os.system("kill -9 %d" % p.pid)
p.join() p.join()
def init_serv(self, place): def init_serv(self, place):
main = fluid.Program() main = fluid.Program()
with fluid.program_guard(main): with fluid.program_guard(main):
serv = layers.ListenAndServ( serv = layers.ListenAndServ(
"127.0.0.1:6174", ["X"], optimizer_mode=False) "127.0.0.1:0", ["X"], optimizer_mode=False)
with serv.do(): with serv.do():
x = layers.data( x = layers.data(
shape=[32, 32], shape=[32, 32],
...@@ -50,10 +61,29 @@ class TestRecvOp(unittest.TestCase): ...@@ -50,10 +61,29 @@ class TestRecvOp(unittest.TestCase):
o = layers.scale(x=x, scale=10.0) o = layers.scale(x=x, scale=10.0)
main.global_block().create_var( main.global_block().create_var(
name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape) name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape)
self.server_exe = fluid.Executor(place)
self.server_exe.run(main)
def init_client(self, place, port):
main = fluid.Program()
with fluid.program_guard(main):
x = layers.data(
shape=[32, 32],
dtype='float32',
name='X',
append_batch_size=False)
fluid.initializer.Constant(value=2.3)(x, main.global_block())
get_var = main.global_block().create_var(
name="scale_0.tmp_0", # server side var
dtype="float32",
persistable=False,
shape=[32, 32])
o = layers.Send("127.0.0.1:%d" % port, [x], [get_var])
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(main) self.dist_out = exe.run(main, fetch_list=o) # o is a list
def init_client(self, place): def run_local(self, place):
main = fluid.Program() main = fluid.Program()
with fluid.program_guard(main): with fluid.program_guard(main):
x = layers.data( x = layers.data(
...@@ -61,10 +91,10 @@ class TestRecvOp(unittest.TestCase): ...@@ -61,10 +91,10 @@ class TestRecvOp(unittest.TestCase):
dtype='float32', dtype='float32',
name='X', name='X',
append_batch_size=False) append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block()) fluid.initializer.Constant(value=2.3)(x, main.global_block())
layers.Send("127.0.0.1:6174", [x], [x]) o = layers.scale(x=x, scale=10.0)
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(main) self.local_out = exe.run(main, fetch_list=[o])
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册