From c67c39168282bd3da37b4905b8d1856ca30275f2 Mon Sep 17 00:00:00 2001 From: yaoxuefeng Date: Wed, 16 Sep 2020 13:37:07 +0800 Subject: [PATCH] refine fleet dataset class api (#27133) --- python/paddle/distributed/__init__.py | 8 +- python/paddle/distributed/fleet/__init__.py | 1 - .../distributed/fleet/dataset/dataset.py | 1000 ++++++++++------- python/paddle/fluid/reader.py | 4 +- .../fluid/tests/unittests/dist_fleet_ctr.py | 12 +- .../tests/unittests/dist_fleet_ctr_ps_gpu.py | 10 +- .../tests/unittests/dist_fleet_heter_ctr.py | 10 +- .../fluid/tests/unittests/test_dataset.py | 328 ++++-- .../unittests/test_dataset_dataloader.py | 12 +- .../tests/unittests/test_fleet_rolemaker_2.py | 5 +- .../fluid/tests/unittests/test_monitor.py | 13 +- 11 files changed, 835 insertions(+), 568 deletions(-) diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index b7357eef7ad..27c82227316 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -21,6 +21,7 @@ from .parallel import get_rank from .parallel import get_world_size from paddle.fluid.dygraph.parallel import prepare_context #DEFINE_ALIAS from paddle.fluid.dygraph.parallel import ParallelEnv #DEFINE_ALIAS +from paddle.distributed.fleet.dataset import * from . import collective from .collective import * @@ -30,11 +31,8 @@ __all__ = ["spawn"] # dygraph parallel apis __all__ += [ - "init_parallel_env", - "get_rank", - "get_world_size", - "prepare_context", - "ParallelEnv", + "init_parallel_env", "get_rank", "get_world_size", "prepare_context", + "ParallelEnv", "InMemoryDataset", "QueueDataset" ] # collective apis diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 5f0cf9f93d6..2539fa57a34 100644 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -23,7 +23,6 @@ from .dataset import * __all__ = [ "DistributedStrategy", "UtilBase", - "DatasetFactory", "UserDefinedRoleMaker", "PaddleCloudRoleMaker", "Fleet", diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index f6504cacd96..5bd971181ed 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -14,54 +14,11 @@ """This is definition of dataset class, which is high performance IO.""" import paddle -import paddle.fluid as fluid from paddle.fluid.proto import data_feed_pb2 from google.protobuf import text_format import paddle.fluid.core as core -class DatasetFactory(object): - """ - DatasetFactory is a factory which create dataset by its name, - you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", - the default is "QueueDataset". - - Example: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - - """ - - def __init__(self): - """ Init. """ - pass - - def create_dataset(self, datafeed_class="QueueDataset"): - """ - Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", - the default is "QueueDataset". - - Args: - datafeed_class(str): datafeed class name, QueueDataset or InMemoryDataset. - Default is QueueDataset. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - - """ - try: - dataset = globals()[datafeed_class]() - return dataset - except: - raise ValueError("datafeed class %s does not exist" % - datafeed_class) - - class DatasetBase(object): """ Base dataset class. """ @@ -75,96 +32,67 @@ class DatasetBase(object): self.thread_num = 1 self.filelist = [] - def set_pipe_command(self, pipe_command): + def init(self, + batch_size=1, + thread_num=1, + use_var=[], + pipe_command="cat", + input_type=0, + fs_name="", + fs_ugi="", + download_cmd="cat"): """ - Set pipe command of current dataset - A pipe command is a UNIX pipeline command that can be used only - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_pipe_command("python my_script.py") + should be called only once in user's python scripts to initialize setings of dataset instance. + Normally, it is called by InMemoryDataset or QueueDataset. Args: - pipe_command(str): pipe command + batch_size(int): batch size. It will be effective during training. default is 1. + thread_num(int): thread num, it is the num of readers. default is 1. + use_var(list): list of variables. Variables which you will use. default is []. + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat" + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0. + fs_name(str): fs name. default is "". + fs_ugi(str): fs ugi. default is "". + download_cmd(str): customized download command. default is "cat" - """ - self.proto_desc.pipe_command = pipe_command - def set_rank_offset(self, rank_offset): """ - Set rank_offset for merge_pv. It set the message of Pv. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_rank_offset("rank_offset") - - Args: - rank_offset(str): rank_offset's name + self._set_batch_size(batch_size) + self._set_thread(thread_num) + self._set_use_var(use_var) + self._set_pipe_command(pipe_command) + self._set_input_type(input_type) + self._set_hdfs_config(fs_name, fs_ugi) + self._set_download_cmd(download_cmd) + def _set_pipe_command(self, pipe_command): """ - self.proto_desc.rank_offset = rank_offset + Set pipe command of current dataset + A pipe command is a UNIX pipeline command that can be used only - def set_fea_eval(self, record_candidate_size, fea_eval=True): - """ - set fea eval mode for slots shuffle to debug the importance level of - slots(features), fea_eval need to be set True for slots shuffle. - - Args: - record_candidate_size(int): size of instances candidate to shuffle - one slot - fea_eval(bool): whether enable fea eval mode to enable slots shuffle. - default is True. - Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_fea_eval(1000000, True) + import paddle + dataset = paddle.distributed.fleet.dataset.DatasetBase() + dataset._set_pipe_command("python my_script.py") - """ - if fea_eval: - self.dataset.set_fea_eval(fea_eval, record_candidate_size) - self.fea_eval = fea_eval - - def slots_shuffle(self, slots): - """ - Slots Shuffle - Slots Shuffle is a shuffle method in slots level, which is usually used - in sparse feature with large scale of instances. To compare the metric, i.e. - auc while doing slots shuffle on one or several slots with baseline to - evaluate the importance level of slots(features). - Args: - slots(list[string]): the set of slots(string) to do slots shuffle. + pipe_command(str): pipe command - Examples: - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_merge_by_lineid() - #suppose there is a slot 0 - dataset.slots_shuffle(['0']) """ - if self.fea_eval: - slots_set = set(slots) - self.dataset.slots_shuffle(slots_set) + self.proto_desc.pipe_command = pipe_command - def set_batch_size(self, batch_size): + def _set_batch_size(self, batch_size): """ Set batch size. Will be effective during training Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(128) + import paddle + dataset = paddle.distributed.fleet.DatasetBase() + dataset._set_batch_size(128) Args: batch_size(int): batch size @@ -172,32 +100,16 @@ class DatasetBase(object): """ self.proto_desc.batch_size = batch_size - def set_pv_batch_size(self, pv_batch_size): - """ - Set pv batch size. It will be effective during enable_pv_merge - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_pv_batch(128) - Args: - pv_batch_size(int): pv batch size - - """ - self.proto_desc.pv_batch_size = pv_batch_size - - def set_thread(self, thread_num): + def _set_thread(self, thread_num): """ Set thread num, it is the num of readers. Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_thread(12) + import paddle + dataset = paddle.distributed.fleet.DatasetBase() + dataset._set_thread(12) Args: thread_num(int): thread num @@ -212,8 +124,8 @@ class DatasetBase(object): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() + import paddle + dataset = paddle.distributed.fleet.DatasetBase() dataset.set_filelist(['a.txt', 'b.txt']) Args: @@ -222,19 +134,19 @@ class DatasetBase(object): self.dataset.set_filelist(filelist) self.filelist = filelist - def set_input_type(self, input_type): + def _set_input_type(self, input_type): self.proto_desc.input_type = input_type - def set_use_var(self, var_list): + def _set_use_var(self, var_list): """ Set Variables which you will use. Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_use_var([data, label]) + import paddle + dataset = paddle.distributed.fleet.DatasetBase() + dataset._set_use_var([data, label]) Args: var_list(list): variable list @@ -253,19 +165,19 @@ class DatasetBase(object): slot_var.type = "uint64" else: raise ValueError( - "Currently, fluid.dataset only supports dtype=float32 and dtype=int64" + "Currently, paddle.distributed.fleet.dataset only supports dtype=float32 and dtype=int64" ) - def set_hdfs_config(self, fs_name, fs_ugi): + def _set_hdfs_config(self, fs_name, fs_ugi): """ Set hdfs config: fs name ad ugi Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_hdfs_config("my_fs_name", "my_fs_ugi") + import paddle + dataset = paddle.distributed.fleet.DatasetBase() + dataset._set_hdfs_config("my_fs_name", "my_fs_ugi") Args: fs_name(str): fs name @@ -273,16 +185,16 @@ class DatasetBase(object): """ self.dataset.set_hdfs_config(fs_name, fs_ugi) - def set_download_cmd(self, download_cmd): + def _set_download_cmd(self, download_cmd): """ Set customized download cmd: download_cmd Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_download_cmd("./read_from_afs") + import paddle + dataset = paddle.distributed.fleet.DatasetBase() + dataset._set_download_cmd("./read_from_afs") Args: download_cmd(str): customized download command @@ -297,22 +209,22 @@ class DatasetBase(object): if self.thread_num > len(self.filelist): self.thread_num = len(self.filelist) self.dataset.set_thread_num(self.thread_num) - self.dataset.set_data_feed_desc(self.desc()) + self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() def _finish_to_run(self): self.dataset.destroy_readers() - def desc(self): + def _desc(self): """ Returns a protobuf message for this DataFeedDesc Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset() - print(dataset.desc()) + import paddle + dataset = paddle.distributed.fleet.DatasetBase() + print(dataset._desc()) Returns: A string message @@ -330,10 +242,10 @@ class InMemoryDataset(DatasetBase): """ InMemoryDataset, it will load data into memory and shuffle data before training. - This class should be created by DatasetFactory Example: - dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.InMemoryDataset() """ def __init__(self): @@ -351,7 +263,229 @@ class InMemoryDataset(DatasetBase): self.merge_by_lineid = False self.fleet_send_sleep_seconds = None - def set_feed_type(self, data_feed_type): + def _init_distributed_settings(self, **kwargs): + """ + should be called only once in user's python scripts to initialize distributed-related setings of dataset instance + Args: + kwargs: Keyword arguments. Currently, we support following keys in **kwargs: + + merge_size(int): ins size to merge, if merge_size > 0, set merge by line id, + instances of same line id will be merged after shuffle, + you should parse line id in data generator. default is -1. + parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False. + parse_content(bool): Set if Dataset need to parse content. default is False. + fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024 + fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0 + fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle. + default is False. + candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=[]) + dataset._init_distributed_settings( + parse_ins_id=True, + parse_content=True, + fea_eval=True, + candidate_size=10000) + + """ + merge_size = kwargs.get("merge_size", -1) + if merge_size > 0: + self._set_merge_by_lineid(merge_size) + + parse_ins_id = kwargs.get("parse_ins_id", False) + self._set_parse_ins_id(parse_ins_id) + + parse_content = kwargs.get("parse_content", False) + self._set_parse_content(parse_content) + + fleet_send_batch_size = kwargs.get("fleet_send_batch_size", None) + if fleet_send_batch_size: + self._set_fleet_send_batch_size(fleet_send_batch_size) + + fleet_send_sleep_seconds = kwargs.get("fleet_send_sleep_seconds", None) + if fleet_send_sleep_seconds: + self._set_fleet_send_sleep_seconds(fleet_send_sleep_seconds) + + fea_eval = kwargs.get("fea_eval", False) + if fea_eval: + candidate_size = kwargs.get("candidate_size", 10000) + self._set_fea_eval(candidate_size, True) + + def update_settings(self, **kwargs): + """ + should be called in user's python scripts to update setings of dataset instance + Args: + kwargs: Keyword arguments. Currently, we support following keys in **kwargs, + including single node settings and advanced distributed related settings: + + batch_size(int): batch size. It will be effective during training. default is 1. + thread_num(int): thread num, it is the num of readers. default is 1. + use_var(list): list of variables. Variables which you will use. default is []. + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0. + fs_name(str): fs name. default is "". + fs_ugi(str): fs ugi. default is "". + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat" + download_cmd(str): customized download command. default is "cat" + data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed". + queue_num(int): Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++. + + merge_size(int): ins size to merge, if merge_size > 0, set merge by line id, + instances of same line id will be merged after shuffle, + you should parse line id in data generator. default is -1. + parse_ins_id(bool): Set if Dataset need to parse ins_id. default is False. + parse_content(bool): Set if Dataset need to parse content. default is False. + fleet_send_batch_size(int): Set fleet send batch size in one rpc, default is 1024 + fleet_send_sleep_seconds(int): Set fleet send sleep time, default is 0 + fea_eval(bool): Set if Dataset need to do feature importance evaluation using slots shuffle. + default is False. + candidate_size(int): if fea_eval is set True, set the candidate size used in slots shuffle. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=[]) + dataset._init_distributed_settings( + parse_ins_id=True, + parse_content=True, + fea_eval=True, + candidate_size=10000) + dataset.update_settings(batch_size=2) + + """ + for key in kwargs: + if key == "pipe_command": + self._set_pipe_command(kwargs[key]) + elif key == "batch_size": + self._set_batch_size(kwargs[key]) + elif key == "thread_num": + self._set_thread(kwargs[key]) + elif key == "use_var": + self._set_use_var(kwargs[key]) + elif key == "input_type": + self._set_input_type(kwargs[key]) + elif key == "fs_name" and "fs_ugi" in kwargs: + self._set_hdfs_config(kwargs[key], kwargs["fs_ugi"]) + elif key == "download_cmd": + self._set_download_cmd(kwargs[key]) + elif key == "merge_size" and kwargs.get("merge_size", -1) > 0: + self._set_merge_by_lineid(kwargs[key]) + elif key == "parse_ins_id": + self._set_parse_ins_id(kwargs[key]) + elif key == "parse_content": + self._set_parse_content(kwargs[key]) + elif key == "fleet_send_batch_size": + self._set_fleet_send_batch_size(kwargs[key]) + elif key == "fleet_send_sleep_seconds": + self._set_fleet_send_sleep_seconds(kwargs[key]) + elif key == "fea_eval" and kwargs[key] == True: + candidate_size = kwargs.get("candidate_size", 10000) + self._set_fea_eval(candidate_size, True) + + def init(self, **kwargs): + """ + should be called only once in user's python scripts to initialize setings of dataset instance + Args: + kwargs: Keyword arguments. Currently, we support following keys in **kwargs: + + batch_size(int): batch size. It will be effective during training. default is 1. + thread_num(int): thread num, it is the num of readers. default is 1. + use_var(list): list of variables. Variables which you will use. default is []. + input_type(int): the input type of generated input. 0 is for one sample, 1 is for one batch. defalut is 0. + fs_name(str): fs name. default is "". + fs_ugi(str): fs ugi. default is "". + pipe_command(str): pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is "cat" + download_cmd(str): customized download command. default is "cat" + data_feed_type(str): data feed type used in c++ code. default is "MultiSlotInMemoryDataFeed". + queue_num(int): Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++. + + Examples: + .. code-block:: python + + import paddle + with open("test_queue_dataset_run_a.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + with open("test_queue_dataset_run_b.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = fluid.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + dataset.set_filelist( + ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) + dataset.load_into_memory() + + exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( + ) else fluid.CUDAPlace(0)) + exe.run(fluid.default_startup_program()) + exe.train_from_dataset(fluid.default_main_program(), + dataset) + os.remove("./test_queue_dataset_run_a.txt") + os.remove("./test_queue_dataset_run_b.txt") + """ + batch_size = kwargs.get("batch_size", 1) + thread_num = kwargs.get("thread_num", 1) + use_var = kwargs.get("use_var", []) + input_type = kwargs.get("input_type", 0) + fs_name = kwargs.get("fs_name", "") + fs_ugi = kwargs.get("fs_ugi", "") + pipe_command = kwargs.get("pipe_command", "cat") + download_cmd = kwargs.get("download_cmd", "cat") + + super(InMemoryDataset, self).init( + batch_size=batch_size, + thread_num=thread_num, + use_var=use_var, + pipe_command=pipe_command, + input_type=input_type, + fs_name=fs_name, + fs_ugi=fs_ugi, + download_cmd=download_cmd) + + data_feed_type = kwargs.get("data_feed_type", + "MultiSlotInMemoryDataFeed") + self._set_feed_type(data_feed_type) + + if kwargs.get("queue_num", -1) > 0: + queue_num = kwargs.get("queue_num", -1) + self._set_queue_num(queue_num) + + def _set_feed_type(self, data_feed_type): """ Set data_feed_desc """ @@ -373,7 +507,7 @@ class InMemoryDataset(DatasetBase): self.dataset.set_parse_logkey(self.parse_logkey) self.dataset.set_merge_by_sid(self.merge_by_sid) self.dataset.set_enable_pv_merge(self.enable_pv_merge) - self.dataset.set_data_feed_desc(self.desc()) + self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_channel() self.dataset.create_readers() @@ -387,7 +521,7 @@ class InMemoryDataset(DatasetBase): self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) - def set_queue_num(self, queue_num): + def _set_queue_num(self, queue_num): """ Set Dataset output queue num, training threads get data from queues @@ -397,17 +531,17 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_queue_num(12) + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset._set_queue_num(12) """ self.is_user_set_queue_num = True self.queue_num = queue_num - def set_parse_ins_id(self, parse_ins_id): + def _set_parse_ins_id(self, parse_ins_id): """ - Set id Dataset need to parse insid + Set if Dataset need to parse insid Args: parse_ins_id(bool): if parse ins_id or not @@ -415,14 +549,14 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_parse_ins_id(True) + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset._set_parse_ins_id(True) """ self.parse_ins_id = parse_ins_id - def set_parse_content(self, parse_content): + def _set_parse_content(self, parse_content): """ Set if Dataset need to parse content @@ -432,120 +566,14 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_parse_content(True) + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset._set_parse_content(True) """ self.parse_content = parse_content - def set_parse_logkey(self, parse_logkey): - """ - Set if Dataset need to parse logkey - - Args: - parse_content(bool): if parse logkey or not - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_parse_logkey(True) - - """ - self.parse_logkey = parse_logkey - - def set_merge_by_sid(self, merge_by_sid): - """ - Set if Dataset need to merge sid. If not, one ins means one Pv. - - Args: - merge_by_sid(bool): if merge sid or not - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_merge_by_sid(True) - - """ - self.merge_by_sid = merge_by_sid - - def set_enable_pv_merge(self, enable_pv_merge): - """ - Set if Dataset need to merge pv. - - Args: - enable_pv_merge(bool): if enable_pv_merge or not - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_enable_pv_merge(True) - - """ - self.enable_pv_merge = enable_pv_merge - - def preprocess_instance(self): - """ - Merge pv instance and convey it from input_channel to input_pv_channel. - It will be effective when enable_pv_merge_ is True. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.preprocess_instance() - - """ - self.dataset.preprocess_instance() - - def set_current_phase(self, current_phase): - """ - Set current phase in train. It is useful for untest. - current_phase : 1 for join, 0 for update. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.set_current_phase(1) - - """ - self.dataset.set_current_phase(current_phase) - - def postprocess_instance(self): - """ - Divide pv instance and convey it to input_channel. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.preprocess_instance() - exe.train_from_dataset(dataset) - dataset.postprocess_instance() - - """ - self.dataset.postprocess_instance() - - def set_fleet_send_batch_size(self, fleet_send_batch_size=1024): + def _set_fleet_send_batch_size(self, fleet_send_batch_size=1024): """ Set fleet send batch size, default is 1024 @@ -555,14 +583,14 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_fleet_send_batch_size(800) + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset._set_fleet_send_batch_size(800) """ self.fleet_send_batch_size = fleet_send_batch_size - def set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): + def _set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): """ Set fleet send sleep time, default is 0 @@ -572,14 +600,14 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_fleet_send_sleep_seconds(2) + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset._set_fleet_send_sleep_seconds(2) """ self.fleet_send_sleep_seconds = fleet_send_sleep_seconds - def set_merge_by_lineid(self, merge_size=2): + def _set_merge_by_lineid(self, merge_size=2): """ Set merge by line id, instances of same line id will be merged after shuffle, you should parse line id in data generator. @@ -590,22 +618,22 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - dataset.set_merge_by_lineid() + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset._set_merge_by_lineid() """ self.dataset.set_merge_by_lineid(merge_size) self.merge_by_lineid = True self.parse_ins_id = True - def set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num): + def _set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num): self.dataset.set_generate_unique_feasigns(generate_uni_feasigns) self.gen_uni_feasigns = generate_uni_feasigns self.local_shard_num = shard_num - def generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, - consume_thread_num, shard_num): + def _generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, + consume_thread_num, shard_num): self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) @@ -616,8 +644,8 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -635,8 +663,8 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -656,8 +684,8 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -673,8 +701,8 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -692,9 +720,9 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -736,9 +764,9 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -751,30 +779,6 @@ class InMemoryDataset(DatasetBase): """ self.dataset.release_memory() - def get_pv_data_size(self): - """ - Get memory data size of Pv, user can call this function to know the pv num - of ins in all workers after load into memory. - - Note: - This function may cause bad performance, because it has barrier - - Returns: - The size of memory pv data. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - print dataset.get_pv_data_size() - - """ - return self.dataset.get_pv_data_size() - def get_memory_data_size(self, fleet=None): """ Get memory data size, user can call this function to know the num @@ -792,9 +796,9 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -829,9 +833,9 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid + import paddle from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -849,6 +853,51 @@ class InMemoryDataset(DatasetBase): return global_data_size[0] return local_data_size[0] + def _set_fea_eval(self, record_candidate_size, fea_eval=True): + """ + set fea eval mode for slots shuffle to debug the importance level of + slots(features), fea_eval need to be set True for slots shuffle. + + Args: + record_candidate_size(int): size of instances candidate to shuffle + one slot + fea_eval(bool): whether enable fea eval mode to enable slots shuffle. + default is True. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset._set_fea_eval(1000000, True) + + """ + if fea_eval: + self.dataset.set_fea_eval(fea_eval, record_candidate_size) + self.fea_eval = fea_eval + + def slots_shuffle(self, slots): + """ + Slots Shuffle + Slots Shuffle is a shuffle method in slots level, which is usually used + in sparse feature with large scale of instances. To compare the metric, i.e. + auc while doing slots shuffle on one or several slots with baseline to + evaluate the importance level of slots(features). + + Args: + slots(list[string]): the set of slots(string) to do slots shuffle. + + Examples: + import paddle + dataset = paddle.distributed.InMemoryDataset() + dataset.set_merge_by_lineid() + #suppose there is a slot 0 + dataset.slots_shuffle(['0']) + """ + if self.fea_eval: + slots_set = set(slots) + self.dataset.slots_shuffle(slots_set) + class QueueDataset(DatasetBase): """ @@ -857,19 +906,24 @@ class QueueDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + import paddle + dataset = paddle.distributed.QueueDataset() """ def __init__(self): """ Initialize QueueDataset - This class should be created by DatasetFactory """ super(QueueDataset, self).__init__() self.proto_desc.name = "MultiSlotDataFeed" + def init(self, **kwargs): + """ + should be called only once in user's python scripts to initialize setings of dataset instance + """ + super(QueueDataset, self).init(**kwargs) + def _prepare_to_run(self): """ Set data_feed_desc/thread num/filelist before run, @@ -881,57 +935,9 @@ class QueueDataset(DatasetBase): self.thread_num = 1 self.dataset.set_thread_num(self.thread_num) self.dataset.set_filelist(self.filelist) - self.dataset.set_data_feed_desc(self.desc()) + self.dataset.set_data_feed_desc(self._desc()) self.dataset.create_readers() - def local_shuffle(self): - """ - Local shuffle data. - - Local shuffle is not supported in QueueDataset - NotImplementedError will be raised - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") - dataset.local_shuffle() - - Raises: - NotImplementedError: QueueDataset does not support local shuffle - - """ - raise NotImplementedError( - "QueueDataset does not support local shuffle, " - "please use InMemoryDataset for local_shuffle") - - def global_shuffle(self, fleet=None): - """ - Global shuffle data. - - Global shuffle is not supported in QueueDataset - NotImplementedError will be raised - - Args: - fleet(Fleet): fleet singleton. Default None. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") - dataset.global_shuffle(fleet) - - Raises: - NotImplementedError: QueueDataset does not support global shuffle - - """ - raise NotImplementedError( - "QueueDataset does not support global shuffle, " - "please use InMemoryDataset for global_shuffle") - class FileInstantDataset(DatasetBase): """ @@ -940,35 +946,22 @@ class FileInstantDataset(DatasetBase): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset") + import paddle + dataset = paddle.distributed.fleet.FileInstantDataset() """ def __init__(self): """ Initialize FileInstantDataset - This class should be created by DatasetFactory """ super(FileInstantDataset, self).__init__() self.proto_desc.name = "MultiSlotFileInstantDataFeed" - def local_shuffle(self): + def init(self, **kwargs): """ - Local shuffle - FileInstantDataset does not support local shuffle + should be called only once in user's python scripts to initialize setings of dataset instance """ - raise NotImplementedError( - "FileInstantDataset does not support local shuffle, " - "please use InMemoryDataset for local_shuffle") - - def global_shuffle(self, fleet=None): - """ - Global shuffle - FileInstantDataset does not support global shuffle - """ - raise NotImplementedError( - "FileInstantDataset does not support global shuffle, " - "please use InMemoryDataset for global_shuffle") + super(FileInstantDataset, self).init(**kwargs) class BoxPSDataset(InMemoryDataset): @@ -978,19 +971,119 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() """ def __init__(self): """ Initialize BoxPSDataset - This class should be created by DatasetFactory """ super(BoxPSDataset, self).__init__() self.boxps = core.BoxPS(self.dataset) self.proto_desc.name = "PaddleBoxDataFeed" + def init(self, **kwargs): + """ + should be called only once in user's python scripts to initialize setings of dataset instance + """ + super(BoxPSDataset, self).init(**kwargs) + + rank_offset = kwargs.get("rank_offset", "") + self._set_rank_offset(rank_offset) + pv_batch_size = kwargs.get("pv_batch_size", 1) + self._set_pv_batch_size(pv_batch_size) + parse_logkey = kwargs.get("parse_logkey", False) + self._set_parse_logkey(parse_logkey) + merge_by_sid = kwargs.get("merge_by_sid", False) + self._set_merge_by_sid(merge_by_sid) + enable_pv_merge = kwargs.get("enable_pv_merge", False) + self._set_enable_pv_merge(enable_pv_merge) + + def _set_rank_offset(self, rank_offset): + """ + Set rank_offset for merge_pv. It set the message of Pv. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + dataset._set_rank_offset("rank_offset") + + Args: + rank_offset(str): rank_offset's name + + """ + self.proto_desc.rank_offset = rank_offset + + def _set_pv_batch_size(self, pv_batch_size): + """ + Set pv batch size. It will be effective during enable_pv_merge + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + dataset._set_pv_batch_size(128) + Args: + pv_batch_size(int): pv batch size + + """ + self.proto_desc.pv_batch_size = pv_batch_size + + def _set_parse_logkey(self, parse_logkey): + """ + Set if Dataset need to parse logkey + + Args: + parse_content(bool): if parse logkey or not + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + dataset._set_parse_logkey(True) + + """ + self.parse_logkey = parse_logkey + + def _set_merge_by_sid(self, merge_by_sid): + """ + Set if Dataset need to merge sid. If not, one ins means one Pv. + + Args: + merge_by_sid(bool): if merge sid or not + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + dataset._set_merge_by_sid(True) + + """ + self.merge_by_sid = merge_by_sid + + def _set_enable_pv_merge(self, enable_pv_merge): + """ + Set if Dataset need to merge pv. + + Args: + enable_pv_merge(bool): if enable_pv_merge or not + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + dataset._set_enable_pv_merge(True) + + """ + self.enable_pv_merge = enable_pv_merge + def set_date(self, date): """ Workaround for date @@ -1008,8 +1101,8 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.begin_pass() """ self.boxps.begin_pass() @@ -1021,8 +1114,8 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.end_pass(True) """ self.boxps.end_pass(need_save_delta) @@ -1034,8 +1127,8 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -1049,8 +1142,8 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -1064,8 +1157,8 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -1093,11 +1186,90 @@ class BoxPSDataset(InMemoryDataset): slots(list[string]): the set of slots(string) to do slots shuffle. Examples: - import paddle.fluid as fluid - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() dataset.set_merge_by_lineid() #suppose there is a slot 0 dataset.slots_shuffle(['0']) """ slots_set = set(slots) self.boxps.slots_shuffle(slots_set) + + def set_current_phase(self, current_phase): + """ + Set current phase in train. It is useful for untest. + current_phase : 1 for join, 0 for update. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.set_current_phase(1) + + """ + self.dataset.set_current_phase(current_phase) + + def get_pv_data_size(self): + """ + Get memory data size of Pv, user can call this function to know the pv num + of ins in all workers after load into memory. + + Note: + This function may cause bad performance, because it has barrier + + Returns: + The size of memory pv data. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + print dataset.get_pv_data_size() + + """ + return self.dataset.get_pv_data_size() + + def preprocess_instance(self): + """ + Merge pv instance and convey it from input_channel to input_pv_channel. + It will be effective when enable_pv_merge_ is True. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.preprocess_instance() + + """ + self.dataset.preprocess_instance() + + def postprocess_instance(self): + """ + Divide pv instance and convey it to input_channel. + + Examples: + .. code-block:: python + + import paddle + dataset = paddle.distributed.fleet.BoxPSDataset() + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.preprocess_instance() + exe.train_from_dataset(dataset) + dataset.postprocess_instance() + + """ + self.dataset.postprocess_instance() diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index f2bb567b95b..533222531f9 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -1726,13 +1726,13 @@ class DatasetLoader(DataLoaderBase): logging.warn('thread_num {} which is set in Dataset is ignored'. format(dataset.thread_num)) - dataset.set_thread(thread_num) + dataset._set_thread(thread_num) if isinstance(dataset, paddle.distributed.fleet.dataset. InMemoryDataset) and dataset.queue_num > thread_num: logging.warn("queue_num {} which is set in Dataset is ignored". format(dataset.queue_num)) - dataset.set_queue_num(thread_num) + dataset._set_queue_num(thread_num) self._dataset = dataset use_slots = [ diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index dc39472d7ae..1b0ce0c03e7 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -208,14 +208,16 @@ class TestDistCTR2x2(FleetDistRunnerBase): filelist = train_file_list # config dataset - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() - dataset.set_batch_size(batch_size) - dataset.set_use_var(self.feeds) + dataset = paddle.distributed.QueueDataset() pipe_command = 'python ctr_dataset_reader.py' - dataset.set_pipe_command(pipe_command) + + dataset.init( + batch_size=batch_size, + use_var=self.feeds, + pipe_command=pipe_command, + thread_num=thread_num) dataset.set_filelist(filelist) - dataset.set_thread(thread_num) for epoch_id in range(1): pass_start = time.time() diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py index 03d0fa447da..0e3c8099277 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr_ps_gpu.py @@ -114,14 +114,14 @@ class TestDistGpuPsCTR2x2(TestDistCTR2x2): filelist.append(train_file_path) # config dataset - dataset = paddle.fleet.DatasetFactory().create_dataset() - dataset.set_batch_size(batch_size) - dataset.set_use_var(self.feeds) + dataset = paddle.distributed.QueueDataset() + dataset._set_batch_size(batch_size) + dataset._set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' - dataset.set_pipe_command(pipe_command) + dataset._set_pipe_command(pipe_command) dataset.set_filelist(filelist) - dataset.set_thread(thread_num) + dataset._set_thread(thread_num) for epoch_id in range(1): pass_start = time.time() diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py index 7a4e7534f07..a5633bb0450 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_heter_ctr.py @@ -183,14 +183,14 @@ class TestHeterPsCTR2x2(FleetDistHeterRunnerBase): print("filelist: {}".format(filelist)) # config dataset - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset() - dataset.set_batch_size(batch_size) - dataset.set_use_var(self.feeds) + dataset = paddle.distributed.QueueDataset() + dataset._set_batch_size(batch_size) + dataset._set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' - dataset.set_pipe_command(pipe_command) + dataset._set_pipe_command(pipe_command) dataset.set_filelist(filelist) - dataset.set_thread(thread_num) + dataset._set_thread(thread_num) for epoch_id in range(1): pass_start = time.time() diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index 582bb3dcc68..208956b825e 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -38,26 +38,22 @@ class TestDataset(unittest.TestCase): def test_dataset_create(self): """ Testcase for dataset create. """ try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() except: self.assertTrue(False) try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset = paddle.distributed.QueueDataset() except: self.assertTrue(False) try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "FileInstantDataset") + dataset = paddle.distributed.fleet.dataset.FileInstantDataset() except: self.assertTrue(False) try: - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "MyOwnDataset") + dataset = paddle.distributed.fleet.dataset.MyOwnDataset() self.assertTrue(False) except: self.assertTrue(True) @@ -95,18 +91,18 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) + dataset.update_settings(pipe_command="cat1") + dataset._init_distributed_settings( + parse_ins_id=True, + parse_content=True, + fea_eval=True, + candidate_size=10000) dataset.set_filelist( ["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"]) - dataset.set_parse_ins_id(True) - dataset.set_parse_content(True) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() - dataset.set_fea_eval(10000, True) dataset.local_shuffle() exe = fluid.Executor(fluid.CPUPlace()) @@ -176,14 +172,14 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + download_cmd="cat", + use_var=slots_vars) dataset.set_filelist([filename1, filename2]) - dataset.set_pipe_command("cat") - dataset.set_download_cmd("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) @@ -228,22 +224,19 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) + dataset._init_distributed_settings(fea_eval=True, candidate_size=1) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() - dataset.set_fea_eval(1, True) dataset.slots_shuffle(["slot1"]) dataset.local_shuffle() - dataset.set_generate_unique_feasigns(True, 15) - dataset.generate_local_tables_unlock(0, 11, 1, 25, 15) + dataset._set_generate_unique_feasigns(True, 15) + dataset._generate_local_tables_unlock(0, 11, 1, 25, 15) exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) if self.use_data_loader: @@ -300,17 +293,14 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(1) - dataset.set_parse_ins_id(True) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) + dataset._init_distributed_settings(parse_ins_id=True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch_a.txt", "test_in_memory_dataset_masterpatch_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -325,7 +315,8 @@ class TestDataset(unittest.TestCase): except Exception as e: self.assertTrue(False) - dataset.set_merge_by_lineid(2) + #dataset._set_merge_by_lineid(2) + dataset.update_settings(merge_size=2) dataset.dataset.merge_by_lineid() os.remove("./test_in_memory_dataset_masterpatch_a.txt") @@ -367,17 +358,14 @@ class TestDataset(unittest.TestCase): name="slot4", shape=[1], dtype="float32", lod_level=0) slots_vars = [var1, var2, var3, var4] - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(1) - dataset.set_parse_ins_id(True) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars) + dataset._init_distributed_settings(parse_ins_id=True) dataset.set_filelist([ "test_in_memory_dataset_masterpatch1_a.txt", "test_in_memory_dataset_masterpatch1_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -392,7 +380,7 @@ class TestDataset(unittest.TestCase): except Exception as e: self.assertTrue(False) - dataset.set_merge_by_lineid(2) + dataset._set_merge_by_lineid(2) dataset.dataset.merge_by_lineid() os.remove("./test_in_memory_dataset_masterpatch1_a.txt") @@ -423,16 +411,13 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() dataset.local_shuffle() @@ -473,9 +458,9 @@ class TestDataset(unittest.TestCase): except Exception as e: self.assertTrue(False) - dataset.set_merge_by_lineid(2) - dataset.set_parse_ins_id(False) - dataset.set_fleet_send_sleep_seconds(2) + dataset._set_merge_by_lineid(2) + dataset._set_parse_ins_id(False) + dataset._set_fleet_send_sleep_seconds(2) dataset.preload_into_memory() dataset.wait_preload_done() dataset.release_memory() @@ -483,10 +468,25 @@ class TestDataset(unittest.TestCase): dataset.wait_preload_done() dataset.dataset.merge_by_lineid() dataset.release_memory() - dataset.set_merge_by_lineid(30) - dataset.set_parse_ins_id(False) + dataset._set_merge_by_lineid(30) + dataset._set_parse_ins_id(False) dataset.load_into_memory() dataset.dataset.merge_by_lineid() + dataset.update_settings( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=[], + fs_name="", + fs_ugi="", + download_cmd="cat", + merge_size=-1, + parse_ins_id=False, + parse_content=False, + fleet_send_batch_size=2, + fleet_send_sleep_seconds=2, + fea_eval=True) fleet_ptr = fluid.core.Fleet() fleet_ptr.set_client2client_config(1, 1, 1) fleet_ptr.get_cache_threshold(0) @@ -517,14 +517,11 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.QueueDataset() + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) @@ -543,12 +540,9 @@ class TestDataset(unittest.TestCase): except Exception as e: self.assertTrue(False) - dataset2 = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") - dataset2.set_use_var(slots_vars) - dataset2.set_batch_size(32) - dataset2.set_thread(3) - dataset2.set_pipe_command("cat") + dataset2 = paddle.distributed.QueueDataset() + dataset2.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist([]) try: exe.train_from_dataset(fluid.default_main_program(), dataset2) @@ -585,14 +579,11 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.QueueDataset() + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( ) else fluid.CUDAPlace(0)) @@ -641,15 +632,15 @@ class TestDataset(unittest.TestCase): name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_input_type(1) - dataset.set_batch_size(1) - dataset.set_thread(2) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() exe = fluid.Executor(fluid.CPUPlace() if not core.is_compiled_with_cuda( @@ -721,13 +712,10 @@ class TestDatasetWithFetchHandler(unittest.TestCase): inputs(list): inputs of get_dataset files(list): files of get_dataset """ - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.QueueDataset() + dataset.init( + batch_size=32, thread_num=3, pipe_command="cat", use_var=inputs) dataset.set_filelist(files) - dataset.set_pipe_command("cat") - dataset.set_use_var(inputs) return dataset def setUp(self): @@ -879,16 +867,17 @@ class TestDataset2(unittest.TestCase): except ImportError as e: print("warning: no mpi4py") exe.run(startup_program) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.InMemoryDataset() + + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + use_var=slots_vars) dataset.set_filelist([ "test_in_memory_dataset2_run_a.txt", "test_in_memory_dataset2_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() fleet._opt_info = None fleet._fleet_ptr = None @@ -949,16 +938,16 @@ class TestDataset2(unittest.TestCase): except ImportError as e: print("warning: no mpi4py") exe.run(startup_program) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.InMemoryDataset() + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + use_var=slots_vars) dataset.set_filelist([ "test_in_memory_dataset2_run2_a.txt", "test_in_memory_dataset2_run2_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) dataset.load_into_memory() try: dataset.global_shuffle(fleet) @@ -966,14 +955,11 @@ class TestDataset2(unittest.TestCase): print("warning: catch expected error") fleet._opt_info = None fleet._fleet_ptr = None - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_rank_offset("") - dataset.set_pv_batch_size(1) - dataset.set_hdfs_config("", "") + dataset = paddle.distributed.InMemoryDataset() + dataset.init(fs_name="", fs_ugi="") d = paddle.distributed.fleet.DatasetBase() try: - dataset.set_feed_type("MultiSlotInMemoryDataFeed") + dataset._set_feed_type("MultiSlotInMemoryDataFeed") except: print("warning: catch expected error") dataset.thread_num = 0 @@ -981,9 +967,6 @@ class TestDataset2(unittest.TestCase): dataset._prepare_to_run() except: print("warning: catch expected error") - dataset.set_parse_logkey(True) - dataset.set_merge_by_sid(True) - dataset.set_enable_pv_merge(True) try: dataset.preprocess_instance() except: @@ -996,16 +979,15 @@ class TestDataset2(unittest.TestCase): dataset.postprocess_instance() except: print("warning: catch expected error") - dataset.set_fleet_send_batch_size(1024) + dataset._set_fleet_send_batch_size(1024) try: dataset.global_shuffle() except: print("warning: catch expected error") - dataset.get_pv_data_size() + #dataset.get_pv_data_size() dataset.get_memory_data_size() dataset.get_shuffle_data_size() - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "QueueDataset") + dataset = paddle.distributed.QueueDataset() try: dataset.local_shuffle() except: @@ -1027,6 +1009,120 @@ class TestDataset2(unittest.TestCase): os.remove("./test_in_memory_dataset2_run2_a.txt") os.remove("./test_in_memory_dataset2_run2_b.txt") + def test_bosps_dataset_fleet2(self): + """ + Testcase for InMemoryDataset from create to run. + """ + with open("test_in_memory_dataset2_run2_a.txt", "w") as f: + data = "1 1 2 3 3 4 5 5 5 5 1 1\n" + data += "1 2 2 3 4 4 6 6 6 6 1 2\n" + data += "1 3 2 3 5 4 7 7 7 7 1 3\n" + f.write(data) + with open("test_in_memory_dataset2_run2_b.txt", "w") as f: + data = "1 4 2 3 3 4 5 5 5 5 1 4\n" + data += "1 5 2 3 4 4 6 6 6 6 1 5\n" + data += "1 6 2 3 5 4 7 7 7 7 1 6\n" + data += "1 7 2 3 6 4 8 8 8 8 1 7\n" + f.write(data) + + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + with fluid.program_guard(train_program, startup_program): + slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"] + slots_vars = [] + for slot in slots: + var = fluid.layers.data(\ + name=slot, shape=[1], dtype="float32", lod_level=1) + slots_vars.append(var) + fake_cost = \ + fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1]) + fake_cost = fluid.layers.mean(fake_cost) + with fluid.scope_guard(scope): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + try: + fleet.init() + except ImportError as e: + print("warning: no mpi4py") + adam = fluid.optimizer.Adam(learning_rate=0.000005) + try: + adam = fleet.distributed_optimizer( + adam, + strategy={ + "fs_uri": "fs_uri_xxx", + "fs_user": "fs_user_xxx", + "fs_passwd": "fs_passwd_xxx", + "fs_hadoop_bin": "fs_hadoop_bin_xxx" + }) + adam.minimize([fake_cost], [scope]) + except AttributeError as e: + print("warning: no mpi") + except ImportError as e: + print("warning: no mpi4py") + exe.run(startup_program) + dataset = paddle.distributed.fleet.BoxPSDataset() + dataset.init( + batch_size=32, + thread_num=3, + pipe_command="cat", + use_var=slots_vars) + dataset.set_filelist([ + "test_in_memory_dataset2_run2_a.txt", + "test_in_memory_dataset2_run2_b.txt" + ]) + dataset.load_into_memory() + try: + dataset.global_shuffle(fleet) + except: + print("warning: catch expected error") + fleet._opt_info = None + fleet._fleet_ptr = None + dataset = paddle.distributed.fleet.BoxPSDataset() + dataset.init( + rank_offset="", + pv_batch_size=1, + fs_name="", + fs_ugi="", + data_feed_type="MultiSlotInMemoryDataFeed", + parse_logkey=True, + merge_by_sid=True, + enable_pv_merge=True) + d = paddle.distributed.fleet.DatasetBase() + try: + dataset._set_feed_type("MultiSlotInMemoryDataFeed") + except: + print("warning: catch expected error") + dataset.thread_num = 0 + try: + dataset._prepare_to_run() + except: + print("warning: catch expected error") + dataset._set_parse_logkey(True) + dataset._set_merge_by_sid(True) + dataset._set_enable_pv_merge(True) + try: + dataset.preprocess_instance() + except: + print("warning: catch expected error") + try: + dataset.set_current_phase(1) + except: + print("warning: catch expected error") + try: + dataset.postprocess_instance() + except: + print("warning: catch expected error") + dataset._set_fleet_send_batch_size(1024) + try: + dataset.global_shuffle() + except: + print("warning: catch expected error") + #dataset.get_pv_data_size() + dataset.get_memory_data_size() + dataset.get_shuffle_data_size() + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py index c13c33f209f..9195ac277b9 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py +++ b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py @@ -97,9 +97,11 @@ class DatasetLoaderTestBase(unittest.TestCase): def check_batch_number(self, place, randomize_batch_num=False): main_prog, startup_prog, feeds = self.build_network() - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - self.dataset_name) - dataset.set_batch_size(BATCH_SIZE) + if self.dataset_name == "QueueDataset": + dataset = paddle.distributed.QueueDataset() + else: + dataset = paddle.distributed.InMemoryDataset() + dataset._set_batch_size(BATCH_SIZE) if isinstance(place, fluid.CPUPlace): file_num = 10 @@ -128,8 +130,8 @@ class DatasetLoaderTestBase(unittest.TestCase): fake_reader(batch_num=BATCH_NUM + random_delta_batch_size[i])) dataset.set_filelist(filelist) - dataset.set_use_var(feeds) - dataset.set_pipe_command("cat") + dataset._set_use_var(feeds) + dataset._set_pipe_command("cat") if self.dataset_name == 'InMemoryDataset': dataset.load_into_memory() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py index eb5d9eb6660..a831f6e838e 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -163,10 +163,9 @@ class TestCloudRoleMaker2(unittest.TestCase): data = "1 1 1 1\n" f.write(data) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") + dataset = paddle.distributed.InMemoryDataset() dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"]) - dataset.set_use_var([show, label]) + dataset._set_use_var([show, label]) dataset.load_into_memory() dataset.get_memory_data_size(fleet) dataset.get_shuffle_data_size(fleet) diff --git a/python/paddle/fluid/tests/unittests/test_monitor.py b/python/paddle/fluid/tests/unittests/test_monitor.py index f6207edb41c..cf273876b1f 100644 --- a/python/paddle/fluid/tests/unittests/test_monitor.py +++ b/python/paddle/fluid/tests/unittests/test_monitor.py @@ -52,18 +52,17 @@ class TestDatasetWithStat(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = paddle.distributed.fleet.DatasetFactory().create_dataset( - "InMemoryDataset") - dataset.set_batch_size(32) - dataset.set_thread(3) + dataset = paddle.distributed.InMemoryDataset() + dataset._set_batch_size(32) + dataset._set_thread(3) dataset.set_filelist([ "test_in_memory_dataset_run_a.txt", "test_in_memory_dataset_run_b.txt" ]) - dataset.set_pipe_command("cat") - dataset.set_use_var(slots_vars) + dataset._set_pipe_command("cat") + dataset._set_use_var(slots_vars) dataset.load_into_memory() - dataset.set_fea_eval(1, True) + dataset._set_fea_eval(1, True) dataset.slots_shuffle(["slot1"]) exe = fluid.Executor(fluid.CPUPlace()) -- GitLab