test_dataset.py 45.4 KB
Newer Older
X
xjqbest 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
X
xjqbest 已提交
14
"""
X
xjqbest 已提交
15 16
TestCases for Dataset,
including create, config, run, etc.
X
xjqbest 已提交
17
"""
X
xjqbest 已提交
18 19

import os
20
import tempfile
X
xjqbest 已提交
21 22
import unittest

23 24 25 26
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core

X
xjqbest 已提交
27 28

class TestDataset(unittest.TestCase):
29
    """TestCases for Dataset."""
30

Z
Zeng Jinle 已提交
31 32 33 34 35
    def setUp(self):
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

X
xjqbest 已提交
36
    def test_dataset_create(self):
37
        """Testcase for dataset create."""
X
xjqbest 已提交
38
        try:
39
            dataset = paddle.distributed.InMemoryDataset()
X
xjqbest 已提交
40 41 42 43
        except:
            self.assertTrue(False)

        try:
44
            dataset = paddle.distributed.QueueDataset()
X
xjqbest 已提交
45 46 47
        except:
            self.assertTrue(False)

48
        try:
49
            dataset = paddle.distributed.fleet.dataset.FileInstantDataset()
50 51 52
        except:
            self.assertTrue(False)

X
xjqbest 已提交
53
        try:
54
            dataset = paddle.distributed.fleet.dataset.MyOwnDataset()
X
xjqbest 已提交
55 56 57 58
            self.assertTrue(False)
        except:
            self.assertTrue(True)

59 60 61 62 63 64 65
    def test_config(self):
        """
        Testcase for python config.
        """
        dataset = fluid.InMemoryDataset()
        dataset.set_parse_ins_id(True)
        dataset.set_parse_content(True)
66
        dataset._set_trainer_num(1)
67 68
        self.assertTrue(dataset.parse_ins_id)
        self.assertTrue(dataset.parse_content)
69
        self.assertEqual(dataset.trainer_num, 1)
70

71 72 73 74 75 76 77 78
    def test_shuffle_by_uid(self):
        """
        Testcase for shuffle_by_uid.
        """
        dataset = paddle.distributed.InMemoryDataset()
        dataset._set_uid_slot('6048')
        dataset._set_shuffle_by_uid(True)

79 80 81 82
    def test_run_with_dump(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
83 84 85 86 87 88

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
89 90 91 92
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
93
        with open(dump_b_path, "w") as f:
94 95 96 97 98 99 100 101 102
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
103 104 105
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
106 107
            slots_vars.append(var)

108
        dataset = paddle.distributed.InMemoryDataset()
109 110 111
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
112
        dataset.update_settings(pipe_command="cat1")
113 114 115 116 117 118
        dataset._init_distributed_settings(
            parse_ins_id=True,
            parse_content=True,
            fea_eval=True,
            candidate_size=10000,
        )
119
        dataset.set_filelist([dump_a_path, dump_b_path])
120 121 122
        dataset.load_into_memory()
        dataset.local_shuffle()

123 124 125 126 127 128
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
129 130
        for i in range(2):
            try:
131
                exe.train_from_dataset(main_program, dataset)
132 133 134 135 136
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

137
        temp_dir.cleanup()
138

X
xjqbest 已提交
139
    def test_dataset_config(self):
140
        """Testcase for dataset configuration."""
X
xjqbest 已提交
141 142 143 144 145
        dataset = fluid.core.Dataset("MultiSlotDataset")
        dataset.set_thread_num(12)
        dataset.set_filelist(["a.txt", "b.txt", "c.txt"])
        dataset.set_trainer_num(4)
        dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")
146
        dataset.set_download_cmd("./read_from_afs my_fs_name my_fs_ugi")
147
        dataset.set_enable_pv_merge(False)
X
xjqbest 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164

        thread_num = dataset.get_thread_num()
        self.assertEqual(thread_num, 12)

        filelist = dataset.get_filelist()
        self.assertEqual(len(filelist), 3)
        self.assertEqual(filelist[0], "a.txt")
        self.assertEqual(filelist[1], "b.txt")
        self.assertEqual(filelist[2], "c.txt")

        trainer_num = dataset.get_trainer_num()
        self.assertEqual(trainer_num, 4)

        name, ugi = dataset.get_hdfs_config()
        self.assertEqual(name, "my_fs_name")
        self.assertEqual(ugi, "my_fs_ugi")

165 166 167 168 169 170 171
        download_cmd = dataset.get_download_cmd()
        self.assertEqual(download_cmd, "./read_from_afs my_fs_name my_fs_ugi")

    def test_set_download_cmd(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
172
        temp_dir = tempfile.TemporaryDirectory()
173 174 175 176 177 178
        filename1 = os.path.join(
            temp_dir.name, "afs:test_in_memory_dataset_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "afs:test_in_memory_dataset_run_b.txt"
        )
179

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        with open(filename1, "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(filename2, "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
195 196 197
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
198 199
            slots_vars.append(var)

200
        dataset = paddle.distributed.InMemoryDataset()
201 202 203 204 205 206 207
        dataset.init(
            batch_size=32,
            thread_num=3,
            pipe_command="cat",
            download_cmd="cat",
            use_var=slots_vars,
        )
208 209
        dataset.set_filelist([filename1, filename2])
        dataset.load_into_memory()
210 211 212 213 214
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
215
        exe = fluid.Executor(fluid.CPUPlace())
216
        exe.run(startup_program)
217
        if self.use_data_loader:
218
            data_loader = fluid.io.DataLoader.from_dataset(
219 220
                dataset, fluid.cpu_places(), self.drop_last
            )
221 222
            for i in range(self.epoch_num):
                for data in data_loader():
223
                    exe.run(main_program, feed=data)
224 225 226
        else:
            for i in range(self.epoch_num):
                try:
227
                    exe.train_from_dataset(main_program, dataset)
228 229 230
                except Exception as e:
                    self.assertTrue(False)

231
        temp_dir.cleanup()
232

X
xjqbest 已提交
233
    def test_in_memory_dataset_run(self):
X
xjqbest 已提交
234
        """
X
xjqbest 已提交
235
        Testcase for InMemoryDataset from create to run.
X
xjqbest 已提交
236
        """
237
        temp_dir = tempfile.TemporaryDirectory()
238 239 240 241 242 243
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_b.txt"
        )
244 245

        with open(filename1, "w") as f:
X
xjqbest 已提交
246 247 248 249
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
250
        with open(filename2, "w") as f:
X
xjqbest 已提交
251 252 253 254 255 256
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

257
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
258 259
        slots_vars = []
        for slot in slots:
260 261 262
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
X
xjqbest 已提交
263 264
            slots_vars.append(var)

265
        dataset = paddle.distributed.InMemoryDataset()
266 267 268
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
269
        dataset._init_distributed_settings(fea_eval=True, candidate_size=1)
270
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
271
        dataset.load_into_memory()
272
        dataset.slots_shuffle(["slot1"])
X
xjqbest 已提交
273
        dataset.local_shuffle()
274 275
        dataset._set_generate_unique_feasigns(True, 15)
        dataset._generate_local_tables_unlock(0, 11, 1, 25, 15)
X
xjqbest 已提交
276 277
        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
278
        if self.use_data_loader:
279
            data_loader = fluid.io.DataLoader.from_dataset(
280 281
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
282 283 284 285 286 287
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
288 289 290
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
291 292
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
293

294
        temp_dir.cleanup()
X
xjqbest 已提交
295

296 297 298 299
    def test_in_memory_dataset_masterpatch(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
300
        temp_dir = tempfile.TemporaryDirectory()
301 302 303 304 305 306
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch_b.txt"
        )
307 308

        with open(filename1, "w") as f:
309 310 311 312 313 314 315 316 317 318
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
319
        with open(filename2, "w") as f:
320 321 322 323 324 325 326 327 328 329 330 331
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
            for slot in slots[:2]:
332 333 334
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="int64", lod_level=1
                )
335 336
                slots_vars.append(var)
            for slot in slots[2:]:
337 338 339
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
340 341
                slots_vars.append(var)

342
        dataset = paddle.distributed.InMemoryDataset()
343 344 345
        dataset.init(
            batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars
        )
346
        dataset._init_distributed_settings(parse_ins_id=True)
347 348 349 350 351 352
        dataset.set_filelist(
            [
                "test_in_memory_dataset_masterpatch_a.txt",
                "test_in_memory_dataset_masterpatch_b.txt",
            ]
        )
353 354 355 356 357 358 359 360 361 362 363 364 365 366
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

367
        # dataset._set_merge_by_lineid(2)
368
        dataset.update_settings(merge_size=2)
369 370
        dataset.dataset.merge_by_lineid()

371
        temp_dir.cleanup()
372

373 374 375 376
    def test_in_memory_dataset_masterpatch1(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
377
        temp_dir = tempfile.TemporaryDirectory()
378 379 380 381 382 383
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch1_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch1_b.txt"
        )
384 385

        with open(filename1, "w") as f:
386 387 388 389 390 391 392 393 394 395
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
396
        with open(filename2, "w") as f:
397 398 399 400 401 402 403 404 405 406
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
407 408 409 410 411 412 413 414 415 416 417 418
            var1 = fluid.layers.data(
                name="slot1", shape=[1], dtype="int64", lod_level=0
            )
            var2 = fluid.layers.data(
                name="slot2", shape=[1], dtype="int64", lod_level=0
            )
            var3 = fluid.layers.data(
                name="slot3", shape=[1], dtype="float32", lod_level=0
            )
            var4 = fluid.layers.data(
                name="slot4", shape=[1], dtype="float32", lod_level=0
            )
419 420
            slots_vars = [var1, var2, var3, var4]

421
        dataset = paddle.distributed.InMemoryDataset()
422 423 424
        dataset.init(
            batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars
        )
425
        dataset._init_distributed_settings(parse_ins_id=True)
426 427 428 429 430 431
        dataset.set_filelist(
            [
                "test_in_memory_dataset_masterpatch1_a.txt",
                "test_in_memory_dataset_masterpatch1_b.txt",
            ]
        )
432 433 434 435 436 437 438 439 440 441 442 443 444 445
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

446
        dataset._set_merge_by_lineid(2)
447 448
        dataset.dataset.merge_by_lineid()

449
        temp_dir.cleanup()
450

451 452 453 454 455 456
    def test_in_memory_dataset_run_2(self):
        """
        Testcase for InMemoryDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
457
        temp_dir = tempfile.TemporaryDirectory()
458 459 460 461 462 463
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_b.txt"
        )
464 465

        with open(filename1, "w") as f:
466 467 468 469
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
470
        with open(filename2, "w") as f:
471 472 473 474 475 476 477 478 479
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
480 481 482
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="float32", lod_level=1
            )
483 484
            slots_vars.append(var)

485
        dataset = paddle.distributed.InMemoryDataset()
486 487 488
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
489
        dataset.set_filelist([filename1, filename2])
490 491 492
        dataset.load_into_memory()
        dataset.local_shuffle()

493 494 495 496 497
        exe = fluid.Executor(
            fluid.CPUPlace()
            if not core.is_compiled_with_cuda()
            else fluid.CUDAPlace(0)
        )
498
        exe.run(fluid.default_startup_program())
499 500 501 502

        for i in range(2):
            try:
                exe.train_from_dataset(fluid.default_main_program(), dataset)
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=1
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=2
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=2
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=3
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=4
                )
518 519 520 521 522
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

Z
Zeng Jinle 已提交
523
        if self.use_data_loader:
524
            data_loader = fluid.io.DataLoader.from_dataset(
525 526
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
527 528 529 530 531 532
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
533 534 535
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
536 537
                except Exception as e:
                    self.assertTrue(False)
538

539 540 541
        dataset._set_merge_by_lineid(2)
        dataset._set_parse_ins_id(False)
        dataset._set_fleet_send_sleep_seconds(2)
542 543 544 545
        dataset.preload_into_memory()
        dataset.wait_preload_done()
        dataset.preload_into_memory(1)
        dataset.wait_preload_done()
546
        dataset.dataset.merge_by_lineid()
547 548
        dataset._set_merge_by_lineid(30)
        dataset._set_parse_ins_id(False)
549 550
        dataset.load_into_memory()
        dataset.dataset.merge_by_lineid()
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
        dataset.update_settings(
            batch_size=1,
            thread_num=2,
            input_type=1,
            pipe_command="cat",
            use_var=[],
            fs_name="",
            fs_ugi="",
            download_cmd="cat",
            merge_size=-1,
            parse_ins_id=False,
            parse_content=False,
            fleet_send_batch_size=2,
            fleet_send_sleep_seconds=2,
            fea_eval=True,
        )
567
        fleet_ptr = fluid.core.Fleet()
568
        fleet_ptr.set_client2client_config(1, 1, 1)
569
        fleet_ptr.get_cache_threshold(0)
570

571
        temp_dir.cleanup()
572

X
xjqbest 已提交
573
    def test_queue_dataset_run(self):
X
xjqbest 已提交
574
        """
X
xjqbest 已提交
575
        Testcase for QueueDataset from create to run.
X
xjqbest 已提交
576
        """
577 578 579 580 581
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
X
xjqbest 已提交
582 583 584 585
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
586
        with open(filename2, "w") as f:
X
xjqbest 已提交
587 588 589 590 591 592
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

593
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
594 595
        slots_vars = []
        for slot in slots:
596 597 598
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
X
xjqbest 已提交
599 600
            slots_vars.append(var)

601
        dataset = paddle.distributed.QueueDataset()
602 603 604
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
605
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
606 607 608

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
609
        if self.use_data_loader:
610
            data_loader = fluid.io.DataLoader.from_dataset(
611 612
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
613 614 615 616 617 618
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
619 620 621
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
622 623
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
624

625
        dataset2 = paddle.distributed.QueueDataset()
626 627 628
        dataset2.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
629 630 631 632 633 634 635 636
        dataset.set_filelist([])
        try:
            exe.train_from_dataset(fluid.default_main_program(), dataset2)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

637
        temp_dir.cleanup()
X
xjqbest 已提交
638

639 640 641 642 643 644
    def test_queue_dataset_run_2(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
645 646 647 648 649
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
650 651 652 653
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
654
        with open(filename2, "w") as f:
655 656 657 658 659 660 661 662 663
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
664 665 666
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="float32", lod_level=1
            )
667 668
            slots_vars.append(var)

669
        dataset = paddle.distributed.QueueDataset()
670 671 672
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
673
        dataset.set_filelist([filename1, filename2])
674

675 676 677 678 679
        exe = fluid.Executor(
            fluid.CPUPlace()
            if not core.is_compiled_with_cuda()
            else fluid.CUDAPlace(0)
        )
680 681
        exe.run(fluid.default_startup_program())
        if self.use_data_loader:
682
            data_loader = fluid.io.DataLoader.from_dataset(
683 684
                dataset, fluid.cpu_places(), self.drop_last
            )
685 686 687 688 689 690
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
691 692 693
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
694 695 696
                except Exception as e:
                    self.assertTrue(False)

697
        temp_dir.cleanup()
698 699 700 701 702 703 704

    def test_queue_dataset_run_3(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
705 706 707 708 709
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
710 711 712 713 714
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)
715
        with open(filename2, "w") as f:
716 717 718 719 720 721 722 723 724
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
725 726 727
            var = fluid.data(
                name=slot, shape=[None, 1], dtype="int64", lod_level=1
            )
728 729
            slots_vars.append(var)

730
        dataset = paddle.distributed.InMemoryDataset()
731 732 733 734 735 736 737
        dataset.init(
            batch_size=1,
            thread_num=2,
            input_type=1,
            pipe_command="cat",
            use_var=slots_vars,
        )
738
        dataset.set_filelist([filename1, filename2])
739 740
        dataset.load_into_memory()

741 742 743 744 745
        exe = fluid.Executor(
            fluid.CPUPlace()
            if not core.is_compiled_with_cuda()
            else fluid.CUDAPlace(0)
        )
746
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
747
        if self.use_data_loader:
748
            data_loader = fluid.io.DataLoader.from_dataset(
749 750
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
751 752 753 754 755 756
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
757 758 759
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
760 761
                except Exception as e:
                    self.assertTrue(False)
762

763
        temp_dir.cleanup()
764

D
danleifeng 已提交
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788
    def test_run_with_inmemory_dataset_train_debug_mode(self):
        """
        Testcase for InMemoryDataset from create to run.
        """

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(dump_b_path, "w") as f:
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
789 790 791
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
D
danleifeng 已提交
792 793 794
            slots_vars.append(var)

        dataset = paddle.distributed.InMemoryDataset()
795 796 797 798 799 800 801 802 803 804 805 806 807
        dataset.init(
            batch_size=32,
            thread_num=1,
            pipe_command="cat",
            data_feed_type="SlotRecordInMemoryDataFeed",
            use_var=slots_vars,
        )
        dataset._init_distributed_settings(
            parse_ins_id=True,
            parse_content=True,
            fea_eval=True,
            candidate_size=10000,
        )
D
danleifeng 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
        dataset.set_filelist([dump_a_path, dump_b_path])
        dataset.load_into_memory()

        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
        for i in range(2):
            try:
                exe.train_from_dataset(main_program, dataset, debug=True)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

        temp_dir.cleanup()

X
xjqbest 已提交
827

Z
Zeng Jinle 已提交
828
class TestDatasetWithDataLoader(TestDataset):
X
xujiaqi01 已提交
829 830 831 832
    """
    Test Dataset With Data Loader class. TestCases.
    """

Z
Zeng Jinle 已提交
833
    def setUp(self):
X
xujiaqi01 已提交
834 835 836
        """
        Test Dataset With Data Loader, setUp.
        """
Z
Zeng Jinle 已提交
837 838 839 840 841
        self.use_data_loader = True
        self.epoch_num = 10
        self.drop_last = False


842
class TestDatasetWithFetchHandler(unittest.TestCase):
X
xujiaqi01 已提交
843 844 845 846
    """
    Test Dataset With Fetch Handler. TestCases.
    """

847
    def net(self):
X
xujiaqi01 已提交
848 849 850
        """
        Test Dataset With Fetch Handler. TestCases.
        """
851 852 853 854
        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        poolings = []
        for slot in slots:
855 856 857
            data = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
858 859 860 861 862 863 864 865 866 867 868
            var = fluid.layers.cast(x=data, dtype='float32')
            pool = fluid.layers.sequence_pool(input=var, pool_type='AVERAGE')

            slots_vars.append(data)
            poolings.append(pool)

        concated = fluid.layers.concat(poolings, axis=1)
        fc = fluid.layers.fc(input=concated, act='tanh', size=32)
        return slots_vars, fc

    def get_dataset(self, inputs, files):
X
xujiaqi01 已提交
869 870 871 872 873 874 875
        """
        Test Dataset With Fetch Handler. TestCases.

        Args:
            inputs(list): inputs of get_dataset
            files(list): files of  get_dataset
        """
876
        dataset = paddle.distributed.QueueDataset()
877 878 879
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=inputs
        )
880 881 882 883
        dataset.set_filelist(files)
        return dataset

    def setUp(self):
X
xujiaqi01 已提交
884 885 886
        """
        Test Dataset With Fetch Handler. TestCases.
        """
887
        self.temp_dir = tempfile.TemporaryDirectory()
888 889 890 891 892 893
        self.filename1 = os.path.join(
            self.temp_dir.name, "test_queue_dataset_run_a.txt"
        )
        self.filename2 = os.path.join(
            self.temp_dir.name, "test_queue_dataset_run_b.txt"
        )
894 895

        with open(self.filename1, "w") as f:
896 897 898 899
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
900
        with open(self.filename2, "w") as f:
901 902 903 904 905 906 907
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

    def tearDown(self):
X
xujiaqi01 已提交
908 909 910
        """
        Test Dataset With Fetch Handler. TestCases.
        """
911
        self.temp_dir.cleanup()
912 913

    def test_dataset_none(self):
X
xujiaqi01 已提交
914 915 916
        """
        Test Dataset With Fetch Handler. TestCases.
        """
917
        slots_vars, out = self.net()
918
        files = [self.filename1, self.filename2]
919 920 921 922 923 924 925 926 927 928 929 930
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        # test dataset->None
        try:
            exe.train_from_dataset(fluid.default_main_program(), None)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
931
            self.assertEqual(error_msg, str(e))
932 933 934 935
        except Exception as e:
            self.assertTrue(False)

    def test_infer_from_dataset(self):
X
xujiaqi01 已提交
936 937 938
        """
        Test Dataset With Fetch Handler. TestCases.
        """
939
        slots_vars, out = self.net()
940
        files = [self.filename1, self.filename2]
941 942 943 944 945 946 947 948 949 950 951 952
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        try:
            exe.infer_from_dataset(fluid.default_main_program(), dataset)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

953 954 955 956 957
    def test_fetch_handler(self):
        """
        Test Dataset With Fetch Handler. TestCases.
        """
        slots_vars, out = self.net()
958
        files = [self.filename1, self.filename2]
959 960 961 962 963 964 965 966 967
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        fh = fluid.executor.FetchHandler(out.name)
        fh.help()

        try:
968 969 970 971 972
            exe.train_from_dataset(
                program=fluid.default_main_program(),
                dataset=dataset,
                fetch_handler=fh,
            )
973 974 975 976
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
977
            self.assertEqual(error_msg, str(e))
978 979 980
        except Exception as e:
            self.assertTrue(False)

981

X
xujiaqi01 已提交
982
class TestDataset2(unittest.TestCase):
983
    """TestCases for Dataset."""
X
xujiaqi01 已提交
984 985

    def setUp(self):
986
        """TestCases for Dataset."""
X
xujiaqi01 已提交
987 988 989 990 991 992 993 994
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

    def test_dataset_fleet(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
995
        temp_dir = tempfile.TemporaryDirectory()
996 997 998 999 1000 1001
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run_b.txt"
        )
1002 1003 1004

        self.skipTest("parameter server will add pslib UT later")

1005
        with open(filename1, "w") as f:
X
xujiaqi01 已提交
1006 1007 1008 1009
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1010
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
1020 1021 1022 1023
        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
            fleet,
        )

X
xujiaqi01 已提交
1024 1025 1026 1027
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
1028 1029 1030
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
X
xujiaqi01 已提交
1031
                slots_vars.append(var)
1032
            fake_cost = paddle.subtract(slots_vars[0], slots_vars[-1])
1033
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1034 1035 1036 1037
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1038
                fleet.init()
X
xujiaqi01 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
                adam = fleet.distributed_optimizer(adam)
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1050 1051
            dataset = paddle.distributed.InMemoryDataset()

1052 1053 1054 1055 1056 1057
            dataset.init(
                batch_size=32,
                thread_num=3,
                pipe_command="cat",
                use_var=slots_vars,
            )
1058
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1059 1060 1061 1062
            dataset.load_into_memory()
            fleet._opt_info = None
            fleet._fleet_ptr = None

1063
        temp_dir.cleanup()
X
xujiaqi01 已提交
1064 1065 1066 1067 1068

    def test_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1069
        temp_dir = tempfile.TemporaryDirectory()
1070 1071 1072 1073 1074 1075
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_b.txt"
        )
1076 1077

        with open(filename1, "w") as f:
X
xujiaqi01 已提交
1078 1079 1080 1081
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1082
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
1083 1084 1085 1086 1087 1088 1089 1090 1091
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
1092
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
1093

X
xujiaqi01 已提交
1094 1095 1096 1097
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
1098 1099 1100
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
X
xujiaqi01 已提交
1101
                slots_vars.append(var)
1102
            fake_cost = paddle.subtract(slots_vars[0], slots_vars[-1])
1103
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1104 1105 1106 1107
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1108
                fleet.init()
X
xujiaqi01 已提交
1109 1110 1111 1112
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1113 1114 1115 1116 1117 1118 1119 1120 1121
                adam = fleet.distributed_optimizer(
                    adam,
                    strategy={
                        "fs_uri": "fs_uri_xxx",
                        "fs_user": "fs_user_xxx",
                        "fs_passwd": "fs_passwd_xxx",
                        "fs_hadoop_bin": "fs_hadoop_bin_xxx",
                    },
                )
X
xujiaqi01 已提交
1122 1123 1124 1125 1126 1127
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1128
            dataset = paddle.distributed.InMemoryDataset()
1129 1130 1131 1132 1133 1134
            dataset.init(
                batch_size=32,
                thread_num=3,
                pipe_command="cat",
                use_var=slots_vars,
            )
1135
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1136
            dataset.load_into_memory()
X
xujiaqi01 已提交
1137 1138 1139 1140
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1141 1142
            fleet._opt_info = None
            fleet._fleet_ptr = None
1143 1144
            dataset = paddle.distributed.InMemoryDataset()
            dataset.init(fs_name="", fs_ugi="")
1145
            d = paddle.distributed.fleet.DatasetBase()
1146
            try:
1147
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
1167
            dataset._set_fleet_send_batch_size(1024)
1168 1169 1170 1171
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1172
            # dataset.get_pv_data_size()
1173 1174
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1175
            dataset = paddle.distributed.QueueDataset()
1176 1177 1178 1179 1180 1181 1182 1183
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1184
            dataset = paddle.distributed.fleet.FileInstantDataset()
1185 1186 1187 1188 1189 1190 1191 1192
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1193

1194
        temp_dir.cleanup()
X
xujiaqi01 已提交
1195

1196 1197 1198 1199
    def test_bosps_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1200
        temp_dir = tempfile.TemporaryDirectory()
1201 1202 1203 1204 1205 1206
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_b.txt"
        )
1207 1208

        with open(filename1, "w") as f:
1209 1210 1211 1212
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1213
        with open(filename2, "w") as f:
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
1224

1225 1226 1227 1228
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
1229 1230 1231
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
1232
                slots_vars.append(var)
1233
            fake_cost = paddle.subtract(slots_vars[0], slots_vars[-1])
1234
            fake_cost = paddle.mean(fake_cost)
1235 1236 1237 1238 1239 1240 1241 1242 1243
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
                fleet.init()
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1244 1245 1246 1247 1248 1249 1250 1251 1252
                adam = fleet.distributed_optimizer(
                    adam,
                    strategy={
                        "fs_uri": "fs_uri_xxx",
                        "fs_user": "fs_user_xxx",
                        "fs_passwd": "fs_passwd_xxx",
                        "fs_hadoop_bin": "fs_hadoop_bin_xxx",
                    },
                )
1253 1254 1255 1256 1257 1258 1259
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
            dataset = paddle.distributed.fleet.BoxPSDataset()
1260 1261 1262 1263 1264 1265
            dataset.init(
                batch_size=32,
                thread_num=3,
                pipe_command="cat",
                use_var=slots_vars,
            )
1266
            dataset.set_filelist([filename1, filename2])
1267 1268 1269 1270 1271 1272 1273 1274
            dataset.load_into_memory()
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
            fleet._opt_info = None
            fleet._fleet_ptr = None
            dataset = paddle.distributed.fleet.BoxPSDataset()
1275 1276 1277 1278 1279 1280 1281 1282 1283 1284
            dataset.init(
                rank_offset="",
                pv_batch_size=1,
                fs_name="",
                fs_ugi="",
                data_feed_type="MultiSlotInMemoryDataFeed",
                parse_logkey=True,
                merge_by_sid=True,
                enable_pv_merge=True,
            )
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
            d = paddle.distributed.fleet.DatasetBase()
            try:
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            dataset._set_parse_logkey(True)
            dataset._set_merge_by_sid(True)
            dataset._set_enable_pv_merge(True)
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
            dataset._set_fleet_send_batch_size(1024)
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1315
            # dataset.get_pv_data_size()
1316 1317
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1318
        temp_dir.cleanup()
1319

X
xujiaqi01 已提交
1320

X
xjqbest 已提交
1321
if __name__ == '__main__':
X
xjqbest 已提交
1322
    unittest.main()