Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
220db4f3
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
220db4f3
编写于
12月 07, 2018
作者:
Y
Yancey1989
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
clean code
上级
cb8a24be
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
7 addition
and
84 deletion
+7
-84
paddle/fluid/framework/details/build_strategy.cc
paddle/fluid/framework/details/build_strategy.cc
+0
-1
paddle/fluid/framework/details/multi_devices_graph_pass.cc
paddle/fluid/framework/details/multi_devices_graph_pass.cc
+0
-3
paddle/fluid/framework/parallel_executor.h
paddle/fluid/framework/parallel_executor.h
+0
-2
paddle/fluid/framework/scope.cc
paddle/fluid/framework/scope.cc
+1
-4
paddle/fluid/framework/threadpool.cc
paddle/fluid/framework/threadpool.cc
+3
-12
paddle/fluid/framework/threadpool.h
paddle/fluid/framework/threadpool.h
+1
-1
paddle/fluid/framework/threadpool_test.cc
paddle/fluid/framework/threadpool_test.cc
+0
-44
paddle/fluid/operators/reader/blocking_queue.h
paddle/fluid/operators/reader/blocking_queue.h
+0
-3
paddle/fluid/operators/reader/buffered_reader.cc
paddle/fluid/operators/reader/buffered_reader.cc
+0
-3
paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
.../fluid/operators/reader/create_double_buffer_reader_op.cc
+2
-11
未找到文件。
paddle/fluid/framework/details/build_strategy.cc
浏览文件 @
220db4f3
...
...
@@ -118,7 +118,6 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
std
::
unique_ptr
<
ir
::
Graph
>
graph
(
new
ir
::
Graph
(
main_program
));
for
(
std
::
shared_ptr
<
ir
::
Pass
>
&
pass
:
pass_builder_
->
AllPasses
())
{
VLOG
(
5
)
<<
"run pass: "
<<
pass
->
Type
();
if
(
pass
->
Type
()
==
"multi_devices_pass"
)
{
pass
->
Erase
(
"places"
);
pass
->
SetNotOwned
<
const
std
::
vector
<
platform
::
Place
>>
(
"places"
,
&
places
);
...
...
paddle/fluid/framework/details/multi_devices_graph_pass.cc
浏览文件 @
220db4f3
...
...
@@ -329,7 +329,6 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
std
::
unordered_map
<
std
::
string
,
int
>
sharded_var_device
;
for
(
ir
::
Node
*
node
:
sorted_ops
)
{
VLOG
(
5
)
<<
"op name: "
<<
node
->
Op
()
->
Type
();
if
(
boost
::
get
<
int
>
(
node
->
Op
()
->
GetAttr
(
OpProtoAndCheckerMaker
::
OpRoleAttrName
()))
==
static_cast
<
int
>
(
OpRole
::
kRPC
))
{
...
...
@@ -366,11 +365,9 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
// is true only for the op that scale the final scalar loss.
// It also assumes backward op will always follow the forward op in
// the block.
VLOG
(
5
)
<<
"this is loss scale op!"
;
is_forwarding
=
false
;
}
else
{
int
op_dev_id
=
GetOpDeviceID
(
result
,
node
,
sharded_var_device
);
VLOG
(
5
)
<<
"on device id: "
<<
op_dev_id
;
if
(
op_dev_id
!=
-
1
)
{
// This op only runs on one specific device.
CreateComputationalOp
(
&
result
,
node
,
op_dev_id
);
for
(
ir
::
Node
*
n
:
node
->
outputs
)
{
...
...
paddle/fluid/framework/parallel_executor.h
浏览文件 @
220db4f3
...
...
@@ -20,8 +20,6 @@ limitations under the License. */
#include <unordered_set>
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/details/build_strategy.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/executor.h"
...
...
paddle/fluid/framework/scope.cc
浏览文件 @
220db4f3
...
...
@@ -58,10 +58,7 @@ int64_t GetEagerDeletionThreshold() {
(
static_cast
<
int64_t
>
(
1
)
<<
30
));
}
Scope
::~
Scope
()
{
VLOG
(
5
)
<<
"~Scope()"
;
DropKids
();
}
Scope
::~
Scope
()
{
DropKids
();
}
Scope
&
Scope
::
NewScope
()
const
{
SCOPE_LOCK_GUARD
...
...
paddle/fluid/framework/threadpool.cc
浏览文件 @
220db4f3
...
...
@@ -48,18 +48,9 @@ void ThreadPool::Init() {
ThreadPool
::
ThreadPool
(
int
num_threads
)
:
running_
(
true
)
{
threads_
.
resize
(
num_threads
);
for
(
int
i
=
0
;
i
<
num_threads
;
++
i
)
{
// for (auto& thread : threads_) {
for
(
auto
&
thread
:
threads_
)
{
// TODO(Yancey1989): binding the thread on the specify CPU number
threads_
[
i
].
reset
(
new
std
::
thread
(
std
::
bind
(
&
ThreadPool
::
TaskLoop
,
this
,
i
)));
/**
sched_param sch;
int policy;
pthread_getschedparam(threads_[i]->native_handle(), &policy, &sch);
if (pthread_setschedparam(threads_[i]->native_handle(), SCHED_FIFO, &sch)) {
VLOG(1) << "Failed to setschedparam: " << errno;
}**/
thread
.
reset
(
new
std
::
thread
(
std
::
bind
(
&
ThreadPool
::
TaskLoop
,
this
)));
}
}
...
...
@@ -77,7 +68,7 @@ ThreadPool::~ThreadPool() {
}
}
void
ThreadPool
::
TaskLoop
(
int
i
)
{
void
ThreadPool
::
TaskLoop
()
{
while
(
true
)
{
Task
task
;
...
...
paddle/fluid/framework/threadpool.h
浏览文件 @
220db4f3
...
...
@@ -99,7 +99,7 @@ class ThreadPool {
// The constructor starts threads to run TaskLoop, which retrieves
// and runs tasks from the queue.
void
TaskLoop
(
int
i
);
void
TaskLoop
();
// Init is called by GetInstance.
static
void
Init
();
...
...
paddle/fluid/framework/threadpool_test.cc
浏览文件 @
220db4f3
...
...
@@ -59,47 +59,3 @@ TEST(ThreadPool, ConcurrentRun) {
}
EXPECT_EQ
(
sum
,
((
n
+
1
)
*
n
)
/
2
);
}
static
int64_t
GetTS
()
{
struct
timeval
tp
;
gettimeofday
(
&
tp
,
NULL
);
return
tp
.
tv_sec
*
1000000
+
tp
.
tv_usec
;
}
void
multi_call
(
std
::
function
<
void
()
>
call
)
{
for
(
int
i
=
0
;
i
<
500
;
++
i
)
{
call
();
}
}
TEST
(
ThreadPool
,
PERFORMANCE
)
{
auto
sum
=
[]
{
int
a
=
0
;
for
(
int
i
=
0
;
i
<
1000
;
++
i
)
{
a
+=
i
;
}
};
// framework::ThreadPool *pool = new framework::ThreadPool(2);
int64_t
start
=
GetTS
();
for
(
int
i
=
0
;
i
<
1000
;
++
i
)
{
// int64_t s = GetTS();
framework
::
Async
(
std
::
move
(
sum
));
// pool->Run(std::move(sum));
// VLOG(5) << "push to pool spent : " << GetTS() - s << " (us).";
}
VLOG
(
5
)
<<
"pool spent: "
<<
GetTS
()
-
start
<<
" (us)."
;
start
=
GetTS
();
for
(
int
i
=
0
;
i
<
1000
;
++
i
)
{
sum
();
}
VLOG
(
5
)
<<
"sequence call spent: "
<<
GetTS
()
-
start
<<
" (us)."
;
std
::
vector
<
std
::
thread
>
threads
;
start
=
GetTS
();
for
(
int
i
=
0
;
i
<
2
;
++
i
)
{
std
::
thread
t
(
multi_call
,
std
::
ref
(
sum
));
threads
.
push_back
(
std
::
move
(
t
));
}
for
(
auto
&
thread
:
threads
)
{
thread
.
join
();
}
VLOG
(
5
)
<<
"two threads spent: "
<<
GetTS
()
-
start
<<
" (us)."
;
}
paddle/fluid/operators/reader/blocking_queue.h
浏览文件 @
220db4f3
...
...
@@ -67,12 +67,9 @@ class BlockingQueue {
}
bool
Receive
(
T
*
elem
)
{
VLOG
(
1
)
<<
"blocking queue::Receive ..."
;
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
receive_cv_
.
wait
(
lock
,
[
&
]
{
return
!
queue_
.
empty
()
||
closed_
;
});
VLOG
(
1
)
<<
"queue_.empty()="
<<
queue_
.
empty
();
if
(
!
queue_
.
empty
())
{
if
(
elem
==
nullptr
)
VLOG
(
1
)
<<
"elem is nullptr"
;
PADDLE_ENFORCE_NOT_NULL
(
elem
);
*
elem
=
queue_
.
front
();
if
(
LIKELY
(
!
speed_test_mode_
))
{
...
...
paddle/fluid/operators/reader/buffered_reader.cc
浏览文件 @
220db4f3
...
...
@@ -82,13 +82,11 @@ void BufferedReader::StartImpl() {
}
void
BufferedReader
::
ReadNextImpl
(
std
::
vector
<
framework
::
LoDTensor
>
*
out
)
{
VLOG
(
1
)
<<
"ReadNextImpl start on place: "
<<
place_
;
if
(
position_
.
empty
())
{
out
->
clear
();
return
;
}
size_t
i
=
position_
.
front
().
get
();
VLOG
(
1
)
<<
"position front: "
<<
i
;
position_
.
pop
();
if
(
i
==
-
1UL
)
{
...
...
@@ -105,7 +103,6 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
ReadAsync
(
prev_pos_
);
}
prev_pos_
=
i
;
VLOG
(
1
)
<<
"success ReadNextImpl"
;
}
}
// namespace reader
...
...
paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
浏览文件 @
220db4f3
...
...
@@ -25,15 +25,9 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
private:
void
RunImpl
(
const
framework
::
Scope
&
scope
,
const
platform
::
Place
&
dev_place
)
const
override
{
VLOG
(
1
)
<<
"find var in scope: "
<<
&
scope
;
auto
*
out_var
=
scope
.
FindVar
(
Output
(
"Out"
));
VLOG
(
1
)
<<
"var "
<<
Output
(
"Out"
)
<<
" -> "
<<
out_var
;
auto
*
out
=
out_var
->
GetMutable
<
framework
::
ReaderHolder
>
();
// auto* out = scope.Var(Output("Out"))
// ->template GetMutable<framework::ReaderHolder>();
auto
*
out
=
scope
.
Var
(
Output
(
"Out"
))
->
template
GetMutable
<
framework
::
ReaderHolder
>();
if
(
out
->
Get
()
!=
nullptr
)
{
VLOG
(
1
)
<<
Output
(
"Out"
)
<<
" is not nullptr."
;
return
;
}
const
auto
&
underlying_reader
=
scope
.
FindVar
(
Input
(
"UnderlyingReader"
))
...
...
@@ -52,11 +46,8 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
sin
>>
num
;
place
=
platform
::
CUDAPlace
(
static_cast
<
int
>
(
num
));
}
VLOG
(
1
)
<<
"create buffered reader on "
<<
place
;
out
->
Reset
(
framework
::
MakeDecoratedReader
<
BufferedReader
>
(
underlying_reader
,
place
,
2
));
VLOG
(
1
)
<<
"Reset Buffered Reader in var: "
<<
scope
.
FindVar
(
Input
(
"UnderlyingReader"
));
}
};
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录