diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index 50bb5d29240a7d550135dd46742b6148c43bdac6..80c5107e5b09d8e6eb516cf2d287b041117c9b25 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -240,6 +240,8 @@ class DatasetBase(object): class InMemoryDataset(DatasetBase): """ + :api_attr: Static Graph + InMemoryDataset, it will load data into memory and shuffle data before training. @@ -265,6 +267,8 @@ class InMemoryDataset(DatasetBase): def _init_distributed_settings(self, **kwargs): """ + :api_attr: Static Graph + should be called only once in user's python scripts to initialize distributed-related setings of dataset instance Args: kwargs: Keyword arguments. Currently, we support following keys in **kwargs: @@ -323,6 +327,8 @@ class InMemoryDataset(DatasetBase): def update_settings(self, **kwargs): """ + :api_attr: Static Graph + should be called in user's python scripts to update setings of dataset instance Args: kwargs: Keyword arguments. Currently, we support following keys in **kwargs, @@ -400,6 +406,8 @@ class InMemoryDataset(DatasetBase): def init(self, **kwargs): """ + :api_attr: Static Graph + should be called only once in user's python scripts to initialize setings of dataset instance Args: kwargs: Keyword arguments. Currently, we support following keys in **kwargs: @@ -639,6 +647,8 @@ class InMemoryDataset(DatasetBase): def load_into_memory(self): """ + :api_attr: Static Graph + Load data into memory Examples: @@ -655,6 +665,8 @@ class InMemoryDataset(DatasetBase): def preload_into_memory(self, thread_num=None): """ + :api_attr: Static Graph + Load data into memory in async mode Args: @@ -679,6 +691,8 @@ class InMemoryDataset(DatasetBase): def wait_preload_done(self): """ + :api_attr: Static Graph + Wait preload_into_memory done Examples: @@ -696,6 +710,8 @@ class InMemoryDataset(DatasetBase): def local_shuffle(self): """ + :api_attr: Static Graph + Local shuffle Examples: @@ -712,6 +728,8 @@ class InMemoryDataset(DatasetBase): def global_shuffle(self, fleet=None, thread_num=12): """ + :api_attr: Static Graph + Global shuffle. Global shuffle can be used only in distributed mode. i.e. multiple processes on single machine or multiple machines training together. @@ -781,6 +799,8 @@ class InMemoryDataset(DatasetBase): def get_memory_data_size(self, fleet=None): """ + :api_attr: Static Graph + Get memory data size, user can call this function to know the num of ins in all workers after load into memory. @@ -817,6 +837,8 @@ class InMemoryDataset(DatasetBase): def get_shuffle_data_size(self, fleet=None): """ + :api_attr: Static Graph + Get shuffle data size, user can call this function to know the num of ins in all workers after local/global shuffle. @@ -901,6 +923,8 @@ class InMemoryDataset(DatasetBase): class QueueDataset(DatasetBase): """ + :api_attr: Static Graph + QueueDataset, it will process data streamly. Examples: @@ -920,6 +944,8 @@ class QueueDataset(DatasetBase): def init(self, **kwargs): """ + :api_attr: Static Graph + should be called only once in user's python scripts to initialize setings of dataset instance """ super(QueueDataset, self).init(**kwargs) diff --git a/python/paddle/fluid/dataset.py b/python/paddle/fluid/dataset.py index 87b1ce2511e78714e066325b4d7c3b351b08cf13..7de2c0a114ba3015ab21040c1fa0ec0ada476623 100644 --- a/python/paddle/fluid/dataset.py +++ b/python/paddle/fluid/dataset.py @@ -335,6 +335,7 @@ class InMemoryDataset(DatasetBase): dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset") """ + @deprecated(since="2.0.0", update_to="paddle.distributed.InMemoryDataset") def __init__(self): """ Init. """ super(InMemoryDataset, self).__init__() @@ -350,12 +351,18 @@ class InMemoryDataset(DatasetBase): self.merge_by_lineid = False self.fleet_send_sleep_seconds = None + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_feed_type") def set_feed_type(self, data_feed_type): """ Set data_feed_desc """ self.proto_desc.name = data_feed_type + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._prepare_to_run") def _prepare_to_run(self): """ Set data_feed_desc before load or shuffle, @@ -376,16 +383,27 @@ class InMemoryDataset(DatasetBase): self.dataset.create_channel() self.dataset.create_readers() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._dynamic_adjust_before_train" + ) def _dynamic_adjust_before_train(self, thread_num): if not self.is_user_set_queue_num: self.dataset.dynamic_adjust_channel_num(thread_num, False) self.dataset.dynamic_adjust_readers_num(thread_num) + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._dynamic_adjust_after_train" + ) def _dynamic_adjust_after_train(self): if not self.is_user_set_queue_num: self.dataset.dynamic_adjust_channel_num(self.thread_num, False) self.dataset.dynamic_adjust_readers_num(self.thread_num) + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_queue_num") def set_queue_num(self, queue_num): """ Set Dataset output queue num, training threads get data from queues @@ -404,6 +422,9 @@ class InMemoryDataset(DatasetBase): self.is_user_set_queue_num = True self.queue_num = queue_num + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_parse_ins_id") def set_parse_ins_id(self, parse_ins_id): """ Set id Dataset need to parse insid @@ -421,6 +442,9 @@ class InMemoryDataset(DatasetBase): """ self.parse_ins_id = parse_ins_id + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_parse_content") def set_parse_content(self, parse_content): """ Set if Dataset need to parse content @@ -455,6 +479,9 @@ class InMemoryDataset(DatasetBase): """ self.parse_logkey = parse_logkey + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_merge_by_sid") def set_merge_by_sid(self, merge_by_sid): """ Set if Dataset need to merge sid. If not, one ins means one Pv. @@ -544,6 +571,10 @@ class InMemoryDataset(DatasetBase): """ self.dataset.postprocess_instance() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_fleet_send_batch_size" + ) def set_fleet_send_batch_size(self, fleet_send_batch_size=1024): """ Set fleet send batch size, default is 1024 @@ -561,6 +592,10 @@ class InMemoryDataset(DatasetBase): """ self.fleet_send_batch_size = fleet_send_batch_size + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_fleet_send_sleep_seconds" + ) def set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): """ Set fleet send sleep time, default is 0 @@ -578,6 +613,9 @@ class InMemoryDataset(DatasetBase): """ self.fleet_send_sleep_seconds = fleet_send_sleep_seconds + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_merge_by_lineid") def set_merge_by_lineid(self, merge_size=2): """ Set merge by line id, instances of same line id will be merged after @@ -598,16 +636,27 @@ class InMemoryDataset(DatasetBase): self.merge_by_lineid = True self.parse_ins_id = True + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._set_generate_unique_feasigns" + ) def set_generate_unique_feasigns(self, generate_uni_feasigns, shard_num): self.dataset.set_generate_unique_feasigns(generate_uni_feasigns) self.gen_uni_feasigns = generate_uni_feasigns self.local_shard_num = shard_num + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset._generate_local_tables_unlock" + ) def generate_local_tables_unlock(self, table_id, fea_dim, read_thread_num, consume_thread_num, shard_num): self.dataset.generate_local_tables_unlock( table_id, fea_dim, read_thread_num, consume_thread_num, shard_num) + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.load_into_memory") def load_into_memory(self): """ Load data into memory @@ -624,6 +673,9 @@ class InMemoryDataset(DatasetBase): self._prepare_to_run() self.dataset.load_into_memory() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.preload_into_memory") def preload_into_memory(self, thread_num=None): """ Load data into memory in async mode @@ -648,6 +700,9 @@ class InMemoryDataset(DatasetBase): self.dataset.create_preload_readers() self.dataset.preload_into_memory() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.wait_preload_done") def wait_preload_done(self): """ Wait preload_into_memory done @@ -665,6 +720,9 @@ class InMemoryDataset(DatasetBase): self.dataset.wait_preload_done() self.dataset.destroy_preload_readers() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.local_shuffle") def local_shuffle(self): """ Local shuffle @@ -681,6 +739,9 @@ class InMemoryDataset(DatasetBase): """ self.dataset.local_shuffle() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.global_shuffle") def global_shuffle(self, fleet=None, thread_num=12): """ Global shuffle. @@ -726,6 +787,9 @@ class InMemoryDataset(DatasetBase): if fleet is not None: fleet._role_maker.barrier_worker() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.release_memory") def release_memory(self): """ :api_attr: Static Graph @@ -774,6 +838,9 @@ class InMemoryDataset(DatasetBase): """ return self.dataset.get_pv_data_size() + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.get_memory_data_size") def get_memory_data_size(self, fleet=None): """ Get memory data size, user can call this function to know the num @@ -810,6 +877,9 @@ class InMemoryDataset(DatasetBase): return global_data_size[0] return local_data_size[0] + @deprecated( + since="2.0.0", + update_to="paddle.distributed.InMemoryDataset.get_shuffle_data_size") def get_shuffle_data_size(self, fleet=None): """ Get shuffle data size, user can call this function to know the num @@ -869,6 +939,9 @@ class QueueDataset(DatasetBase): super(QueueDataset, self).__init__() self.proto_desc.name = "MultiSlotDataFeed" + @deprecated( + since="2.0.0", + update_to="paddle.distributed.QueueDataset._prepare_to_run") def _prepare_to_run(self): """ Set data_feed_desc/thread num/filelist before run,