test_dataset.py 45.5 KB
Newer Older
X
xjqbest 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
X
xjqbest 已提交
14
"""
X
xjqbest 已提交
15 16
TestCases for Dataset,
including create, config, run, etc.
X
xjqbest 已提交
17
"""
X
xjqbest 已提交
18

19
import paddle
X
xjqbest 已提交
20
import paddle.fluid as fluid
21
import paddle.fluid.core as core
X
xjqbest 已提交
22
import os
23
import tempfile
X
xjqbest 已提交
24 25 26 27
import unittest


class TestDataset(unittest.TestCase):
28
    """TestCases for Dataset."""
29

Z
Zeng Jinle 已提交
30 31 32 33 34
    def setUp(self):
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

X
xjqbest 已提交
35
    def test_dataset_create(self):
36
        """Testcase for dataset create."""
X
xjqbest 已提交
37
        try:
38
            dataset = paddle.distributed.InMemoryDataset()
X
xjqbest 已提交
39 40 41 42
        except:
            self.assertTrue(False)

        try:
43
            dataset = paddle.distributed.QueueDataset()
X
xjqbest 已提交
44 45 46
        except:
            self.assertTrue(False)

47
        try:
48
            dataset = paddle.distributed.fleet.dataset.FileInstantDataset()
49 50 51
        except:
            self.assertTrue(False)

X
xjqbest 已提交
52
        try:
53
            dataset = paddle.distributed.fleet.dataset.MyOwnDataset()
X
xjqbest 已提交
54 55 56 57
            self.assertTrue(False)
        except:
            self.assertTrue(True)

58 59 60 61 62 63 64
    def test_config(self):
        """
        Testcase for python config.
        """
        dataset = fluid.InMemoryDataset()
        dataset.set_parse_ins_id(True)
        dataset.set_parse_content(True)
65
        dataset._set_trainer_num(1)
66 67
        self.assertTrue(dataset.parse_ins_id)
        self.assertTrue(dataset.parse_content)
68
        self.assertEqual(dataset.trainer_num, 1)
69

70 71 72 73 74 75 76 77
    def test_shuffle_by_uid(self):
        """
        Testcase for shuffle_by_uid.
        """
        dataset = paddle.distributed.InMemoryDataset()
        dataset._set_uid_slot('6048')
        dataset._set_shuffle_by_uid(True)

78 79 80 81
    def test_run_with_dump(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
82 83 84 85 86 87

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
88 89 90 91
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
92
        with open(dump_b_path, "w") as f:
93 94 95 96 97 98 99 100 101
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
102 103 104
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
105 106
            slots_vars.append(var)

107
        dataset = paddle.distributed.InMemoryDataset()
108 109 110
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
111
        dataset.update_settings(pipe_command="cat1")
112 113 114 115 116 117
        dataset._init_distributed_settings(
            parse_ins_id=True,
            parse_content=True,
            fea_eval=True,
            candidate_size=10000,
        )
118
        dataset.set_filelist([dump_a_path, dump_b_path])
119 120 121
        dataset.load_into_memory()
        dataset.local_shuffle()

122 123 124 125 126 127
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
128 129
        for i in range(2):
            try:
130
                exe.train_from_dataset(main_program, dataset)
131 132 133 134 135
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

136
        temp_dir.cleanup()
137

X
xjqbest 已提交
138
    def test_dataset_config(self):
139
        """Testcase for dataset configuration."""
X
xjqbest 已提交
140 141 142 143 144
        dataset = fluid.core.Dataset("MultiSlotDataset")
        dataset.set_thread_num(12)
        dataset.set_filelist(["a.txt", "b.txt", "c.txt"])
        dataset.set_trainer_num(4)
        dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")
145
        dataset.set_download_cmd("./read_from_afs my_fs_name my_fs_ugi")
146
        dataset.set_enable_pv_merge(False)
X
xjqbest 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163

        thread_num = dataset.get_thread_num()
        self.assertEqual(thread_num, 12)

        filelist = dataset.get_filelist()
        self.assertEqual(len(filelist), 3)
        self.assertEqual(filelist[0], "a.txt")
        self.assertEqual(filelist[1], "b.txt")
        self.assertEqual(filelist[2], "c.txt")

        trainer_num = dataset.get_trainer_num()
        self.assertEqual(trainer_num, 4)

        name, ugi = dataset.get_hdfs_config()
        self.assertEqual(name, "my_fs_name")
        self.assertEqual(ugi, "my_fs_ugi")

164 165 166 167 168 169 170
        download_cmd = dataset.get_download_cmd()
        self.assertEqual(download_cmd, "./read_from_afs my_fs_name my_fs_ugi")

    def test_set_download_cmd(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
171
        temp_dir = tempfile.TemporaryDirectory()
172 173 174 175 176 177
        filename1 = os.path.join(
            temp_dir.name, "afs:test_in_memory_dataset_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "afs:test_in_memory_dataset_run_b.txt"
        )
178

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
        with open(filename1, "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(filename2, "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
194 195 196
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
197 198
            slots_vars.append(var)

199
        dataset = paddle.distributed.InMemoryDataset()
200 201 202 203 204 205 206
        dataset.init(
            batch_size=32,
            thread_num=3,
            pipe_command="cat",
            download_cmd="cat",
            use_var=slots_vars,
        )
207 208
        dataset.set_filelist([filename1, filename2])
        dataset.load_into_memory()
209 210 211 212 213
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
214
        exe = fluid.Executor(fluid.CPUPlace())
215
        exe.run(startup_program)
216
        if self.use_data_loader:
217
            data_loader = fluid.io.DataLoader.from_dataset(
218 219
                dataset, fluid.cpu_places(), self.drop_last
            )
220 221
            for i in range(self.epoch_num):
                for data in data_loader():
222
                    exe.run(main_program, feed=data)
223 224 225
        else:
            for i in range(self.epoch_num):
                try:
226
                    exe.train_from_dataset(main_program, dataset)
227 228 229
                except Exception as e:
                    self.assertTrue(False)

230
        temp_dir.cleanup()
231

X
xjqbest 已提交
232
    def test_in_memory_dataset_run(self):
X
xjqbest 已提交
233
        """
X
xjqbest 已提交
234
        Testcase for InMemoryDataset from create to run.
X
xjqbest 已提交
235
        """
236
        temp_dir = tempfile.TemporaryDirectory()
237 238 239 240 241 242
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_b.txt"
        )
243 244

        with open(filename1, "w") as f:
X
xjqbest 已提交
245 246 247 248
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
249
        with open(filename2, "w") as f:
X
xjqbest 已提交
250 251 252 253 254 255
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

256
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
257 258
        slots_vars = []
        for slot in slots:
259 260 261
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
X
xjqbest 已提交
262 263
            slots_vars.append(var)

264
        dataset = paddle.distributed.InMemoryDataset()
265 266 267
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
268
        dataset._init_distributed_settings(fea_eval=True, candidate_size=1)
269
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
270
        dataset.load_into_memory()
271
        dataset.slots_shuffle(["slot1"])
X
xjqbest 已提交
272
        dataset.local_shuffle()
273 274
        dataset._set_generate_unique_feasigns(True, 15)
        dataset._generate_local_tables_unlock(0, 11, 1, 25, 15)
X
xjqbest 已提交
275 276
        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
277
        if self.use_data_loader:
278
            data_loader = fluid.io.DataLoader.from_dataset(
279 280
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
281 282 283 284 285 286
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
287 288 289
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
290 291
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
292

293
        temp_dir.cleanup()
X
xjqbest 已提交
294

295 296 297 298
    def test_in_memory_dataset_masterpatch(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
299
        temp_dir = tempfile.TemporaryDirectory()
300 301 302 303 304 305
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch_b.txt"
        )
306 307

        with open(filename1, "w") as f:
308 309 310 311 312 313 314 315 316 317
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
318
        with open(filename2, "w") as f:
319 320 321 322 323 324 325 326 327 328 329 330
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
            for slot in slots[:2]:
331 332 333
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="int64", lod_level=1
                )
334 335
                slots_vars.append(var)
            for slot in slots[2:]:
336 337 338
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
339 340
                slots_vars.append(var)

341
        dataset = paddle.distributed.InMemoryDataset()
342 343 344
        dataset.init(
            batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars
        )
345
        dataset._init_distributed_settings(parse_ins_id=True)
346 347 348 349 350 351
        dataset.set_filelist(
            [
                "test_in_memory_dataset_masterpatch_a.txt",
                "test_in_memory_dataset_masterpatch_b.txt",
            ]
        )
352 353 354 355 356 357 358 359 360 361 362 363 364 365
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

366
        # dataset._set_merge_by_lineid(2)
367
        dataset.update_settings(merge_size=2)
368 369
        dataset.dataset.merge_by_lineid()

370
        temp_dir.cleanup()
371

372 373 374 375
    def test_in_memory_dataset_masterpatch1(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
376
        temp_dir = tempfile.TemporaryDirectory()
377 378 379 380 381 382
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch1_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_masterpatch1_b.txt"
        )
383 384

        with open(filename1, "w") as f:
385 386 387 388 389 390 391 392 393 394
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
395
        with open(filename2, "w") as f:
396 397 398 399 400 401 402 403 404 405
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
406 407 408 409 410 411 412 413 414 415 416 417
            var1 = fluid.layers.data(
                name="slot1", shape=[1], dtype="int64", lod_level=0
            )
            var2 = fluid.layers.data(
                name="slot2", shape=[1], dtype="int64", lod_level=0
            )
            var3 = fluid.layers.data(
                name="slot3", shape=[1], dtype="float32", lod_level=0
            )
            var4 = fluid.layers.data(
                name="slot4", shape=[1], dtype="float32", lod_level=0
            )
418 419
            slots_vars = [var1, var2, var3, var4]

420
        dataset = paddle.distributed.InMemoryDataset()
421 422 423
        dataset.init(
            batch_size=32, thread_num=1, pipe_command="cat", use_var=slots_vars
        )
424
        dataset._init_distributed_settings(parse_ins_id=True)
425 426 427 428 429 430
        dataset.set_filelist(
            [
                "test_in_memory_dataset_masterpatch1_a.txt",
                "test_in_memory_dataset_masterpatch1_b.txt",
            ]
        )
431 432 433 434 435 436 437 438 439 440 441 442 443 444
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

445
        dataset._set_merge_by_lineid(2)
446 447
        dataset.dataset.merge_by_lineid()

448
        temp_dir.cleanup()
449

450 451 452 453 454 455
    def test_in_memory_dataset_run_2(self):
        """
        Testcase for InMemoryDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
456
        temp_dir = tempfile.TemporaryDirectory()
457 458 459 460 461 462
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset_run_b.txt"
        )
463 464

        with open(filename1, "w") as f:
465 466 467 468
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
469
        with open(filename2, "w") as f:
470 471 472 473 474 475 476 477 478
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
479 480 481
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="float32", lod_level=1
            )
482 483
            slots_vars.append(var)

484
        dataset = paddle.distributed.InMemoryDataset()
485 486 487
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
488
        dataset.set_filelist([filename1, filename2])
489 490 491
        dataset.load_into_memory()
        dataset.local_shuffle()

492 493 494 495 496
        exe = fluid.Executor(
            fluid.CPUPlace()
            if not core.is_compiled_with_cuda()
            else fluid.CUDAPlace(0)
        )
497
        exe.run(fluid.default_startup_program())
498 499 500 501

        for i in range(2):
            try:
                exe.train_from_dataset(fluid.default_main_program(), dataset)
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=1
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=2
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=2
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=3
                )
                exe.train_from_dataset(
                    fluid.default_main_program(), dataset, thread=4
                )
517 518 519 520 521
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

Z
Zeng Jinle 已提交
522
        if self.use_data_loader:
523
            data_loader = fluid.io.DataLoader.from_dataset(
524 525
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
526 527 528 529 530 531
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
532 533 534
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
535 536
                except Exception as e:
                    self.assertTrue(False)
537

538 539 540
        dataset._set_merge_by_lineid(2)
        dataset._set_parse_ins_id(False)
        dataset._set_fleet_send_sleep_seconds(2)
541 542 543 544
        dataset.preload_into_memory()
        dataset.wait_preload_done()
        dataset.preload_into_memory(1)
        dataset.wait_preload_done()
545
        dataset.dataset.merge_by_lineid()
546 547
        dataset._set_merge_by_lineid(30)
        dataset._set_parse_ins_id(False)
548 549
        dataset.load_into_memory()
        dataset.dataset.merge_by_lineid()
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
        dataset.update_settings(
            batch_size=1,
            thread_num=2,
            input_type=1,
            pipe_command="cat",
            use_var=[],
            fs_name="",
            fs_ugi="",
            download_cmd="cat",
            merge_size=-1,
            parse_ins_id=False,
            parse_content=False,
            fleet_send_batch_size=2,
            fleet_send_sleep_seconds=2,
            fea_eval=True,
        )
566
        fleet_ptr = fluid.core.Fleet()
567
        fleet_ptr.set_client2client_config(1, 1, 1)
568
        fleet_ptr.get_cache_threshold(0)
569

570
        temp_dir.cleanup()
571

X
xjqbest 已提交
572
    def test_queue_dataset_run(self):
X
xjqbest 已提交
573
        """
X
xjqbest 已提交
574
        Testcase for QueueDataset from create to run.
X
xjqbest 已提交
575
        """
576 577 578 579 580
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
X
xjqbest 已提交
581 582 583 584
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
585
        with open(filename2, "w") as f:
X
xjqbest 已提交
586 587 588 589 590 591
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

592
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
593 594
        slots_vars = []
        for slot in slots:
595 596 597
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
X
xjqbest 已提交
598 599
            slots_vars.append(var)

600
        dataset = paddle.distributed.QueueDataset()
601 602 603
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
604
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
605 606 607

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
608
        if self.use_data_loader:
609
            data_loader = fluid.io.DataLoader.from_dataset(
610 611
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
612 613 614 615 616 617
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
618 619 620
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
621 622
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
623

624
        dataset2 = paddle.distributed.QueueDataset()
625 626 627
        dataset2.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
628 629 630 631 632 633 634 635
        dataset.set_filelist([])
        try:
            exe.train_from_dataset(fluid.default_main_program(), dataset2)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

636
        temp_dir.cleanup()
X
xjqbest 已提交
637

638 639 640 641 642 643
    def test_queue_dataset_run_2(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
644 645 646 647 648
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
649 650 651 652
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
653
        with open(filename2, "w") as f:
654 655 656 657 658 659 660 661 662
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
663 664 665
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="float32", lod_level=1
            )
666 667
            slots_vars.append(var)

668
        dataset = paddle.distributed.QueueDataset()
669 670 671
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
672
        dataset.set_filelist([filename1, filename2])
673

674 675 676 677 678
        exe = fluid.Executor(
            fluid.CPUPlace()
            if not core.is_compiled_with_cuda()
            else fluid.CUDAPlace(0)
        )
679 680
        exe.run(fluid.default_startup_program())
        if self.use_data_loader:
681
            data_loader = fluid.io.DataLoader.from_dataset(
682 683
                dataset, fluid.cpu_places(), self.drop_last
            )
684 685 686 687 688 689
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
690 691 692
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
693 694 695
                except Exception as e:
                    self.assertTrue(False)

696
        temp_dir.cleanup()
697 698 699 700 701 702 703

    def test_queue_dataset_run_3(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
704 705 706 707 708
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
709 710 711 712 713
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)
714
        with open(filename2, "w") as f:
715 716 717 718 719 720 721 722 723
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
724 725 726
            var = fluid.data(
                name=slot, shape=[None, 1], dtype="int64", lod_level=1
            )
727 728
            slots_vars.append(var)

729
        dataset = paddle.distributed.InMemoryDataset()
730 731 732 733 734 735 736
        dataset.init(
            batch_size=1,
            thread_num=2,
            input_type=1,
            pipe_command="cat",
            use_var=slots_vars,
        )
737
        dataset.set_filelist([filename1, filename2])
738 739
        dataset.load_into_memory()

740 741 742 743 744
        exe = fluid.Executor(
            fluid.CPUPlace()
            if not core.is_compiled_with_cuda()
            else fluid.CUDAPlace(0)
        )
745
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
746
        if self.use_data_loader:
747
            data_loader = fluid.io.DataLoader.from_dataset(
748 749
                dataset, fluid.cpu_places(), self.drop_last
            )
Z
Zeng Jinle 已提交
750 751 752 753 754 755
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
756 757 758
                    exe.train_from_dataset(
                        fluid.default_main_program(), dataset
                    )
Z
Zeng Jinle 已提交
759 760
                except Exception as e:
                    self.assertTrue(False)
761

762
        temp_dir.cleanup()
763

D
danleifeng 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
    def test_run_with_inmemory_dataset_train_debug_mode(self):
        """
        Testcase for InMemoryDataset from create to run.
        """

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(dump_b_path, "w") as f:
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
788 789 790
            var = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
D
danleifeng 已提交
791 792 793
            slots_vars.append(var)

        dataset = paddle.distributed.InMemoryDataset()
794 795 796 797 798 799 800 801 802 803 804 805 806
        dataset.init(
            batch_size=32,
            thread_num=1,
            pipe_command="cat",
            data_feed_type="SlotRecordInMemoryDataFeed",
            use_var=slots_vars,
        )
        dataset._init_distributed_settings(
            parse_ins_id=True,
            parse_content=True,
            fea_eval=True,
            candidate_size=10000,
        )
D
danleifeng 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
        dataset.set_filelist([dump_a_path, dump_b_path])
        dataset.load_into_memory()

        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
        for i in range(2):
            try:
                exe.train_from_dataset(main_program, dataset, debug=True)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

        temp_dir.cleanup()

X
xjqbest 已提交
826

Z
Zeng Jinle 已提交
827
class TestDatasetWithDataLoader(TestDataset):
X
xujiaqi01 已提交
828 829 830 831
    """
    Test Dataset With Data Loader class. TestCases.
    """

Z
Zeng Jinle 已提交
832
    def setUp(self):
X
xujiaqi01 已提交
833 834 835
        """
        Test Dataset With Data Loader, setUp.
        """
Z
Zeng Jinle 已提交
836 837 838 839 840
        self.use_data_loader = True
        self.epoch_num = 10
        self.drop_last = False


841
class TestDatasetWithFetchHandler(unittest.TestCase):
X
xujiaqi01 已提交
842 843 844 845
    """
    Test Dataset With Fetch Handler. TestCases.
    """

846
    def net(self):
X
xujiaqi01 已提交
847 848 849
        """
        Test Dataset With Fetch Handler. TestCases.
        """
850 851 852 853
        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        poolings = []
        for slot in slots:
854 855 856
            data = fluid.layers.data(
                name=slot, shape=[1], dtype="int64", lod_level=1
            )
857 858 859 860 861 862 863 864 865 866 867
            var = fluid.layers.cast(x=data, dtype='float32')
            pool = fluid.layers.sequence_pool(input=var, pool_type='AVERAGE')

            slots_vars.append(data)
            poolings.append(pool)

        concated = fluid.layers.concat(poolings, axis=1)
        fc = fluid.layers.fc(input=concated, act='tanh', size=32)
        return slots_vars, fc

    def get_dataset(self, inputs, files):
X
xujiaqi01 已提交
868 869 870 871 872 873 874
        """
        Test Dataset With Fetch Handler. TestCases.

        Args:
            inputs(list): inputs of get_dataset
            files(list): files of  get_dataset
        """
875
        dataset = paddle.distributed.QueueDataset()
876 877 878
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=inputs
        )
879 880 881 882
        dataset.set_filelist(files)
        return dataset

    def setUp(self):
X
xujiaqi01 已提交
883 884 885
        """
        Test Dataset With Fetch Handler. TestCases.
        """
886
        self.temp_dir = tempfile.TemporaryDirectory()
887 888 889 890 891 892
        self.filename1 = os.path.join(
            self.temp_dir.name, "test_queue_dataset_run_a.txt"
        )
        self.filename2 = os.path.join(
            self.temp_dir.name, "test_queue_dataset_run_b.txt"
        )
893 894

        with open(self.filename1, "w") as f:
895 896 897 898
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
899
        with open(self.filename2, "w") as f:
900 901 902 903 904 905 906
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

    def tearDown(self):
X
xujiaqi01 已提交
907 908 909
        """
        Test Dataset With Fetch Handler. TestCases.
        """
910
        self.temp_dir.cleanup()
911 912

    def test_dataset_none(self):
X
xujiaqi01 已提交
913 914 915
        """
        Test Dataset With Fetch Handler. TestCases.
        """
916
        slots_vars, out = self.net()
917
        files = [self.filename1, self.filename2]
918 919 920 921 922 923 924 925 926 927 928 929
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        # test dataset->None
        try:
            exe.train_from_dataset(fluid.default_main_program(), None)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
930
            self.assertEqual(error_msg, str(e))
931 932 933 934
        except Exception as e:
            self.assertTrue(False)

    def test_infer_from_dataset(self):
X
xujiaqi01 已提交
935 936 937
        """
        Test Dataset With Fetch Handler. TestCases.
        """
938
        slots_vars, out = self.net()
939
        files = [self.filename1, self.filename2]
940 941 942 943 944 945 946 947 948 949 950 951
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        try:
            exe.infer_from_dataset(fluid.default_main_program(), dataset)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

952 953 954 955 956
    def test_fetch_handler(self):
        """
        Test Dataset With Fetch Handler. TestCases.
        """
        slots_vars, out = self.net()
957
        files = [self.filename1, self.filename2]
958 959 960 961 962 963 964 965 966
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        fh = fluid.executor.FetchHandler(out.name)
        fh.help()

        try:
967 968 969 970 971
            exe.train_from_dataset(
                program=fluid.default_main_program(),
                dataset=dataset,
                fetch_handler=fh,
            )
972 973 974 975
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
976
            self.assertEqual(error_msg, str(e))
977 978 979
        except Exception as e:
            self.assertTrue(False)

980

X
xujiaqi01 已提交
981
class TestDataset2(unittest.TestCase):
982
    """TestCases for Dataset."""
X
xujiaqi01 已提交
983 984

    def setUp(self):
985
        """TestCases for Dataset."""
X
xujiaqi01 已提交
986 987 988 989 990 991 992 993
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

    def test_dataset_fleet(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
994
        temp_dir = tempfile.TemporaryDirectory()
995 996 997 998 999 1000
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run_b.txt"
        )
1001 1002 1003

        self.skipTest("parameter server will add pslib UT later")

1004
        with open(filename1, "w") as f:
X
xujiaqi01 已提交
1005 1006 1007 1008
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1009
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
1019 1020 1021 1022
        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
            fleet,
        )

X
xujiaqi01 已提交
1023 1024 1025 1026
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
1027 1028 1029
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
X
xujiaqi01 已提交
1030
                slots_vars.append(var)
1031 1032 1033
            fake_cost = fluid.layers.elementwise_sub(
                slots_vars[0], slots_vars[-1]
            )
1034
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1035 1036 1037 1038
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1039
                fleet.init()
X
xujiaqi01 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
                adam = fleet.distributed_optimizer(adam)
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1051 1052
            dataset = paddle.distributed.InMemoryDataset()

1053 1054 1055 1056 1057 1058
            dataset.init(
                batch_size=32,
                thread_num=3,
                pipe_command="cat",
                use_var=slots_vars,
            )
1059
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1060 1061 1062 1063
            dataset.load_into_memory()
            fleet._opt_info = None
            fleet._fleet_ptr = None

1064
        temp_dir.cleanup()
X
xujiaqi01 已提交
1065 1066 1067 1068 1069

    def test_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1070
        temp_dir = tempfile.TemporaryDirectory()
1071 1072 1073 1074 1075 1076
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_b.txt"
        )
1077 1078

        with open(filename1, "w") as f:
X
xujiaqi01 已提交
1079 1080 1081 1082
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1083
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
1084 1085 1086 1087 1088 1089 1090 1091 1092
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
1093
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
1094

X
xujiaqi01 已提交
1095 1096 1097 1098
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
1099 1100 1101
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
X
xujiaqi01 已提交
1102
                slots_vars.append(var)
1103 1104 1105
            fake_cost = fluid.layers.elementwise_sub(
                slots_vars[0], slots_vars[-1]
            )
1106
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1107 1108 1109 1110
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1111
                fleet.init()
X
xujiaqi01 已提交
1112 1113 1114 1115
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1116 1117 1118 1119 1120 1121 1122 1123 1124
                adam = fleet.distributed_optimizer(
                    adam,
                    strategy={
                        "fs_uri": "fs_uri_xxx",
                        "fs_user": "fs_user_xxx",
                        "fs_passwd": "fs_passwd_xxx",
                        "fs_hadoop_bin": "fs_hadoop_bin_xxx",
                    },
                )
X
xujiaqi01 已提交
1125 1126 1127 1128 1129 1130
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1131
            dataset = paddle.distributed.InMemoryDataset()
1132 1133 1134 1135 1136 1137
            dataset.init(
                batch_size=32,
                thread_num=3,
                pipe_command="cat",
                use_var=slots_vars,
            )
1138
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1139
            dataset.load_into_memory()
X
xujiaqi01 已提交
1140 1141 1142 1143
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1144 1145
            fleet._opt_info = None
            fleet._fleet_ptr = None
1146 1147
            dataset = paddle.distributed.InMemoryDataset()
            dataset.init(fs_name="", fs_ugi="")
1148
            d = paddle.distributed.fleet.DatasetBase()
1149
            try:
1150
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
1170
            dataset._set_fleet_send_batch_size(1024)
1171 1172 1173 1174
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1175
            # dataset.get_pv_data_size()
1176 1177
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1178
            dataset = paddle.distributed.QueueDataset()
1179 1180 1181 1182 1183 1184 1185 1186
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1187
            dataset = paddle.distributed.fleet.FileInstantDataset()
1188 1189 1190 1191 1192 1193 1194 1195
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1196

1197
        temp_dir.cleanup()
X
xujiaqi01 已提交
1198

1199 1200 1201 1202
    def test_bosps_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1203
        temp_dir = tempfile.TemporaryDirectory()
1204 1205 1206 1207 1208 1209
        filename1 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_a.txt"
        )
        filename2 = os.path.join(
            temp_dir.name, "test_in_memory_dataset2_run2_b.txt"
        )
1210 1211

        with open(filename1, "w") as f:
1212 1213 1214 1215
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1216
        with open(filename2, "w") as f:
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
1227

1228 1229 1230 1231
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
1232 1233 1234
                var = fluid.layers.data(
                    name=slot, shape=[1], dtype="float32", lod_level=1
                )
1235
                slots_vars.append(var)
1236 1237 1238
            fake_cost = fluid.layers.elementwise_sub(
                slots_vars[0], slots_vars[-1]
            )
1239
            fake_cost = paddle.mean(fake_cost)
1240 1241 1242 1243 1244 1245 1246 1247 1248
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
                fleet.init()
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1249 1250 1251 1252 1253 1254 1255 1256 1257
                adam = fleet.distributed_optimizer(
                    adam,
                    strategy={
                        "fs_uri": "fs_uri_xxx",
                        "fs_user": "fs_user_xxx",
                        "fs_passwd": "fs_passwd_xxx",
                        "fs_hadoop_bin": "fs_hadoop_bin_xxx",
                    },
                )
1258 1259 1260 1261 1262 1263 1264
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
            dataset = paddle.distributed.fleet.BoxPSDataset()
1265 1266 1267 1268 1269 1270
            dataset.init(
                batch_size=32,
                thread_num=3,
                pipe_command="cat",
                use_var=slots_vars,
            )
1271
            dataset.set_filelist([filename1, filename2])
1272 1273 1274 1275 1276 1277 1278 1279
            dataset.load_into_memory()
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
            fleet._opt_info = None
            fleet._fleet_ptr = None
            dataset = paddle.distributed.fleet.BoxPSDataset()
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289
            dataset.init(
                rank_offset="",
                pv_batch_size=1,
                fs_name="",
                fs_ugi="",
                data_feed_type="MultiSlotInMemoryDataFeed",
                parse_logkey=True,
                merge_by_sid=True,
                enable_pv_merge=True,
            )
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319
            d = paddle.distributed.fleet.DatasetBase()
            try:
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            dataset._set_parse_logkey(True)
            dataset._set_merge_by_sid(True)
            dataset._set_enable_pv_merge(True)
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
            dataset._set_fleet_send_batch_size(1024)
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1320
            # dataset.get_pv_data_size()
1321 1322
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1323
        temp_dir.cleanup()
1324

X
xujiaqi01 已提交
1325

X
xjqbest 已提交
1326
if __name__ == '__main__':
X
xjqbest 已提交
1327
    unittest.main()