From cddc70964d351deb2008c6b29bf5743e750a2873 Mon Sep 17 00:00:00 2001 From: ShenLiang Date: Thu, 26 Nov 2020 22:28:26 +0800 Subject: [PATCH] fix InMemoryDataset doc (#28688) * add Inmemorydataset --- .../distributed/fleet/dataset/dataset.py | 311 +++++++++++++----- 1 file changed, 224 insertions(+), 87 deletions(-) diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index ce14909f2ec..10c27ea91d2 100644 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -241,13 +241,16 @@ class DatasetBase(object): class InMemoryDataset(DatasetBase): """ :api_attr: Static Graph + + It will load data into memory and shuffle data before training. - InMemoryDataset, it will load data into memory - and shuffle data before training. + Examples: + .. code-block:: python + + import paddle + paddle.enable_static() + dataset = paddle.distributed.InMemoryDataset() - Example: - import paddle - dataset = paddle.distributed.InMemoryDataset() """ def __init__(self): @@ -288,6 +291,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset.init( batch_size=1, @@ -329,11 +333,11 @@ class InMemoryDataset(DatasetBase): """ :api_attr: Static Graph - should be called in user's python scripts to update setings of dataset instance + should be called in user's python scripts to update setings of dataset instance. + Args: kwargs: Keyword arguments. Currently, we support following keys in **kwargs, including single node settings and advanced distributed related settings: - batch_size(int): batch size. It will be effective during training. default is 1. thread_num(int): thread num, it is the num of readers. default is 1. use_var(list): list of variables. Variables which you will use. default is []. @@ -359,20 +363,22 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.InMemoryDataset() - dataset.init( + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + dataset.init( batch_size=1, thread_num=2, input_type=1, pipe_command="cat", use_var=[]) - dataset._init_distributed_settings( + dataset._init_distributed_settings( parse_ins_id=True, parse_content=True, fea_eval=True, candidate_size=10000) - dataset.update_settings(batch_size=2) + dataset.update_settings(batch_size=2) """ for key in kwargs: @@ -409,6 +415,7 @@ class InMemoryDataset(DatasetBase): :api_attr: Static Graph should be called only once in user's python scripts to initialize setings of dataset instance + Args: kwargs: Keyword arguments. Currently, we support following keys in **kwargs: @@ -427,23 +434,20 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + import os + paddle.enable_static() + with open("test_queue_dataset_run_a.txt", "w") as f: - data = "2 1 2 2 5 4 2 2 7 2 1 3\n" - data += "2 6 2 2 1 4 2 2 4 2 2 3\n" - data += "2 5 2 2 9 9 2 2 7 2 1 3\n" - data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + data = "2 1 2 2 5 4 2 2 7 2 1 3" f.write(data) with open("test_queue_dataset_run_b.txt", "w") as f: - data = "2 1 2 2 5 4 2 2 7 2 1 3\n" - data += "2 6 2 2 1 4 2 2 4 2 2 3\n" - data += "2 5 2 2 9 9 2 2 7 2 1 3\n" - data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + data = "2 1 2 2 5 4 2 2 7 2 1 3" f.write(data) slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: - var = fluid.data( + var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) @@ -457,10 +461,8 @@ class InMemoryDataset(DatasetBase): dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) dataset.load_into_memory() - - paddle.enable_static() - place = paddle.CUDAPlace(0) if paddle.fluid.core.is_compiled_with_cuda() else paddle.CPUPlace() + place = paddle.CPUPlace() exe = paddle.static.Executor(place) startup_program = paddle.static.Program() main_program = paddle.static.Program() @@ -470,6 +472,7 @@ class InMemoryDataset(DatasetBase): os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_b.txt") + """ batch_size = kwargs.get("batch_size", 1) thread_num = kwargs.get("thread_num", 1) @@ -545,6 +548,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._set_queue_num(12) @@ -563,6 +567,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._set_parse_ins_id(True) @@ -580,6 +585,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._set_parse_content(True) @@ -597,6 +603,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._set_fleet_send_batch_size(800) @@ -614,6 +621,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._set_fleet_send_sleep_seconds(2) @@ -632,6 +640,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._set_merge_by_lineid() @@ -659,11 +668,25 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() """ self._prepare_to_run() self.dataset.load_into_memory() @@ -680,12 +703,26 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.preload_into_memory() - dataset.wait_preload_done() + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() """ self._prepare_to_run() if thread_num is None: @@ -703,12 +740,26 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.preload_into_memory() - dataset.wait_preload_done() + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.preload_into_memory() + dataset.wait_preload_done() """ self.dataset.wait_preload_done() self.dataset.destroy_preload_readers() @@ -722,12 +773,26 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.local_shuffle() + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.local_shuffle() """ self.dataset.local_shuffle() @@ -743,13 +808,26 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.global_shuffle(fleet) + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.global_shuffle() Args: fleet(Fleet): fleet singleton. Default None. @@ -787,19 +865,32 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.global_shuffle(fleet) - exe = paddle.static.Executor(paddle.CPUPlace()) - startup_program = paddle.static.Program() - main_program = paddle.static.Program() - exe.run(startup_program) - exe.train_from_dataset(main_program, dataset) - dataset.release_memory() + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.global_shuffle() + exe = paddle.static.Executor(paddle.CPUPlace()) + startup_program = paddle.static.Program() + main_program = paddle.static.Program() + exe.run(startup_program) + exe.train_from_dataset(main_program, dataset) + dataset.release_memory() """ self.dataset.release_memory() @@ -823,13 +914,26 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - print dataset.get_memory_data_size(fleet) + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + print dataset.get_memory_data_size() """ import numpy as np @@ -862,14 +966,28 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet - dataset = paddle.distributed.InMemoryDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.global_shuffle(fleet) - print dataset.get_shuffle_data_size(fleet) + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.global_shuffle() + print dataset.get_shuffle_data_size() """ import numpy as np @@ -897,6 +1015,7 @@ class InMemoryDataset(DatasetBase): .. code-block:: python import paddle + paddle.enable_static() dataset = paddle.distributed.InMemoryDataset() dataset._set_fea_eval(1000000, True) @@ -917,11 +1036,29 @@ class InMemoryDataset(DatasetBase): slots(list[string]): the set of slots(string) to do slots shuffle. Examples: - import paddle - dataset = paddle.distributed.InMemoryDataset() - dataset.set_merge_by_lineid() - #suppose there is a slot 0 - dataset.slots_shuffle(['0']) + .. code-block:: python + + import paddle + paddle.enable_static() + + dataset = paddle.distributed.InMemoryDataset() + dataset._init_distributed_settings(fea_eval=True) + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.slots_shuffle(['slot1']) """ if self.fea_eval: slots_set = set(slots) -- GitLab