From 20c8432ae3e60e47f2e3f228f6bbeb64a8619876 Mon Sep 17 00:00:00 2001 From: xujiaqi01 <173596896@qq.com> Date: Thu, 6 Aug 2020 17:48:57 +0800 Subject: [PATCH] move dataset from paddfle.fluid to paddle.fleet (#25887) * move dataset to fleet test=develop * fix test=develop * fix test=develop * fix test=develop * test=develop * test=develop * test=develop * test=develop * test=develop * test=develop * test=develop --- python/paddle/fleet/__init__.py | 7 +- python/paddle/fleet/dataset/__init__.py | 2 + python/paddle/fleet/dataset/dataset.py | 1103 +++++++++++++++++ python/paddle/fluid/reader.py | 5 +- .../fluid/tests/unittests/dist_fleet_ctr.py | 2 +- .../fluid/tests/unittests/test_dataset.py | 111 +- .../unittests/test_dataset_dataloader.py | 4 +- .../tests/unittests/test_fleet_rolemaker_2.py | 4 +- .../fluid/tests/unittests/test_monitor.py | 4 +- .../fluid/tests/unittests/test_pipeline.py | 3 +- 10 files changed, 1213 insertions(+), 32 deletions(-) create mode 100644 python/paddle/fleet/dataset/dataset.py diff --git a/python/paddle/fleet/__init__.py b/python/paddle/fleet/__init__.py index b25c362ce9..cc5ce0f2b7 100644 --- a/python/paddle/fleet/__init__.py +++ b/python/paddle/fleet/__init__.py @@ -16,10 +16,13 @@ from .base.distributed_strategy import DistributedStrategy from .base.fleet_base import Fleet from .base.util_factory import UtilBase - +from .dataset import * #from .base.role_maker import PaddleCloudRoleMaker -__all__ = ["DistributedStrategy", "UtilBase"] +__all__ = [ + "DistributedStrategy", "UtilBase", "DatasetFactory", "DatasetBase", + "InMemoryDataset", "QueueDataset" +] fleet = Fleet() init = fleet.init diff --git a/python/paddle/fleet/dataset/__init__.py b/python/paddle/fleet/dataset/__init__.py index 8647330f32..af33c4eafb 100644 --- a/python/paddle/fleet/dataset/__init__.py +++ b/python/paddle/fleet/dataset/__init__.py @@ -10,3 +10,5 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and + +from .dataset import * diff --git a/python/paddle/fleet/dataset/dataset.py b/python/paddle/fleet/dataset/dataset.py new file mode 100644 index 0000000000..f6504cacd9 --- /dev/null +++ b/python/paddle/fleet/dataset/dataset.py @@ -0,0 +1,1103 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This is definition of dataset class, which is high performance IO.""" + +import paddle +import paddle.fluid as fluid +from paddle.fluid.proto import data_feed_pb2 +from google.protobuf import text_format +import paddle.fluid.core as core + + +class DatasetFactory(object): + """ + DatasetFactory is a factory which create dataset by its name, + you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", + the default is "QueueDataset". + + Example: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + + """ + + def __init__(self): + """ Init. """ + pass + + def create_dataset(self, datafeed_class="QueueDataset"): + """ + Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset", + the default is "QueueDataset". + + Args: + datafeed_class(str): datafeed class name, QueueDataset or InMemoryDataset. + Default is QueueDataset. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + + """ + try: + dataset = globals()[datafeed_class]() + return dataset + except: + raise ValueError("datafeed class %s does not exist" % + datafeed_class) + + +class DatasetBase(object): + """ Base dataset class. """ + + def __init__(self): + """ Init. """ + # define class name here + # to decide whether we need create in memory instance + self.proto_desc = data_feed_pb2.DataFeedDesc() + self.proto_desc.pipe_command = "cat" + self.dataset = core.Dataset("MultiSlotDataset") + self.thread_num = 1 + self.filelist = [] + + def set_pipe_command(self, pipe_command): + """ + Set pipe command of current dataset + A pipe command is a UNIX pipeline command that can be used only + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_pipe_command("python my_script.py") + + Args: + pipe_command(str): pipe command + + """ + self.proto_desc.pipe_command = pipe_command + + def set_rank_offset(self, rank_offset): + """ + Set rank_offset for merge_pv. It set the message of Pv. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_rank_offset("rank_offset") + + Args: + rank_offset(str): rank_offset's name + + """ + self.proto_desc.rank_offset = rank_offset + + def set_fea_eval(self, record_candidate_size, fea_eval=True): + """ + set fea eval mode for slots shuffle to debug the importance level of + slots(features), fea_eval need to be set True for slots shuffle. + + Args: + record_candidate_size(int): size of instances candidate to shuffle + one slot + fea_eval(bool): whether enable fea eval mode to enable slots shuffle. + default is True. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_fea_eval(1000000, True) + + """ + if fea_eval: + self.dataset.set_fea_eval(fea_eval, record_candidate_size) + self.fea_eval = fea_eval + + def slots_shuffle(self, slots): + """ + Slots Shuffle + Slots Shuffle is a shuffle method in slots level, which is usually used + in sparse feature with large scale of instances. To compare the metric, i.e. + auc while doing slots shuffle on one or several slots with baseline to + evaluate the importance level of slots(features). + + Args: + slots(list[string]): the set of slots(string) to do slots shuffle. + + Examples: + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_merge_by_lineid() + #suppose there is a slot 0 + dataset.slots_shuffle(['0']) + """ + if self.fea_eval: + slots_set = set(slots) + self.dataset.slots_shuffle(slots_set) + + def set_batch_size(self, batch_size): + """ + Set batch size. Will be effective during training + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_batch_size(128) + + Args: + batch_size(int): batch size + + """ + self.proto_desc.batch_size = batch_size + + def set_pv_batch_size(self, pv_batch_size): + """ + Set pv batch size. It will be effective during enable_pv_merge + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_pv_batch(128) + Args: + pv_batch_size(int): pv batch size + + """ + self.proto_desc.pv_batch_size = pv_batch_size + + def set_thread(self, thread_num): + """ + Set thread num, it is the num of readers. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_thread(12) + + Args: + thread_num(int): thread num + """ + self.dataset.set_thread_num(thread_num) + self.thread_num = thread_num + + def set_filelist(self, filelist): + """ + Set file list in current worker. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_filelist(['a.txt', 'b.txt']) + + Args: + filelist(list): file list + """ + self.dataset.set_filelist(filelist) + self.filelist = filelist + + def set_input_type(self, input_type): + self.proto_desc.input_type = input_type + + def set_use_var(self, var_list): + """ + Set Variables which you will use. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_use_var([data, label]) + + Args: + var_list(list): variable list + """ + multi_slot = self.proto_desc.multi_slot_desc + for var in var_list: + slot_var = multi_slot.slots.add() + slot_var.is_used = True + slot_var.name = var.name + if var.lod_level == 0: + slot_var.is_dense = True + slot_var.shape.extend(var.shape) + if var.dtype == core.VarDesc.VarType.FP32: + slot_var.type = "float" + elif var.dtype == core.VarDesc.VarType.INT64: + slot_var.type = "uint64" + else: + raise ValueError( + "Currently, fluid.dataset only supports dtype=float32 and dtype=int64" + ) + + def set_hdfs_config(self, fs_name, fs_ugi): + """ + Set hdfs config: fs name ad ugi + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_hdfs_config("my_fs_name", "my_fs_ugi") + + Args: + fs_name(str): fs name + fs_ugi(str): fs ugi + """ + self.dataset.set_hdfs_config(fs_name, fs_ugi) + + def set_download_cmd(self, download_cmd): + """ + Set customized download cmd: download_cmd + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_download_cmd("./read_from_afs") + + Args: + download_cmd(str): customized download command + """ + self.dataset.set_download_cmd(download_cmd) + + def _prepare_to_run(self): + """ + Set data_feed_desc before load or shuffle, + user no need to call this function. + """ + if self.thread_num > len(self.filelist): + self.thread_num = len(self.filelist) + self.dataset.set_thread_num(self.thread_num) + self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_readers() + + def _finish_to_run(self): + self.dataset.destroy_readers() + + def desc(self): + """ + Returns a protobuf message for this DataFeedDesc + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset() + print(dataset.desc()) + + Returns: + A string message + """ + return text_format.MessageToString(self.proto_desc) + + def _dynamic_adjust_before_train(self, thread_num): + pass + + def _dynamic_adjust_after_train(self): + pass + + +class InMemoryDataset(DatasetBase): + """ + InMemoryDataset, it will load data into memory + and shuffle data before training. + This class should be created by DatasetFactory + + Example: + dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset") + """ + + def __init__(self): + """ Init. """ + super(InMemoryDataset, self).__init__() + self.proto_desc.name = "MultiSlotInMemoryDataFeed" + self.fleet_send_batch_size = None + self.is_user_set_queue_num = False + self.queue_num = None + self.parse_ins_id = False + self.parse_content = False + self.parse_logkey = False + self.merge_by_sid = True + self.enable_pv_merge = False + self.merge_by_lineid = False + self.fleet_send_sleep_seconds = None + + def set_feed_type(self, data_feed_type): + """ + Set data_feed_desc + """ + self.proto_desc.name = data_feed_type + + def _prepare_to_run(self): + """ + Set data_feed_desc before load or shuffle, + user no need to call this function. + """ + if self.thread_num <= 0: + self.thread_num = 1 + self.dataset.set_thread_num(self.thread_num) + if self.queue_num is None: + self.queue_num = self.thread_num + self.dataset.set_queue_num(self.queue_num) + self.dataset.set_parse_ins_id(self.parse_ins_id) + self.dataset.set_parse_content(self.parse_content) + self.dataset.set_parse_logkey(self.parse_logkey) + self.dataset.set_merge_by_sid(self.merge_by_sid) + self.dataset.set_enable_pv_merge(self.enable_pv_merge) + self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_channel() + self.dataset.create_readers() + + def _dynamic_adjust_before_train(self, thread_num): + if not self.is_user_set_queue_num: + self.dataset.dynamic_adjust_channel_num(thread_num, False) + self.dataset.dynamic_adjust_readers_num(thread_num) + + def _dynamic_adjust_after_train(self): + if not self.is_user_set_queue_num: + self.dataset.dynamic_adjust_channel_num(self.thread_num, False) + self.dataset.dynamic_adjust_readers_num(self.thread_num) + + def set_queue_num(self, queue_num): + """ + Set Dataset output queue num, training threads get data from queues + + Args: + queue_num(int): dataset output queue num + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_queue_num(12) + + """ + self.is_user_set_queue_num = True + self.queue_num = queue_num + + def set_parse_ins_id(self, parse_ins_id): + """ + Set id Dataset need to parse insid + + Args: + parse_ins_id(bool): if parse ins_id or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_parse_ins_id(True) + + """ + self.parse_ins_id = parse_ins_id + + def set_parse_content(self, parse_content): + """ + Set if Dataset need to parse content + + Args: + parse_content(bool): if parse content or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_parse_content(True) + + """ + self.parse_content = parse_content + + def set_parse_logkey(self, parse_logkey): + """ + Set if Dataset need to parse logkey + + Args: + parse_content(bool): if parse logkey or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_parse_logkey(True) + + """ + self.parse_logkey = parse_logkey + + def set_merge_by_sid(self, merge_by_sid): + """ + Set if Dataset need to merge sid. If not, one ins means one Pv. + + Args: + merge_by_sid(bool): if merge sid or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_merge_by_sid(True) + + """ + self.merge_by_sid = merge_by_sid + + def set_enable_pv_merge(self, enable_pv_merge): + """ + Set if Dataset need to merge pv. + + Args: + enable_pv_merge(bool): if enable_pv_merge or not + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_enable_pv_merge(True) + + """ + self.enable_pv_merge = enable_pv_merge + + def preprocess_instance(self): + """ + Merge pv instance and convey it from input_channel to input_pv_channel. + It will be effective when enable_pv_merge_ is True. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.preprocess_instance() + + """ + self.dataset.preprocess_instance() + + def set_current_phase(self, current_phase): + """ + Set current phase in train. It is useful for untest. + current_phase : 1 for join, 0 for update. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.set_current_phase(1) + + """ + self.dataset.set_current_phase(current_phase) + + def postprocess_instance(self): + """ + Divide pv instance and convey it to input_channel. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.preprocess_instance() + exe.train_from_dataset(dataset) + dataset.postprocess_instance() + + """ + self.dataset.postprocess_instance() + + def set_fleet_send_batch_size(self, fleet_send_batch_size=1024): + """ + Set fleet send batch size, default is 1024 + + Args: + fleet_send_batch_size(int): fleet send batch size + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_fleet_send_batch_size(800) + + """ + self.fleet_send_batch_size = fleet_send_batch_size + + def set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): + """ + Set fleet send sleep time, default is 0 + + Args: + fleet_send_sleep_seconds(int): fleet send sleep time + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_fleet_send_sleep_seconds(2) + + """ + self.fleet_send_sleep_seconds = fleet_send_sleep_seconds + + def set_merge_by_lineid(self, merge_size=2): + """ + Set merge by line id, instances of same line id will be merged after + shuffle, you should parse line id in data generator. + + Args: + merge_size(int): ins size to merge. default is 2. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_merge_by_lineid() + + """ + self.dataset.set_merge_by_lineid(merge_size) + self.merge_by_lineid = True + self.parse_ins_id = True + + def set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num): + self.dataset.set_generate_unique_feasigns(generate_uni_feasigns) + self.gen_uni_feasigns = generate_uni_feasigns + self.local_shard_num = shard_num + + def generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, + consume_thread_num, shard_num): + self.dataset.generate_local_tables_unlock( + table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) + + def load_into_memory(self): + """ + Load data into memory + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + """ + self._prepare_to_run() + self.dataset.load_into_memory() + + def preload_into_memory(self, thread_num=None): + """ + Load data into memory in async mode + + Args: + thread_num(int): preload thread num + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() + """ + self._prepare_to_run() + if thread_num is None: + thread_num = self.thread_num + self.dataset.set_preload_thread_num(thread_num) + self.dataset.create_preload_readers() + self.dataset.preload_into_memory() + + def wait_preload_done(self): + """ + Wait preload_into_memory done + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() + """ + self.dataset.wait_preload_done() + self.dataset.destroy_preload_readers() + + def local_shuffle(self): + """ + Local shuffle + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.local_shuffle() + """ + self.dataset.local_shuffle() + + def global_shuffle(self, fleet=None, thread_num=12): + """ + Global shuffle. + Global shuffle can be used only in distributed mode. i.e. multiple + processes on single machine or multiple machines training together. + If you run in distributed mode, you should pass fleet instead of None. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.global_shuffle(fleet) + + Args: + fleet(Fleet): fleet singleton. Default None. + thread_num(int): shuffle thread num. Default is 12. + + """ + trainer_num = 1 + if fleet is not None: + fleet._role_maker.barrier_worker() + trainer_num = fleet.worker_num() + if self.fleet_send_batch_size is None: + self.fleet_send_batch_size = 1024 + if self.fleet_send_sleep_seconds is None: + self.fleet_send_sleep_seconds = 0 + self.dataset.register_client2client_msg_handler() + self.dataset.set_trainer_num(trainer_num) + self.dataset.set_fleet_send_batch_size(self.fleet_send_batch_size) + self.dataset.set_fleet_send_sleep_seconds(self.fleet_send_sleep_seconds) + if fleet is not None: + fleet._role_maker.barrier_worker() + self.dataset.global_shuffle(thread_num) + if fleet is not None: + fleet._role_maker.barrier_worker() + if self.merge_by_lineid: + self.dataset.merge_by_lineid() + if fleet is not None: + fleet._role_maker.barrier_worker() + + def release_memory(self): + """ + :api_attr: Static Graph + + Release InMemoryDataset memory data, when data will not be used again. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.global_shuffle(fleet) + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + exe.train_from_dataset(fluid.default_main_program(), dataset) + dataset.release_memory() + + """ + self.dataset.release_memory() + + def get_pv_data_size(self): + """ + Get memory data size of Pv, user can call this function to know the pv num + of ins in all workers after load into memory. + + Note: + This function may cause bad performance, because it has barrier + + Returns: + The size of memory pv data. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + print dataset.get_pv_data_size() + + """ + return self.dataset.get_pv_data_size() + + def get_memory_data_size(self, fleet=None): + """ + Get memory data size, user can call this function to know the num + of ins in all workers after load into memory. + + Note: + This function may cause bad performance, because it has barrier + + Args: + fleet(Fleet): Fleet Object. + + Returns: + The size of memory data. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + print dataset.get_memory_data_size(fleet) + + """ + import numpy as np + local_data_size = self.dataset.get_memory_data_size() + local_data_size = np.array([local_data_size]) + if fleet is not None: + global_data_size = local_data_size * 0 + fleet._role_maker.all_reduce_worker(local_data_size, + global_data_size) + return global_data_size[0] + return local_data_size[0] + + def get_shuffle_data_size(self, fleet=None): + """ + Get shuffle data size, user can call this function to know the num + of ins in all workers after local/global shuffle. + + Note: + This function may cause bad performance to local shuffle, + because it has barrier. It does not affect global shuffle. + + Args: + fleet(Fleet): Fleet Object. + + Returns: + The size of shuffle data. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.global_shuffle(fleet) + print dataset.get_shuffle_data_size(fleet) + + """ + import numpy as np + local_data_size = self.dataset.get_shuffle_data_size() + local_data_size = np.array([local_data_size]) + if fleet is not None: + global_data_size = local_data_size * 0 + fleet._role_maker.all_reduce_worker(local_data_size, + global_data_size) + return global_data_size[0] + return local_data_size[0] + + +class QueueDataset(DatasetBase): + """ + QueueDataset, it will process data streamly. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + + """ + + def __init__(self): + """ + Initialize QueueDataset + This class should be created by DatasetFactory + """ + super(QueueDataset, self).__init__() + self.proto_desc.name = "MultiSlotDataFeed" + + def _prepare_to_run(self): + """ + Set data_feed_desc/thread num/filelist before run, + user no need to call this function. + """ + if self.thread_num > len(self.filelist): + self.thread_num = len(self.filelist) + if self.thread_num == 0: + self.thread_num = 1 + self.dataset.set_thread_num(self.thread_num) + self.dataset.set_filelist(self.filelist) + self.dataset.set_data_feed_desc(self.desc()) + self.dataset.create_readers() + + def local_shuffle(self): + """ + Local shuffle data. + + Local shuffle is not supported in QueueDataset + NotImplementedError will be raised + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset.local_shuffle() + + Raises: + NotImplementedError: QueueDataset does not support local shuffle + + """ + raise NotImplementedError( + "QueueDataset does not support local shuffle, " + "please use InMemoryDataset for local_shuffle") + + def global_shuffle(self, fleet=None): + """ + Global shuffle data. + + Global shuffle is not supported in QueueDataset + NotImplementedError will be raised + + Args: + fleet(Fleet): fleet singleton. Default None. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset.global_shuffle(fleet) + + Raises: + NotImplementedError: QueueDataset does not support global shuffle + + """ + raise NotImplementedError( + "QueueDataset does not support global shuffle, " + "please use InMemoryDataset for global_shuffle") + + +class FileInstantDataset(DatasetBase): + """ + FileInstantDataset, it will process data streamly. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset") + """ + + def __init__(self): + """ + Initialize FileInstantDataset + This class should be created by DatasetFactory + """ + super(FileInstantDataset, self).__init__() + self.proto_desc.name = "MultiSlotFileInstantDataFeed" + + def local_shuffle(self): + """ + Local shuffle + FileInstantDataset does not support local shuffle + """ + raise NotImplementedError( + "FileInstantDataset does not support local shuffle, " + "please use InMemoryDataset for local_shuffle") + + def global_shuffle(self, fleet=None): + """ + Global shuffle + FileInstantDataset does not support global shuffle + """ + raise NotImplementedError( + "FileInstantDataset does not support global shuffle, " + "please use InMemoryDataset for global_shuffle") + + +class BoxPSDataset(InMemoryDataset): + """ + BoxPSDataset: derived from InMemoryDataset. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + """ + + def __init__(self): + """ + Initialize BoxPSDataset + This class should be created by DatasetFactory + """ + super(BoxPSDataset, self).__init__() + self.boxps = core.BoxPS(self.dataset) + self.proto_desc.name = "PaddleBoxDataFeed" + + def set_date(self, date): + """ + Workaround for date + """ + year = int(date[:4]) + month = int(date[4:6]) + day = int(date[6:]) + self.boxps.set_date(year, month, day) + + def begin_pass(self): + """ + Begin Pass + Notify BoxPS to load sparse parameters of next pass to GPU Memory + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + dataset.begin_pass() + """ + self.boxps.begin_pass() + + def end_pass(self, need_save_delta): + """ + End Pass + Notify BoxPS that current pass ended + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + dataset.end_pass(True) + """ + self.boxps.end_pass(need_save_delta) + + def wait_preload_done(self): + """ + Wait async preload done + Wait Until Feed Pass Done + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() + """ + self.boxps.wait_feed_pass_done() + + def load_into_memory(self): + """ + Load next pass into memory and notify boxps to fetch its emb from SSD + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + """ + self._prepare_to_run() + self.boxps.load_into_memory() + + def preload_into_memory(self): + """ + Begin async preload next pass while current pass may be training + Examples: + .. code-block:: python + + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + """ + self._prepare_to_run() + self.boxps.preload_into_memory() + + def _dynamic_adjust_before_train(self, thread_num): + if not self.is_user_set_queue_num: + self.dataset.dynamic_adjust_channel_num(thread_num, True) + self.dataset.dynamic_adjust_readers_num(thread_num) + + def _dynamic_adjust_after_train(self): + pass + + def slots_shuffle(self, slots): + """ + Slots Shuffle + Slots Shuffle is a shuffle method in slots level, which is usually used + in sparse feature with large scale of instances. To compare the metric, i.e. + auc while doing slots shuffle on one or several slots with baseline to + evaluate the importance level of slots(features). + + Args: + slots(list[string]): the set of slots(string) to do slots shuffle. + + Examples: + import paddle.fluid as fluid + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_merge_by_lineid() + #suppose there is a slot 0 + dataset.slots_shuffle(['0']) + """ + slots_set = set(slots) + self.boxps.slots_shuffle(slots_set) diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 0289ecea34..1f96bbc4ce 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -28,7 +28,6 @@ from .layers.io import monkey_patch_reader_methods, _copy_reader_var_, double_bu from .unique_name import UniqueNameGenerator import logging import warnings -from .dataset import DatasetBase, InMemoryDataset ### Dygraph DataLoader configs ### import os @@ -1670,7 +1669,7 @@ class PyReader(DataLoaderBase): class DatasetLoader(DataLoaderBase): def __init__(self, dataset, places, drop_last): - assert isinstance(dataset, + assert isinstance(dataset, paddle.fleet.dataset. DatasetBase), "dataset must be type of DatasetBase" assert not in_dygraph_mode( ), "DatasetLoader is not supported in dygraph mode yet" @@ -1686,7 +1685,7 @@ class DatasetLoader(DataLoaderBase): dataset.set_thread(thread_num) - if isinstance(dataset, + if isinstance(dataset, paddle.fleet.dataset. InMemoryDataset) and dataset.queue_num > thread_num: logging.warn("queue_num {} which is set in Dataset is ignored". format(dataset.queue_num)) diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 6bf95b9d67..56ca3105de 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -210,7 +210,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): filelist.append(train_file_path) # config dataset - dataset = fluid.DatasetFactory().create_dataset() + dataset = paddle.fleet.DatasetFactory().create_dataset() dataset.set_batch_size(batch_size) dataset.set_use_var(self.feeds) pipe_command = 'python ctr_dataset_reader.py' diff --git a/python/paddle/fluid/tests/unittests/test_dataset.py b/python/paddle/fluid/tests/unittests/test_dataset.py index cc2cee6029..90d5f58539 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset.py +++ b/python/paddle/fluid/tests/unittests/test_dataset.py @@ -17,6 +17,7 @@ including create, config, run, etc. """ from __future__ import print_function +import paddle import paddle.fluid as fluid import paddle.compat as cpt import paddle.fluid.core as core @@ -37,23 +38,26 @@ class TestDataset(unittest.TestCase): def test_dataset_create(self): """ Testcase for dataset create. """ try: - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") except: self.assertTrue(False) try: - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "QueueDataset") except: self.assertTrue(False) try: - dataset = fluid.DatasetFactory().create_dataset( + dataset = paddle.fleet.DatasetFactory().create_dataset( "FileInstantDataset") except: self.assertTrue(False) try: - dataset = fluid.DatasetFactory().create_dataset("MyOwnDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "MyOwnDataset") self.assertTrue(False) except: self.assertTrue(True) @@ -91,7 +95,8 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist( @@ -125,7 +130,7 @@ class TestDataset(unittest.TestCase): dataset.set_trainer_num(4) dataset.set_hdfs_config("my_fs_name", "my_fs_ugi") dataset.set_download_cmd("./read_from_afs my_fs_name my_fs_ugi") - dataset.enable_pv_merge() + dataset.set_enable_pv_merge(False) thread_num = dataset.get_thread_num() self.assertEqual(thread_num, 12) @@ -171,7 +176,8 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist([filename1, filename2]) @@ -222,7 +228,8 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist([ @@ -293,7 +300,8 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(1) dataset.set_parse_ins_id(True) @@ -359,7 +367,8 @@ class TestDataset(unittest.TestCase): name="slot4", shape=[1], dtype="float32", lod_level=0) slots_vars = [var1, var2, var3, var4] - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(1) dataset.set_parse_ins_id(True) @@ -414,7 +423,8 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist([ @@ -507,7 +517,7 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset("QueueDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist( @@ -532,7 +542,7 @@ class TestDataset(unittest.TestCase): except Exception as e: self.assertTrue(False) - dataset2 = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset2 = paddle.fleet.DatasetFactory().create_dataset("QueueDataset") dataset2.set_use_var(slots_vars) dataset2.set_batch_size(32) dataset2.set_thread(3) @@ -573,7 +583,7 @@ class TestDataset(unittest.TestCase): name=slot, shape=[1], dtype="float32", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset("QueueDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist( @@ -628,7 +638,8 @@ class TestDataset(unittest.TestCase): name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_input_type(1) dataset.set_batch_size(1) dataset.set_thread(2) @@ -707,7 +718,7 @@ class TestDatasetWithFetchHandler(unittest.TestCase): inputs(list): inputs of get_dataset files(list): files of get_dataset """ - dataset = fluid.DatasetFactory().create_dataset("QueueDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset("QueueDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist(files) @@ -864,7 +875,8 @@ class TestDataset2(unittest.TestCase): except ImportError as e: print("warning: no mpi4py") exe.run(startup_program) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist([ @@ -884,9 +896,6 @@ class TestDataset2(unittest.TestCase): """ Testcase for InMemoryDataset from create to run. """ - - self.skipTest("parameter server will add pslib UT later") - with open("test_in_memory_dataset2_run2_a.txt", "w") as f: data = "1 1 2 3 3 4 5 5 5 5 1 1\n" data += "1 2 2 3 4 4 6 6 6 6 1 2\n" @@ -902,7 +911,7 @@ class TestDataset2(unittest.TestCase): train_program = fluid.Program() startup_program = fluid.Program() scope = fluid.Scope() - from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet with fluid.program_guard(train_program, startup_program): slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"] slots_vars = [] @@ -936,7 +945,8 @@ class TestDataset2(unittest.TestCase): except ImportError as e: print("warning: no mpi4py") exe.run(startup_program) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist([ @@ -952,6 +962,63 @@ class TestDataset2(unittest.TestCase): print("warning: catch expected error") fleet._opt_info = None fleet._fleet_ptr = None + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") + dataset.set_rank_offset("") + dataset.set_pv_batch_size(1) + dataset.set_hdfs_config("", "") + d = paddle.fleet.DatasetBase() + try: + dataset.set_feed_type("MultiSlotInMemoryDataFeed") + except: + print("warning: catch expected error") + dataset.thread_num = 0 + try: + dataset._prepare_to_run() + except: + print("warning: catch expected error") + dataset.set_parse_logkey(True) + dataset.set_merge_by_sid(True) + dataset.set_enable_pv_merge(True) + try: + dataset.preprocess_instance() + except: + print("warning: catch expected error") + try: + dataset.set_current_phase(1) + except: + print("warning: catch expected error") + try: + dataset.postprocess_instance() + except: + print("warning: catch expected error") + dataset.set_fleet_send_batch_size(1024) + try: + dataset.global_shuffle() + except: + print("warning: catch expected error") + dataset.get_pv_data_size() + dataset.get_memory_data_size() + dataset.get_shuffle_data_size() + dataset = paddle.fleet.DatasetFactory().create_dataset( + "QueueDataset") + try: + dataset.local_shuffle() + except: + print("warning: catch expected error") + try: + dataset.global_shuffle() + except: + print("warning: catch expected error") + dataset = paddle.fleet.FileInstantDataset() + try: + dataset.local_shuffle() + except: + print("warning: catch expected error") + try: + dataset.global_shuffle() + except: + print("warning: catch expected error") os.remove("./test_in_memory_dataset2_run2_a.txt") os.remove("./test_in_memory_dataset2_run2_b.txt") diff --git a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py index 10aefbb222..22d59e78ff 100644 --- a/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py +++ b/python/paddle/fluid/tests/unittests/test_dataset_dataloader.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import paddle import paddle.fluid as fluid import numpy as np import six @@ -96,7 +97,8 @@ class DatasetLoaderTestBase(unittest.TestCase): def check_batch_number(self, place, randomize_batch_num=False): main_prog, startup_prog, feeds = self.build_network() - dataset = fluid.DatasetFactory().create_dataset(self.dataset_name) + dataset = paddle.fleet.DatasetFactory().create_dataset( + self.dataset_name) dataset.set_batch_size(BATCH_SIZE) if isinstance(place, fluid.CPUPlace): diff --git a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py index 88a9d23585..351dc0a5d0 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_rolemaker_2.py @@ -14,6 +14,7 @@ """Test cases for role makers.""" from __future__ import print_function +import paddle import os import unittest @@ -162,7 +163,8 @@ class TestCloudRoleMaker2(unittest.TestCase): data = "1 1 1 1\n" f.write(data) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_filelist(["test_fleet_gloo_role_maker_1.txt"]) dataset.set_use_var([show, label]) dataset.load_into_memory() diff --git a/python/paddle/fluid/tests/unittests/test_monitor.py b/python/paddle/fluid/tests/unittests/test_monitor.py index 39601eb0e1..2d4c8f61c0 100644 --- a/python/paddle/fluid/tests/unittests/test_monitor.py +++ b/python/paddle/fluid/tests/unittests/test_monitor.py @@ -16,6 +16,7 @@ TestCases for Monitor """ from __future__ import print_function +import paddle import paddle.fluid as fluid import paddle.fluid.core as core import numpy as np @@ -51,7 +52,8 @@ class TestDatasetWithStat(unittest.TestCase): name=slot, shape=[1], dtype="int64", lod_level=1) slots_vars.append(var) - dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset = paddle.fleet.DatasetFactory().create_dataset( + "InMemoryDataset") dataset.set_batch_size(32) dataset.set_thread(3) dataset.set_filelist([ diff --git a/python/paddle/fluid/tests/unittests/test_pipeline.py b/python/paddle/fluid/tests/unittests/test_pipeline.py index fe31add697..dd1cf29eff 100644 --- a/python/paddle/fluid/tests/unittests/test_pipeline.py +++ b/python/paddle/fluid/tests/unittests/test_pipeline.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ # limitations under the License. from __future__ import print_function +import paddle import paddle.fluid as fluid import paddle.fluid.layers as layers import numpy as np -- GitLab