提交 8c1b2575 编写于 作者: T typhoonzero

add dist unitest data compare

上级 4c55a602
...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include <fstream>
#include <ostream> #include <ostream>
#include <thread> // NOLINT #include <thread> // NOLINT
#include <vector> #include <vector>
...@@ -67,7 +68,7 @@ ListenAndServOp::ListenAndServOp(const std::string &type, ...@@ -67,7 +68,7 @@ ListenAndServOp::ListenAndServOp(const std::string &type,
const framework::AttributeMap &attrs) const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {} : OperatorBase(type, inputs, outputs, attrs) {}
int ListenAndServOp::GetSelectedPort() { int ListenAndServOp::GetSelectedPort() const {
return rpc_service_->GetSelectedPort(); return rpc_service_->GetSelectedPort();
} }
...@@ -99,7 +100,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -99,7 +100,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
framework::Executor executor(dev_place); framework::Executor executor(dev_place);
std::vector<int> block_list; std::vector<int> block_list;
for (size_t blkid = 1; blkid < num_blocks; ++blkid) { for (size_t blkid = 1; blkid < num_blocks; ++blkid) {
if (blkid != prefetch_block->ID()) { if (blkid != static_cast<size_t>(prefetch_block->ID())) {
block_list.push_back(blkid); block_list.push_back(blkid);
} }
} }
...@@ -121,10 +122,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -121,10 +122,14 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
rpc_service_->SetProgram(program); rpc_service_->SetProgram(program);
// start the server listening after all member initialized. // start the server listening after all member initialized.
server_thread_.reset(new std::thread(RunServer, rpc_service_)); server_thread_.reset(new std::thread(RunServer, rpc_service_));
// FIXME(typhoonzero): do we need to wait until the server port is ready? VLOG(3) << "wait server thread to become ready...";
sleep(5); sleep(5);
// Write to a file of server selected port for python use.
std::ofstream port_file;
port_file.open("/tmp/paddle.selected_port");
port_file << rpc_service_->GetSelectedPort();
port_file.close();
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false; bool exit_flag = false;
// Record received sparse variables, so that // Record received sparse variables, so that
// we could reset those after execute optimize program // we could reset those after execute optimize program
...@@ -175,7 +180,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -175,7 +180,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
parallel_blkids.push_back(1); parallel_blkids.push_back(1);
double ts = detail::GetTimestamp(); double ts = detail::GetTimestamp();
for (size_t blkid = 2; blkid < num_blocks; ++blkid) { for (size_t blkid = 2; blkid < num_blocks; ++blkid) {
if (blkid != prefetch_block->ID()) { if (blkid != static_cast<size_t>(prefetch_block->ID())) {
if (program->Block(blkid).Parent() != last_parent_blkid) { if (program->Block(blkid).Parent() != last_parent_blkid) {
ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared, ParallelExecuteBlocks(parallel_blkids, &executor, optimize_prepared,
program, &recv_scope); program, &recv_scope);
......
...@@ -39,7 +39,7 @@ class ListenAndServOp : public framework::OperatorBase { ...@@ -39,7 +39,7 @@ class ListenAndServOp : public framework::OperatorBase {
const framework::VariableNameMap &outputs, const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs); const framework::AttributeMap &attrs);
int GetSelectedPort(); int GetSelectedPort() const;
void Stop() override; void Stop() override;
......
...@@ -139,7 +139,6 @@ void StartServerNet(bool is_sparse) { ...@@ -139,7 +139,6 @@ void StartServerNet(bool is_sparse) {
attrs.insert({"PrefetchBlock", prefetch_block}); attrs.insert({"PrefetchBlock", prefetch_block});
listen_and_serv_op = listen_and_serv_op =
f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs); f::OpRegistry::CreateOp("listen_and_serv", {{"X", {"x1"}}}, {}, attrs);
LOG(INFO) << "selected port before run " << selected_port;
listen_and_serv_op->Run(scope, place); listen_and_serv_op->Run(scope, place);
LOG(INFO) << "server exit"; LOG(INFO) << "server exit";
} }
...@@ -158,16 +157,13 @@ TEST(SendRecvOp, CPUDense) { ...@@ -158,16 +157,13 @@ TEST(SendRecvOp, CPUDense) {
selected_port = static_cast<paddle::operators::ListenAndServOp *>( selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get()) listen_and_serv_op.get())
->GetSelectedPort(); ->GetSelectedPort();
LOG(INFO) << "selected port " << selected_port;
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})}); attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})}); attrs.insert({"epmap", std::vector<std::string>({endpoint})});
auto send_op = f::OpRegistry::CreateOp( auto send_op = f::OpRegistry::CreateOp(
"send", {{"X", {"x1"}}}, "send", {{"X", {"x1"}}},
{{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs); {{"Out", {"Out"}}, {"RPCClient", {"RPC_CLIENT_VAR"}}}, attrs);
LOG(INFO) << "before run " << endpoint;
send_op->Run(scope, place); send_op->Run(scope, place);
LOG(INFO) << "end run";
auto in_var = scope.Var("x1"); auto in_var = scope.Var("x1");
auto tensor = in_var->GetMutable<f::LoDTensor>(); auto tensor = in_var->GetMutable<f::LoDTensor>();
...@@ -180,7 +176,6 @@ TEST(SendRecvOp, CPUDense) { ...@@ -180,7 +176,6 @@ TEST(SendRecvOp, CPUDense) {
for (int64_t i = 0; i < target->numel(); ++i) { for (int64_t i = 0; i < target->numel(); ++i) {
EXPECT_EQ(expected[i] * 2, actual[i]); EXPECT_EQ(expected[i] * 2, actual[i]);
} }
LOG(INFO) << "before stop";
listen_and_serv_op->Stop(); listen_and_serv_op->Stop();
server_thread.join(); server_thread.join();
listen_and_serv_op.reset(nullptr); listen_and_serv_op.reset(nullptr);
...@@ -199,7 +194,6 @@ TEST(SendRecvOp, CPUSparse) { ...@@ -199,7 +194,6 @@ TEST(SendRecvOp, CPUSparse) {
selected_port = static_cast<paddle::operators::ListenAndServOp *>( selected_port = static_cast<paddle::operators::ListenAndServOp *>(
listen_and_serv_op.get()) listen_and_serv_op.get())
->GetSelectedPort(); ->GetSelectedPort();
LOG(INFO) << "selected port " << selected_port;
std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port); std::string endpoint = paddle::string::Sprintf("127.0.0.1:%d", selected_port);
attrs.insert({"endpoints", std::vector<std::string>({endpoint})}); attrs.insert({"endpoints", std::vector<std::string>({endpoint})});
attrs.insert({"epmap", std::vector<std::string>({endpoint})}); attrs.insert({"epmap", std::vector<std::string>({endpoint})});
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
from .. import core from .. import core
from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program, Program
from ..unique_name import generate as unique_name from ..unique_name import generate as unique_name
from control_flow import BlockGuard from control_flow import BlockGuard
from ..layer_helper import LayerHelper from ..layer_helper import LayerHelper
...@@ -158,6 +158,7 @@ class ListenAndServ(object): ...@@ -158,6 +158,7 @@ class ListenAndServ(object):
main_program = self.helper.main_program main_program = self.helper.main_program
current_block = main_program.current_block() current_block = main_program.current_block()
parent_block = self.parent_block() parent_block = self.parent_block()
empty_block = Program().global_block()
parent_block.append_op( parent_block.append_op(
type='listen_and_serv', type='listen_and_serv',
...@@ -166,11 +167,12 @@ class ListenAndServ(object): ...@@ -166,11 +167,12 @@ class ListenAndServ(object):
attrs={ attrs={
'endpoint': self.endpoint, 'endpoint': self.endpoint,
'Fanin': self.fan_in, 'Fanin': self.fan_in,
'OptimizeBlock': current_block 'OptimizeBlock': current_block,
'PrefetchBlock': empty_block
}) })
def Send(endpoints, send_vars, get_vars): def Send(endpoints, send_vars, get_vars=None):
""" """
Send layer Send layer
...@@ -184,7 +186,6 @@ def Send(endpoints, send_vars, get_vars): ...@@ -184,7 +186,6 @@ def Send(endpoints, send_vars, get_vars):
side when server have finished running server side program. side when server have finished running server side program.
""" """
assert (type(send_vars) == list) assert (type(send_vars) == list)
assert (type(get_vars) == list)
epmap = endpoints.split(",") epmap = endpoints.split(",")
endpoints = list(set(epmap)) endpoints = list(set(epmap))
...@@ -192,6 +193,11 @@ def Send(endpoints, send_vars, get_vars): ...@@ -192,6 +193,11 @@ def Send(endpoints, send_vars, get_vars):
helper = LayerHelper("Send", **locals()) helper = LayerHelper("Send", **locals())
rpc_client_var = default_main_program().global_block().create_var( rpc_client_var = default_main_program().global_block().create_var(
name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW) name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW)
if not get_vars:
get_vars = []
for s in send_vars:
v = helper.create_tmp_variable(dtype=s.dtype, stop_gradient=True)
get_vars.append(v)
helper.append_op( helper.append_op(
type="send", type="send",
...@@ -200,6 +206,7 @@ def Send(endpoints, send_vars, get_vars): ...@@ -200,6 +206,7 @@ def Send(endpoints, send_vars, get_vars):
"RPCClient": rpc_client_var}, "RPCClient": rpc_client_var},
attrs={"endpoints": endpoints, attrs={"endpoints": endpoints,
"epmap": epmap}) "epmap": epmap})
return get_vars
def Recv(endpoints, get_vars): def Recv(endpoints, get_vars):
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.layers as layers
import numpy
from multiprocessing import Process
import os, sys
import time
class TestRecvOp(unittest.TestCase):
def no_test_send(self):
# Run init_serv in a thread
place = fluid.CPUPlace()
p = Process(target=self.init_serv, args=(place, ))
p.daemon = True
p.start()
time.sleep(1)
self.init_client(place)
# FIXME(typhoonzero): find a way to gracefully shutdown the server.
os.system("kill -9 %d" % p.pid)
p.join()
def init_serv(self, place):
main = fluid.Program()
with fluid.program_guard(main):
serv = layers.ListenAndServ(
"127.0.0.1:6174", ["X"], optimizer_mode=False)
with serv.do():
x = layers.data(
shape=[32, 32],
dtype='float32',
name="X",
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
o = layers.scale(x=x, scale=10.0)
main.global_block().create_var(
name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape)
exe = fluid.Executor(place)
exe.run(main)
def init_client(self, place):
main = fluid.Program()
with fluid.program_guard(main):
x = layers.data(
shape=[32, 32],
dtype='float32',
name='X',
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
layers.Send("127.0.0.1:6174", [x], [x])
exe = fluid.Executor(place)
exe.run(main)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册