Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
983566d9
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
983566d9
编写于
6月 25, 2018
作者:
G
guosheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' of
https://github.com/PaddlePaddle/paddle
into fix-beam_search-dev
上级
8536d261
f0cf70ec
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
61 addition
and
49 deletion
+61
-49
cmake/external/grpc.cmake
cmake/external/grpc.cmake
+3
-3
paddle/fluid/operators/distributed/grpc_client.cc
paddle/fluid/operators/distributed/grpc_client.cc
+2
-1
paddle/fluid/operators/distributed/grpc_client.h
paddle/fluid/operators/distributed/grpc_client.h
+10
-10
paddle/fluid/operators/distributed/grpc_server.cc
paddle/fluid/operators/distributed/grpc_server.cc
+10
-10
paddle/fluid/operators/distributed/rpc_client.cc
paddle/fluid/operators/distributed/rpc_client.cc
+4
-0
paddle/fluid/operators/distributed/rpc_client.h
paddle/fluid/operators/distributed/rpc_client.h
+10
-9
paddle/fluid/operators/distributed/rpc_server.cc
paddle/fluid/operators/distributed/rpc_server.cc
+4
-3
paddle/fluid/operators/listen_and_serv_op.cc
paddle/fluid/operators/listen_and_serv_op.cc
+0
-2
python/paddle/fluid/layers/nn.py
python/paddle/fluid/layers/nn.py
+18
-11
未找到文件。
cmake/external/grpc.cmake
浏览文件 @
983566d9
...
@@ -40,12 +40,12 @@ ExternalProject_Add(
...
@@ -40,12 +40,12 @@ ExternalProject_Add(
# NOTE(wuyi):
# NOTE(wuyi):
# this package is generated by following steps:
# this package is generated by following steps:
# 1. git clone -b v1.8.x https://github.com/grpc/grpc.git
# 1. git clone -b v1.8.x https://github.com/grpc/grpc.git
# 2. submodule update --init
# 2.
git
submodule update --init
# 3. keep only zlib, cares, protobuf, boringssl under "third_party",
# 3. keep only zlib, cares, protobuf, boringssl under "third_party",
# checkout and clean other dirs under third_party
# checkout and clean other dirs under third_party
# 4. remove .git, and package the directory.
# 4. remove .git, and package the directory.
URL
"http://paddlepaddledeps.bj.bcebos.com/grpc-v1.
8
.x.tar.gz"
URL
"http://paddlepaddledeps.bj.bcebos.com/grpc-v1.
10
.x.tar.gz"
URL_MD5
"
c9c58ee7d0e8929a63155af6a2ecdbd0
"
URL_MD5
"
1f268a2aff6759839dccd256adcc91cf
"
PREFIX
${
GRPC_SOURCES_DIR
}
PREFIX
${
GRPC_SOURCES_DIR
}
UPDATE_COMMAND
""
UPDATE_COMMAND
""
CONFIGURE_COMMAND
""
CONFIGURE_COMMAND
""
...
...
paddle/fluid/operators/distributed/grpc_client.cc
浏览文件 @
983566d9
...
@@ -269,14 +269,15 @@ void GRPCClient::Proceed() {
...
@@ -269,14 +269,15 @@ void GRPCClient::Proceed() {
}
}
std
::
shared_ptr
<
grpc
::
Channel
>
GRPCClient
::
GetChannel
(
const
std
::
string
&
ep
)
{
std
::
shared_ptr
<
grpc
::
Channel
>
GRPCClient
::
GetChannel
(
const
std
::
string
&
ep
)
{
// TODO(Yancey1989): make grpc client completely thread-safe
std
::
lock_guard
<
std
::
mutex
>
guard
(
chan_mutex_
);
std
::
lock_guard
<
std
::
mutex
>
guard
(
chan_mutex_
);
auto
it
=
channels_
.
find
(
ep
);
auto
it
=
channels_
.
find
(
ep
);
if
(
it
!=
channels_
.
end
())
{
if
(
it
!=
channels_
.
end
())
{
return
it
->
second
;
return
it
->
second
;
}
}
// Channel configurations:
grpc
::
ChannelArguments
args
;
grpc
::
ChannelArguments
args
;
args
.
SetInt
(
GRPC_ARG_MAX_RECONNECT_BACKOFF_MS
,
2000
);
args
.
SetCompressionAlgorithm
(
GRPC_COMPRESS_NONE
);
args
.
SetCompressionAlgorithm
(
GRPC_COMPRESS_NONE
);
args
.
SetMaxSendMessageSize
(
std
::
numeric_limits
<
int
>::
max
());
args
.
SetMaxSendMessageSize
(
std
::
numeric_limits
<
int
>::
max
());
args
.
SetMaxReceiveMessageSize
(
std
::
numeric_limits
<
int
>::
max
());
args
.
SetMaxReceiveMessageSize
(
std
::
numeric_limits
<
int
>::
max
());
...
...
paddle/fluid/operators/distributed/grpc_client.h
浏览文件 @
983566d9
...
@@ -76,6 +76,7 @@ class BaseProcessor {
...
@@ -76,6 +76,7 @@ class BaseProcessor {
virtual
void
Prepare
(
const
VarHandle
&
var_info
,
int64_t
time_out
)
{
virtual
void
Prepare
(
const
VarHandle
&
var_info
,
int64_t
time_out
)
{
context_
.
reset
(
new
grpc
::
ClientContext
());
context_
.
reset
(
new
grpc
::
ClientContext
());
var_h_
=
var_info
;
var_h_
=
var_info
;
context_
->
set_wait_for_ready
(
true
);
std
::
chrono
::
system_clock
::
time_point
deadline
=
std
::
chrono
::
system_clock
::
time_point
deadline
=
std
::
chrono
::
system_clock
::
now
()
+
std
::
chrono
::
milliseconds
(
time_out
);
std
::
chrono
::
system_clock
::
now
()
+
std
::
chrono
::
milliseconds
(
time_out
);
...
@@ -85,6 +86,7 @@ class BaseProcessor {
...
@@ -85,6 +86,7 @@ class BaseProcessor {
virtual
void
Prepare
(
int64_t
time_out
)
{
virtual
void
Prepare
(
int64_t
time_out
)
{
context_
.
reset
(
new
grpc
::
ClientContext
());
context_
.
reset
(
new
grpc
::
ClientContext
());
context_
->
set_wait_for_ready
(
true
);
std
::
chrono
::
system_clock
::
time_point
deadline
=
std
::
chrono
::
system_clock
::
time_point
deadline
=
std
::
chrono
::
system_clock
::
now
()
+
std
::
chrono
::
milliseconds
(
time_out
);
std
::
chrono
::
system_clock
::
now
()
+
std
::
chrono
::
milliseconds
(
time_out
);
...
@@ -176,26 +178,24 @@ class GRPCClient : public RPCClient {
...
@@ -176,26 +178,24 @@ class GRPCClient : public RPCClient {
bool
AsyncSendVar
(
const
std
::
string
&
ep
,
const
platform
::
DeviceContext
&
ctx
,
bool
AsyncSendVar
(
const
std
::
string
&
ep
,
const
platform
::
DeviceContext
&
ctx
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
var_name
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
var_name
,
int64_t
time_out
=
RPCClient
::
rpc_time_out
)
override
;
int64_t
time_out
=
FLAGS_grpc_deadline
)
override
;
bool
AsyncGetVar
(
const
std
::
string
&
ep
,
const
platform
::
DeviceContext
&
ctx
,
bool
AsyncGetVar
(
const
std
::
string
&
ep
,
const
platform
::
DeviceContext
&
ctx
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
var_name
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
var_name
,
int64_t
time_out
=
RPCClient
::
rpc_time_out
)
override
;
int64_t
time_out
=
FLAGS_grpc_deadline
)
override
;
bool
AsyncPrefetchVar
(
const
std
::
string
&
ep
,
bool
AsyncPrefetchVar
(
const
std
::
string
&
ep
,
const
platform
::
DeviceContext
&
ctx
,
const
platform
::
DeviceContext
&
ctx
,
const
framework
::
Scope
&
scope
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
in_var_name
,
const
std
::
string
&
in_var_name
,
const
std
::
string
&
out_var_name
,
const
std
::
string
&
out_var_name
,
int64_t
time_out
=
RPCClient
::
rpc_time_out
)
override
;
int64_t
time_out
=
FLAGS_grpc_deadline
)
override
;
void
AsyncSendBatchBarrier
(
void
AsyncSendBatchBarrier
(
const
std
::
string
&
ep
,
const
std
::
string
&
ep
,
int64_t
time_out
=
FLAGS_grpc_deadline
)
override
;
int64_t
time_out
=
RPCClient
::
rpc_time_out
)
override
;
void
AsyncSendFetchBarrier
(
void
AsyncSendFetchBarrier
(
const
std
::
string
&
ep
,
const
std
::
string
&
ep
,
int64_t
time_out
=
FLAGS_grpc_deadline
)
override
;
int64_t
time_out
=
RPCClient
::
rpc_time_out
)
override
;
void
Wait
()
override
;
void
Wait
()
override
;
...
@@ -211,7 +211,7 @@ class GRPCClient : public RPCClient {
...
@@ -211,7 +211,7 @@ class GRPCClient : public RPCClient {
void
Proceed
();
void
Proceed
();
void
AsyncSendComplete
(
const
std
::
string
&
ep
,
void
AsyncSendComplete
(
const
std
::
string
&
ep
,
int64_t
time_out
=
RPCClient
::
rpc_time_out
);
int64_t
time_out
=
FLAGS_grpc_deadline
);
std
::
shared_ptr
<
grpc
::
Channel
>
GetChannel
(
const
std
::
string
&
ep
);
std
::
shared_ptr
<
grpc
::
Channel
>
GetChannel
(
const
std
::
string
&
ep
);
...
...
paddle/fluid/operators/distributed/grpc_server.cc
浏览文件 @
983566d9
...
@@ -97,7 +97,7 @@ class RequestSend final : public RequestBase {
...
@@ -97,7 +97,7 @@ class RequestSend final : public RequestBase {
void
Process
()
override
{
void
Process
()
override
{
std
::
string
varname
=
GetReqName
();
std
::
string
varname
=
GetReqName
();
VLOG
(
3
)
<<
"RequestSend var_name:"
<<
varname
;
VLOG
(
4
)
<<
"RequestSend var_name:"
<<
varname
;
auto
scope
=
request_
->
GetMutableLocalScope
();
auto
scope
=
request_
->
GetMutableLocalScope
();
auto
invar
=
request_
->
GetVar
();
auto
invar
=
request_
->
GetVar
();
...
@@ -132,7 +132,7 @@ class RequestGet final : public RequestBase {
...
@@ -132,7 +132,7 @@ class RequestGet final : public RequestBase {
void
Process
()
override
{
void
Process
()
override
{
// proc request.
// proc request.
std
::
string
varname
=
request_
.
varname
();
std
::
string
varname
=
request_
.
varname
();
VLOG
(
3
)
<<
"RequestGet "
<<
varname
;
VLOG
(
4
)
<<
"RequestGet "
<<
varname
;
auto
scope
=
request_handler_
->
scope
();
auto
scope
=
request_handler_
->
scope
();
auto
invar
=
scope
->
FindVar
(
varname
);
auto
invar
=
scope
->
FindVar
(
varname
);
...
@@ -178,7 +178,7 @@ class RequestPrefetch final : public RequestBase {
...
@@ -178,7 +178,7 @@ class RequestPrefetch final : public RequestBase {
// prefetch process...
// prefetch process...
std
::
string
in_var_name
=
request_
->
Varname
();
std
::
string
in_var_name
=
request_
->
Varname
();
std
::
string
out_var_name
=
request_
->
OutVarname
();
std
::
string
out_var_name
=
request_
->
OutVarname
();
VLOG
(
3
)
<<
"RequestPrefetch, in_var_name: "
<<
in_var_name
VLOG
(
4
)
<<
"RequestPrefetch, in_var_name: "
<<
in_var_name
<<
" out_var_name: "
<<
out_var_name
;
<<
" out_var_name: "
<<
out_var_name
;
auto
scope
=
request_
->
GetMutableLocalScope
();
auto
scope
=
request_
->
GetMutableLocalScope
();
...
@@ -201,10 +201,10 @@ class RequestPrefetch final : public RequestBase {
...
@@ -201,10 +201,10 @@ class RequestPrefetch final : public RequestBase {
};
};
void
AsyncGRPCServer
::
WaitServerReady
()
{
void
AsyncGRPCServer
::
WaitServerReady
()
{
VLOG
(
3
)
<<
"AsyncGRPCServer is wait server ready"
;
VLOG
(
4
)
<<
"AsyncGRPCServer is wait server ready"
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
this
->
mutex_ready_
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
this
->
mutex_ready_
);
condition_ready_
.
wait
(
lock
,
[
=
]
{
return
this
->
ready_
==
1
;
});
condition_ready_
.
wait
(
lock
,
[
=
]
{
return
this
->
ready_
==
1
;
});
VLOG
(
3
)
<<
"AsyncGRPCServer WaitSeverReady"
;
VLOG
(
4
)
<<
"AsyncGRPCServer WaitSeverReady"
;
}
}
void
AsyncGRPCServer
::
StartServer
()
{
void
AsyncGRPCServer
::
StartServer
()
{
...
@@ -243,7 +243,7 @@ void AsyncGRPCServer::StartServer() {
...
@@ -243,7 +243,7 @@ void AsyncGRPCServer::StartServer() {
for
(
int
i
=
0
;
i
<
threadnum
;
i
++
)
{
for
(
int
i
=
0
;
i
<
threadnum
;
i
++
)
{
rpc_threads_
[
rpc_name
].
emplace_back
(
new
std
::
thread
(
std
::
bind
(
rpc_threads_
[
rpc_name
].
emplace_back
(
new
std
::
thread
(
std
::
bind
(
&
AsyncGRPCServer
::
HandleRequest
,
this
,
cq
.
get
(),
rpc_name
,
f
)));
&
AsyncGRPCServer
::
HandleRequest
,
this
,
cq
.
get
(),
rpc_name
,
f
)));
VLOG
(
3
)
<<
t
.
first
<<
" creates threads!"
;
VLOG
(
4
)
<<
t
.
first
<<
" creates threads!"
;
}
}
}
}
...
@@ -260,7 +260,7 @@ void AsyncGRPCServer::StartServer() {
...
@@ -260,7 +260,7 @@ void AsyncGRPCServer::StartServer() {
auto
&
threads
=
t
.
second
;
auto
&
threads
=
t
.
second
;
for
(
size_t
i
=
0
;
i
<
threads
.
size
();
++
i
)
{
for
(
size_t
i
=
0
;
i
<
threads
.
size
();
++
i
)
{
threads
[
i
]
->
join
();
threads
[
i
]
->
join
();
VLOG
(
3
)
<<
t
.
first
<<
" threads ends!"
;
VLOG
(
4
)
<<
t
.
first
<<
" threads ends!"
;
}
}
}
}
}
}
...
@@ -268,7 +268,7 @@ void AsyncGRPCServer::StartServer() {
...
@@ -268,7 +268,7 @@ void AsyncGRPCServer::StartServer() {
void
AsyncGRPCServer
::
ShutdownQueue
()
{
void
AsyncGRPCServer
::
ShutdownQueue
()
{
for
(
auto
&
t
:
rpc_cq_
)
{
for
(
auto
&
t
:
rpc_cq_
)
{
t
.
second
->
Shutdown
();
t
.
second
->
Shutdown
();
VLOG
(
3
)
<<
t
.
first
<<
"
shutdown!"
;
VLOG
(
4
)
<<
t
.
first
<<
" queue
shutdown!"
;
}
}
}
}
...
@@ -277,7 +277,7 @@ void AsyncGRPCServer::ShutDownImpl() {
...
@@ -277,7 +277,7 @@ void AsyncGRPCServer::ShutDownImpl() {
is_shut_down_
=
true
;
is_shut_down_
=
true
;
ShutdownQueue
();
ShutdownQueue
();
VLOG
(
3
)
<<
"server_ shutdown!"
;
VLOG
(
4
)
<<
"server_ shutdown!"
;
server_
->
Shutdown
();
server_
->
Shutdown
();
}
}
...
@@ -285,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
...
@@ -285,7 +285,7 @@ void AsyncGRPCServer::TryToRegisterNewOne(const std::string& rpc_name,
int
req_id
)
{
int
req_id
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
cq_mutex_
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
cq_mutex_
);
if
(
is_shut_down_
)
{
if
(
is_shut_down_
)
{
LOG
(
WARNING
)
<<
"shutdown, do not TryToRegisterNewSendOne"
;
VLOG
(
4
)
<<
"shutdown, do not TryToRegisterNewSendOne"
;
return
;
return
;
}
}
...
...
paddle/fluid/operators/distributed/rpc_client.cc
浏览文件 @
983566d9
...
@@ -13,6 +13,10 @@
...
@@ -13,6 +13,10 @@
// limitations under the License.
// limitations under the License.
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "paddle/fluid/operators/distributed/rpc_client.h"
#include "gflags/gflags.h"
// default to 3min to avoid temprary network failures.
DEFINE_int32
(
grpc_deadline
,
180000
,
"deadline timeouts for grpc"
);
namespace
paddle
{
namespace
paddle
{
namespace
operators
{
namespace
operators
{
...
...
paddle/fluid/operators/distributed/rpc_client.h
浏览文件 @
983566d9
...
@@ -15,11 +15,14 @@
...
@@ -15,11 +15,14 @@
#pragma once
#pragma once
#include <string>
#include <string>
#include "gflags/gflags.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/scope.h"
DECLARE_int32
(
grpc_deadline
);
namespace
paddle
{
namespace
paddle
{
namespace
operators
{
namespace
operators
{
namespace
distributed
{
namespace
distributed
{
...
@@ -32,26 +35,26 @@ class RPCClient {
...
@@ -32,26 +35,26 @@ class RPCClient {
const
platform
::
DeviceContext
&
ctx
,
const
platform
::
DeviceContext
&
ctx
,
const
framework
::
Scope
&
scope
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
var_name
,
const
std
::
string
&
var_name
,
int64_t
time_out
=
rpc_time_out
)
=
0
;
int64_t
time_out
=
FLAGS_grpc_deadline
)
=
0
;
virtual
bool
AsyncGetVar
(
const
std
::
string
&
ep
,
virtual
bool
AsyncGetVar
(
const
std
::
string
&
ep
,
const
platform
::
DeviceContext
&
ctx
,
const
platform
::
DeviceContext
&
ctx
,
const
framework
::
Scope
&
scope
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
var_name
,
const
std
::
string
&
var_name
,
int64_t
time_out
=
rpc_time_out
)
=
0
;
int64_t
time_out
=
FLAGS_grpc_deadline
)
=
0
;
virtual
bool
AsyncPrefetchVar
(
const
std
::
string
&
ep
,
virtual
bool
AsyncPrefetchVar
(
const
std
::
string
&
ep
,
const
platform
::
DeviceContext
&
ctx
,
const
platform
::
DeviceContext
&
ctx
,
const
framework
::
Scope
&
scope
,
const
framework
::
Scope
&
scope
,
const
std
::
string
&
in_var_name
,
const
std
::
string
&
in_var_name
,
const
std
::
string
&
out_var_name
,
const
std
::
string
&
out_var_name
,
int64_t
time_out
=
rpc_time_out
)
=
0
;
int64_t
time_out
=
FLAGS_grpc_deadline
)
=
0
;
virtual
void
AsyncSendBatchBarrier
(
const
std
::
string
&
ep
,
virtual
void
AsyncSendBatchBarrier
(
int64_t
time_out
=
rpc_time_out
)
=
0
;
const
std
::
string
&
ep
,
int64_t
time_out
=
FLAGS_grpc_deadline
)
=
0
;
virtual
void
AsyncSendFetchBarrier
(
const
std
::
string
&
ep
,
virtual
void
AsyncSendFetchBarrier
(
int64_t
time_out
=
rpc_time_out
)
=
0
;
const
std
::
string
&
ep
,
int64_t
time_out
=
FLAGS_grpc_deadline
)
=
0
;
// SendComplete tells all the server that current trainer have no more data
// SendComplete tells all the server that current trainer have no more data
// to train, so that the pserver can reduce it's barrier count, and continue
// to train, so that the pserver can reduce it's barrier count, and continue
...
@@ -60,8 +63,6 @@ class RPCClient {
...
@@ -60,8 +63,6 @@ class RPCClient {
virtual
void
Wait
()
=
0
;
virtual
void
Wait
()
=
0
;
static
constexpr
int64_t
rpc_time_out
=
120
*
1000
;
template
<
typename
T
>
template
<
typename
T
>
static
RPCClient
*
GetInstance
()
{
static
RPCClient
*
GetInstance
()
{
std
::
call_once
(
init_flag_
,
&
RPCClient
::
Init
<
T
>
);
std
::
call_once
(
init_flag_
,
&
RPCClient
::
Init
<
T
>
);
...
...
paddle/fluid/operators/distributed/rpc_server.cc
浏览文件 @
983566d9
...
@@ -47,11 +47,12 @@ void RPCServer::WaitBarrier(const std::string& rpc_name) {
...
@@ -47,11 +47,12 @@ void RPCServer::WaitBarrier(const std::string& rpc_name) {
return
(
barrier_counter_
[
rpc_name
]
>=
client_num_
||
exit_flag_
.
load
());
return
(
barrier_counter_
[
rpc_name
]
>=
client_num_
||
exit_flag_
.
load
());
});
});
VLOG
(
3
)
<<
"batch_barrier_:"
<<
barrier_counter_
[
rpc_name
];
VLOG
(
3
)
<<
"batch_barrier_: "
<<
rpc_name
<<
" "
<<
barrier_counter_
[
rpc_name
];
}
}
void
RPCServer
::
IncreaseBatchBarrier
(
const
std
::
string
rpc_name
)
{
void
RPCServer
::
IncreaseBatchBarrier
(
const
std
::
string
rpc_name
)
{
VLOG
(
3
)
<<
"RPCServer begin IncreaseBatchBarrier "
<<
rpc_name
;
VLOG
(
4
)
<<
"RPCServer begin IncreaseBatchBarrier "
<<
rpc_name
;
int
b
=
0
;
int
b
=
0
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
b
=
++
barrier_counter_
[
rpc_name
];
b
=
++
barrier_counter_
[
rpc_name
];
...
@@ -100,7 +101,7 @@ void RPCServer::SetCond(const std::string& rpc_name) {
...
@@ -100,7 +101,7 @@ void RPCServer::SetCond(const std::string& rpc_name) {
}
}
void
RPCServer
::
WaitCond
(
const
std
::
string
&
rpc_name
)
{
void
RPCServer
::
WaitCond
(
const
std
::
string
&
rpc_name
)
{
VLOG
(
3
)
<<
"RPCServer WaitCond "
<<
rpc_name
;
VLOG
(
4
)
<<
"RPCServer WaitCond "
<<
rpc_name
;
int
cond
=
0
;
int
cond
=
0
;
{
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
...
...
paddle/fluid/operators/listen_and_serv_op.cc
浏览文件 @
983566d9
...
@@ -164,7 +164,6 @@ void ListenAndServOp::RunSyncLoop(
...
@@ -164,7 +164,6 @@ void ListenAndServOp::RunSyncLoop(
void
ListenAndServOp
::
RunAsyncLoop
(
framework
::
Executor
*
executor
,
void
ListenAndServOp
::
RunAsyncLoop
(
framework
::
Executor
*
executor
,
framework
::
ProgramDesc
*
program
)
const
{
framework
::
ProgramDesc
*
program
)
const
{
VLOG
(
3
)
<<
"RunAsyncLoop in"
;
// grad name to block id
// grad name to block id
std
::
unordered_map
<
std
::
string
,
int32_t
>
grad_to_block_id
;
std
::
unordered_map
<
std
::
string
,
int32_t
>
grad_to_block_id
;
std
::
unordered_map
<
int32_t
,
std
::
string
>
id_to_grad
;
std
::
unordered_map
<
int32_t
,
std
::
string
>
id_to_grad
;
...
@@ -202,7 +201,6 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
...
@@ -202,7 +201,6 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
request_get_handler_
->
SetGradToPreparedCtx
(
&
grad_to_prepared_ctx
);
request_get_handler_
->
SetGradToPreparedCtx
(
&
grad_to_prepared_ctx
);
request_prefetch_handler_
->
SetGradToPreparedCtx
(
&
grad_to_prepared_ctx
);
request_prefetch_handler_
->
SetGradToPreparedCtx
(
&
grad_to_prepared_ctx
);
VLOG
(
3
)
<<
"RunAsyncLoop into while"
;
while
(
true
)
{
while
(
true
)
{
if
(
rpc_service_
->
IsExit
())
{
if
(
rpc_service_
->
IsExit
())
{
LOG
(
INFO
)
<<
"get exit!rpc_processor break!"
;
LOG
(
INFO
)
<<
"get exit!rpc_processor break!"
;
...
...
python/paddle/fluid/layers/nn.py
浏览文件 @
983566d9
...
@@ -4318,14 +4318,18 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None):
...
@@ -4318,14 +4318,18 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None):
say :attr:`actual_shape` has a higher priority
say :attr:`actual_shape` has a higher priority
than :attr:`shape`.
than :attr:`shape`.
act (str): The non-linear activation to be applied to output variable.
act (str): The non-linear activation to be applied to output variable.
inplace(bool): If this flag is set true, a new output tensor is created
inplace(bool): If this flag is set true, the output
whose data is copied from input x, otherwise the output
shares data with input without copying, otherwise
shares data with input without copying.
a new output tensor is created
whose data is copied from input x.
name (str): The name of this layer. It is optional.
name (str): The name of this layer. It is optional.
Returns:
Returns:
Variable: The output tensor.
Variable: The output tensor.
Raises:
TypeError: if actual_shape is neither Variable nor None.
Examples:
Examples:
.. code-block:: python
.. code-block:: python
...
@@ -4337,6 +4341,11 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None):
...
@@ -4337,6 +4341,11 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None):
if
not
(
isinstance
(
shape
,
list
)
or
isinstance
(
shape
,
tuple
)):
if
not
(
isinstance
(
shape
,
list
)
or
isinstance
(
shape
,
tuple
)):
raise
ValueError
(
"Input shape must be a python lsit or tuple."
)
raise
ValueError
(
"Input shape must be a python lsit or tuple."
)
inputs
=
{
"X"
:
x
}
if
isinstance
(
actual_shape
,
Variable
):
inputs
[
"Shape"
]
=
actual_shape
elif
actual_shape
is
not
None
:
raise
TypeError
(
"actual_shape should either be Variable or None"
)
# Validate the shape
# Validate the shape
unk_dim_idx
=
-
1
unk_dim_idx
=
-
1
...
@@ -4357,9 +4366,7 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None):
...
@@ -4357,9 +4366,7 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None):
reshaped
=
helper
.
create_tmp_variable
(
dtype
=
x
.
dtype
)
reshaped
=
helper
.
create_tmp_variable
(
dtype
=
x
.
dtype
)
helper
.
append_op
(
helper
.
append_op
(
type
=
"reshape"
,
type
=
"reshape"
,
inputs
=
{
"X"
:
x
,
inputs
=
inputs
,
"Shape"
:
actual_shape
}
if
isinstance
(
actual_shape
,
Variable
)
else
{
"X"
:
x
},
attrs
=
{
"shape"
:
shape
,
attrs
=
{
"shape"
:
shape
,
"inplace"
:
inplace
},
"inplace"
:
inplace
},
outputs
=
{
"Out"
:
reshaped
})
outputs
=
{
"Out"
:
reshaped
})
...
@@ -4978,10 +4985,10 @@ def log(x):
...
@@ -4978,10 +4985,10 @@ def log(x):
.. math::
.. math::
Out =
\\
ln(
input
)
Out =
\\
ln(
x
)
Args:
Args:
input
(Variable): Input tensor.
x
(Variable): Input tensor.
Returns:
Returns:
Variable: The natural log of the input tensor computed element-wise.
Variable: The natural log of the input tensor computed element-wise.
...
@@ -5002,7 +5009,7 @@ def log(x):
...
@@ -5002,7 +5009,7 @@ def log(x):
def
relu
(
x
):
def
relu
(
x
):
"""
"""
Relu takes one input data (Tensor) and produces one output data (Tensor)
Relu takes one input data (Tensor) and produces one output data (Tensor)
where the rectified linear function, y = max(0,
input
), is applied to
where the rectified linear function, y = max(0,
x
), is applied to
the tensor elementwise.
the tensor elementwise.
.. math::
.. math::
...
@@ -5010,7 +5017,7 @@ def relu(x):
...
@@ -5010,7 +5017,7 @@ def relu(x):
Out =
\\
max(0, x)
Out =
\\
max(0, x)
Args:
Args:
input
(Variable): The input tensor.
x
(Variable): The input tensor.
Returns:
Returns:
Variable: The output tensor with the same shape as input.
Variable: The output tensor with the same shape as input.
...
@@ -5019,7 +5026,7 @@ def relu(x):
...
@@ -5019,7 +5026,7 @@ def relu(x):
.. code-block:: python
.. code-block:: python
output = fluid.layers.relu(
input
)
output = fluid.layers.relu(
x
)
"""
"""
helper
=
LayerHelper
(
'relu'
,
**
locals
())
helper
=
LayerHelper
(
'relu'
,
**
locals
())
dtype
=
helper
.
input_dtype
(
input_param_name
=
'x'
)
dtype
=
helper
.
input_dtype
(
input_param_name
=
'x'
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录