未验证 提交 c67c3916 编写于 作者: Y yaoxuefeng 提交者: GitHub

refine fleet dataset class api (#27133)

上级 c296618c
...@@ -21,6 +21,7 @@ from .parallel import get_rank ...@@ -21,6 +21,7 @@ from .parallel import get_rank
from .parallel import get_world_size from .parallel import get_world_size
from paddle.fluid.dygraph.parallel import prepare_context #DEFINE_ALIAS from paddle.fluid.dygraph.parallel import prepare_context #DEFINE_ALIAS
from paddle.fluid.dygraph.parallel import ParallelEnv #DEFINE_ALIAS from paddle.fluid.dygraph.parallel import ParallelEnv #DEFINE_ALIAS
from paddle.distributed.fleet.dataset import *
from . import collective from . import collective
from .collective import * from .collective import *
...@@ -30,11 +31,8 @@ __all__ = ["spawn"] ...@@ -30,11 +31,8 @@ __all__ = ["spawn"]
# dygraph parallel apis # dygraph parallel apis
__all__ += [ __all__ += [
"init_parallel_env", "init_parallel_env", "get_rank", "get_world_size", "prepare_context",
"get_rank", "ParallelEnv", "InMemoryDataset", "QueueDataset"
"get_world_size",
"prepare_context",
"ParallelEnv",
] ]
# collective apis # collective apis
......
...@@ -23,7 +23,6 @@ from .dataset import * ...@@ -23,7 +23,6 @@ from .dataset import *
__all__ = [ __all__ = [
"DistributedStrategy", "DistributedStrategy",
"UtilBase", "UtilBase",
"DatasetFactory",
"UserDefinedRoleMaker", "UserDefinedRoleMaker",
"PaddleCloudRoleMaker", "PaddleCloudRoleMaker",
"Fleet", "Fleet",
......
...@@ -14,54 +14,11 @@ ...@@ -14,54 +14,11 @@
"""This is definition of dataset class, which is high performance IO.""" """This is definition of dataset class, which is high performance IO."""
import paddle import paddle
import paddle.fluid as fluid
from paddle.fluid.proto import data_feed_pb2 from paddle.fluid.proto import data_feed_pb2
from google.protobuf import text_format from google.protobuf import text_format
import paddle.fluid.core as core import paddle.fluid.core as core
class DatasetFactory(object):
"""
DatasetFactory is a factory which create dataset by its name,
you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
the default is "QueueDataset".
Example:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
"""
def __init__(self):
""" Init. """
pass
def create_dataset(self, datafeed_class="QueueDataset"):
"""
Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
the default is "QueueDataset".
Args:
datafeed_class(str): datafeed class name, QueueDataset or InMemoryDataset.
Default is QueueDataset.
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset()
"""
try:
dataset = globals()[datafeed_class]()
return dataset
except:
raise ValueError("datafeed class %s does not exist" %
datafeed_class)
class DatasetBase(object): class DatasetBase(object):
""" Base dataset class. """ """ Base dataset class. """
...@@ -75,96 +32,67 @@ class DatasetBase(object): ...@@ -75,96 +32,67 @@ class DatasetBase(object):
self.thread_num = 1 self.thread_num = 1
self.filelist = [] self.filelist = []
def set_pipe_command(self, pipe_command): def init(self,
batch_size=1,
thread_num=1,
use_var=[],
pipe_command="cat",
input_type=0,
fs_name="",
fs_ugi="",
download_cmd="cat"):
""" """
Set pipe command of current dataset should be called only once in user's python scripts to initialize setings of dataset instance.
A pipe command is a UNIX pipeline command that can be used only Normally, it is called by InMemoryDataset or QueueDataset.
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_pipe_command("python my_script.py")
Args: Args:
pipe_command(str): pipe command batch_size(int): batch size. It will be effective during training. default is 1.
thread_num(int): thread num, it is the num of readers. default is 1.
use_var(list): list of variables. Variables which you will use. default is [].
pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat"
input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0.
fs_name(str): fs name. default is "".
fs_ugi(str): fs ugi. default is "".
download_cmd(str): customized download command. default is "cat"
"""
self.proto_desc.pipe_command = pipe_command
def set_rank_offset(self, rank_offset):
""" """
Set rank_offset for merge_pv. It set the message of Pv. self._set_batch_size(batch_size)
self._set_thread(thread_num)
Examples: self._set_use_var(use_var)
.. code-block:: python self._set_pipe_command(pipe_command)
self._set_input_type(input_type)
import paddle.fluid as fluid self._set_hdfs_config(fs_name, fs_ugi)
dataset = fluid.DatasetFactory().create_dataset() self._set_download_cmd(download_cmd)
dataset.set_rank_offset("rank_offset")
Args:
rank_offset(str): rank_offset's name
def _set_pipe_command(self, pipe_command):
""" """
self.proto_desc.rank_offset = rank_offset Set pipe command of current dataset
A pipe command is a UNIX pipeline command that can be used only
def set_fea_eval(self, record_candidate_size, fea_eval=True):
"""
set fea eval mode for slots shuffle to debug the importance level of
slots(features), fea_eval need to be set True for slots shuffle.
Args:
record_candidate_size(int): size of instances candidate to shuffle
one slot
fea_eval(bool): whether enable fea eval mode to enable slots shuffle.
default is True.
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.fleet.dataset.DatasetBase()
dataset.set_fea_eval(1000000, True) dataset._set_pipe_command("python my_script.py")
"""
if fea_eval:
self.dataset.set_fea_eval(fea_eval, record_candidate_size)
self.fea_eval = fea_eval
def slots_shuffle(self, slots):
"""
Slots Shuffle
Slots Shuffle is a shuffle method in slots level, which is usually used
in sparse feature with large scale of instances. To compare the metric, i.e.
auc while doing slots shuffle on one or several slots with baseline to
evaluate the importance level of slots(features).
Args: Args:
slots(list[string]): the set of slots(string) to do slots shuffle. pipe_command(str): pipe command
Examples:
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_merge_by_lineid()
#suppose there is a slot 0
dataset.slots_shuffle(['0'])
""" """
if self.fea_eval: self.proto_desc.pipe_command = pipe_command
slots_set = set(slots)
self.dataset.slots_shuffle(slots_set)
def set_batch_size(self, batch_size): def _set_batch_size(self, batch_size):
""" """
Set batch size. Will be effective during training Set batch size. Will be effective during training
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetBase()
dataset.set_batch_size(128) dataset._set_batch_size(128)
Args: Args:
batch_size(int): batch size batch_size(int): batch size
...@@ -172,32 +100,16 @@ class DatasetBase(object): ...@@ -172,32 +100,16 @@ class DatasetBase(object):
""" """
self.proto_desc.batch_size = batch_size self.proto_desc.batch_size = batch_size
def set_pv_batch_size(self, pv_batch_size): def _set_thread(self, thread_num):
"""
Set pv batch size. It will be effective during enable_pv_merge
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_pv_batch(128)
Args:
pv_batch_size(int): pv batch size
"""
self.proto_desc.pv_batch_size = pv_batch_size
def set_thread(self, thread_num):
""" """
Set thread num, it is the num of readers. Set thread num, it is the num of readers.
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetBase()
dataset.set_thread(12) dataset._set_thread(12)
Args: Args:
thread_num(int): thread num thread_num(int): thread num
...@@ -212,8 +124,8 @@ class DatasetBase(object): ...@@ -212,8 +124,8 @@ class DatasetBase(object):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetBase()
dataset.set_filelist(['a.txt', 'b.txt']) dataset.set_filelist(['a.txt', 'b.txt'])
Args: Args:
...@@ -222,19 +134,19 @@ class DatasetBase(object): ...@@ -222,19 +134,19 @@ class DatasetBase(object):
self.dataset.set_filelist(filelist) self.dataset.set_filelist(filelist)
self.filelist = filelist self.filelist = filelist
def set_input_type(self, input_type): def _set_input_type(self, input_type):
self.proto_desc.input_type = input_type self.proto_desc.input_type = input_type
def set_use_var(self, var_list): def _set_use_var(self, var_list):
""" """
Set Variables which you will use. Set Variables which you will use.
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetBase()
dataset.set_use_var([data, label]) dataset._set_use_var([data, label])
Args: Args:
var_list(list): variable list var_list(list): variable list
...@@ -253,19 +165,19 @@ class DatasetBase(object): ...@@ -253,19 +165,19 @@ class DatasetBase(object):
slot_var.type = "uint64" slot_var.type = "uint64"
else: else:
raise ValueError( raise ValueError(
"Currently, fluid.dataset only supports dtype=float32 and dtype=int64" "Currently, paddle.distributed.fleet.dataset only supports dtype=float32 and dtype=int64"
) )
def set_hdfs_config(self, fs_name, fs_ugi): def _set_hdfs_config(self, fs_name, fs_ugi):
""" """
Set hdfs config: fs name ad ugi Set hdfs config: fs name ad ugi
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetBase()
dataset.set_hdfs_config("my_fs_name", "my_fs_ugi") dataset._set_hdfs_config("my_fs_name", "my_fs_ugi")
Args: Args:
fs_name(str): fs name fs_name(str): fs name
...@@ -273,16 +185,16 @@ class DatasetBase(object): ...@@ -273,16 +185,16 @@ class DatasetBase(object):
""" """
self.dataset.set_hdfs_config(fs_name, fs_ugi) self.dataset.set_hdfs_config(fs_name, fs_ugi)
def set_download_cmd(self, download_cmd): def _set_download_cmd(self, download_cmd):
""" """
Set customized download cmd: download_cmd Set customized download cmd: download_cmd
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetBase()
dataset.set_download_cmd("./read_from_afs") dataset._set_download_cmd("./read_from_afs")
Args: Args:
download_cmd(str): customized download command download_cmd(str): customized download command
...@@ -297,22 +209,22 @@ class DatasetBase(object): ...@@ -297,22 +209,22 @@ class DatasetBase(object):
if self.thread_num > len(self.filelist): if self.thread_num > len(self.filelist):
self.thread_num = len(self.filelist) self.thread_num = len(self.filelist)
self.dataset.set_thread_num(self.thread_num) self.dataset.set_thread_num(self.thread_num)
self.dataset.set_data_feed_desc(self.desc()) self.dataset.set_data_feed_desc(self._desc())
self.dataset.create_readers() self.dataset.create_readers()
def _finish_to_run(self): def _finish_to_run(self):
self.dataset.destroy_readers() self.dataset.destroy_readers()
def desc(self): def _desc(self):
""" """
Returns a protobuf message for this DataFeedDesc Returns a protobuf message for this DataFeedDesc
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset() dataset = paddle.distributed.fleet.DatasetBase()
print(dataset.desc()) print(dataset._desc())
Returns: Returns:
A string message A string message
...@@ -330,10 +242,10 @@ class InMemoryDataset(DatasetBase): ...@@ -330,10 +242,10 @@ class InMemoryDataset(DatasetBase):
""" """
InMemoryDataset, it will load data into memory InMemoryDataset, it will load data into memory
and shuffle data before training. and shuffle data before training.
This class should be created by DatasetFactory
Example: Example:
dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset") import paddle
dataset = paddle.distributed.InMemoryDataset()
""" """
def __init__(self): def __init__(self):
...@@ -351,7 +263,229 @@ class InMemoryDataset(DatasetBase): ...@@ -351,7 +263,229 @@ class InMemoryDataset(DatasetBase):
self.merge_by_lineid = False self.merge_by_lineid = False
self.fleet_send_sleep_seconds = None self.fleet_send_sleep_seconds = None
def set_feed_type(self, data_feed_type): def _init_distributed_settings(self, **kwargs):
"""
should be called only once in user's python scripts to initialize distributed-related setings of dataset instance
Args:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs:
merge_size(int): ins size to merge, if merge_size > 0, set merge by line id,
instances of same line id will be merged after shuffle,
you should parse line id in data generator. default is -1.
parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False.
parse_content(bool): Set if Dataset need to parse content. default is False.
fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024
fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0
fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle.
default is False.
candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle.
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.InMemoryDataset()
dataset.init(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=[])
dataset._init_distributed_settings(
parse_ins_id=True,
parse_content=True,
fea_eval=True,
candidate_size=10000)
"""
merge_size = kwargs.get("merge_size", -1)
if merge_size > 0:
self._set_merge_by_lineid(merge_size)
parse_ins_id = kwargs.get("parse_ins_id", False)
self._set_parse_ins_id(parse_ins_id)
parse_content = kwargs.get("parse_content", False)
self._set_parse_content(parse_content)
fleet_send_batch_size = kwargs.get("fleet_send_batch_size", None)
if fleet_send_batch_size:
self._set_fleet_send_batch_size(fleet_send_batch_size)
fleet_send_sleep_seconds = kwargs.get("fleet_send_sleep_seconds", None)
if fleet_send_sleep_seconds:
self._set_fleet_send_sleep_seconds(fleet_send_sleep_seconds)
fea_eval = kwargs.get("fea_eval", False)
if fea_eval:
candidate_size = kwargs.get("candidate_size", 10000)
self._set_fea_eval(candidate_size, True)
def update_settings(self, **kwargs):
"""
should be called in user's python scripts to update setings of dataset instance
Args:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs,
including single node settings and advanced distributed related settings:
batch_size(int): batch size. It will be effective during training. default is 1.
thread_num(int): thread num, it is the num of readers. default is 1.
use_var(list): list of variables. Variables which you will use. default is [].
input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0.
fs_name(str): fs name. default is "".
fs_ugi(str): fs ugi. default is "".
pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat"
download_cmd(str): customized download command. default is "cat"
data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed".
queue_num(int): Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++.
merge_size(int): ins size to merge, if merge_size > 0, set merge by line id,
instances of same line id will be merged after shuffle,
you should parse line id in data generator. default is -1.
parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False.
parse_content(bool): Set if Dataset need to parse content. default is False.
fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024
fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0
fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle.
default is False.
candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle.
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.InMemoryDataset()
dataset.init(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=[])
dataset._init_distributed_settings(
parse_ins_id=True,
parse_content=True,
fea_eval=True,
candidate_size=10000)
dataset.update_settings(batch_size=2)
"""
for key in kwargs:
if key == "pipe_command":
self._set_pipe_command(kwargs[key])
elif key == "batch_size":
self._set_batch_size(kwargs[key])
elif key == "thread_num":
self._set_thread(kwargs[key])
elif key == "use_var":
self._set_use_var(kwargs[key])
elif key == "input_type":
self._set_input_type(kwargs[key])
elif key == "fs_name" and "fs_ugi" in kwargs:
self._set_hdfs_config(kwargs[key], kwargs["fs_ugi"])
elif key == "download_cmd":
self._set_download_cmd(kwargs[key])
elif key == "merge_size" and kwargs.get("merge_size", -1) > 0:
self._set_merge_by_lineid(kwargs[key])
elif key == "parse_ins_id":
self._set_parse_ins_id(kwargs[key])
elif key == "parse_content":
self._set_parse_content(kwargs[key])
elif key == "fleet_send_batch_size":
self._set_fleet_send_batch_size(kwargs[key])
elif key == "fleet_send_sleep_seconds":
self._set_fleet_send_sleep_seconds(kwargs[key])
elif key == "fea_eval" and kwargs[key] == True:
candidate_size = kwargs.get("candidate_size", 10000)
self._set_fea_eval(candidate_size, True)
def init(self, **kwargs):
"""
should be called only once in user's python scripts to initialize setings of dataset instance
Args:
kwargs: Keyword arguments. Currently, we support following keys in **kwargs:
batch_size(int): batch size. It will be effective during training. default is 1.
thread_num(int): thread num, it is the num of readers. default is 1.
use_var(list): list of variables. Variables which you will use. default is [].
input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0.
fs_name(str): fs name. default is "".
fs_ugi(str): fs ugi. default is "".
pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat"
download_cmd(str): customized download command. default is "cat"
data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed".
queue_num(int): Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++.
Examples:
.. code-block:: python
import paddle
with open("test_queue_dataset_run_a.txt", "w") as f:
data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
f.write(data)
with open("test_queue_dataset_run_b.txt", "w") as f:
data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
f.write(data)
slots = ["slot1", "slot2", "slot3", "slot4"]
slots_vars = []
for slot in slots:
var = fluid.data(
name=slot, shape=[None, 1], dtype="int64", lod_level=1)
slots_vars.append(var)
dataset = paddle.distributed.InMemoryDataset()
dataset.init(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=slots_vars)
dataset.set_filelist(
["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
dataset.load_into_memory()
exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0))
exe.run(fluid.default_startup_program())
exe.train_from_dataset(fluid.default_main_program(),
dataset)
os.remove("./test_queue_dataset_run_a.txt")
os.remove("./test_queue_dataset_run_b.txt")
"""
batch_size = kwargs.get("batch_size", 1)
thread_num = kwargs.get("thread_num", 1)
use_var = kwargs.get("use_var", [])
input_type = kwargs.get("input_type", 0)
fs_name = kwargs.get("fs_name", "")
fs_ugi = kwargs.get("fs_ugi", "")
pipe_command = kwargs.get("pipe_command", "cat")
download_cmd = kwargs.get("download_cmd", "cat")
super(InMemoryDataset, self).init(
batch_size=batch_size,
thread_num=thread_num,
use_var=use_var,
pipe_command=pipe_command,
input_type=input_type,
fs_name=fs_name,
fs_ugi=fs_ugi,
download_cmd=download_cmd)
data_feed_type = kwargs.get("data_feed_type",
"MultiSlotInMemoryDataFeed")
self._set_feed_type(data_feed_type)
if kwargs.get("queue_num", -1) > 0:
queue_num = kwargs.get("queue_num", -1)
self._set_queue_num(queue_num)
def _set_feed_type(self, data_feed_type):
""" """
Set data_feed_desc Set data_feed_desc
""" """
...@@ -373,7 +507,7 @@ class InMemoryDataset(DatasetBase): ...@@ -373,7 +507,7 @@ class InMemoryDataset(DatasetBase):
self.dataset.set_parse_logkey(self.parse_logkey) self.dataset.set_parse_logkey(self.parse_logkey)
self.dataset.set_merge_by_sid(self.merge_by_sid) self.dataset.set_merge_by_sid(self.merge_by_sid)
self.dataset.set_enable_pv_merge(self.enable_pv_merge) self.dataset.set_enable_pv_merge(self.enable_pv_merge)
self.dataset.set_data_feed_desc(self.desc()) self.dataset.set_data_feed_desc(self._desc())
self.dataset.create_channel() self.dataset.create_channel()
self.dataset.create_readers() self.dataset.create_readers()
...@@ -387,7 +521,7 @@ class InMemoryDataset(DatasetBase): ...@@ -387,7 +521,7 @@ class InMemoryDataset(DatasetBase):
self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_channel_num(self.thread_num, False)
self.dataset.dynamic_adjust_readers_num(self.thread_num) self.dataset.dynamic_adjust_readers_num(self.thread_num)
def set_queue_num(self, queue_num): def _set_queue_num(self, queue_num):
""" """
Set Dataset output queue num, training threads get data from queues Set Dataset output queue num, training threads get data from queues
...@@ -397,17 +531,17 @@ class InMemoryDataset(DatasetBase): ...@@ -397,17 +531,17 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
dataset.set_queue_num(12) dataset._set_queue_num(12)
""" """
self.is_user_set_queue_num = True self.is_user_set_queue_num = True
self.queue_num = queue_num self.queue_num = queue_num
def set_parse_ins_id(self, parse_ins_id): def _set_parse_ins_id(self, parse_ins_id):
""" """
Set id Dataset need to parse insid Set if Dataset need to parse insid
Args: Args:
parse_ins_id(bool): if parse ins_id or not parse_ins_id(bool): if parse ins_id or not
...@@ -415,14 +549,14 @@ class InMemoryDataset(DatasetBase): ...@@ -415,14 +549,14 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
dataset.set_parse_ins_id(True) dataset._set_parse_ins_id(True)
""" """
self.parse_ins_id = parse_ins_id self.parse_ins_id = parse_ins_id
def set_parse_content(self, parse_content): def _set_parse_content(self, parse_content):
""" """
Set if Dataset need to parse content Set if Dataset need to parse content
...@@ -432,120 +566,14 @@ class InMemoryDataset(DatasetBase): ...@@ -432,120 +566,14 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
dataset.set_parse_content(True) dataset._set_parse_content(True)
""" """
self.parse_content = parse_content self.parse_content = parse_content
def set_parse_logkey(self, parse_logkey): def _set_fleet_send_batch_size(self, fleet_send_batch_size=1024):
"""
Set if Dataset need to parse logkey
Args:
parse_content(bool): if parse logkey or not
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_parse_logkey(True)
"""
self.parse_logkey = parse_logkey
def set_merge_by_sid(self, merge_by_sid):
"""
Set if Dataset need to merge sid. If not, one ins means one Pv.
Args:
merge_by_sid(bool): if merge sid or not
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_merge_by_sid(True)
"""
self.merge_by_sid = merge_by_sid
def set_enable_pv_merge(self, enable_pv_merge):
"""
Set if Dataset need to merge pv.
Args:
enable_pv_merge(bool): if enable_pv_merge or not
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_enable_pv_merge(True)
"""
self.enable_pv_merge = enable_pv_merge
def preprocess_instance(self):
"""
Merge pv instance and convey it from input_channel to input_pv_channel.
It will be effective when enable_pv_merge_ is True.
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.preprocess_instance()
"""
self.dataset.preprocess_instance()
def set_current_phase(self, current_phase):
"""
Set current phase in train. It is useful for untest.
current_phase : 1 for join, 0 for update.
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.set_current_phase(1)
"""
self.dataset.set_current_phase(current_phase)
def postprocess_instance(self):
"""
Divide pv instance and convey it to input_channel.
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.preprocess_instance()
exe.train_from_dataset(dataset)
dataset.postprocess_instance()
"""
self.dataset.postprocess_instance()
def set_fleet_send_batch_size(self, fleet_send_batch_size=1024):
""" """
Set fleet send batch size, default is 1024 Set fleet send batch size, default is 1024
...@@ -555,14 +583,14 @@ class InMemoryDataset(DatasetBase): ...@@ -555,14 +583,14 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
dataset.set_fleet_send_batch_size(800) dataset._set_fleet_send_batch_size(800)
""" """
self.fleet_send_batch_size = fleet_send_batch_size self.fleet_send_batch_size = fleet_send_batch_size
def set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): def _set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0):
""" """
Set fleet send sleep time, default is 0 Set fleet send sleep time, default is 0
...@@ -572,14 +600,14 @@ class InMemoryDataset(DatasetBase): ...@@ -572,14 +600,14 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
dataset.set_fleet_send_sleep_seconds(2) dataset._set_fleet_send_sleep_seconds(2)
""" """
self.fleet_send_sleep_seconds = fleet_send_sleep_seconds self.fleet_send_sleep_seconds = fleet_send_sleep_seconds
def set_merge_by_lineid(self, merge_size=2): def _set_merge_by_lineid(self, merge_size=2):
""" """
Set merge by line id, instances of same line id will be merged after Set merge by line id, instances of same line id will be merged after
shuffle, you should parse line id in data generator. shuffle, you should parse line id in data generator.
...@@ -590,21 +618,21 @@ class InMemoryDataset(DatasetBase): ...@@ -590,21 +618,21 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
dataset.set_merge_by_lineid() dataset._set_merge_by_lineid()
""" """
self.dataset.set_merge_by_lineid(merge_size) self.dataset.set_merge_by_lineid(merge_size)
self.merge_by_lineid = True self.merge_by_lineid = True
self.parse_ins_id = True self.parse_ins_id = True
def set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num): def _set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num):
self.dataset.set_generate_unique_feasigns(generate_uni_feasigns) self.dataset.set_generate_unique_feasigns(generate_uni_feasigns)
self.gen_uni_feasigns = generate_uni_feasigns self.gen_uni_feasigns = generate_uni_feasigns
self.local_shard_num = shard_num self.local_shard_num = shard_num
def generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, def _generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num,
consume_thread_num, shard_num): consume_thread_num, shard_num):
self.dataset.generate_local_tables_unlock( self.dataset.generate_local_tables_unlock(
table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) table_id, fea_dim, read_thread_num, consume_thread_num, shard_num)
...@@ -616,8 +644,8 @@ class InMemoryDataset(DatasetBase): ...@@ -616,8 +644,8 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -635,8 +663,8 @@ class InMemoryDataset(DatasetBase): ...@@ -635,8 +663,8 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.preload_into_memory() dataset.preload_into_memory()
...@@ -656,8 +684,8 @@ class InMemoryDataset(DatasetBase): ...@@ -656,8 +684,8 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.preload_into_memory() dataset.preload_into_memory()
...@@ -673,8 +701,8 @@ class InMemoryDataset(DatasetBase): ...@@ -673,8 +701,8 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -692,9 +720,9 @@ class InMemoryDataset(DatasetBase): ...@@ -692,9 +720,9 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -736,9 +764,9 @@ class InMemoryDataset(DatasetBase): ...@@ -736,9 +764,9 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -751,30 +779,6 @@ class InMemoryDataset(DatasetBase): ...@@ -751,30 +779,6 @@ class InMemoryDataset(DatasetBase):
""" """
self.dataset.release_memory() self.dataset.release_memory()
def get_pv_data_size(self):
"""
Get memory data size of Pv, user can call this function to know the pv num
of ins in all workers after load into memory.
Note:
This function may cause bad performance, because it has barrier
Returns:
The size of memory pv data.
Examples:
.. code-block:: python
import paddle.fluid as fluid
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
print dataset.get_pv_data_size()
"""
return self.dataset.get_pv_data_size()
def get_memory_data_size(self, fleet=None): def get_memory_data_size(self, fleet=None):
""" """
Get memory data size, user can call this function to know the num Get memory data size, user can call this function to know the num
...@@ -792,9 +796,9 @@ class InMemoryDataset(DatasetBase): ...@@ -792,9 +796,9 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -829,9 +833,9 @@ class InMemoryDataset(DatasetBase): ...@@ -829,9 +833,9 @@ class InMemoryDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.InMemoryDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -849,6 +853,51 @@ class InMemoryDataset(DatasetBase): ...@@ -849,6 +853,51 @@ class InMemoryDataset(DatasetBase):
return global_data_size[0] return global_data_size[0]
return local_data_size[0] return local_data_size[0]
def _set_fea_eval(self, record_candidate_size, fea_eval=True):
"""
set fea eval mode for slots shuffle to debug the importance level of
slots(features), fea_eval need to be set True for slots shuffle.
Args:
record_candidate_size(int): size of instances candidate to shuffle
one slot
fea_eval(bool): whether enable fea eval mode to enable slots shuffle.
default is True.
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.InMemoryDataset()
dataset._set_fea_eval(1000000, True)
"""
if fea_eval:
self.dataset.set_fea_eval(fea_eval, record_candidate_size)
self.fea_eval = fea_eval
def slots_shuffle(self, slots):
"""
Slots Shuffle
Slots Shuffle is a shuffle method in slots level, which is usually used
in sparse feature with large scale of instances. To compare the metric, i.e.
auc while doing slots shuffle on one or several slots with baseline to
evaluate the importance level of slots(features).
Args:
slots(list[string]): the set of slots(string) to do slots shuffle.
Examples:
import paddle
dataset = paddle.distributed.InMemoryDataset()
dataset.set_merge_by_lineid()
#suppose there is a slot 0
dataset.slots_shuffle(['0'])
"""
if self.fea_eval:
slots_set = set(slots)
self.dataset.slots_shuffle(slots_set)
class QueueDataset(DatasetBase): class QueueDataset(DatasetBase):
""" """
...@@ -857,19 +906,24 @@ class QueueDataset(DatasetBase): ...@@ -857,19 +906,24 @@ class QueueDataset(DatasetBase):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("QueueDataset") dataset = paddle.distributed.QueueDataset()
""" """
def __init__(self): def __init__(self):
""" """
Initialize QueueDataset Initialize QueueDataset
This class should be created by DatasetFactory
""" """
super(QueueDataset, self).__init__() super(QueueDataset, self).__init__()
self.proto_desc.name = "MultiSlotDataFeed" self.proto_desc.name = "MultiSlotDataFeed"
def init(self, **kwargs):
"""
should be called only once in user's python scripts to initialize setings of dataset instance
"""
super(QueueDataset, self).init(**kwargs)
def _prepare_to_run(self): def _prepare_to_run(self):
""" """
Set data_feed_desc/thread num/filelist before run, Set data_feed_desc/thread num/filelist before run,
...@@ -881,115 +935,154 @@ class QueueDataset(DatasetBase): ...@@ -881,115 +935,154 @@ class QueueDataset(DatasetBase):
self.thread_num = 1 self.thread_num = 1
self.dataset.set_thread_num(self.thread_num) self.dataset.set_thread_num(self.thread_num)
self.dataset.set_filelist(self.filelist) self.dataset.set_filelist(self.filelist)
self.dataset.set_data_feed_desc(self.desc()) self.dataset.set_data_feed_desc(self._desc())
self.dataset.create_readers() self.dataset.create_readers()
def local_shuffle(self):
"""
Local shuffle data.
Local shuffle is not supported in QueueDataset class FileInstantDataset(DatasetBase):
NotImplementedError will be raised """
FileInstantDataset, it will process data streamly.
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("QueueDataset") dataset = paddle.distributed.fleet.FileInstantDataset()
dataset.local_shuffle() """
Raises:
NotImplementedError: QueueDataset does not support local shuffle
def __init__(self):
""" """
raise NotImplementedError( Initialize FileInstantDataset
"QueueDataset does not support local shuffle, " """
"please use InMemoryDataset for local_shuffle") super(FileInstantDataset, self).__init__()
self.proto_desc.name = "MultiSlotFileInstantDataFeed"
def global_shuffle(self, fleet=None): def init(self, **kwargs):
""" """
Global shuffle data. should be called only once in user's python scripts to initialize setings of dataset instance
"""
super(FileInstantDataset, self).init(**kwargs)
Global shuffle is not supported in QueueDataset
NotImplementedError will be raised
Args: class BoxPSDataset(InMemoryDataset):
fleet(Fleet): fleet singleton. Default None. """
BoxPSDataset: derived from InMemoryDataset.
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet dataset = paddle.distributed.fleet.BoxPSDataset()
dataset = fluid.DatasetFactory().create_dataset("QueueDataset") """
dataset.global_shuffle(fleet)
Raises: def __init__(self):
NotImplementedError: QueueDataset does not support global shuffle """
Initialize BoxPSDataset
"""
super(BoxPSDataset, self).__init__()
self.boxps = core.BoxPS(self.dataset)
self.proto_desc.name = "PaddleBoxDataFeed"
def init(self, **kwargs):
"""
should be called only once in user's python scripts to initialize setings of dataset instance
""" """
raise NotImplementedError( super(BoxPSDataset, self).init(**kwargs)
"QueueDataset does not support global shuffle, "
"please use InMemoryDataset for global_shuffle")
rank_offset = kwargs.get("rank_offset", "")
self._set_rank_offset(rank_offset)
pv_batch_size = kwargs.get("pv_batch_size", 1)
self._set_pv_batch_size(pv_batch_size)
parse_logkey = kwargs.get("parse_logkey", False)
self._set_parse_logkey(parse_logkey)
merge_by_sid = kwargs.get("merge_by_sid", False)
self._set_merge_by_sid(merge_by_sid)
enable_pv_merge = kwargs.get("enable_pv_merge", False)
self._set_enable_pv_merge(enable_pv_merge)
class FileInstantDataset(DatasetBase): def _set_rank_offset(self, rank_offset):
""" """
FileInstantDataset, it will process data streamly. Set rank_offset for merge_pv. It set the message of Pv.
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
""" dataset._set_rank_offset("rank_offset")
def __init__(self): Args:
""" rank_offset(str): rank_offset's name
Initialize FileInstantDataset
This class should be created by DatasetFactory
"""
super(FileInstantDataset, self).__init__()
self.proto_desc.name = "MultiSlotFileInstantDataFeed"
def local_shuffle(self):
""" """
Local shuffle self.proto_desc.rank_offset = rank_offset
FileInstantDataset does not support local shuffle
def _set_pv_batch_size(self, pv_batch_size):
""" """
raise NotImplementedError( Set pv batch size. It will be effective during enable_pv_merge
"FileInstantDataset does not support local shuffle, "
"please use InMemoryDataset for local_shuffle") Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.fleet.BoxPSDataset()
dataset._set_pv_batch_size(128)
Args:
pv_batch_size(int): pv batch size
def global_shuffle(self, fleet=None):
""" """
Global shuffle self.proto_desc.pv_batch_size = pv_batch_size
FileInstantDataset does not support global shuffle
def _set_parse_logkey(self, parse_logkey):
""" """
raise NotImplementedError( Set if Dataset need to parse logkey
"FileInstantDataset does not support global shuffle, "
"please use InMemoryDataset for global_shuffle")
Args:
parse_content(bool): if parse logkey or not
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.fleet.BoxPSDataset()
dataset._set_parse_logkey(True)
class BoxPSDataset(InMemoryDataset):
""" """
BoxPSDataset: derived from InMemoryDataset. self.parse_logkey = parse_logkey
def _set_merge_by_sid(self, merge_by_sid):
"""
Set if Dataset need to merge sid. If not, one ins means one Pv.
Args:
merge_by_sid(bool): if merge sid or not
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
dataset._set_merge_by_sid(True)
""" """
self.merge_by_sid = merge_by_sid
def __init__(self): def _set_enable_pv_merge(self, enable_pv_merge):
""" """
Initialize BoxPSDataset Set if Dataset need to merge pv.
This class should be created by DatasetFactory
Args:
enable_pv_merge(bool): if enable_pv_merge or not
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.fleet.BoxPSDataset()
dataset._set_enable_pv_merge(True)
""" """
super(BoxPSDataset, self).__init__() self.enable_pv_merge = enable_pv_merge
self.boxps = core.BoxPS(self.dataset)
self.proto_desc.name = "PaddleBoxDataFeed"
def set_date(self, date): def set_date(self, date):
""" """
...@@ -1008,8 +1101,8 @@ class BoxPSDataset(InMemoryDataset): ...@@ -1008,8 +1101,8 @@ class BoxPSDataset(InMemoryDataset):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
dataset.begin_pass() dataset.begin_pass()
""" """
self.boxps.begin_pass() self.boxps.begin_pass()
...@@ -1021,8 +1114,8 @@ class BoxPSDataset(InMemoryDataset): ...@@ -1021,8 +1114,8 @@ class BoxPSDataset(InMemoryDataset):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
dataset.end_pass(True) dataset.end_pass(True)
""" """
self.boxps.end_pass(need_save_delta) self.boxps.end_pass(need_save_delta)
...@@ -1034,8 +1127,8 @@ class BoxPSDataset(InMemoryDataset): ...@@ -1034,8 +1127,8 @@ class BoxPSDataset(InMemoryDataset):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.preload_into_memory() dataset.preload_into_memory()
...@@ -1049,8 +1142,8 @@ class BoxPSDataset(InMemoryDataset): ...@@ -1049,8 +1142,8 @@ class BoxPSDataset(InMemoryDataset):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.load_into_memory() dataset.load_into_memory()
...@@ -1064,8 +1157,8 @@ class BoxPSDataset(InMemoryDataset): ...@@ -1064,8 +1157,8 @@ class BoxPSDataset(InMemoryDataset):
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
filelist = ["a.txt", "b.txt"] filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.preload_into_memory() dataset.preload_into_memory()
...@@ -1093,11 +1186,90 @@ class BoxPSDataset(InMemoryDataset): ...@@ -1093,11 +1186,90 @@ class BoxPSDataset(InMemoryDataset):
slots(list[string]): the set of slots(string) to do slots shuffle. slots(list[string]): the set of slots(string) to do slots shuffle.
Examples: Examples:
import paddle.fluid as fluid import paddle
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = paddle.distributed.fleet.BoxPSDataset()
dataset.set_merge_by_lineid() dataset.set_merge_by_lineid()
#suppose there is a slot 0 #suppose there is a slot 0
dataset.slots_shuffle(['0']) dataset.slots_shuffle(['0'])
""" """
slots_set = set(slots) slots_set = set(slots)
self.boxps.slots_shuffle(slots_set) self.boxps.slots_shuffle(slots_set)
def set_current_phase(self, current_phase):
"""
Set current phase in train. It is useful for untest.
current_phase : 1 for join, 0 for update.
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.fleet.BoxPSDataset()
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.set_current_phase(1)
"""
self.dataset.set_current_phase(current_phase)
def get_pv_data_size(self):
"""
Get memory data size of Pv, user can call this function to know the pv num
of ins in all workers after load into memory.
Note:
This function may cause bad performance, because it has barrier
Returns:
The size of memory pv data.
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.fleet.BoxPSDataset()
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
print dataset.get_pv_data_size()
"""
return self.dataset.get_pv_data_size()
def preprocess_instance(self):
"""
Merge pv instance and convey it from input_channel to input_pv_channel.
It will be effective when enable_pv_merge_ is True.
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.fleet.BoxPSDataset()
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.preprocess_instance()
"""
self.dataset.preprocess_instance()
def postprocess_instance(self):
"""
Divide pv instance and convey it to input_channel.
Examples:
.. code-block:: python
import paddle
dataset = paddle.distributed.fleet.BoxPSDataset()
filelist = ["a.txt", "b.txt"]
dataset.set_filelist(filelist)
dataset.load_into_memory()
dataset.preprocess_instance()
exe.train_from_dataset(dataset)
dataset.postprocess_instance()
"""
self.dataset.postprocess_instance()
...@@ -1726,13 +1726,13 @@ class DatasetLoader(DataLoaderBase): ...@@ -1726,13 +1726,13 @@ class DatasetLoader(DataLoaderBase):
logging.warn('thread_num {} which is set in Dataset is ignored'. logging.warn('thread_num {} which is set in Dataset is ignored'.
format(dataset.thread_num)) format(dataset.thread_num))
dataset.set_thread(thread_num) dataset._set_thread(thread_num)
if isinstance(dataset, paddle.distributed.fleet.dataset. if isinstance(dataset, paddle.distributed.fleet.dataset.
InMemoryDataset) and dataset.queue_num > thread_num: InMemoryDataset) and dataset.queue_num > thread_num:
logging.warn("queue_num {} which is set in Dataset is ignored". logging.warn("queue_num {} which is set in Dataset is ignored".
format(dataset.queue_num)) format(dataset.queue_num))
dataset.set_queue_num(thread_num) dataset._set_queue_num(thread_num)
self._dataset = dataset self._dataset = dataset
use_slots = [ use_slots = [
......
...@@ -208,14 +208,16 @@ class TestDistCTR2x2(FleetDistRunnerBase): ...@@ -208,14 +208,16 @@ class TestDistCTR2x2(FleetDistRunnerBase):
filelist = train_file_list filelist = train_file_list
# config dataset # config dataset
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() dataset = paddle.distributed.QueueDataset()
dataset.set_batch_size(batch_size)
dataset.set_use_var(self.feeds)
pipe_command = 'python ctr_dataset_reader.py' pipe_command = 'python ctr_dataset_reader.py'
dataset.set_pipe_command(pipe_command)
dataset.init(
batch_size=batch_size,
use_var=self.feeds,
pipe_command=pipe_command,
thread_num=thread_num)
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.set_thread(thread_num)
for epoch_id in range(1): for epoch_id in range(1):
pass_start = time.time() pass_start = time.time()
......
...@@ -114,14 +114,14 @@ class TestDistGpuPsCTR2x2(TestDistCTR2x2): ...@@ -114,14 +114,14 @@ class TestDistGpuPsCTR2x2(TestDistCTR2x2):
filelist.append(train_file_path) filelist.append(train_file_path)
# config dataset # config dataset
dataset = paddle.fleet.DatasetFactory().create_dataset() dataset = paddle.distributed.QueueDataset()
dataset.set_batch_size(batch_size) dataset._set_batch_size(batch_size)
dataset.set_use_var(self.feeds) dataset._set_use_var(self.feeds)
pipe_command = 'python ctr_dataset_reader.py' pipe_command = 'python ctr_dataset_reader.py'
dataset.set_pipe_command(pipe_command) dataset._set_pipe_command(pipe_command)
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.set_thread(thread_num) dataset._set_thread(thread_num)
for epoch_id in range(1): for epoch_id in range(1):
pass_start = time.time() pass_start = time.time()
......
...@@ -183,14 +183,14 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): ...@@ -183,14 +183,14 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase):
print("filelist: {}".format(filelist)) print("filelist: {}".format(filelist))
# config dataset # config dataset
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() dataset = paddle.distributed.QueueDataset()
dataset.set_batch_size(batch_size) dataset._set_batch_size(batch_size)
dataset.set_use_var(self.feeds) dataset._set_use_var(self.feeds)
pipe_command = 'python ctr_dataset_reader.py' pipe_command = 'python ctr_dataset_reader.py'
dataset.set_pipe_command(pipe_command) dataset._set_pipe_command(pipe_command)
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.set_thread(thread_num) dataset._set_thread(thread_num)
for epoch_id in range(1): for epoch_id in range(1):
pass_start = time.time() pass_start = time.time()
......
...@@ -38,26 +38,22 @@ class TestDataset(unittest.TestCase): ...@@ -38,26 +38,22 @@ class TestDataset(unittest.TestCase):
def test_dataset_create(self): def test_dataset_create(self):
""" Testcase for dataset create. """ """ Testcase for dataset create. """
try: try:
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset")
except: except:
self.assertTrue(False) self.assertTrue(False)
try: try:
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.QueueDataset()
"QueueDataset")
except: except:
self.assertTrue(False) self.assertTrue(False)
try: try:
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.fleet.dataset.FileInstantDataset()
"FileInstantDataset")
except: except:
self.assertTrue(False) self.assertTrue(False)
try: try:
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.fleet.dataset.MyOwnDataset()
"MyOwnDataset")
self.assertTrue(False) self.assertTrue(False)
except: except:
self.assertTrue(True) self.assertTrue(True)
...@@ -95,18 +91,18 @@ class TestDataset(unittest.TestCase): ...@@ -95,18 +91,18 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[1], dtype="int64", lod_level=1) name=slot, shape=[1], dtype="int64", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars)
dataset.set_thread(3) dataset.update_settings(pipe_command="cat1")
dataset._init_distributed_settings(
parse_ins_id=True,
parse_content=True,
fea_eval=True,
candidate_size=10000)
dataset.set_filelist( dataset.set_filelist(
["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"]) ["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"])
dataset.set_parse_ins_id(True)
dataset.set_parse_content(True)
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
dataset.set_fea_eval(10000, True)
dataset.local_shuffle() dataset.local_shuffle()
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
...@@ -176,14 +172,14 @@ class TestDataset(unittest.TestCase): ...@@ -176,14 +172,14 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[1], dtype="int64", lod_level=1) name=slot, shape=[1], dtype="int64", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32,
dataset.set_thread(3) thread_num=3,
pipe_command="cat",
download_cmd="cat",
use_var=slots_vars)
dataset.set_filelist([filename1, filename2]) dataset.set_filelist([filename1, filename2])
dataset.set_pipe_command("cat")
dataset.set_download_cmd("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
...@@ -228,22 +224,19 @@ class TestDataset(unittest.TestCase): ...@@ -228,22 +224,19 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[1], dtype="int64", lod_level=1) name=slot, shape=[1], dtype="int64", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars)
dataset.set_thread(3) dataset._init_distributed_settings(fea_eval=True, candidate_size=1)
dataset.set_filelist([ dataset.set_filelist([
"test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_a.txt",
"test_in_memory_dataset_run_b.txt" "test_in_memory_dataset_run_b.txt"
]) ])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
dataset.set_fea_eval(1, True)
dataset.slots_shuffle(["slot1"]) dataset.slots_shuffle(["slot1"])
dataset.local_shuffle() dataset.local_shuffle()
dataset.set_generate_unique_feasigns(True, 15) dataset._set_generate_unique_feasigns(True, 15)
dataset.generate_local_tables_unlock(0, 11, 1, 25, 15) dataset._generate_local_tables_unlock(0, 11, 1, 25, 15)
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
if self.use_data_loader: if self.use_data_loader:
...@@ -300,17 +293,14 @@ class TestDataset(unittest.TestCase): ...@@ -300,17 +293,14 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[1], dtype="float32", lod_level=1) name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_thread(1) dataset._init_distributed_settings(parse_ins_id=True)
dataset.set_parse_ins_id(True)
dataset.set_filelist([ dataset.set_filelist([
"test_in_memory_dataset_masterpatch_a.txt", "test_in_memory_dataset_masterpatch_a.txt",
"test_in_memory_dataset_masterpatch_b.txt" "test_in_memory_dataset_masterpatch_b.txt"
]) ])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
dataset.local_shuffle() dataset.local_shuffle()
...@@ -325,7 +315,8 @@ class TestDataset(unittest.TestCase): ...@@ -325,7 +315,8 @@ class TestDataset(unittest.TestCase):
except Exception as e: except Exception as e:
self.assertTrue(False) self.assertTrue(False)
dataset.set_merge_by_lineid(2) #dataset._set_merge_by_lineid(2)
dataset.update_settings(merge_size=2)
dataset.dataset.merge_by_lineid() dataset.dataset.merge_by_lineid()
os.remove("./test_in_memory_dataset_masterpatch_a.txt") os.remove("./test_in_memory_dataset_masterpatch_a.txt")
...@@ -367,17 +358,14 @@ class TestDataset(unittest.TestCase): ...@@ -367,17 +358,14 @@ class TestDataset(unittest.TestCase):
name="slot4", shape=[1], dtype="float32", lod_level=0) name="slot4", shape=[1], dtype="float32", lod_level=0)
slots_vars = [var1, var2, var3, var4] slots_vars = [var1, var2, var3, var4]
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars)
dataset.set_thread(1) dataset._init_distributed_settings(parse_ins_id=True)
dataset.set_parse_ins_id(True)
dataset.set_filelist([ dataset.set_filelist([
"test_in_memory_dataset_masterpatch1_a.txt", "test_in_memory_dataset_masterpatch1_a.txt",
"test_in_memory_dataset_masterpatch1_b.txt" "test_in_memory_dataset_masterpatch1_b.txt"
]) ])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
dataset.local_shuffle() dataset.local_shuffle()
...@@ -392,7 +380,7 @@ class TestDataset(unittest.TestCase): ...@@ -392,7 +380,7 @@ class TestDataset(unittest.TestCase):
except Exception as e: except Exception as e:
self.assertTrue(False) self.assertTrue(False)
dataset.set_merge_by_lineid(2) dataset._set_merge_by_lineid(2)
dataset.dataset.merge_by_lineid() dataset.dataset.merge_by_lineid()
os.remove("./test_in_memory_dataset_masterpatch1_a.txt") os.remove("./test_in_memory_dataset_masterpatch1_a.txt")
...@@ -423,16 +411,13 @@ class TestDataset(unittest.TestCase): ...@@ -423,16 +411,13 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[1], dtype="float32", lod_level=1) name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars)
dataset.set_thread(3)
dataset.set_filelist([ dataset.set_filelist([
"test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_a.txt",
"test_in_memory_dataset_run_b.txt" "test_in_memory_dataset_run_b.txt"
]) ])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
dataset.local_shuffle() dataset.local_shuffle()
...@@ -473,9 +458,9 @@ class TestDataset(unittest.TestCase): ...@@ -473,9 +458,9 @@ class TestDataset(unittest.TestCase):
except Exception as e: except Exception as e:
self.assertTrue(False) self.assertTrue(False)
dataset.set_merge_by_lineid(2) dataset._set_merge_by_lineid(2)
dataset.set_parse_ins_id(False) dataset._set_parse_ins_id(False)
dataset.set_fleet_send_sleep_seconds(2) dataset._set_fleet_send_sleep_seconds(2)
dataset.preload_into_memory() dataset.preload_into_memory()
dataset.wait_preload_done() dataset.wait_preload_done()
dataset.release_memory() dataset.release_memory()
...@@ -483,10 +468,25 @@ class TestDataset(unittest.TestCase): ...@@ -483,10 +468,25 @@ class TestDataset(unittest.TestCase):
dataset.wait_preload_done() dataset.wait_preload_done()
dataset.dataset.merge_by_lineid() dataset.dataset.merge_by_lineid()
dataset.release_memory() dataset.release_memory()
dataset.set_merge_by_lineid(30) dataset._set_merge_by_lineid(30)
dataset.set_parse_ins_id(False) dataset._set_parse_ins_id(False)
dataset.load_into_memory() dataset.load_into_memory()
dataset.dataset.merge_by_lineid() dataset.dataset.merge_by_lineid()
dataset.update_settings(
batch_size=1,
thread_num=2,
input_type=1,
pipe_command="cat",
use_var=[],
fs_name="",
fs_ugi="",
download_cmd="cat",
merge_size=-1,
parse_ins_id=False,
parse_content=False,
fleet_send_batch_size=2,
fleet_send_sleep_seconds=2,
fea_eval=True)
fleet_ptr = fluid.core.Fleet() fleet_ptr = fluid.core.Fleet()
fleet_ptr.set_client2client_config(1, 1, 1) fleet_ptr.set_client2client_config(1, 1, 1)
fleet_ptr.get_cache_threshold(0) fleet_ptr.get_cache_threshold(0)
...@@ -517,14 +517,11 @@ class TestDataset(unittest.TestCase): ...@@ -517,14 +517,11 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[1], dtype="int64", lod_level=1) name=slot, shape=[1], dtype="int64", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.QueueDataset()
"QueueDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars)
dataset.set_thread(3)
dataset.set_filelist( dataset.set_filelist(
["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
...@@ -543,12 +540,9 @@ class TestDataset(unittest.TestCase): ...@@ -543,12 +540,9 @@ class TestDataset(unittest.TestCase):
except Exception as e: except Exception as e:
self.assertTrue(False) self.assertTrue(False)
dataset2 = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset2 = paddle.distributed.QueueDataset()
"QueueDataset") dataset2.init(
dataset2.set_use_var(slots_vars) batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars)
dataset2.set_batch_size(32)
dataset2.set_thread(3)
dataset2.set_pipe_command("cat")
dataset.set_filelist([]) dataset.set_filelist([])
try: try:
exe.train_from_dataset(fluid.default_main_program(), dataset2) exe.train_from_dataset(fluid.default_main_program(), dataset2)
...@@ -585,14 +579,11 @@ class TestDataset(unittest.TestCase): ...@@ -585,14 +579,11 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[1], dtype="float32", lod_level=1) name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.QueueDataset()
"QueueDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars)
dataset.set_thread(3)
dataset.set_filelist( dataset.set_filelist(
["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)) ) else fluid.CUDAPlace(0))
...@@ -641,15 +632,15 @@ class TestDataset(unittest.TestCase): ...@@ -641,15 +632,15 @@ class TestDataset(unittest.TestCase):
name=slot, shape=[None, 1], dtype="int64", lod_level=1) name=slot, shape=[None, 1], dtype="int64", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_input_type(1) batch_size=1,
dataset.set_batch_size(1) thread_num=2,
dataset.set_thread(2) input_type=1,
pipe_command="cat",
use_var=slots_vars)
dataset.set_filelist( dataset.set_filelist(
["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda(
...@@ -721,13 +712,10 @@ class TestDatasetWithFetchHandler(unittest.TestCase): ...@@ -721,13 +712,10 @@ class TestDatasetWithFetchHandler(unittest.TestCase):
inputs(list): inputs of get_dataset inputs(list): inputs of get_dataset
files(list): files of get_dataset files(list): files of get_dataset
""" """
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.QueueDataset()
"QueueDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32, thread_num=3, pipe_command="cat", use_var=inputs)
dataset.set_thread(3)
dataset.set_filelist(files) dataset.set_filelist(files)
dataset.set_pipe_command("cat")
dataset.set_use_var(inputs)
return dataset return dataset
def setUp(self): def setUp(self):
...@@ -879,16 +867,17 @@ class TestDataset2(unittest.TestCase): ...@@ -879,16 +867,17 @@ class TestDataset2(unittest.TestCase):
except ImportError as e: except ImportError as e:
print("warning: no mpi4py") print("warning: no mpi4py")
exe.run(startup_program) exe.run(startup_program)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset")
dataset.set_batch_size(32) dataset.init(
dataset.set_thread(3) batch_size=32,
thread_num=3,
pipe_command="cat",
use_var=slots_vars)
dataset.set_filelist([ dataset.set_filelist([
"test_in_memory_dataset2_run_a.txt", "test_in_memory_dataset2_run_a.txt",
"test_in_memory_dataset2_run_b.txt" "test_in_memory_dataset2_run_b.txt"
]) ])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
fleet._opt_info = None fleet._opt_info = None
fleet._fleet_ptr = None fleet._fleet_ptr = None
...@@ -949,16 +938,16 @@ class TestDataset2(unittest.TestCase): ...@@ -949,16 +938,16 @@ class TestDataset2(unittest.TestCase):
except ImportError as e: except ImportError as e:
print("warning: no mpi4py") print("warning: no mpi4py")
exe.run(startup_program) exe.run(startup_program)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(
dataset.set_batch_size(32) batch_size=32,
dataset.set_thread(3) thread_num=3,
pipe_command="cat",
use_var=slots_vars)
dataset.set_filelist([ dataset.set_filelist([
"test_in_memory_dataset2_run2_a.txt", "test_in_memory_dataset2_run2_a.txt",
"test_in_memory_dataset2_run2_b.txt" "test_in_memory_dataset2_run2_b.txt"
]) ])
dataset.set_pipe_command("cat")
dataset.set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
try: try:
dataset.global_shuffle(fleet) dataset.global_shuffle(fleet)
...@@ -966,14 +955,11 @@ class TestDataset2(unittest.TestCase): ...@@ -966,14 +955,11 @@ class TestDataset2(unittest.TestCase):
print("warning: catch expected error") print("warning: catch expected error")
fleet._opt_info = None fleet._opt_info = None
fleet._fleet_ptr = None fleet._fleet_ptr = None
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset.init(fs_name="", fs_ugi="")
dataset.set_rank_offset("")
dataset.set_pv_batch_size(1)
dataset.set_hdfs_config("", "")
d = paddle.distributed.fleet.DatasetBase() d = paddle.distributed.fleet.DatasetBase()
try: try:
dataset.set_feed_type("MultiSlotInMemoryDataFeed") dataset._set_feed_type("MultiSlotInMemoryDataFeed")
except: except:
print("warning: catch expected error") print("warning: catch expected error")
dataset.thread_num = 0 dataset.thread_num = 0
...@@ -981,9 +967,6 @@ class TestDataset2(unittest.TestCase): ...@@ -981,9 +967,6 @@ class TestDataset2(unittest.TestCase):
dataset._prepare_to_run() dataset._prepare_to_run()
except: except:
print("warning: catch expected error") print("warning: catch expected error")
dataset.set_parse_logkey(True)
dataset.set_merge_by_sid(True)
dataset.set_enable_pv_merge(True)
try: try:
dataset.preprocess_instance() dataset.preprocess_instance()
except: except:
...@@ -996,16 +979,15 @@ class TestDataset2(unittest.TestCase): ...@@ -996,16 +979,15 @@ class TestDataset2(unittest.TestCase):
dataset.postprocess_instance() dataset.postprocess_instance()
except: except:
print("warning: catch expected error") print("warning: catch expected error")
dataset.set_fleet_send_batch_size(1024) dataset._set_fleet_send_batch_size(1024)
try: try:
dataset.global_shuffle() dataset.global_shuffle()
except: except:
print("warning: catch expected error") print("warning: catch expected error")
dataset.get_pv_data_size() #dataset.get_pv_data_size()
dataset.get_memory_data_size() dataset.get_memory_data_size()
dataset.get_shuffle_data_size() dataset.get_shuffle_data_size()
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.QueueDataset()
"QueueDataset")
try: try:
dataset.local_shuffle() dataset.local_shuffle()
except: except:
...@@ -1027,6 +1009,120 @@ class TestDataset2(unittest.TestCase): ...@@ -1027,6 +1009,120 @@ class TestDataset2(unittest.TestCase):
os.remove("./test_in_memory_dataset2_run2_a.txt") os.remove("./test_in_memory_dataset2_run2_a.txt")
os.remove("./test_in_memory_dataset2_run2_b.txt") os.remove("./test_in_memory_dataset2_run2_b.txt")
def test_bosps_dataset_fleet2(self):
"""
Testcase for InMemoryDataset from create to run.
"""
with open("test_in_memory_dataset2_run2_a.txt", "w") as f:
data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
f.write(data)
with open("test_in_memory_dataset2_run2_b.txt", "w") as f:
data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
f.write(data)
train_program = fluid.Program()
startup_program = fluid.Program()
scope = fluid.Scope()
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
with fluid.program_guard(train_program, startup_program):
slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
slots_vars = []
for slot in slots:
var = fluid.layers.data(\
name=slot, shape=[1], dtype="float32", lod_level=1)
slots_vars.append(var)
fake_cost = \
fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
fake_cost = fluid.layers.mean(fake_cost)
with fluid.scope_guard(scope):
place = fluid.CPUPlace()
exe = fluid.Executor(place)
try:
fleet.init()
except ImportError as e:
print("warning: no mpi4py")
adam = fluid.optimizer.Adam(learning_rate=0.000005)
try:
adam = fleet.distributed_optimizer(
adam,
strategy={
"fs_uri": "fs_uri_xxx",
"fs_user": "fs_user_xxx",
"fs_passwd": "fs_passwd_xxx",
"fs_hadoop_bin": "fs_hadoop_bin_xxx"
})
adam.minimize([fake_cost], [scope])
except AttributeError as e:
print("warning: no mpi")
except ImportError as e:
print("warning: no mpi4py")
exe.run(startup_program)
dataset = paddle.distributed.fleet.BoxPSDataset()
dataset.init(
batch_size=32,
thread_num=3,
pipe_command="cat",
use_var=slots_vars)
dataset.set_filelist([
"test_in_memory_dataset2_run2_a.txt",
"test_in_memory_dataset2_run2_b.txt"
])
dataset.load_into_memory()
try:
dataset.global_shuffle(fleet)
except:
print("warning: catch expected error")
fleet._opt_info = None
fleet._fleet_ptr = None
dataset = paddle.distributed.fleet.BoxPSDataset()
dataset.init(
rank_offset="",
pv_batch_size=1,
fs_name="",
fs_ugi="",
data_feed_type="MultiSlotInMemoryDataFeed",
parse_logkey=True,
merge_by_sid=True,
enable_pv_merge=True)
d = paddle.distributed.fleet.DatasetBase()
try:
dataset._set_feed_type("MultiSlotInMemoryDataFeed")
except:
print("warning: catch expected error")
dataset.thread_num = 0
try:
dataset._prepare_to_run()
except:
print("warning: catch expected error")
dataset._set_parse_logkey(True)
dataset._set_merge_by_sid(True)
dataset._set_enable_pv_merge(True)
try:
dataset.preprocess_instance()
except:
print("warning: catch expected error")
try:
dataset.set_current_phase(1)
except:
print("warning: catch expected error")
try:
dataset.postprocess_instance()
except:
print("warning: catch expected error")
dataset._set_fleet_send_batch_size(1024)
try:
dataset.global_shuffle()
except:
print("warning: catch expected error")
#dataset.get_pv_data_size()
dataset.get_memory_data_size()
dataset.get_shuffle_data_size()
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -97,9 +97,11 @@ class DatasetLoaderTestBase(unittest.TestCase): ...@@ -97,9 +97,11 @@ class DatasetLoaderTestBase(unittest.TestCase):
def check_batch_number(self, place, randomize_batch_num=False): def check_batch_number(self, place, randomize_batch_num=False):
main_prog, startup_prog, feeds = self.build_network() main_prog, startup_prog, feeds = self.build_network()
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( if self.dataset_name == "QueueDataset":
self.dataset_name) dataset = paddle.distributed.QueueDataset()
dataset.set_batch_size(BATCH_SIZE) else:
dataset = paddle.distributed.InMemoryDataset()
dataset._set_batch_size(BATCH_SIZE)
if isinstance(place, fluid.CPUPlace): if isinstance(place, fluid.CPUPlace):
file_num = 10 file_num = 10
...@@ -128,8 +130,8 @@ class DatasetLoaderTestBase(unittest.TestCase): ...@@ -128,8 +130,8 @@ class DatasetLoaderTestBase(unittest.TestCase):
fake_reader(batch_num=BATCH_NUM + random_delta_batch_size[i])) fake_reader(batch_num=BATCH_NUM + random_delta_batch_size[i]))
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
dataset.set_use_var(feeds) dataset._set_use_var(feeds)
dataset.set_pipe_command("cat") dataset._set_pipe_command("cat")
if self.dataset_name == 'InMemoryDataset': if self.dataset_name == 'InMemoryDataset':
dataset.load_into_memory() dataset.load_into_memory()
......
...@@ -163,10 +163,9 @@ class TestCloudRoleMaker2(unittest.TestCase): ...@@ -163,10 +163,9 @@ class TestCloudRoleMaker2(unittest.TestCase):
data = "1 1 1 1\n" data = "1 1 1 1\n"
f.write(data) f.write(data)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset")
dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"]) dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"])
dataset.set_use_var([show, label]) dataset._set_use_var([show, label])
dataset.load_into_memory() dataset.load_into_memory()
dataset.get_memory_data_size(fleet) dataset.get_memory_data_size(fleet)
dataset.get_shuffle_data_size(fleet) dataset.get_shuffle_data_size(fleet)
......
...@@ -52,18 +52,17 @@ class TestDatasetWithStat(unittest.TestCase): ...@@ -52,18 +52,17 @@ class TestDatasetWithStat(unittest.TestCase):
name=slot, shape=[1], dtype="int64", lod_level=1) name=slot, shape=[1], dtype="int64", lod_level=1)
slots_vars.append(var) slots_vars.append(var)
dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( dataset = paddle.distributed.InMemoryDataset()
"InMemoryDataset") dataset._set_batch_size(32)
dataset.set_batch_size(32) dataset._set_thread(3)
dataset.set_thread(3)
dataset.set_filelist([ dataset.set_filelist([
"test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_a.txt",
"test_in_memory_dataset_run_b.txt" "test_in_memory_dataset_run_b.txt"
]) ])
dataset.set_pipe_command("cat") dataset._set_pipe_command("cat")
dataset.set_use_var(slots_vars) dataset._set_use_var(slots_vars)
dataset.load_into_memory() dataset.load_into_memory()
dataset.set_fea_eval(1, True) dataset._set_fea_eval(1, True)
dataset.slots_shuffle(["slot1"]) dataset.slots_shuffle(["slot1"])
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册