Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
7d653f0a
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
7d653f0a
编写于
4月 06, 2019
作者:
G
guru4elephant
提交者:
GitHub
4月 06, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16652 from xjqbest/dataset_merge_develop
fix dataset bug
上级
ea8655db
126d2a2f
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
81 addition
and
27 deletion
+81
-27
paddle/fluid/framework/data_feed.cc
paddle/fluid/framework/data_feed.cc
+19
-3
paddle/fluid/framework/data_feed.h
paddle/fluid/framework/data_feed.h
+3
-0
paddle/fluid/framework/data_set.cc
paddle/fluid/framework/data_set.cc
+11
-0
paddle/fluid/framework/data_set.h
paddle/fluid/framework/data_set.h
+7
-0
paddle/fluid/framework/fleet/fleet_wrapper.cc
paddle/fluid/framework/fleet/fleet_wrapper.cc
+1
-0
paddle/fluid/framework/io/shell.cc
paddle/fluid/framework/io/shell.cc
+1
-1
paddle/fluid/pybind/data_set_py.cc
paddle/fluid/pybind/data_set_py.cc
+4
-0
python/paddle/fluid/dataset.py
python/paddle/fluid/dataset.py
+2
-0
python/paddle/fluid/executor.py
python/paddle/fluid/executor.py
+2
-2
python/paddle/fluid/incubate/fleet/parameter_server/__init__.py
.../paddle/fluid/incubate/fleet/parameter_server/__init__.py
+21
-8
python/paddle/fluid/tests/unittests/test_dataset.py
python/paddle/fluid/tests/unittests/test_dataset.py
+7
-11
python/paddle/fluid/trainer_factory.py
python/paddle/fluid/trainer_factory.py
+3
-2
未找到文件。
paddle/fluid/framework/data_feed.cc
浏览文件 @
7d653f0a
...
...
@@ -242,6 +242,11 @@ void InMemoryDataFeed<T>::SetTrainerNum(int trainer_num) {
trainer_num_
=
trainer_num
;
}
template
<
typename
T
>
void
InMemoryDataFeed
<
T
>::
SetFleetSendBatchSize
(
int64_t
size
)
{
fleet_send_batch_size_
=
size
;
}
template
<
typename
T
>
void
InMemoryDataFeed
<
T
>::
PutInsToChannel
(
const
std
::
string
&
ins_str
)
{
#ifdef _LINUX
...
...
@@ -361,8 +366,13 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
VLOG
(
3
)
<<
"GlobalShuffle() begin, thread_id="
<<
thread_id_
;
auto
fleet_ptr
=
FleetWrapper
::
GetInstance
();
std
::
vector
<
std
::
vector
<
T
*>>
send_vec
(
trainer_num_
);
std
::
vector
<
int
>
send_index
(
trainer_num_
);
uint64_t
reserve_len
=
fleet_send_batch_size_
/
trainer_num_
;
for
(
auto
&
vec
:
send_vec
)
{
vec
.
reserve
(
fleet_send_batch_size_
);
vec
.
reserve
(
reserve_len
);
}
for
(
int
i
=
0
;
i
<
trainer_num_
;
++
i
)
{
send_index
[
i
]
=
i
;
}
std
::
vector
<
std
::
future
<
int32_t
>>
total_status
;
auto
interval
=
GetMemoryDataInterval
();
...
...
@@ -375,7 +385,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
int64_t
node_id
=
random_num
%
trainer_num_
;
send_vec
[
node_id
].
push_back
(
&
((
*
memory_data_
)[
i
]));
if
(
i
%
fleet_send_batch_size_
==
0
&&
i
!=
0
)
{
for
(
int
j
=
0
;
j
<
send_vec
.
size
();
++
j
)
{
// shuffle the sequence of sending to avoid network timeout error
std
::
random_shuffle
(
send_index
.
begin
(),
send_index
.
end
());
for
(
int
index
=
0
;
index
<
send_index
.
size
();
++
index
)
{
int
j
=
send_index
[
index
];
std
::
string
send_str
;
SerializeIns
(
send_vec
[
j
],
&
send_str
);
VLOG
(
3
)
<<
"send str_length="
<<
send_str
.
length
()
...
...
@@ -388,7 +401,10 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
}
}
}
for
(
int
j
=
0
;
j
<
send_vec
.
size
();
++
j
)
{
// shuffle the sequence of sending to avoid network timeout error
std
::
random_shuffle
(
send_index
.
begin
(),
send_index
.
end
());
for
(
int
index
=
0
;
index
<
send_index
.
size
();
++
index
)
{
int
j
=
send_index
[
index
];
if
(
send_vec
[
j
].
size
()
!=
0
)
{
std
::
string
send_str
;
SerializeIns
(
send_vec
[
j
],
&
send_str
);
...
...
paddle/fluid/framework/data_feed.h
浏览文件 @
7d653f0a
...
...
@@ -94,6 +94,8 @@ class DataFeed {
virtual
void
SetThreadNum
(
int
thread_num
)
{}
// This function will do nothing at default
virtual
void
SetTrainerNum
(
int
trainer_num
)
{}
// This function will do nothing at default
virtual
void
SetFleetSendBatchSize
(
int64_t
size
)
{}
virtual
void
SetFileListMutex
(
std
::
mutex
*
mutex
)
{
mutex_for_pick_file_
=
mutex
;
}
...
...
@@ -212,6 +214,7 @@ class InMemoryDataFeed : public PrivateQueueDataFeed<T> {
virtual
void
SetThreadId
(
int
thread_id
);
virtual
void
SetThreadNum
(
int
thread_num
);
virtual
void
SetTrainerNum
(
int
trainer_num
);
virtual
void
SetFleetSendBatchSize
(
int64_t
size
);
virtual
void
PutInsToChannel
(
const
std
::
string
&
ins_str
);
virtual
void
FillMemoryDataToChannel
();
virtual
void
FillChannelToMemoryData
();
...
...
paddle/fluid/framework/data_set.cc
浏览文件 @
7d653f0a
...
...
@@ -64,6 +64,17 @@ void DatasetImpl<T>::SetTrainerNum(int trainer_num) {
}
}
// if you run distributed, and want to do global shuffle,
// set this before global shuffle.
// be sure you call CreateReaders before SetFleetSendBatchSize
template
<
typename
T
>
void
DatasetImpl
<
T
>::
SetFleetSendBatchSize
(
int64_t
size
)
{
fleet_send_batch_size_
=
size
;
for
(
auto
reader
:
readers_
)
{
reader
->
SetFleetSendBatchSize
(
size
);
}
}
template
<
typename
T
>
void
DatasetImpl
<
T
>::
SetHdfsConfig
(
const
std
::
string
&
fs_name
,
const
std
::
string
&
fs_ugi
)
{
...
...
paddle/fluid/framework/data_set.h
浏览文件 @
7d653f0a
...
...
@@ -47,6 +47,8 @@ class Dataset {
virtual
void
SetThreadNum
(
int
thread_num
)
=
0
;
// set workers' num
virtual
void
SetTrainerNum
(
int
trainer_num
)
=
0
;
// set fleet send batch size
virtual
void
SetFleetSendBatchSize
(
int64_t
size
)
=
0
;
// set fs name and ugi
virtual
void
SetHdfsConfig
(
const
std
::
string
&
fs_name
,
const
std
::
string
&
fs_ugi
)
=
0
;
...
...
@@ -59,6 +61,8 @@ class Dataset {
virtual
int
GetThreadNum
()
=
0
;
// get worker num
virtual
int
GetTrainerNum
()
=
0
;
// get fleet send batch size
virtual
int64_t
GetFleetSendBatchSize
()
=
0
;
// get hdfs config
virtual
std
::
pair
<
std
::
string
,
std
::
string
>
GetHdfsConfig
()
=
0
;
// get data fedd desc
...
...
@@ -98,6 +102,7 @@ class DatasetImpl : public Dataset {
virtual
void
SetFileList
(
const
std
::
vector
<
std
::
string
>&
filelist
);
virtual
void
SetThreadNum
(
int
thread_num
);
virtual
void
SetTrainerNum
(
int
trainer_num
);
virtual
void
SetFleetSendBatchSize
(
int64_t
size
);
virtual
void
SetHdfsConfig
(
const
std
::
string
&
fs_name
,
const
std
::
string
&
fs_ugi
);
virtual
void
SetDataFeedDesc
(
const
std
::
string
&
data_feed_desc_str
);
...
...
@@ -105,6 +110,7 @@ class DatasetImpl : public Dataset {
virtual
const
std
::
vector
<
std
::
string
>&
GetFileList
()
{
return
filelist_
;
}
virtual
int
GetThreadNum
()
{
return
thread_num_
;
}
virtual
int
GetTrainerNum
()
{
return
trainer_num_
;
}
virtual
int64_t
GetFleetSendBatchSize
()
{
return
fleet_send_batch_size_
;
}
virtual
std
::
pair
<
std
::
string
,
std
::
string
>
GetHdfsConfig
()
{
return
std
::
make_pair
(
fs_name_
,
fs_ugi_
);
}
...
...
@@ -137,6 +143,7 @@ class DatasetImpl : public Dataset {
std
::
string
fs_name_
;
std
::
string
fs_ugi_
;
unsigned
int
rand_seed
;
int64_t
fleet_send_batch_size_
;
};
// use std::vector<MultiSlotType> as data type
...
...
paddle/fluid/framework/fleet/fleet_wrapper.cc
浏览文件 @
7d653f0a
...
...
@@ -237,6 +237,7 @@ void FleetWrapper::PushDenseParamSync(
std
::
vector
<
paddle
::
ps
::
Region
>
regions
;
for
(
auto
&
t
:
var_names
)
{
Variable
*
var
=
scope
.
FindVar
(
t
);
CHECK
(
var
!=
nullptr
)
<<
"var["
<<
t
<<
"] not found"
;
LoDTensor
*
tensor
=
var
->
GetMutable
<
LoDTensor
>
();
float
*
g
=
tensor
->
mutable_data
<
float
>
(
place
);
paddle
::
ps
::
Region
reg
(
g
,
tensor
->
numel
());
...
...
paddle/fluid/framework/io/shell.cc
浏览文件 @
7d653f0a
...
...
@@ -126,7 +126,7 @@ static int shell_popen_fork_internal(const char* real_cmd, bool do_read,
}
close_open_fds_internal
();
if
(
execl
(
"/bin/
sh"
,
"
sh"
,
"-c"
,
real_cmd
,
NULL
)
<
0
)
{
if
(
execl
(
"/bin/
bash"
,
"ba
sh"
,
"-c"
,
real_cmd
,
NULL
)
<
0
)
{
return
-
1
;
}
exit
(
127
);
...
...
paddle/fluid/pybind/data_set_py.cc
浏览文件 @
7d653f0a
...
...
@@ -50,11 +50,15 @@ void BindDataset(py::module* m) {
.
def
(
"set_filelist"
,
&
framework
::
Dataset
::
SetFileList
)
.
def
(
"set_thread_num"
,
&
framework
::
Dataset
::
SetThreadNum
)
.
def
(
"set_trainer_num"
,
&
framework
::
Dataset
::
SetTrainerNum
)
.
def
(
"set_fleet_send_batch_size"
,
&
framework
::
Dataset
::
SetFleetSendBatchSize
)
.
def
(
"set_hdfs_config"
,
&
framework
::
Dataset
::
SetHdfsConfig
)
.
def
(
"set_data_feed_desc"
,
&
framework
::
Dataset
::
SetDataFeedDesc
)
.
def
(
"get_filelist"
,
&
framework
::
Dataset
::
GetFileList
)
.
def
(
"get_thread_num"
,
&
framework
::
Dataset
::
GetThreadNum
)
.
def
(
"get_trainer_num"
,
&
framework
::
Dataset
::
GetTrainerNum
)
.
def
(
"get_fleet_send_batch_size"
,
&
framework
::
Dataset
::
GetFleetSendBatchSize
)
.
def
(
"get_hdfs_config"
,
&
framework
::
Dataset
::
GetHdfsConfig
)
.
def
(
"get_data_feed_desc"
,
&
framework
::
Dataset
::
GetDataFeedDesc
)
.
def
(
"register_client2client_msg_handler"
,
...
...
python/paddle/fluid/dataset.py
浏览文件 @
7d653f0a
...
...
@@ -241,11 +241,13 @@ class InMemoryDataset(DatasetBase):
fleet: fleet singleton. Default None.
"""
trainer_num
=
1
fleet_send_batch_size
=
80000
if
fleet
is
not
None
:
fleet
.
fleet_instance
.
role_maker_
.
_barrier_worker
()
trainer_num
=
fleet
.
worker_num
()
self
.
dataset
.
register_client2client_msg_handler
()
self
.
dataset
.
set_trainer_num
(
trainer_num
)
self
.
dataset
.
set_fleet_send_batch_size
(
fleet_send_batch_size
)
if
fleet
is
not
None
:
fleet
.
fleet_instance
.
role_maker_
.
_barrier_worker
()
self
.
dataset
.
global_shuffle
()
...
...
python/paddle/fluid/executor.py
浏览文件 @
7d653f0a
...
...
@@ -712,7 +712,7 @@ class Executor(object):
if
dataset
==
None
:
raise
RuntimeError
(
"dataset is needed and should be initialized"
)
if
self
.
place
==
paddle
.
fluid
.
CUDAPlace
(
):
if
not
isinstance
(
self
.
place
,
core
.
CPUPlace
):
raise
RuntimeError
(
"infer_from_dataset is verified on CPUPlace"
"We will open CUDAPlace in the future"
)
...
...
@@ -796,7 +796,7 @@ class Executor(object):
if
dataset
==
None
:
raise
RuntimeError
(
"dataset is need and should be initialized"
)
if
self
.
place
==
paddle
.
fluid
.
CUDAPlace
(
):
if
not
isinstance
(
self
.
place
,
core
.
CPUPlace
):
raise
RuntimeError
(
"train_from_dataset is verified on CPUPlace"
"We will open CUDAPlace in the future"
)
...
...
python/paddle/fluid/incubate/fleet/parameter_server/__init__.py
浏览文件 @
7d653f0a
...
...
@@ -123,18 +123,25 @@ class Fleet(object):
print
(
"You should run DistributedOptimizer.minimize() first"
)
sys
.
exit
(
-
1
)
def
init_worker
(
self
,
programs
):
def
init_worker
(
self
,
programs
,
scopes
=
None
):
"""
init_worker(): will be called by user. When a user knows current process is_server(), he/she
should call init_worker() to initialize global information about worker and connect
worker with pserver.
worker with pserver.
You should run startup program before init_worker.
Args:
programs(Program|list): a Program or a list of Programs
scopes(Scope|list): a Scope or a list of Scopes, default None.
"""
if
not
isinstance
(
programs
,
list
):
programs
=
[
programs
]
if
scopes
is
None
:
scopes
=
[
fluid
.
global_scope
()]
*
len
(
programs
)
if
len
(
scopes
)
!=
len
(
programs
):
print
(
"You should make sure len(scopes) == len(programs) or set scopes None"
)
sys
.
exit
(
-
1
)
if
self
.
_opt_info
:
if
"fleet_desc"
in
self
.
_opt_info
:
self
.
_dist_desc_str
=
text_format
.
MessageToString
(
...
...
@@ -160,7 +167,7 @@ class Fleet(object):
self
.
role_maker_
.
_barrier_worker
()
if
self
.
role_maker_
.
_is_first_worker
():
tables
=
self
.
_dist_desc
.
trainer_param
.
dense_table
for
prog
in
programs
:
for
prog
,
scope
in
zip
(
programs
,
scopes
)
:
prog_id
=
str
(
id
(
prog
))
prog_conf
=
self
.
_opt_info
[
'program_configs'
][
prog_id
]
prog_tables
=
{}
...
...
@@ -174,10 +181,16 @@ class Fleet(object):
continue
var_name_list
=
[]
for
i
in
range
(
0
,
len
(
table
.
dense_variable_name
)):
var_name_list
.
append
(
table
.
dense_variable_name
[
i
])
self
.
_fleet_ptr
.
init_model
(
prog
.
desc
,
int
(
table
.
table_id
),
var_name_list
)
var_name
=
table
.
dense_variable_name
[
i
]
if
scope
.
find_var
(
var_name
)
is
None
:
print
(
"var "
+
var_name
+
" not found in scope, "
+
"you should run startup program first"
)
sys
.
exit
(
-
1
)
var_name_list
.
append
(
var_name
)
self
.
_fleet_ptr
.
init_model
(
scope
,
int
(
table
.
table_id
),
var_name_list
)
# barrier for init model done
self
.
role_maker_
.
_barrier_worker
()
else
:
...
...
python/paddle/fluid/tests/unittests/test_dataset.py
浏览文件 @
7d653f0a
...
...
@@ -29,7 +29,6 @@ class TestDataset(unittest.TestCase):
def
test_dataset_create
(
self
):
""" Testcase for dataset create. """
return
try
:
dataset
=
fluid
.
DatasetFactory
().
create_dataset
(
"InMemoryDataset"
)
except
:
...
...
@@ -48,7 +47,6 @@ class TestDataset(unittest.TestCase):
def
test_dataset_config
(
self
):
""" Testcase for dataset configuration. """
return
dataset
=
fluid
.
core
.
Dataset
(
"MultiSlotDataset"
)
dataset
.
set_thread_num
(
12
)
dataset
.
set_filelist
([
"a.txt"
,
"b.txt"
,
"c.txt"
])
...
...
@@ -75,7 +73,6 @@ class TestDataset(unittest.TestCase):
"""
Testcase for InMemoryDataset from create to run.
"""
return
with
open
(
"test_in_memory_dataset_run_a.txt"
,
"w"
)
as
f
:
data
=
"1 1 2 3 3 4 5 5 5 5 1 1
\n
"
data
+=
"1 2 2 3 4 4 6 6 6 6 1 2
\n
"
...
...
@@ -112,9 +109,10 @@ class TestDataset(unittest.TestCase):
for
i
in
range
(
2
):
try
:
exe
.
train_from_dataset
(
fluid
.
default_main_program
(),
dataset
)
except
:
#self.assertTrue(False)
except
ImportError
as
e
:
pass
except
Exception
as
e
:
self
.
assertTrue
(
False
)
os
.
remove
(
"./test_in_memory_dataset_run_a.txt"
)
os
.
remove
(
"./test_in_memory_dataset_run_b.txt"
)
...
...
@@ -123,7 +121,6 @@ class TestDataset(unittest.TestCase):
"""
Testcase for QueueDataset from create to run.
"""
return
with
open
(
"test_queue_dataset_run_a.txt"
,
"w"
)
as
f
:
data
=
"1 1 2 3 3 4 5 5 5 5 1 1
\n
"
data
+=
"1 2 2 3 4 4 6 6 6 6 1 2
\n
"
...
...
@@ -156,15 +153,14 @@ class TestDataset(unittest.TestCase):
for
i
in
range
(
2
):
try
:
exe
.
train_from_dataset
(
fluid
.
default_main_program
(),
dataset
)
except
:
#self.assertTrue(False)
except
ImportError
as
e
:
pass
except
Exception
as
e
:
self
.
assertTrue
(
False
)
os
.
remove
(
"./test_queue_dataset_run_a.txt"
)
os
.
remove
(
"./test_queue_dataset_run_b.txt"
)
if
__name__
==
'__main__'
:
#unittest.main()
import
sys
sys
.
exit
(
0
)
unittest
.
main
()
python/paddle/fluid/trainer_factory.py
浏览文件 @
7d653f0a
...
...
@@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from
.trainer_desc
import
MultiTrainer
,
DistMultiTrainer
from
.device_worker
import
Hogwild
,
DownpourSGD
__all__
=
[
"TrainerFactory"
]
...
...
@@ -20,8 +23,6 @@ class TrainerFactory(object):
pass
def
_create_trainer
(
self
,
opt_info
=
None
):
from
.trainer_desc
import
MultiTrainer
,
DistMultiTrainer
from
.device_worker
import
Hogwild
,
DownpourSGD
trainer
=
None
device_worker
=
None
if
opt_info
==
None
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录