test_dataset.py 48.3 KB
Newer Older
X
xjqbest 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
X
xjqbest 已提交
14
"""
X
xjqbest 已提交
15 16
TestCases for Dataset,
including create, config, run, etc.
X
xjqbest 已提交
17
"""
X
xjqbest 已提交
18

19
import paddle
X
xjqbest 已提交
20
import paddle.fluid as fluid
21
import paddle.compat as cpt
22
import paddle.fluid.core as core
X
xjqbest 已提交
23
import os
24
import tempfile
X
xjqbest 已提交
25 26 27 28
import unittest


class TestDataset(unittest.TestCase):
X
xjqbest 已提交
29
    """  TestCases for Dataset. """
30

Z
Zeng Jinle 已提交
31 32 33 34 35
    def setUp(self):
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

X
xjqbest 已提交
36
    def test_dataset_create(self):
X
xjqbest 已提交
37
        """ Testcase for dataset create. """
X
xjqbest 已提交
38
        try:
39
            dataset = paddle.distributed.InMemoryDataset()
X
xjqbest 已提交
40 41 42 43
        except:
            self.assertTrue(False)

        try:
44
            dataset = paddle.distributed.QueueDataset()
X
xjqbest 已提交
45 46 47
        except:
            self.assertTrue(False)

48
        try:
49
            dataset = paddle.distributed.fleet.dataset.FileInstantDataset()
50 51 52
        except:
            self.assertTrue(False)

X
xjqbest 已提交
53
        try:
54
            dataset = paddle.distributed.fleet.dataset.MyOwnDataset()
X
xjqbest 已提交
55 56 57 58
            self.assertTrue(False)
        except:
            self.assertTrue(True)

59 60 61 62 63 64 65
    def test_config(self):
        """
        Testcase for python config.
        """
        dataset = fluid.InMemoryDataset()
        dataset.set_parse_ins_id(True)
        dataset.set_parse_content(True)
66
        dataset._set_trainer_num(1)
67 68
        self.assertTrue(dataset.parse_ins_id)
        self.assertTrue(dataset.parse_content)
69
        self.assertEqual(dataset.trainer_num, 1)
70

71 72 73 74 75 76 77 78
    def test_shuffle_by_uid(self):
        """
        Testcase for shuffle_by_uid.
        """
        dataset = paddle.distributed.InMemoryDataset()
        dataset._set_uid_slot('6048')
        dataset._set_shuffle_by_uid(True)

79 80 81 82
    def test_run_with_dump(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
83 84 85 86 87 88

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
89 90 91 92
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
93
        with open(dump_b_path, "w") as f:
94 95 96 97 98 99 100 101 102
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
103 104 105 106
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
107 108
            slots_vars.append(var)

109
        dataset = paddle.distributed.InMemoryDataset()
110 111 112 113
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
114
        dataset.update_settings(pipe_command="cat1")
115 116 117 118
        dataset._init_distributed_settings(parse_ins_id=True,
                                           parse_content=True,
                                           fea_eval=True,
                                           candidate_size=10000)
119
        dataset.set_filelist([dump_a_path, dump_b_path])
120 121 122
        dataset.load_into_memory()
        dataset.local_shuffle()

123 124 125 126 127 128
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
129 130
        for i in range(2):
            try:
131
                exe.train_from_dataset(main_program, dataset)
132 133 134 135 136
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

137
        temp_dir.cleanup()
138

X
xjqbest 已提交
139
    def test_dataset_config(self):
X
xjqbest 已提交
140
        """ Testcase for dataset configuration. """
X
xjqbest 已提交
141 142 143 144 145
        dataset = fluid.core.Dataset("MultiSlotDataset")
        dataset.set_thread_num(12)
        dataset.set_filelist(["a.txt", "b.txt", "c.txt"])
        dataset.set_trainer_num(4)
        dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")
146
        dataset.set_download_cmd("./read_from_afs my_fs_name my_fs_ugi")
147
        dataset.set_enable_pv_merge(False)
X
xjqbest 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164

        thread_num = dataset.get_thread_num()
        self.assertEqual(thread_num, 12)

        filelist = dataset.get_filelist()
        self.assertEqual(len(filelist), 3)
        self.assertEqual(filelist[0], "a.txt")
        self.assertEqual(filelist[1], "b.txt")
        self.assertEqual(filelist[2], "c.txt")

        trainer_num = dataset.get_trainer_num()
        self.assertEqual(trainer_num, 4)

        name, ugi = dataset.get_hdfs_config()
        self.assertEqual(name, "my_fs_name")
        self.assertEqual(ugi, "my_fs_ugi")

165 166 167 168 169 170 171
        download_cmd = dataset.get_download_cmd()
        self.assertEqual(download_cmd, "./read_from_afs my_fs_name my_fs_ugi")

    def test_set_download_cmd(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
172 173 174 175 176 177
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "afs:test_in_memory_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "afs:test_in_memory_dataset_run_b.txt")

178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
        with open(filename1, "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(filename2, "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
193 194 195 196
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
197 198
            slots_vars.append(var)

199
        dataset = paddle.distributed.InMemoryDataset()
200 201 202 203 204
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     download_cmd="cat",
                     use_var=slots_vars)
205 206
        dataset.set_filelist([filename1, filename2])
        dataset.load_into_memory()
207 208 209 210 211
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
212
        exe = fluid.Executor(fluid.CPUPlace())
213
        exe.run(startup_program)
214
        if self.use_data_loader:
215 216
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
217 218
            for i in range(self.epoch_num):
                for data in data_loader():
219
                    exe.run(main_program, feed=data)
220 221 222
        else:
            for i in range(self.epoch_num):
                try:
223
                    exe.train_from_dataset(main_program, dataset)
224 225 226
                except Exception as e:
                    self.assertTrue(False)

227
        temp_dir.cleanup()
228

X
xjqbest 已提交
229
    def test_in_memory_dataset_run(self):
X
xjqbest 已提交
230
        """
X
xjqbest 已提交
231
        Testcase for InMemoryDataset from create to run.
X
xjqbest 已提交
232
        """
233 234 235 236 237 238 239
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_b.txt")

        with open(filename1, "w") as f:
X
xjqbest 已提交
240 241 242 243
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
244
        with open(filename2, "w") as f:
X
xjqbest 已提交
245 246 247 248 249 250
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

251
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
252 253
        slots_vars = []
        for slot in slots:
254 255 256 257
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
X
xjqbest 已提交
258 259
            slots_vars.append(var)

260
        dataset = paddle.distributed.InMemoryDataset()
261 262 263 264
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
265
        dataset._init_distributed_settings(fea_eval=True, candidate_size=1)
266
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
267
        dataset.load_into_memory()
268
        dataset.slots_shuffle(["slot1"])
X
xjqbest 已提交
269
        dataset.local_shuffle()
270 271
        dataset._set_generate_unique_feasigns(True, 15)
        dataset._generate_local_tables_unlock(0, 11, 1, 25, 15)
X
xjqbest 已提交
272 273
        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
274
        if self.use_data_loader:
275 276
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
277 278 279 280 281 282 283 284 285 286
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
287

288
        temp_dir.cleanup()
X
xjqbest 已提交
289

290 291 292 293
    def test_in_memory_dataset_masterpatch(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
294 295 296 297 298 299 300
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch_b.txt")

        with open(filename1, "w") as f:
301 302 303 304 305 306 307 308 309 310
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
311
        with open(filename2, "w") as f:
312 313 314 315 316 317 318 319 320 321 322 323
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
            for slot in slots[:2]:
324 325 326 327
                var = fluid.layers.data(name=slot,
                                        shape=[1],
                                        dtype="int64",
                                        lod_level=1)
328 329
                slots_vars.append(var)
            for slot in slots[2:]:
330 331 332 333
                var = fluid.layers.data(name=slot,
                                        shape=[1],
                                        dtype="float32",
                                        lod_level=1)
334 335
                slots_vars.append(var)

336
        dataset = paddle.distributed.InMemoryDataset()
337 338 339 340
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     use_var=slots_vars)
341
        dataset._init_distributed_settings(parse_ins_id=True)
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
        dataset.set_filelist([
            "test_in_memory_dataset_masterpatch_a.txt",
            "test_in_memory_dataset_masterpatch_b.txt"
        ])
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

360 361
        #dataset._set_merge_by_lineid(2)
        dataset.update_settings(merge_size=2)
362 363
        dataset.dataset.merge_by_lineid()

364
        temp_dir.cleanup()
365

366 367 368 369
    def test_in_memory_dataset_masterpatch1(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
370 371 372 373 374 375 376
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch1_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch1_b.txt")

        with open(filename1, "w") as f:
377 378 379 380 381 382 383 384 385 386
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
387
        with open(filename2, "w") as f:
388 389 390 391 392 393 394 395 396 397
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
            var1 = fluid.layers.data(name="slot1",
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=0)
            var2 = fluid.layers.data(name="slot2",
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=0)
            var3 = fluid.layers.data(name="slot3",
                                     shape=[1],
                                     dtype="float32",
                                     lod_level=0)
            var4 = fluid.layers.data(name="slot4",
                                     shape=[1],
                                     dtype="float32",
                                     lod_level=0)
414 415
            slots_vars = [var1, var2, var3, var4]

416
        dataset = paddle.distributed.InMemoryDataset()
417 418 419 420
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     use_var=slots_vars)
421
        dataset._init_distributed_settings(parse_ins_id=True)
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
        dataset.set_filelist([
            "test_in_memory_dataset_masterpatch1_a.txt",
            "test_in_memory_dataset_masterpatch1_b.txt"
        ])
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

440
        dataset._set_merge_by_lineid(2)
441 442
        dataset.dataset.merge_by_lineid()

443
        temp_dir.cleanup()
444

445 446 447 448 449 450
    def test_in_memory_dataset_run_2(self):
        """
        Testcase for InMemoryDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
451 452 453 454 455 456 457
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_b.txt")

        with open(filename1, "w") as f:
458 459 460 461
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
462
        with open(filename2, "w") as f:
463 464 465 466 467 468 469 470 471
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
472 473 474 475
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="float32",
                                    lod_level=1)
476 477
            slots_vars.append(var)

478
        dataset = paddle.distributed.InMemoryDataset()
479 480 481 482
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
483
        dataset.set_filelist([filename1, filename2])
484 485 486
        dataset.load_into_memory()
        dataset.local_shuffle()

487 488
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
489
        exe.run(fluid.default_startup_program())
490 491 492 493

        for i in range(2):
            try:
                exe.train_from_dataset(fluid.default_main_program(), dataset)
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=1)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=2)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=2)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=3)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=4)
509 510 511 512 513
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

Z
Zeng Jinle 已提交
514
        if self.use_data_loader:
515 516
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
517 518 519 520 521 522 523 524 525 526
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
527

528 529 530
        dataset._set_merge_by_lineid(2)
        dataset._set_parse_ins_id(False)
        dataset._set_fleet_send_sleep_seconds(2)
531 532 533 534
        dataset.preload_into_memory()
        dataset.wait_preload_done()
        dataset.preload_into_memory(1)
        dataset.wait_preload_done()
535
        dataset.dataset.merge_by_lineid()
536 537
        dataset._set_merge_by_lineid(30)
        dataset._set_parse_ins_id(False)
538 539
        dataset.load_into_memory()
        dataset.dataset.merge_by_lineid()
540 541 542 543 544 545 546 547 548 549 550 551 552 553
        dataset.update_settings(batch_size=1,
                                thread_num=2,
                                input_type=1,
                                pipe_command="cat",
                                use_var=[],
                                fs_name="",
                                fs_ugi="",
                                download_cmd="cat",
                                merge_size=-1,
                                parse_ins_id=False,
                                parse_content=False,
                                fleet_send_batch_size=2,
                                fleet_send_sleep_seconds=2,
                                fea_eval=True)
554
        fleet_ptr = fluid.core.Fleet()
555
        fleet_ptr.set_client2client_config(1, 1, 1)
556
        fleet_ptr.get_cache_threshold(0)
557

558
        temp_dir.cleanup()
559

X
xjqbest 已提交
560
    def test_queue_dataset_run(self):
X
xjqbest 已提交
561
        """
X
xjqbest 已提交
562
        Testcase for QueueDataset from create to run.
X
xjqbest 已提交
563
        """
564 565 566 567 568
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
X
xjqbest 已提交
569 570 571 572
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
573
        with open(filename2, "w") as f:
X
xjqbest 已提交
574 575 576 577 578 579
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

580
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
581 582
        slots_vars = []
        for slot in slots:
583 584 585 586
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
X
xjqbest 已提交
587 588
            slots_vars.append(var)

589
        dataset = paddle.distributed.QueueDataset()
590 591 592 593
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
594
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
595 596 597

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
598
        if self.use_data_loader:
599 600
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
601 602 603 604 605 606 607 608 609 610
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
611

612
        dataset2 = paddle.distributed.QueueDataset()
613 614 615 616
        dataset2.init(batch_size=32,
                      thread_num=3,
                      pipe_command="cat",
                      use_var=slots_vars)
617 618 619 620 621 622 623 624
        dataset.set_filelist([])
        try:
            exe.train_from_dataset(fluid.default_main_program(), dataset2)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

625
        temp_dir.cleanup()
X
xjqbest 已提交
626

627 628 629 630 631 632
    def test_queue_dataset_run_2(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
633 634 635 636 637
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
638 639 640 641
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
642
        with open(filename2, "w") as f:
643 644 645 646 647 648 649 650 651
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
652 653 654 655
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="float32",
                                    lod_level=1)
656 657
            slots_vars.append(var)

658
        dataset = paddle.distributed.QueueDataset()
659 660 661 662
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
663
        dataset.set_filelist([filename1, filename2])
664

665 666
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
667 668
        exe.run(fluid.default_startup_program())
        if self.use_data_loader:
669 670
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
671 672 673 674 675 676 677 678 679 680 681
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)

682
        temp_dir.cleanup()
683 684 685 686 687 688 689

    def test_queue_dataset_run_3(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
690 691 692 693 694
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
695 696 697 698 699
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)
700
        with open(filename2, "w") as f:
701 702 703 704 705 706 707 708 709
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
710 711 712 713
            var = fluid.data(name=slot,
                             shape=[None, 1],
                             dtype="int64",
                             lod_level=1)
714 715
            slots_vars.append(var)

716
        dataset = paddle.distributed.InMemoryDataset()
717 718 719 720 721
        dataset.init(batch_size=1,
                     thread_num=2,
                     input_type=1,
                     pipe_command="cat",
                     use_var=slots_vars)
722
        dataset.set_filelist([filename1, filename2])
723 724
        dataset.load_into_memory()

725 726
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
727
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
728
        if self.use_data_loader:
729 730
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
731 732 733 734 735 736 737 738 739 740
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
741

742
        temp_dir.cleanup()
743

D
danleifeng 已提交
744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
    def test_run_with_inmemory_dataset_train_debug_mode(self):
        """
        Testcase for InMemoryDataset from create to run.
        """

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(dump_b_path, "w") as f:
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
            slots_vars.append(var)

        dataset = paddle.distributed.InMemoryDataset()
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     data_feed_type="SlotRecordInMemoryDataFeed",
                     use_var=slots_vars)
        dataset._init_distributed_settings(parse_ins_id=True,
                                           parse_content=True,
                                           fea_eval=True,
                                           candidate_size=10000)
        dataset.set_filelist([dump_a_path, dump_b_path])
        dataset.load_into_memory()

        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
        for i in range(2):
            try:
                exe.train_from_dataset(main_program, dataset, debug=True)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

        temp_dir.cleanup()

X
xjqbest 已提交
803

Z
Zeng Jinle 已提交
804
class TestDatasetWithDataLoader(TestDataset):
X
xujiaqi01 已提交
805 806 807 808
    """
    Test Dataset With Data Loader class. TestCases.
    """

Z
Zeng Jinle 已提交
809
    def setUp(self):
X
xujiaqi01 已提交
810 811 812
        """
        Test Dataset With Data Loader, setUp.
        """
Z
Zeng Jinle 已提交
813 814 815 816 817
        self.use_data_loader = True
        self.epoch_num = 10
        self.drop_last = False


818
class TestDatasetWithFetchHandler(unittest.TestCase):
X
xujiaqi01 已提交
819 820 821 822
    """
    Test Dataset With Fetch Handler. TestCases.
    """

823
    def net(self):
X
xujiaqi01 已提交
824 825 826
        """
        Test Dataset With Fetch Handler. TestCases.
        """
827 828 829 830
        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        poolings = []
        for slot in slots:
831 832 833 834
            data = fluid.layers.data(name=slot,
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=1)
835 836 837 838 839 840 841 842 843 844 845
            var = fluid.layers.cast(x=data, dtype='float32')
            pool = fluid.layers.sequence_pool(input=var, pool_type='AVERAGE')

            slots_vars.append(data)
            poolings.append(pool)

        concated = fluid.layers.concat(poolings, axis=1)
        fc = fluid.layers.fc(input=concated, act='tanh', size=32)
        return slots_vars, fc

    def get_dataset(self, inputs, files):
X
xujiaqi01 已提交
846 847 848 849 850 851 852
        """
        Test Dataset With Fetch Handler. TestCases.

        Args:
            inputs(list): inputs of get_dataset
            files(list): files of  get_dataset
        """
853
        dataset = paddle.distributed.QueueDataset()
854 855 856 857
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=inputs)
858 859 860 861
        dataset.set_filelist(files)
        return dataset

    def setUp(self):
X
xujiaqi01 已提交
862 863 864
        """
        Test Dataset With Fetch Handler. TestCases.
        """
865 866 867 868 869 870 871
        self.temp_dir = tempfile.TemporaryDirectory()
        self.filename1 = os.path.join(self.temp_dir.name,
                                      "test_queue_dataset_run_a.txt")
        self.filename2 = os.path.join(self.temp_dir.name,
                                      "test_queue_dataset_run_b.txt")

        with open(self.filename1, "w") as f:
872 873 874 875
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
876
        with open(self.filename2, "w") as f:
877 878 879 880 881 882 883
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

    def tearDown(self):
X
xujiaqi01 已提交
884 885 886
        """
        Test Dataset With Fetch Handler. TestCases.
        """
887
        self.temp_dir.cleanup()
888 889

    def test_dataset_none(self):
X
xujiaqi01 已提交
890 891 892
        """
        Test Dataset With Fetch Handler. TestCases.
        """
893
        slots_vars, out = self.net()
894
        files = [self.filename1, self.filename2]
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        # test dataset->None
        try:
            exe.train_from_dataset(fluid.default_main_program(), None)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
            self.assertEqual(error_msg, cpt.get_exception_message(e))
        except Exception as e:
            self.assertTrue(False)

    def test_infer_from_dataset(self):
X
xujiaqi01 已提交
912 913 914
        """
        Test Dataset With Fetch Handler. TestCases.
        """
915
        slots_vars, out = self.net()
916
        files = [self.filename1, self.filename2]
917 918 919 920 921 922 923 924 925 926 927 928
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        try:
            exe.infer_from_dataset(fluid.default_main_program(), dataset)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

929 930 931 932 933
    def test_fetch_handler(self):
        """
        Test Dataset With Fetch Handler. TestCases.
        """
        slots_vars, out = self.net()
934
        files = [self.filename1, self.filename2]
935 936 937 938 939 940 941 942 943
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        fh = fluid.executor.FetchHandler(out.name)
        fh.help()

        try:
944 945 946
            exe.train_from_dataset(program=fluid.default_main_program(),
                                   dataset=dataset,
                                   fetch_handler=fh)
947 948 949 950 951 952 953 954
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
            self.assertEqual(error_msg, cpt.get_exception_message(e))
        except Exception as e:
            self.assertTrue(False)

955

X
xujiaqi01 已提交
956 957 958 959 960 961 962 963 964 965 966 967 968
class TestDataset2(unittest.TestCase):
    """  TestCases for Dataset. """

    def setUp(self):
        """  TestCases for Dataset. """
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

    def test_dataset_fleet(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
969 970 971 972 973
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run_b.txt")
974 975 976

        self.skipTest("parameter server will add pslib UT later")

977
        with open(filename1, "w") as f:
X
xujiaqi01 已提交
978 979 980 981
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
982
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
983 984 985 986 987 988 989 990 991
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
992
        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
X
xujiaqi01 已提交
993 994 995 996 997 998 999 1000 1001
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
1002
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1003 1004 1005 1006
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1007
                fleet.init()
X
xujiaqi01 已提交
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
                adam = fleet.distributed_optimizer(adam)
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1019 1020
            dataset = paddle.distributed.InMemoryDataset()

1021 1022 1023 1024
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
1025
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1026 1027 1028 1029
            dataset.load_into_memory()
            fleet._opt_info = None
            fleet._fleet_ptr = None

1030
        temp_dir.cleanup()
X
xujiaqi01 已提交
1031 1032 1033 1034 1035

    def test_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1036 1037 1038 1039 1040 1041 1042
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_b.txt")

        with open(filename1, "w") as f:
X
xujiaqi01 已提交
1043 1044 1045 1046
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1047
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
1048 1049 1050 1051 1052 1053 1054 1055 1056
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
1057
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
X
xujiaqi01 已提交
1058 1059 1060 1061 1062 1063 1064 1065 1066
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
1067
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1068 1069 1070 1071
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1072
                fleet.init()
X
xujiaqi01 已提交
1073 1074 1075 1076
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
                adam = fleet.distributed_optimizer(adam,
                                                   strategy={
                                                       "fs_uri":
                                                       "fs_uri_xxx",
                                                       "fs_user":
                                                       "fs_user_xxx",
                                                       "fs_passwd":
                                                       "fs_passwd_xxx",
                                                       "fs_hadoop_bin":
                                                       "fs_hadoop_bin_xxx"
                                                   })
X
xujiaqi01 已提交
1088 1089 1090 1091 1092 1093
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1094
            dataset = paddle.distributed.InMemoryDataset()
1095 1096 1097 1098
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
1099
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1100
            dataset.load_into_memory()
X
xujiaqi01 已提交
1101 1102 1103 1104
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1105 1106
            fleet._opt_info = None
            fleet._fleet_ptr = None
1107 1108
            dataset = paddle.distributed.InMemoryDataset()
            dataset.init(fs_name="", fs_ugi="")
1109
            d = paddle.distributed.fleet.DatasetBase()
1110
            try:
1111
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
1131
            dataset._set_fleet_send_batch_size(1024)
1132 1133 1134 1135
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1136
            #dataset.get_pv_data_size()
1137 1138
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1139
            dataset = paddle.distributed.QueueDataset()
1140 1141 1142 1143 1144 1145 1146 1147
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1148
            dataset = paddle.distributed.fleet.FileInstantDataset()
1149 1150 1151 1152 1153 1154 1155 1156
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1157

1158
        temp_dir.cleanup()
X
xujiaqi01 已提交
1159

1160 1161 1162 1163
    def test_bosps_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1164 1165 1166 1167 1168 1169 1170
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_b.txt")

        with open(filename1, "w") as f:
1171 1172 1173 1174
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1175
        with open(filename2, "w") as f:
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
1195
            fake_cost = paddle.mean(fake_cost)
1196 1197 1198 1199 1200 1201 1202 1203 1204
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
                fleet.init()
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
                adam = fleet.distributed_optimizer(adam,
                                                   strategy={
                                                       "fs_uri":
                                                       "fs_uri_xxx",
                                                       "fs_user":
                                                       "fs_user_xxx",
                                                       "fs_passwd":
                                                       "fs_passwd_xxx",
                                                       "fs_hadoop_bin":
                                                       "fs_hadoop_bin_xxx"
                                                   })
1216 1217 1218 1219 1220 1221 1222
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
            dataset = paddle.distributed.fleet.BoxPSDataset()
1223 1224 1225 1226
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
1227
            dataset.set_filelist([filename1, filename2])
1228 1229 1230 1231 1232 1233 1234 1235
            dataset.load_into_memory()
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
            fleet._opt_info = None
            fleet._fleet_ptr = None
            dataset = paddle.distributed.fleet.BoxPSDataset()
1236 1237 1238 1239 1240 1241 1242 1243
            dataset.init(rank_offset="",
                         pv_batch_size=1,
                         fs_name="",
                         fs_ugi="",
                         data_feed_type="MultiSlotInMemoryDataFeed",
                         parse_logkey=True,
                         merge_by_sid=True,
                         enable_pv_merge=True)
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
            d = paddle.distributed.fleet.DatasetBase()
            try:
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            dataset._set_parse_logkey(True)
            dataset._set_merge_by_sid(True)
            dataset._set_enable_pv_merge(True)
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
            dataset._set_fleet_send_batch_size(1024)
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
            #dataset.get_pv_data_size()
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1277
        temp_dir.cleanup()
1278

X
xujiaqi01 已提交
1279

X
xjqbest 已提交
1280
if __name__ == '__main__':
X
xjqbest 已提交
1281
    unittest.main()