Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
18d6254d
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 2 年 前同步成功
通知
2325
Star
20933
Fork
5424
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
18d6254d
编写于
4月 26, 2018
作者:
Y
yangyaming
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' of
https://github.com/PaddlePaddle/Paddle
into fix-10219
上级
bf824d85
c4af8faf
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
390 addition
and
76 deletion
+390
-76
benchmark/fluid/machine_translation.py
benchmark/fluid/machine_translation.py
+1
-1
benchmark/fluid/mnist.py
benchmark/fluid/mnist.py
+1
-1
benchmark/fluid/resnet.py
benchmark/fluid/resnet.py
+1
-1
benchmark/fluid/stacked_dynamic_lstm.py
benchmark/fluid/stacked_dynamic_lstm.py
+3
-3
benchmark/fluid/vgg.py
benchmark/fluid/vgg.py
+1
-1
doc/v2/api/data/dataset.rst
doc/v2/api/data/dataset.rst
+14
-14
paddle/fluid/operators/reader/CMakeLists.txt
paddle/fluid/operators/reader/CMakeLists.txt
+2
-0
paddle/fluid/operators/reader/blocking_queue.h
paddle/fluid/operators/reader/blocking_queue.h
+112
-0
paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
.../fluid/operators/reader/create_double_buffer_reader_op.cc
+10
-22
paddle/fluid/operators/reader/open_files_op.cc
paddle/fluid/operators/reader/open_files_op.cc
+14
-24
paddle/fluid/operators/reader/reader_blocking_queue_test.cc
paddle/fluid/operators/reader/reader_blocking_queue_test.cc
+219
-0
paddle/fluid/platform/dynload/cublas.h
paddle/fluid/platform/dynload/cublas.h
+6
-4
paddle/fluid/platform/dynload/cudnn.h
paddle/fluid/platform/dynload/cudnn.h
+1
-1
paddle/fluid/platform/dynload/cupti.h
paddle/fluid/platform/dynload/cupti.h
+1
-1
paddle/fluid/platform/dynload/curand.h
paddle/fluid/platform/dynload/curand.h
+1
-1
paddle/fluid/platform/dynload/nccl.h
paddle/fluid/platform/dynload/nccl.h
+1
-1
paddle/fluid/platform/dynload/warpctc.h
paddle/fluid/platform/dynload/warpctc.h
+1
-1
paddle/scripts/paddle_docker_build.sh
paddle/scripts/paddle_docker_build.sh
+1
-0
未找到文件。
benchmark/fluid/machine_translation.py
浏览文件 @
18d6254d
...
@@ -21,7 +21,7 @@ import argparse
...
@@ -21,7 +21,7 @@ import argparse
import
time
import
time
import
distutils.util
import
distutils.util
import
paddle
.v2
as
paddle
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
paddle.fluid.core
as
core
import
paddle.fluid.framework
as
framework
import
paddle.fluid.framework
as
framework
...
...
benchmark/fluid/mnist.py
浏览文件 @
18d6254d
...
@@ -20,7 +20,7 @@ import numpy as np
...
@@ -20,7 +20,7 @@ import numpy as np
import
argparse
import
argparse
import
time
import
time
import
paddle
.v2
as
paddle
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
import
paddle.fluid.profiler
as
profiler
import
paddle.fluid.profiler
as
profiler
...
...
benchmark/fluid/resnet.py
浏览文件 @
18d6254d
...
@@ -23,7 +23,7 @@ import time
...
@@ -23,7 +23,7 @@ import time
import
cProfile
,
pstats
,
StringIO
import
cProfile
,
pstats
,
StringIO
import
paddle
.v2
as
paddle
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
paddle.fluid.core
as
core
import
paddle.fluid.profiler
as
profiler
import
paddle.fluid.profiler
as
profiler
...
...
benchmark/fluid/stacked_dynamic_lstm.py
浏览文件 @
18d6254d
...
@@ -23,10 +23,10 @@ import random
...
@@ -23,10 +23,10 @@ import random
import
time
import
time
import
numpy
import
numpy
import
paddle
.v2
as
paddle
import
paddle
import
paddle.
v2.
dataset.imdb
as
imdb
import
paddle.dataset.imdb
as
imdb
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
from
paddle.v2
import
batch
import
paddle.batch
as
batch
import
paddle.fluid.profiler
as
profiler
import
paddle.fluid.profiler
as
profiler
...
...
benchmark/fluid/vgg.py
浏览文件 @
18d6254d
...
@@ -17,7 +17,7 @@ from __future__ import print_function
...
@@ -17,7 +17,7 @@ from __future__ import print_function
import
sys
import
sys
import
time
import
time
import
numpy
as
np
import
numpy
as
np
import
paddle
.v2
as
paddle
import
paddle
import
paddle.fluid
as
fluid
import
paddle.fluid
as
fluid
import
paddle.fluid.core
as
core
import
paddle.fluid.core
as
core
import
argparse
import
argparse
...
...
doc/v2/api/data/dataset.rst
浏览文件 @
18d6254d
Dataset
Dataset
=======
=======
.. automodule:: paddle.
v2.
dataset
.. automodule:: paddle.dataset
:members:
:members:
:noindex:
:noindex:
mnist
mnist
+++++
+++++
.. automodule:: paddle.
v2.
dataset.mnist
.. automodule:: paddle.dataset.mnist
:members:
:members:
:noindex:
:noindex:
cifar
cifar
+++++
+++++
.. automodule:: paddle.
v2.
dataset.cifar
.. automodule:: paddle.dataset.cifar
:members:
:members:
:noindex:
:noindex:
conll05
conll05
+++++++
+++++++
.. automodule:: paddle.
v2.
dataset.conll05
.. automodule:: paddle.dataset.conll05
:members: get_dict,get_embedding,test
:members: get_dict,get_embedding,test
:noindex:
:noindex:
imdb
imdb
++++
++++
.. automodule:: paddle.
v2.
dataset.imdb
.. automodule:: paddle.dataset.imdb
:members:
:members:
:noindex:
:noindex:
imikolov
imikolov
++++++++
++++++++
.. automodule:: paddle.
v2.
dataset.imikolov
.. automodule:: paddle.dataset.imikolov
:members:
:members:
:noindex:
:noindex:
movielens
movielens
+++++++++
+++++++++
.. automodule:: paddle.
v2.
dataset.movielens
.. automodule:: paddle.dataset.movielens
:members:
:members:
:noindex:
:noindex:
.. autoclass:: paddle.
v2.
dataset.movielens.MovieInfo
.. autoclass:: paddle.dataset.movielens.MovieInfo
:noindex:
:noindex:
.. autoclass:: paddle.
v2.
dataset.movielens.UserInfo
.. autoclass:: paddle.dataset.movielens.UserInfo
:noindex:
:noindex:
sentiment
sentiment
+++++++++
+++++++++
.. automodule:: paddle.
v2.
dataset.sentiment
.. automodule:: paddle.dataset.sentiment
:members:
:members:
:noindex:
:noindex:
uci_housing
uci_housing
+++++++++++
+++++++++++
.. automodule:: paddle.
v2.
dataset.uci_housing
.. automodule:: paddle.dataset.uci_housing
:members:
:members:
:noindex:
:noindex:
wmt14
wmt14
+++++
+++++
.. automodule:: paddle.
v2.
dataset.wmt14
.. automodule:: paddle.dataset.wmt14
:members:
:members:
:noindex:
:noindex:
wmt16
wmt16
+++++
+++++
.. automodule:: paddle.
v2.
dataset.wmt16
.. automodule:: paddle.dataset.wmt16
:members:
:members:
:noindex:
:noindex:
paddle/fluid/operators/reader/CMakeLists.txt
浏览文件 @
18d6254d
...
@@ -23,5 +23,7 @@ reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_o
...
@@ -23,5 +23,7 @@ reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_o
reader_library
(
create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc
)
reader_library
(
create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc
)
reader_library
(
create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc
)
reader_library
(
create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc
)
reader_library
(
create_threaded_reader_op SRCS create_threaded_reader_op.cc
)
reader_library
(
create_threaded_reader_op SRCS create_threaded_reader_op.cc
)
cc_test
(
reader_blocking_queue_test SRCS reader_blocking_queue_test.cc
)
# Export local libraries to parent
# Export local libraries to parent
set
(
READER_LIBRARY
${
LOCAL_READER_LIBS
}
PARENT_SCOPE
)
set
(
READER_LIBRARY
${
LOCAL_READER_LIBS
}
PARENT_SCOPE
)
paddle/fluid/operators/reader/blocking_queue.h
0 → 100644
浏览文件 @
18d6254d
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <condition_variable> // NOLINT
#include <deque>
#include "paddle/fluid/platform/enforce.h"
namespace
paddle
{
namespace
operators
{
namespace
reader
{
template
<
typename
T
>
class
BlockingQueue
{
// BlockingQueue is for buffered reading and is supposed to use only the
// reader package. It is true that we could and we should have been using
// framework::Channel, but which has currently a deadlock bug. BlockingQueue
// is a workaround and a simplified version of framework::Channel as it
// doesn't support GPU and it implements on buffered blocking queue.
public:
explicit
BlockingQueue
(
size_t
capacity
)
:
capacity_
(
capacity
),
closed_
(
false
)
{
PADDLE_ENFORCE_GT
(
capacity_
,
0
,
"The capacity of a reader::BlockingQueue must be greater than 0."
);
}
bool
Send
(
const
T
&
elem
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
send_cv_
.
wait
(
lock
,
[
&
]
{
return
queue_
.
size
()
<
capacity_
||
closed_
;
});
if
(
closed_
)
{
VLOG
(
5
)
<<
"WARNING: Sending an element to a closed reader::BlokcingQueue."
;
return
false
;
}
PADDLE_ENFORCE_LT
(
queue_
.
size
(),
capacity_
);
queue_
.
push_back
(
elem
);
receive_cv_
.
notify_one
();
return
true
;
}
bool
Send
(
T
&&
elem
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
send_cv_
.
wait
(
lock
,
[
&
]
{
return
queue_
.
size
()
<
capacity_
||
closed_
;
});
if
(
closed_
)
{
VLOG
(
5
)
<<
"WARNING: Sending an element to a closed reader::BlokcingQueue."
;
return
false
;
}
PADDLE_ENFORCE_LT
(
queue_
.
size
(),
capacity_
);
queue_
.
emplace_back
(
std
::
move
(
elem
));
receive_cv_
.
notify_one
();
return
true
;
}
bool
Receive
(
T
*
elem
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
receive_cv_
.
wait
(
lock
,
[
&
]
{
return
!
queue_
.
empty
()
||
closed_
;
});
if
(
!
queue_
.
empty
())
{
PADDLE_ENFORCE_NOT_NULL
(
elem
);
*
elem
=
queue_
.
front
();
queue_
.
pop_front
();
send_cv_
.
notify_one
();
return
true
;
}
else
{
PADDLE_ENFORCE
(
closed_
);
return
false
;
}
}
void
Close
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
closed_
=
true
;
send_cv_
.
notify_all
();
receive_cv_
.
notify_all
();
}
bool
IsClosed
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
return
closed_
;
}
size_t
Cap
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
mutex_
);
return
capacity_
;
}
private:
size_t
capacity_
;
bool
closed_
;
std
::
deque
<
T
>
queue_
;
std
::
mutex
mutex_
;
std
::
condition_variable
receive_cv_
;
std
::
condition_variable
send_cv_
;
};
}
// namespace reader
}
// namespace operators
}
// namespace paddle
paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
浏览文件 @
18d6254d
...
@@ -14,7 +14,7 @@
...
@@ -14,7 +14,7 @@
#include <thread> // NOLINT
#include <thread> // NOLINT
#include "paddle/fluid/
framework/channel
.h"
#include "paddle/fluid/
operators/reader/blocking_queue
.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace
paddle
{
namespace
paddle
{
...
@@ -23,13 +23,13 @@ namespace reader {
...
@@ -23,13 +23,13 @@ namespace reader {
// 'Double buffer' means we shall maintain two batches of input data at the same
// 'Double buffer' means we shall maintain two batches of input data at the same
// time. So the kCacheSize shoul be at least 2.
// time. So the kCacheSize shoul be at least 2.
static
constexpr
size_t
kCacheSize
=
2
;
static
constexpr
size_t
kCacheSize
=
3
;
// There will be two bacthes out of the channel during training:
// There will be two bacthes out of the channel during training:
// 1. the one waiting to be sent to the channel
// 1. the one waiting to be sent to the channel
// 2. the one just be received from the channel, which is also being used by
// 2. the one just be received from the channel, which is also being used by
// subsequent operators.
// subsequent operators.
// So the channel size should be kChacheSize - 2
// So the channel size should be kChacheSize - 2
static
constexpr
size_t
kChannelSize
=
0
;
// kCacheSize - 2
static
constexpr
size_t
kChannelSize
=
1
;
// kCacheSize - 2
class
DoubleBufferReader
:
public
framework
::
DecoratedReader
{
class
DoubleBufferReader
:
public
framework
::
DecoratedReader
{
public:
public:
...
@@ -55,10 +55,8 @@ class DoubleBufferReader : public framework::DecoratedReader {
...
@@ -55,10 +55,8 @@ class DoubleBufferReader : public framework::DecoratedReader {
~
DoubleBufferReader
()
{
EndPrefetcher
();
}
~
DoubleBufferReader
()
{
EndPrefetcher
();
}
private:
private:
bool
HasNext
()
const
;
void
StartPrefetcher
()
{
void
StartPrefetcher
()
{
channel_
=
framework
::
MakeChannel
<
size_t
>
(
kChannelSize
);
channel_
=
new
reader
::
BlockingQueue
<
size_t
>
(
kChannelSize
);
prefetcher_
=
std
::
thread
([
this
]
{
PrefetchThreadFunc
();
});
prefetcher_
=
std
::
thread
([
this
]
{
PrefetchThreadFunc
();
});
}
}
...
@@ -74,7 +72,7 @@ class DoubleBufferReader : public framework::DecoratedReader {
...
@@ -74,7 +72,7 @@ class DoubleBufferReader : public framework::DecoratedReader {
void
PrefetchThreadFunc
();
void
PrefetchThreadFunc
();
std
::
thread
prefetcher_
;
std
::
thread
prefetcher_
;
framework
::
Channel
<
size_t
>*
channel_
;
reader
::
BlockingQueue
<
size_t
>*
channel_
;
platform
::
Place
place_
;
platform
::
Place
place_
;
std
::
vector
<
std
::
vector
<
framework
::
LoDTensor
>>
cpu_tensor_cache_
;
std
::
vector
<
std
::
vector
<
framework
::
LoDTensor
>>
cpu_tensor_cache_
;
std
::
vector
<
std
::
vector
<
framework
::
LoDTensor
>>
gpu_tensor_cache_
;
std
::
vector
<
std
::
vector
<
framework
::
LoDTensor
>>
gpu_tensor_cache_
;
...
@@ -139,17 +137,16 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
...
@@ -139,17 +137,16 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
};
};
void
DoubleBufferReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
{
void
DoubleBufferReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
{
out
->
clear
();
size_t
cached_tensor_id
;
if
(
HasNext
())
{
if
(
channel_
->
Receive
(
&
cached_tensor_id
))
{
size_t
cached_tensor_id
;
channel_
->
Receive
(
&
cached_tensor_id
);
if
(
platform
::
is_gpu_place
(
place_
))
{
if
(
platform
::
is_gpu_place
(
place_
))
{
*
out
=
gpu_tensor_cache_
[
cached_tensor_id
];
*
out
=
gpu_tensor_cache_
[
cached_tensor_id
];
ctxs_
[
cached_tensor_id
]
->
Wait
();
}
else
{
}
else
{
// CPU place
// CPU place
*
out
=
cpu_tensor_cache_
[
cached_tensor_id
];
*
out
=
cpu_tensor_cache_
[
cached_tensor_id
];
}
}
}
else
{
out
->
clear
();
}
}
}
}
...
@@ -159,12 +156,6 @@ void DoubleBufferReader::ReInit() {
...
@@ -159,12 +156,6 @@ void DoubleBufferReader::ReInit() {
StartPrefetcher
();
StartPrefetcher
();
}
}
bool
DoubleBufferReader
::
HasNext
()
const
{
while
(
!
channel_
->
IsClosed
()
&&
!
channel_
->
CanReceive
())
{
}
return
channel_
->
CanReceive
();
}
void
DoubleBufferReader
::
PrefetchThreadFunc
()
{
void
DoubleBufferReader
::
PrefetchThreadFunc
()
{
VLOG
(
5
)
<<
"A new prefetch thread starts."
;
VLOG
(
5
)
<<
"A new prefetch thread starts."
;
size_t
cached_tensor_id
=
0
;
size_t
cached_tensor_id
=
0
;
...
@@ -185,10 +176,7 @@ void DoubleBufferReader::PrefetchThreadFunc() {
...
@@ -185,10 +176,7 @@ void DoubleBufferReader::PrefetchThreadFunc() {
gpu_batch
[
i
].
set_lod
(
cpu_batch
[
i
].
lod
());
gpu_batch
[
i
].
set_lod
(
cpu_batch
[
i
].
lod
());
}
}
}
}
try
{
if
(
!
channel_
->
Send
(
cached_tensor_id
))
{
size_t
tmp
=
cached_tensor_id
;
channel_
->
Send
(
&
tmp
);
}
catch
(
paddle
::
platform
::
EnforceNotMet
e
)
{
VLOG
(
5
)
<<
"WARNING: The double buffer channel has been closed. The "
VLOG
(
5
)
<<
"WARNING: The double buffer channel has been closed. The "
"prefetch thread will terminate."
;
"prefetch thread will terminate."
;
break
;
break
;
...
...
paddle/fluid/operators/reader/open_files_op.cc
浏览文件 @
18d6254d
...
@@ -14,7 +14,7 @@
...
@@ -14,7 +14,7 @@
#include <thread> // NOLINT
#include <thread> // NOLINT
#include "paddle/fluid/
framework/channel
.h"
#include "paddle/fluid/
operators/reader/blocking_queue
.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
#include "paddle/fluid/operators/reader/reader_op_registry.h"
namespace
paddle
{
namespace
paddle
{
...
@@ -37,7 +37,6 @@ class MultiFileReader : public framework::ReaderBase {
...
@@ -37,7 +37,6 @@ class MultiFileReader : public framework::ReaderBase {
~
MultiFileReader
()
{
EndScheduler
();
}
~
MultiFileReader
()
{
EndScheduler
();
}
private:
private:
bool
HasNext
();
void
StartNewScheduler
();
void
StartNewScheduler
();
void
EndScheduler
();
void
EndScheduler
();
void
ScheduleThreadFunc
();
void
ScheduleThreadFunc
();
...
@@ -48,15 +47,14 @@ class MultiFileReader : public framework::ReaderBase {
...
@@ -48,15 +47,14 @@ class MultiFileReader : public framework::ReaderBase {
std
::
thread
scheduler_
;
std
::
thread
scheduler_
;
std
::
vector
<
std
::
thread
>
prefetchers_
;
std
::
vector
<
std
::
thread
>
prefetchers_
;
size_t
buffer_size_
;
size_t
buffer_size_
;
framework
::
Channel
<
size_t
>*
waiting_file_idx_
;
reader
::
BlockingQueue
<
size_t
>*
waiting_file_idx_
;
framework
::
Channel
<
size_t
>*
available_thread_idx_
;
reader
::
BlockingQueue
<
size_t
>*
available_thread_idx_
;
framework
::
Channel
<
std
::
vector
<
framework
::
LoDTensor
>>*
buffer_
;
reader
::
BlockingQueue
<
std
::
vector
<
framework
::
LoDTensor
>>*
buffer_
;
};
};
void
MultiFileReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
{
void
MultiFileReader
::
ReadNext
(
std
::
vector
<
framework
::
LoDTensor
>*
out
)
{
out
->
clear
();
if
(
!
buffer_
->
Receive
(
out
))
{
if
(
HasNext
())
{
out
->
clear
();
buffer_
->
Receive
(
out
);
}
}
}
}
...
@@ -65,25 +63,19 @@ void MultiFileReader::ReInit() {
...
@@ -65,25 +63,19 @@ void MultiFileReader::ReInit() {
StartNewScheduler
();
StartNewScheduler
();
}
}
bool
MultiFileReader
::
HasNext
()
{
while
(
!
buffer_
->
IsClosed
()
&&
!
buffer_
->
CanReceive
())
{
}
return
buffer_
->
CanReceive
();
}
void
MultiFileReader
::
StartNewScheduler
()
{
void
MultiFileReader
::
StartNewScheduler
()
{
size_t
thread_num
=
prefetchers_
.
size
();
size_t
thread_num
=
prefetchers_
.
size
();
waiting_file_idx_
=
framework
::
MakeChannel
<
size_t
>
(
file_names_
.
size
());
waiting_file_idx_
=
new
reader
::
BlockingQueue
<
size_t
>
(
file_names_
.
size
());
available_thread_idx_
=
framework
::
MakeChannel
<
size_t
>
(
thread_num
);
available_thread_idx_
=
new
reader
::
BlockingQueue
<
size_t
>
(
thread_num
);
buffer_
=
buffer_
=
new
reader
::
BlockingQueue
<
std
::
vector
<
framework
::
LoDTensor
>>
(
framework
::
MakeChannel
<
std
::
vector
<
framework
::
LoDTensor
>>
(
buffer_size_
);
buffer_size_
);
for
(
size_t
i
=
0
;
i
<
file_names_
.
size
();
++
i
)
{
for
(
size_t
i
=
0
;
i
<
file_names_
.
size
();
++
i
)
{
waiting_file_idx_
->
Send
(
&
i
);
waiting_file_idx_
->
Send
(
i
);
}
}
waiting_file_idx_
->
Close
();
waiting_file_idx_
->
Close
();
for
(
size_t
i
=
0
;
i
<
thread_num
;
++
i
)
{
for
(
size_t
i
=
0
;
i
<
thread_num
;
++
i
)
{
available_thread_idx_
->
Send
(
&
i
);
available_thread_idx_
->
Send
(
i
);
}
}
scheduler_
=
std
::
thread
([
this
]
{
ScheduleThreadFunc
();
});
scheduler_
=
std
::
thread
([
this
]
{
ScheduleThreadFunc
();
});
...
@@ -149,7 +141,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name,
...
@@ -149,7 +141,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name,
break
;
break
;
}
}
try
{
try
{
buffer_
->
Send
(
&
ins
);
buffer_
->
Send
(
std
::
move
(
ins
)
);
}
catch
(
paddle
::
platform
::
EnforceNotMet
e
)
{
}
catch
(
paddle
::
platform
::
EnforceNotMet
e
)
{
VLOG
(
5
)
<<
"WARNING: The buffer channel has been closed. The prefetch "
VLOG
(
5
)
<<
"WARNING: The buffer channel has been closed. The prefetch "
"thread of file '"
"thread of file '"
...
@@ -158,9 +150,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name,
...
@@ -158,9 +150,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name,
}
}
}
}
try
{
if
(
!
available_thread_idx_
->
Send
(
thread_idx
))
{
available_thread_idx_
->
Send
(
&
thread_idx
);
}
catch
(
paddle
::
platform
::
EnforceNotMet
e
)
{
VLOG
(
5
)
<<
"WARNING: The available_thread_idx_ channel has been closed. "
VLOG
(
5
)
<<
"WARNING: The available_thread_idx_ channel has been closed. "
"Fail to send thread_idx."
;
"Fail to send thread_idx."
;
}
}
...
...
paddle/fluid/operators/reader/reader_blocking_queue_test.cc
0 → 100644
浏览文件 @
18d6254d
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono> // NOLINT
#include <set>
#include <thread> // NOLINT
#include <vector>
#include "gtest/gtest.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
using
paddle
::
operators
::
reader
::
BlockingQueue
;
TEST
(
BlockingQueue
,
CapacityTest
)
{
size_t
cap
=
10
;
BlockingQueue
<
int
>
q
(
cap
);
EXPECT_EQ
(
q
.
Cap
(),
cap
);
}
void
FirstInFirstOut
(
size_t
queue_cap
,
size_t
elem_num
,
size_t
send_time_gap
,
size_t
receive_time_gap
)
{
BlockingQueue
<
size_t
>
q
(
queue_cap
);
std
::
thread
sender
([
&
]()
{
for
(
size_t
i
=
0
;
i
<
elem_num
;
++
i
)
{
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
send_time_gap
));
EXPECT_TRUE
(
q
.
Send
(
i
));
}
q
.
Close
();
});
size_t
count
=
0
;
while
(
true
)
{
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
receive_time_gap
));
size_t
elem
;
if
(
!
q
.
Receive
(
&
elem
))
{
break
;
}
EXPECT_EQ
(
elem
,
count
++
);
}
sender
.
join
();
EXPECT_EQ
(
count
,
elem_num
);
EXPECT_TRUE
(
q
.
IsClosed
());
}
TEST
(
BlockingQueue
,
FirstInFirstOutTest
)
{
FirstInFirstOut
(
2
,
5
,
2
,
50
);
FirstInFirstOut
(
2
,
5
,
50
,
2
);
FirstInFirstOut
(
10
,
3
,
50
,
2
);
FirstInFirstOut
(
10
,
3
,
2
,
50
);
}
TEST
(
BlockingQueue
,
SenderBlockingTest
)
{
const
size_t
queue_cap
=
2
;
BlockingQueue
<
size_t
>
q
(
queue_cap
);
size_t
send_count
=
0
;
std
::
thread
sender
([
&
]()
{
for
(
size_t
i
=
0
;
i
<
5
;
++
i
)
{
if
(
!
q
.
Send
(
i
))
{
break
;
}
++
send_count
;
}
});
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
200
));
q
.
Close
();
sender
.
join
();
EXPECT_EQ
(
send_count
,
queue_cap
);
std
::
vector
<
size_t
>
res
;
while
(
true
)
{
size_t
elem
;
if
(
!
q
.
Receive
(
&
elem
))
{
break
;
}
res
.
push_back
(
elem
);
}
EXPECT_EQ
(
res
.
size
(),
queue_cap
);
for
(
size_t
i
=
0
;
i
<
res
.
size
();
++
i
)
{
EXPECT_EQ
(
res
[
i
],
i
);
}
}
TEST
(
BlockingQueue
,
ReceiverBlockingTest
)
{
const
size_t
queue_cap
=
5
;
BlockingQueue
<
size_t
>
q
(
queue_cap
);
std
::
vector
<
size_t
>
receive_res
;
std
::
thread
receiver
([
&
]()
{
size_t
elem
;
while
(
true
)
{
if
(
!
q
.
Receive
(
&
elem
))
{
break
;
}
receive_res
.
push_back
(
elem
);
}
});
std
::
vector
<
size_t
>
to_send
{
2
,
1
,
7
};
for
(
auto
e
:
to_send
)
{
q
.
Send
(
e
);
}
q
.
Close
();
receiver
.
join
();
EXPECT_EQ
(
receive_res
.
size
(),
to_send
.
size
());
for
(
size_t
i
=
0
;
i
<
to_send
.
size
();
++
i
)
{
EXPECT_EQ
(
receive_res
[
i
],
to_send
[
i
]);
}
}
void
CheckIsUnorderedSame
(
const
std
::
vector
<
std
::
vector
<
size_t
>>&
v1
,
const
std
::
vector
<
std
::
vector
<
size_t
>>&
v2
)
{
std
::
set
<
size_t
>
s1
;
std
::
set
<
size_t
>
s2
;
for
(
auto
vec
:
v1
)
{
for
(
size_t
elem
:
vec
)
{
s1
.
insert
(
elem
);
}
}
for
(
auto
vec
:
v2
)
{
for
(
size_t
elem
:
vec
)
{
s2
.
insert
(
elem
);
}
}
EXPECT_EQ
(
s1
.
size
(),
s2
.
size
());
auto
it1
=
s1
.
begin
();
auto
it2
=
s2
.
begin
();
while
(
it1
!=
s1
.
end
())
{
EXPECT_EQ
(
*
it1
,
*
it2
);
++
it1
;
++
it2
;
}
}
void
MultiSenderMultiReceiver
(
const
size_t
queue_cap
,
const
std
::
vector
<
std
::
vector
<
size_t
>>&
to_send
,
size_t
receiver_num
,
size_t
send_time_gap
,
size_t
receive_time_gap
)
{
BlockingQueue
<
size_t
>
q
(
queue_cap
);
size_t
sender_num
=
to_send
.
size
();
std
::
vector
<
std
::
thread
>
senders
;
for
(
size_t
s_idx
=
0
;
s_idx
<
sender_num
;
++
s_idx
)
{
senders
.
emplace_back
(
std
::
thread
([
&
,
s_idx
]
{
for
(
size_t
elem
:
to_send
[
s_idx
])
{
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
send_time_gap
));
EXPECT_TRUE
(
q
.
Send
(
elem
));
}
}));
}
std
::
vector
<
std
::
thread
>
receivers
;
std
::
mutex
mu
;
std
::
vector
<
std
::
vector
<
size_t
>>
res
;
for
(
size_t
r_idx
=
0
;
r_idx
<
receiver_num
;
++
r_idx
)
{
receivers
.
emplace_back
(
std
::
thread
([
&
]
{
std
::
vector
<
size_t
>
receiver_res
;
while
(
true
)
{
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
receive_time_gap
));
size_t
elem
;
if
(
!
q
.
Receive
(
&
elem
))
{
break
;
}
receiver_res
.
push_back
(
elem
);
}
std
::
lock_guard
<
std
::
mutex
>
lock
(
mu
);
res
.
push_back
(
receiver_res
);
}));
}
for
(
auto
&
t
:
senders
)
{
t
.
join
();
}
q
.
Close
();
for
(
auto
&
t
:
receivers
)
{
t
.
join
();
}
CheckIsUnorderedSame
(
to_send
,
res
);
}
TEST
(
BlockingQueue
,
MultiSenderMultiReaderTest
)
{
std
::
vector
<
std
::
vector
<
size_t
>>
to_send_1
{{
2
,
3
,
4
},
{
9
},
{
0
,
7
,
15
,
6
}};
MultiSenderMultiReceiver
(
2
,
to_send_1
,
2
,
0
,
0
);
MultiSenderMultiReceiver
(
10
,
to_send_1
,
2
,
0
,
0
);
MultiSenderMultiReceiver
(
2
,
to_send_1
,
20
,
0
,
0
);
MultiSenderMultiReceiver
(
2
,
to_send_1
,
2
,
50
,
0
);
MultiSenderMultiReceiver
(
2
,
to_send_1
,
2
,
0
,
50
);
std
::
vector
<
std
::
vector
<
size_t
>>
to_send_2
{
{
2
,
3
,
4
},
{},
{
0
,
7
,
15
,
6
,
9
,
32
}};
MultiSenderMultiReceiver
(
2
,
to_send_2
,
3
,
0
,
0
);
MultiSenderMultiReceiver
(
20
,
to_send_2
,
3
,
0
,
0
);
MultiSenderMultiReceiver
(
2
,
to_send_2
,
30
,
0
,
0
);
MultiSenderMultiReceiver
(
2
,
to_send_2
,
3
,
50
,
0
);
MultiSenderMultiReceiver
(
2
,
to_send_2
,
3
,
0
,
50
);
}
struct
MyClass
{
MyClass
()
:
val_
(
0
)
{}
explicit
MyClass
(
int
val
)
:
val_
(
val
)
{}
MyClass
(
const
MyClass
&
b
)
{
val_
=
b
.
val_
;
}
MyClass
(
MyClass
&&
b
)
{
val_
=
b
.
val_
;
}
void
operator
=
(
const
MyClass
&
b
)
{
val_
=
b
.
val_
;
}
int
val_
;
};
TEST
(
BlockingQueue
,
MyClassTest
)
{
BlockingQueue
<
MyClass
>
q
(
2
);
MyClass
a
(
200
);
q
.
Send
(
std
::
move
(
a
));
MyClass
b
;
q
.
Receive
(
&
b
);
EXPECT_EQ
(
a
.
val_
,
b
.
val_
);
}
paddle/fluid/platform/dynload/cublas.h
浏览文件 @
18d6254d
...
@@ -14,10 +14,12 @@
...
@@ -14,10 +14,12 @@
#pragma once
#pragma once
#include <cublasXt.h>
#include <cublas_v2.h>
#include <cublas_v2.h>
#include <cuda.h>
#include <cuda.h>
#include <dlfcn.h>
#include <dlfcn.h>
#include <mutex> // NOLINT
#include <mutex> // NOLINT
#include <type_traits>
#include "paddle/fluid/platform/dynload/dynamic_loader.h"
#include "paddle/fluid/platform/dynload/dynamic_loader.h"
namespace
paddle
{
namespace
paddle
{
...
@@ -37,14 +39,14 @@ extern void *cublas_dso_handle;
...
@@ -37,14 +39,14 @@ extern void *cublas_dso_handle;
#ifdef PADDLE_USE_DSO
#ifdef PADDLE_USE_DSO
#define DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(__name) \
#define DECLARE_DYNAMIC_LOAD_CUBLAS_WRAP(__name) \
struct DynLoad__##__name { \
struct DynLoad__##__name { \
using FUNC_TYPE = decltype(&::__name); \
template <typename... Args> \
template <typename... Args> \
inline cublasStatus_t operator()(Args... args) { \
inline cublasStatus_t operator()(Args... args) { \
typedef cublasStatus_t (*cublasFunc)(Args...); \
std::call_once(cublas_dso_flag, []() { \
std::call_once(cublas_dso_flag, []() { \
cublas_dso_handle = paddle::platform::dynload::GetCublasDsoHandle(); \
cublas_dso_handle = paddle::platform::dynload::GetCublasDsoHandle(); \
}); \
}); \
void *p_##__name = dlsym(cublas_dso_handle, #__name); \
void *p_##__name = dlsym(cublas_dso_handle, #__name); \
return reinterpret_cast<
cublasFunc>(p_##__name)(args...);
\
return reinterpret_cast<
FUNC_TYPE>(p_##__name)(args...);
\
} \
} \
}; \
}; \
extern DynLoad__##__name __name
extern DynLoad__##__name __name
...
@@ -71,8 +73,8 @@ extern void *cublas_dso_handle;
...
@@ -71,8 +73,8 @@ extern void *cublas_dso_handle;
__macro(cublasDgemm_v2); \
__macro(cublasDgemm_v2); \
__macro(cublasHgemm); \
__macro(cublasHgemm); \
__macro(cublasSgemmEx); \
__macro(cublasSgemmEx); \
__macro(cublasSgeam
_v2);
\
__macro(cublasSgeam
);
\
__macro(cublasDgeam
_v2);
\
__macro(cublasDgeam
);
\
__macro(cublasCreate_v2); \
__macro(cublasCreate_v2); \
__macro(cublasDestroy_v2); \
__macro(cublasDestroy_v2); \
__macro(cublasSetStream_v2); \
__macro(cublasSetStream_v2); \
...
...
paddle/fluid/platform/dynload/cudnn.h
浏览文件 @
18d6254d
...
@@ -34,7 +34,7 @@ extern void EnforceCUDNNLoaded(const char* fn_name);
...
@@ -34,7 +34,7 @@ extern void EnforceCUDNNLoaded(const char* fn_name);
struct DynLoad__##__name { \
struct DynLoad__##__name { \
template <typename... Args> \
template <typename... Args> \
auto operator()(Args... args) -> decltype(__name(args...)) { \
auto operator()(Args... args) -> decltype(__name(args...)) { \
using cudnn_func = decltype(
__name(args...)) (*)(Args...);
\
using cudnn_func = decltype(
&::__name);
\
std::call_once(cudnn_dso_flag, []() { \
std::call_once(cudnn_dso_flag, []() { \
cudnn_dso_handle = paddle::platform::dynload::GetCUDNNDsoHandle(); \
cudnn_dso_handle = paddle::platform::dynload::GetCUDNNDsoHandle(); \
}); \
}); \
...
...
paddle/fluid/platform/dynload/cupti.h
浏览文件 @
18d6254d
...
@@ -41,7 +41,7 @@ extern void *cupti_dso_handle;
...
@@ -41,7 +41,7 @@ extern void *cupti_dso_handle;
struct DynLoad__##__name { \
struct DynLoad__##__name { \
template <typename... Args> \
template <typename... Args> \
inline CUptiResult CUPTIAPI operator()(Args... args) { \
inline CUptiResult CUPTIAPI operator()(Args... args) { \
typedef CUptiResult CUPTIAPI (*cuptiFunc)(Args...);
\
using cuptiFunc = decltype(&::__name);
\
std::call_once(cupti_dso_flag, []() { \
std::call_once(cupti_dso_flag, []() { \
cupti_dso_handle = paddle::platform::dynload::GetCUPTIDsoHandle(); \
cupti_dso_handle = paddle::platform::dynload::GetCUPTIDsoHandle(); \
}); \
}); \
...
...
paddle/fluid/platform/dynload/curand.h
浏览文件 @
18d6254d
...
@@ -30,7 +30,7 @@ extern void *curand_dso_handle;
...
@@ -30,7 +30,7 @@ extern void *curand_dso_handle;
struct DynLoad__##__name { \
struct DynLoad__##__name { \
template <typename... Args> \
template <typename... Args> \
curandStatus_t operator()(Args... args) { \
curandStatus_t operator()(Args... args) { \
typedef curandStatus_t (*curandFunc)(Args...);
\
using curandFunc = decltype(&::__name);
\
std::call_once(curand_dso_flag, []() { \
std::call_once(curand_dso_flag, []() { \
curand_dso_handle = paddle::platform::dynload::GetCurandDsoHandle(); \
curand_dso_handle = paddle::platform::dynload::GetCurandDsoHandle(); \
}); \
}); \
...
...
paddle/fluid/platform/dynload/nccl.h
浏览文件 @
18d6254d
...
@@ -33,7 +33,7 @@ extern void* nccl_dso_handle;
...
@@ -33,7 +33,7 @@ extern void* nccl_dso_handle;
struct DynLoad__##__name { \
struct DynLoad__##__name { \
template <typename... Args> \
template <typename... Args> \
auto operator()(Args... args) -> decltype(__name(args...)) { \
auto operator()(Args... args) -> decltype(__name(args...)) { \
using nccl_func = decltype(
__name(args...)) (*)(Args...);
\
using nccl_func = decltype(
&::__name);
\
std::call_once(nccl_dso_flag, []() { \
std::call_once(nccl_dso_flag, []() { \
nccl_dso_handle = paddle::platform::dynload::GetNCCLDsoHandle(); \
nccl_dso_handle = paddle::platform::dynload::GetNCCLDsoHandle(); \
}); \
}); \
...
...
paddle/fluid/platform/dynload/warpctc.h
浏览文件 @
18d6254d
...
@@ -36,7 +36,7 @@ extern void* warpctc_dso_handle;
...
@@ -36,7 +36,7 @@ extern void* warpctc_dso_handle;
struct DynLoad__##__name { \
struct DynLoad__##__name { \
template <typename... Args> \
template <typename... Args> \
auto operator()(Args... args) -> decltype(__name(args...)) { \
auto operator()(Args... args) -> decltype(__name(args...)) { \
using warpctcFunc = decltype(
__name(args...)) (*)(Args...);
\
using warpctcFunc = decltype(
&::__name);
\
std::call_once(warpctc_dso_flag, []() { \
std::call_once(warpctc_dso_flag, []() { \
warpctc_dso_handle = paddle::platform::dynload::GetWarpCTCDsoHandle(); \
warpctc_dso_handle = paddle::platform::dynload::GetWarpCTCDsoHandle(); \
}); \
}); \
...
...
paddle/scripts/paddle_docker_build.sh
浏览文件 @
18d6254d
...
@@ -75,6 +75,7 @@ function main() {
...
@@ -75,6 +75,7 @@ function main() {
build_android
)
build_android
)
start_build_docker
start_build_docker
docker
exec
${
CONTAINER_ID
}
bash
-c
"./paddle/scripts/paddle_build.sh
$@
"
docker
exec
${
CONTAINER_ID
}
bash
-c
"./paddle/scripts/paddle_build.sh
$@
"
;;
*
)
*
)
if
container_running
"
${
CONTAINER_ID
}
"
;
then
if
container_running
"
${
CONTAINER_ID
}
"
;
then
docker
exec
${
CONTAINER_ID
}
bash
-c
"./paddle/scripts/paddle_build.sh
$@
"
docker
exec
${
CONTAINER_ID
}
bash
-c
"./paddle/scripts/paddle_build.sh
$@
"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录