Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Crayon鑫
Paddle
提交
7c953b34
P
Paddle
项目概览
Crayon鑫
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7c953b34
编写于
9月 16, 2020
作者:
Y
yaoxuefeng6
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
deprecated dataset api in fluid and mark static in new dataset api
上级
ce7dc172
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
99 addition
and
0 deletion
+99
-0
python/paddle/distributed/fleet/dataset/dataset.py
python/paddle/distributed/fleet/dataset/dataset.py
+26
-0
python/paddle/fluid/dataset.py
python/paddle/fluid/dataset.py
+73
-0
未找到文件。
python/paddle/distributed/fleet/dataset/dataset.py
浏览文件 @
7c953b34
...
@@ -240,6 +240,8 @@ class DatasetBase(object):
...
@@ -240,6 +240,8 @@ class DatasetBase(object):
class
InMemoryDataset
(
DatasetBase
):
class
InMemoryDataset
(
DatasetBase
):
"""
"""
:api_attr: Static Graph
InMemoryDataset, it will load data into memory
InMemoryDataset, it will load data into memory
and shuffle data before training.
and shuffle data before training.
...
@@ -265,6 +267,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -265,6 +267,8 @@ class InMemoryDataset(DatasetBase):
def
_init_distributed_settings
(
self
,
**
kwargs
):
def
_init_distributed_settings
(
self
,
**
kwargs
):
"""
"""
:api_attr: Static Graph
should be called only once in user's python scripts to initialize distributed-related setings of dataset instance
should be called only once in user's python scripts to initialize distributed-related setings of dataset instance
Args:
Args:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs:
...
@@ -323,6 +327,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -323,6 +327,8 @@ class InMemoryDataset(DatasetBase):
def
update_settings
(
self
,
**
kwargs
):
def
update_settings
(
self
,
**
kwargs
):
"""
"""
:api_attr: Static Graph
should be called in user's python scripts to update setings of dataset instance
should be called in user's python scripts to update setings of dataset instance
Args:
Args:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs,
kwargs: Keyword arguments. Currently, we support following keys in **kwargs,
...
@@ -400,6 +406,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -400,6 +406,8 @@ class InMemoryDataset(DatasetBase):
def
init
(
self
,
**
kwargs
):
def
init
(
self
,
**
kwargs
):
"""
"""
:api_attr: Static Graph
should be called only once in user's python scripts to initialize setings of dataset instance
should be called only once in user's python scripts to initialize setings of dataset instance
Args:
Args:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs:
...
@@ -639,6 +647,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -639,6 +647,8 @@ class InMemoryDataset(DatasetBase):
def
load_into_memory
(
self
):
def
load_into_memory
(
self
):
"""
"""
:api_attr: Static Graph
Load data into memory
Load data into memory
Examples:
Examples:
...
@@ -655,6 +665,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -655,6 +665,8 @@ class InMemoryDataset(DatasetBase):
def
preload_into_memory
(
self
,
thread_num
=
None
):
def
preload_into_memory
(
self
,
thread_num
=
None
):
"""
"""
:api_attr: Static Graph
Load data into memory in async mode
Load data into memory in async mode
Args:
Args:
...
@@ -679,6 +691,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -679,6 +691,8 @@ class InMemoryDataset(DatasetBase):
def
wait_preload_done
(
self
):
def
wait_preload_done
(
self
):
"""
"""
:api_attr: Static Graph
Wait preload_into_memory done
Wait preload_into_memory done
Examples:
Examples:
...
@@ -696,6 +710,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -696,6 +710,8 @@ class InMemoryDataset(DatasetBase):
def
local_shuffle
(
self
):
def
local_shuffle
(
self
):
"""
"""
:api_attr: Static Graph
Local shuffle
Local shuffle
Examples:
Examples:
...
@@ -712,6 +728,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -712,6 +728,8 @@ class InMemoryDataset(DatasetBase):
def
global_shuffle
(
self
,
fleet
=
None
,
thread_num
=
12
):
def
global_shuffle
(
self
,
fleet
=
None
,
thread_num
=
12
):
"""
"""
:api_attr: Static Graph
Global shuffle.
Global shuffle.
Global shuffle can be used only in distributed mode. i.e. multiple
Global shuffle can be used only in distributed mode. i.e. multiple
processes on single machine or multiple machines training together.
processes on single machine or multiple machines training together.
...
@@ -781,6 +799,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -781,6 +799,8 @@ class InMemoryDataset(DatasetBase):
def
get_memory_data_size
(
self
,
fleet
=
None
):
def
get_memory_data_size
(
self
,
fleet
=
None
):
"""
"""
:api_attr: Static Graph
Get memory data size, user can call this function to know the num
Get memory data size, user can call this function to know the num
of ins in all workers after load into memory.
of ins in all workers after load into memory.
...
@@ -817,6 +837,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -817,6 +837,8 @@ class InMemoryDataset(DatasetBase):
def
get_shuffle_data_size
(
self
,
fleet
=
None
):
def
get_shuffle_data_size
(
self
,
fleet
=
None
):
"""
"""
:api_attr: Static Graph
Get shuffle data size, user can call this function to know the num
Get shuffle data size, user can call this function to know the num
of ins in all workers after local/global shuffle.
of ins in all workers after local/global shuffle.
...
@@ -901,6 +923,8 @@ class InMemoryDataset(DatasetBase):
...
@@ -901,6 +923,8 @@ class InMemoryDataset(DatasetBase):
class
QueueDataset
(
DatasetBase
):
class
QueueDataset
(
DatasetBase
):
"""
"""
:api_attr: Static Graph
QueueDataset, it will process data streamly.
QueueDataset, it will process data streamly.
Examples:
Examples:
...
@@ -920,6 +944,8 @@ class QueueDataset(DatasetBase):
...
@@ -920,6 +944,8 @@ class QueueDataset(DatasetBase):
def
init
(
self
,
**
kwargs
):
def
init
(
self
,
**
kwargs
):
"""
"""
:api_attr: Static Graph
should be called only once in user's python scripts to initialize setings of dataset instance
should be called only once in user's python scripts to initialize setings of dataset instance
"""
"""
super
(
QueueDataset
,
self
).
init
(
**
kwargs
)
super
(
QueueDataset
,
self
).
init
(
**
kwargs
)
...
...
python/paddle/fluid/dataset.py
浏览文件 @
7c953b34
...
@@ -335,6 +335,7 @@ class InMemoryDataset(DatasetBase):
...
@@ -335,6 +335,7 @@ class InMemoryDataset(DatasetBase):
dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset")
"""
"""
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset"
)
def
__init__
(
self
):
def
__init__
(
self
):
""" Init. """
""" Init. """
super
(
InMemoryDataset
,
self
).
__init__
()
super
(
InMemoryDataset
,
self
).
__init__
()
...
@@ -350,12 +351,18 @@ class InMemoryDataset(DatasetBase):
...
@@ -350,12 +351,18 @@ class InMemoryDataset(DatasetBase):
self
.
merge_by_lineid
=
False
self
.
merge_by_lineid
=
False
self
.
fleet_send_sleep_seconds
=
None
self
.
fleet_send_sleep_seconds
=
None
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_feed_type"
)
def
set_feed_type
(
self
,
data_feed_type
):
def
set_feed_type
(
self
,
data_feed_type
):
"""
"""
Set data_feed_desc
Set data_feed_desc
"""
"""
self
.
proto_desc
.
name
=
data_feed_type
self
.
proto_desc
.
name
=
data_feed_type
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._prepare_to_run"
)
def
_prepare_to_run
(
self
):
def
_prepare_to_run
(
self
):
"""
"""
Set data_feed_desc before load or shuffle,
Set data_feed_desc before load or shuffle,
...
@@ -376,16 +383,27 @@ class InMemoryDataset(DatasetBase):
...
@@ -376,16 +383,27 @@ class InMemoryDataset(DatasetBase):
self
.
dataset
.
create_channel
()
self
.
dataset
.
create_channel
()
self
.
dataset
.
create_readers
()
self
.
dataset
.
create_readers
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._dynamic_adjust_before_train"
)
def
_dynamic_adjust_before_train
(
self
,
thread_num
):
def
_dynamic_adjust_before_train
(
self
,
thread_num
):
if
not
self
.
is_user_set_queue_num
:
if
not
self
.
is_user_set_queue_num
:
self
.
dataset
.
dynamic_adjust_channel_num
(
thread_num
,
False
)
self
.
dataset
.
dynamic_adjust_channel_num
(
thread_num
,
False
)
self
.
dataset
.
dynamic_adjust_readers_num
(
thread_num
)
self
.
dataset
.
dynamic_adjust_readers_num
(
thread_num
)
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._dynamic_adjust_after_train"
)
def
_dynamic_adjust_after_train
(
self
):
def
_dynamic_adjust_after_train
(
self
):
if
not
self
.
is_user_set_queue_num
:
if
not
self
.
is_user_set_queue_num
:
self
.
dataset
.
dynamic_adjust_channel_num
(
self
.
thread_num
,
False
)
self
.
dataset
.
dynamic_adjust_channel_num
(
self
.
thread_num
,
False
)
self
.
dataset
.
dynamic_adjust_readers_num
(
self
.
thread_num
)
self
.
dataset
.
dynamic_adjust_readers_num
(
self
.
thread_num
)
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_queue_num"
)
def
set_queue_num
(
self
,
queue_num
):
def
set_queue_num
(
self
,
queue_num
):
"""
"""
Set Dataset output queue num, training threads get data from queues
Set Dataset output queue num, training threads get data from queues
...
@@ -404,6 +422,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -404,6 +422,9 @@ class InMemoryDataset(DatasetBase):
self
.
is_user_set_queue_num
=
True
self
.
is_user_set_queue_num
=
True
self
.
queue_num
=
queue_num
self
.
queue_num
=
queue_num
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_parse_ins_id"
)
def
set_parse_ins_id
(
self
,
parse_ins_id
):
def
set_parse_ins_id
(
self
,
parse_ins_id
):
"""
"""
Set id Dataset need to parse insid
Set id Dataset need to parse insid
...
@@ -421,6 +442,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -421,6 +442,9 @@ class InMemoryDataset(DatasetBase):
"""
"""
self
.
parse_ins_id
=
parse_ins_id
self
.
parse_ins_id
=
parse_ins_id
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_parse_content"
)
def
set_parse_content
(
self
,
parse_content
):
def
set_parse_content
(
self
,
parse_content
):
"""
"""
Set if Dataset need to parse content
Set if Dataset need to parse content
...
@@ -455,6 +479,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -455,6 +479,9 @@ class InMemoryDataset(DatasetBase):
"""
"""
self
.
parse_logkey
=
parse_logkey
self
.
parse_logkey
=
parse_logkey
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_merge_by_sid"
)
def
set_merge_by_sid
(
self
,
merge_by_sid
):
def
set_merge_by_sid
(
self
,
merge_by_sid
):
"""
"""
Set if Dataset need to merge sid. If not, one ins means one Pv.
Set if Dataset need to merge sid. If not, one ins means one Pv.
...
@@ -544,6 +571,10 @@ class InMemoryDataset(DatasetBase):
...
@@ -544,6 +571,10 @@ class InMemoryDataset(DatasetBase):
"""
"""
self
.
dataset
.
postprocess_instance
()
self
.
dataset
.
postprocess_instance
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_fleet_send_batch_size"
)
def
set_fleet_send_batch_size
(
self
,
fleet_send_batch_size
=
1024
):
def
set_fleet_send_batch_size
(
self
,
fleet_send_batch_size
=
1024
):
"""
"""
Set fleet send batch size, default is 1024
Set fleet send batch size, default is 1024
...
@@ -561,6 +592,10 @@ class InMemoryDataset(DatasetBase):
...
@@ -561,6 +592,10 @@ class InMemoryDataset(DatasetBase):
"""
"""
self
.
fleet_send_batch_size
=
fleet_send_batch_size
self
.
fleet_send_batch_size
=
fleet_send_batch_size
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_fleet_send_sleep_seconds"
)
def
set_fleet_send_sleep_seconds
(
self
,
fleet_send_sleep_seconds
=
0
):
def
set_fleet_send_sleep_seconds
(
self
,
fleet_send_sleep_seconds
=
0
):
"""
"""
Set fleet send sleep time, default is 0
Set fleet send sleep time, default is 0
...
@@ -578,6 +613,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -578,6 +613,9 @@ class InMemoryDataset(DatasetBase):
"""
"""
self
.
fleet_send_sleep_seconds
=
fleet_send_sleep_seconds
self
.
fleet_send_sleep_seconds
=
fleet_send_sleep_seconds
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_merge_by_lineid"
)
def
set_merge_by_lineid
(
self
,
merge_size
=
2
):
def
set_merge_by_lineid
(
self
,
merge_size
=
2
):
"""
"""
Set merge by line id, instances of same line id will be merged after
Set merge by line id, instances of same line id will be merged after
...
@@ -598,16 +636,27 @@ class InMemoryDataset(DatasetBase):
...
@@ -598,16 +636,27 @@ class InMemoryDataset(DatasetBase):
self
.
merge_by_lineid
=
True
self
.
merge_by_lineid
=
True
self
.
parse_ins_id
=
True
self
.
parse_ins_id
=
True
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._set_generate_unique_feasigns"
)
def
set_generate_unique_feasigns
(
self
,
generate_uni_feasigns
,
shard_num
):
def
set_generate_unique_feasigns
(
self
,
generate_uni_feasigns
,
shard_num
):
self
.
dataset
.
set_generate_unique_feasigns
(
generate_uni_feasigns
)
self
.
dataset
.
set_generate_unique_feasigns
(
generate_uni_feasigns
)
self
.
gen_uni_feasigns
=
generate_uni_feasigns
self
.
gen_uni_feasigns
=
generate_uni_feasigns
self
.
local_shard_num
=
shard_num
self
.
local_shard_num
=
shard_num
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset._generate_local_tables_unlock"
)
def
generate_local_tables_unlock
(
self
,
table_id
,
fea_dim
,
read_thread_num
,
def
generate_local_tables_unlock
(
self
,
table_id
,
fea_dim
,
read_thread_num
,
consume_thread_num
,
shard_num
):
consume_thread_num
,
shard_num
):
self
.
dataset
.
generate_local_tables_unlock
(
self
.
dataset
.
generate_local_tables_unlock
(
table_id
,
fea_dim
,
read_thread_num
,
consume_thread_num
,
shard_num
)
table_id
,
fea_dim
,
read_thread_num
,
consume_thread_num
,
shard_num
)
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.load_into_memory"
)
def
load_into_memory
(
self
):
def
load_into_memory
(
self
):
"""
"""
Load data into memory
Load data into memory
...
@@ -624,6 +673,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -624,6 +673,9 @@ class InMemoryDataset(DatasetBase):
self
.
_prepare_to_run
()
self
.
_prepare_to_run
()
self
.
dataset
.
load_into_memory
()
self
.
dataset
.
load_into_memory
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.preload_into_memory"
)
def
preload_into_memory
(
self
,
thread_num
=
None
):
def
preload_into_memory
(
self
,
thread_num
=
None
):
"""
"""
Load data into memory in async mode
Load data into memory in async mode
...
@@ -648,6 +700,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -648,6 +700,9 @@ class InMemoryDataset(DatasetBase):
self
.
dataset
.
create_preload_readers
()
self
.
dataset
.
create_preload_readers
()
self
.
dataset
.
preload_into_memory
()
self
.
dataset
.
preload_into_memory
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.wait_preload_done"
)
def
wait_preload_done
(
self
):
def
wait_preload_done
(
self
):
"""
"""
Wait preload_into_memory done
Wait preload_into_memory done
...
@@ -665,6 +720,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -665,6 +720,9 @@ class InMemoryDataset(DatasetBase):
self
.
dataset
.
wait_preload_done
()
self
.
dataset
.
wait_preload_done
()
self
.
dataset
.
destroy_preload_readers
()
self
.
dataset
.
destroy_preload_readers
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.local_shuffle"
)
def
local_shuffle
(
self
):
def
local_shuffle
(
self
):
"""
"""
Local shuffle
Local shuffle
...
@@ -681,6 +739,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -681,6 +739,9 @@ class InMemoryDataset(DatasetBase):
"""
"""
self
.
dataset
.
local_shuffle
()
self
.
dataset
.
local_shuffle
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.global_shuffle"
)
def
global_shuffle
(
self
,
fleet
=
None
,
thread_num
=
12
):
def
global_shuffle
(
self
,
fleet
=
None
,
thread_num
=
12
):
"""
"""
Global shuffle.
Global shuffle.
...
@@ -726,6 +787,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -726,6 +787,9 @@ class InMemoryDataset(DatasetBase):
if
fleet
is
not
None
:
if
fleet
is
not
None
:
fleet
.
_role_maker
.
barrier_worker
()
fleet
.
_role_maker
.
barrier_worker
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.release_memory"
)
def
release_memory
(
self
):
def
release_memory
(
self
):
"""
"""
:api_attr: Static Graph
:api_attr: Static Graph
...
@@ -774,6 +838,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -774,6 +838,9 @@ class InMemoryDataset(DatasetBase):
"""
"""
return
self
.
dataset
.
get_pv_data_size
()
return
self
.
dataset
.
get_pv_data_size
()
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.get_memory_data_size"
)
def
get_memory_data_size
(
self
,
fleet
=
None
):
def
get_memory_data_size
(
self
,
fleet
=
None
):
"""
"""
Get memory data size, user can call this function to know the num
Get memory data size, user can call this function to know the num
...
@@ -810,6 +877,9 @@ class InMemoryDataset(DatasetBase):
...
@@ -810,6 +877,9 @@ class InMemoryDataset(DatasetBase):
return
global_data_size
[
0
]
return
global_data_size
[
0
]
return
local_data_size
[
0
]
return
local_data_size
[
0
]
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.InMemoryDataset.get_shuffle_data_size"
)
def
get_shuffle_data_size
(
self
,
fleet
=
None
):
def
get_shuffle_data_size
(
self
,
fleet
=
None
):
"""
"""
Get shuffle data size, user can call this function to know the num
Get shuffle data size, user can call this function to know the num
...
@@ -869,6 +939,9 @@ class QueueDataset(DatasetBase):
...
@@ -869,6 +939,9 @@ class QueueDataset(DatasetBase):
super
(
QueueDataset
,
self
).
__init__
()
super
(
QueueDataset
,
self
).
__init__
()
self
.
proto_desc
.
name
=
"MultiSlotDataFeed"
self
.
proto_desc
.
name
=
"MultiSlotDataFeed"
@
deprecated
(
since
=
"2.0.0"
,
update_to
=
"paddle.distributed.QueueDataset._prepare_to_run"
)
def
_prepare_to_run
(
self
):
def
_prepare_to_run
(
self
):
"""
"""
Set data_feed_desc/thread num/filelist before run,
Set data_feed_desc/thread num/filelist before run,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录