From edb63090e69cf01f8bd443895a5ca62b4bf16fe5 Mon Sep 17 00:00:00 2001 From: yaoxuefeng Date: Mon, 21 Sep 2020 14:42:38 +0800 Subject: [PATCH] update --- .../paddle/distributed/InMemoryDataset_cn.rst | 182 ++++++++++++++++-- .../paddle/distributed/QueueDataset_cn.rst | 49 ++++- 2 files changed, 216 insertions(+), 15 deletions(-) diff --git a/doc/paddle/api/paddle/distributed/InMemoryDataset_cn.rst b/doc/paddle/api/paddle/distributed/InMemoryDataset_cn.rst index e90fa3fac..d30d828a2 100644 --- a/doc/paddle/api/paddle/distributed/InMemoryDataset_cn.rst +++ b/doc/paddle/api/paddle/distributed/InMemoryDataset_cn.rst @@ -15,6 +15,7 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 .. code-block:: python + import paddle dataset = paddle.distributed.InMemoryDataset() .. py:method:: init(**kwargs) @@ -46,6 +47,8 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 import paddle + import paddle.fluid as fluid + import os with open("test_queue_dataset_run_a.txt", "w") as f: data = "2 1 2 2 5 4 2 2 7 2 1 3\n" data += "2 6 2 2 1 4 2 2 4 2 2 3\n" @@ -180,6 +183,50 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 dataset.update_settings(batch_size=2) +.. py:method:: set_filelist(filelist) + +在当前的worker中设置文件列表。 + +**代码示例**: + +.. code-block:: python + + import paddle + import os + with open("test_queue_dataset_run_a.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + with open("test_queue_dataset_run_b.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + os.remove("./test_queue_dataset_run_a.txt") + os.remove("./test_queue_dataset_run_b.txt") + + +参数: + - **filelist** (list) - 文件列表 + .. py:method:: load_into_memory() **注意:** @@ -194,6 +241,18 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 import paddle dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -211,6 +270,18 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 import paddle dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -226,6 +297,18 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 import paddle dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.preload_into_memory() @@ -241,6 +324,18 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 import paddle dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() @@ -257,12 +352,23 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 .. code-block:: python import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() - dataset.global_shuffle(fleet) + dataset.global_shuffle() 参数: - **fleet** (Fleet) – fleet单例。默认为None。 @@ -277,12 +383,23 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 .. code-block:: python import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() - dataset.global_shuffle(fleet) + dataset.global_shuffle() exe = paddle.static.Executor(paddle.CPUPlace()) startup_program = paddle.static.Program() main_program = paddle.static.Program() @@ -307,12 +424,23 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 .. code-block:: python import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() - print dataset.get_memory_data_size(fleet) + print dataset.get_memory_data_size() .. py:method:: get_shuffle_data_size(fleet=None) @@ -332,13 +460,25 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 .. code-block:: python import paddle - from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet dataset = paddle.distributed.InMemoryDataset() + dataset = paddle.distributed.InMemoryDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) filelist = ["a.txt", "b.txt"] dataset.set_filelist(filelist) dataset.load_into_memory() - dataset.global_shuffle(fleet) - print dataset.get_shuffle_data_size(fleet) + dataset.global_shuffle() + print dataset.get_shuffle_data_size() .. py:method:: slots_shuffle(slots) @@ -352,8 +492,24 @@ InMemoryDataset会根据用户自定义的预处理指令预处理原始数据 .. code-block:: python import paddle - dataset = paddle.distributed.InMemoryDataset() - dataset.set_merge_by_lineid() - #suppose there is a slot 0 - dataset.slots_shuffle(['0']) + dataset = paddle.distributed.InMemoryDataset() + dataset._init_distributed_settings(fea_eval=True) + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + dataset.load_into_memory() + dataset.slots_shuffle(['slot1']) + + diff --git a/doc/paddle/api/paddle/distributed/QueueDataset_cn.rst b/doc/paddle/api/paddle/distributed/QueueDataset_cn.rst index e731c9251..a6d22d01e 100644 --- a/doc/paddle/api/paddle/distributed/QueueDataset_cn.rst +++ b/doc/paddle/api/paddle/distributed/QueueDataset_cn.rst @@ -47,6 +47,8 @@ QueueyDataset是流式处理数据使用Dataset类。与InmemoryDataset继承自 import paddle + import paddle.fluid as fluid + import os with open("test_queue_dataset_run_a.txt", "w") as f: data = "2 1 2 2 5 4 2 2 7 2 1 3\n" data += "2 6 2 2 1 4 2 2 4 2 2 3\n" @@ -63,7 +65,7 @@ QueueyDataset是流式处理数据使用Dataset类。与InmemoryDataset继承自 slots = ["slot1", "slot2", "slot3", "slot4"] slots_vars = [] for slot in slots: - var = fluid.data( + var = paddle.static.data( name=slot, shape=[None, 1], dtype="int64", lod_level=1) slots_vars.append(var) @@ -76,7 +78,6 @@ QueueyDataset是流式处理数据使用Dataset类。与InmemoryDataset继承自 use_var=slots_vars) dataset.set_filelist( ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.load_into_memory() paddle.enable_static() @@ -90,3 +91,47 @@ QueueyDataset是流式处理数据使用Dataset类。与InmemoryDataset继承自 os.remove("./test_queue_dataset_run_a.txt") os.remove("./test_queue_dataset_run_b.txt") + + .. py:method:: set_filelist(filelist) + +在当前的worker中设置文件列表。 + +**代码示例**: + +.. code-block:: python + + import paddle + import os + with open("test_queue_dataset_run_a.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + with open("test_queue_dataset_run_b.txt", "w") as f: + data = "2 1 2 2 5 4 2 2 7 2 1 3\n" + data += "2 6 2 2 1 4 2 2 4 2 2 3\n" + data += "2 5 2 2 9 9 2 2 7 2 1 3\n" + data += "2 7 2 2 1 9 2 3 7 2 5 3\n" + f.write(data) + dataset = paddle.distributed.QueueDataset() + slots = ["slot1", "slot2", "slot3", "slot4"] + slots_vars = [] + for slot in slots: + var = paddle.static.data( + name=slot, shape=[None, 1], dtype="int64", lod_level=1) + slots_vars.append(var) + dataset.init( + batch_size=1, + thread_num=2, + input_type=1, + pipe_command="cat", + use_var=slots_vars) + filelist = ["a.txt", "b.txt"] + dataset.set_filelist(filelist) + os.remove("./test_queue_dataset_run_a.txt") + os.remove("./test_queue_dataset_run_b.txt") + + +参数: + - **filelist** (list) - 文件列表 -- GitLab