test_dataset.py 48.3 KB
Newer Older
X
xjqbest 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
X
xjqbest 已提交
14
"""
X
xjqbest 已提交
15 16
TestCases for Dataset,
including create, config, run, etc.
X
xjqbest 已提交
17
"""
X
xjqbest 已提交
18

19
import paddle
X
xjqbest 已提交
20
import paddle.fluid as fluid
21
import paddle.compat as cpt
22
import paddle.fluid.core as core
X
xjqbest 已提交
23 24 25
import numpy as np
import os
import shutil
26
import tempfile
X
xjqbest 已提交
27 28 29 30
import unittest


class TestDataset(unittest.TestCase):
X
xjqbest 已提交
31
    """  TestCases for Dataset. """
32

Z
Zeng Jinle 已提交
33 34 35 36 37
    def setUp(self):
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

X
xjqbest 已提交
38
    def test_dataset_create(self):
X
xjqbest 已提交
39
        """ Testcase for dataset create. """
X
xjqbest 已提交
40
        try:
41
            dataset = paddle.distributed.InMemoryDataset()
X
xjqbest 已提交
42 43 44 45
        except:
            self.assertTrue(False)

        try:
46
            dataset = paddle.distributed.QueueDataset()
X
xjqbest 已提交
47 48 49
        except:
            self.assertTrue(False)

50
        try:
51
            dataset = paddle.distributed.fleet.dataset.FileInstantDataset()
52 53 54
        except:
            self.assertTrue(False)

X
xjqbest 已提交
55
        try:
56
            dataset = paddle.distributed.fleet.dataset.MyOwnDataset()
X
xjqbest 已提交
57 58 59 60
            self.assertTrue(False)
        except:
            self.assertTrue(True)

61 62 63 64 65 66 67
    def test_config(self):
        """
        Testcase for python config.
        """
        dataset = fluid.InMemoryDataset()
        dataset.set_parse_ins_id(True)
        dataset.set_parse_content(True)
68
        dataset._set_trainer_num(1)
69 70
        self.assertTrue(dataset.parse_ins_id)
        self.assertTrue(dataset.parse_content)
71
        self.assertEqual(dataset.trainer_num, 1)
72

73 74 75 76 77 78 79 80
    def test_shuffle_by_uid(self):
        """
        Testcase for shuffle_by_uid.
        """
        dataset = paddle.distributed.InMemoryDataset()
        dataset._set_uid_slot('6048')
        dataset._set_shuffle_by_uid(True)

81 82 83 84
    def test_run_with_dump(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
85 86 87 88 89 90

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
91 92 93 94
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
95
        with open(dump_b_path, "w") as f:
96 97 98 99 100 101 102 103 104
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
105 106 107 108
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
109 110
            slots_vars.append(var)

111
        dataset = paddle.distributed.InMemoryDataset()
112 113 114 115
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
116
        dataset.update_settings(pipe_command="cat1")
117 118 119 120
        dataset._init_distributed_settings(parse_ins_id=True,
                                           parse_content=True,
                                           fea_eval=True,
                                           candidate_size=10000)
121
        dataset.set_filelist([dump_a_path, dump_b_path])
122 123 124
        dataset.load_into_memory()
        dataset.local_shuffle()

125 126 127 128 129 130
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
131 132
        for i in range(2):
            try:
133
                exe.train_from_dataset(main_program, dataset)
134 135 136 137 138
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

139
        temp_dir.cleanup()
140

X
xjqbest 已提交
141
    def test_dataset_config(self):
X
xjqbest 已提交
142
        """ Testcase for dataset configuration. """
X
xjqbest 已提交
143 144 145 146 147
        dataset = fluid.core.Dataset("MultiSlotDataset")
        dataset.set_thread_num(12)
        dataset.set_filelist(["a.txt", "b.txt", "c.txt"])
        dataset.set_trainer_num(4)
        dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")
148
        dataset.set_download_cmd("./read_from_afs my_fs_name my_fs_ugi")
149
        dataset.set_enable_pv_merge(False)
X
xjqbest 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166

        thread_num = dataset.get_thread_num()
        self.assertEqual(thread_num, 12)

        filelist = dataset.get_filelist()
        self.assertEqual(len(filelist), 3)
        self.assertEqual(filelist[0], "a.txt")
        self.assertEqual(filelist[1], "b.txt")
        self.assertEqual(filelist[2], "c.txt")

        trainer_num = dataset.get_trainer_num()
        self.assertEqual(trainer_num, 4)

        name, ugi = dataset.get_hdfs_config()
        self.assertEqual(name, "my_fs_name")
        self.assertEqual(ugi, "my_fs_ugi")

167 168 169 170 171 172 173
        download_cmd = dataset.get_download_cmd()
        self.assertEqual(download_cmd, "./read_from_afs my_fs_name my_fs_ugi")

    def test_set_download_cmd(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
174 175 176 177 178 179
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "afs:test_in_memory_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "afs:test_in_memory_dataset_run_b.txt")

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        with open(filename1, "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(filename2, "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
195 196 197 198
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
199 200
            slots_vars.append(var)

201
        dataset = paddle.distributed.InMemoryDataset()
202 203 204 205 206
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     download_cmd="cat",
                     use_var=slots_vars)
207 208
        dataset.set_filelist([filename1, filename2])
        dataset.load_into_memory()
209 210 211 212 213
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
214
        exe = fluid.Executor(fluid.CPUPlace())
215
        exe.run(startup_program)
216
        if self.use_data_loader:
217 218
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
219 220
            for i in range(self.epoch_num):
                for data in data_loader():
221
                    exe.run(main_program, feed=data)
222 223 224
        else:
            for i in range(self.epoch_num):
                try:
225
                    exe.train_from_dataset(main_program, dataset)
226 227 228
                except Exception as e:
                    self.assertTrue(False)

229
        temp_dir.cleanup()
230

X
xjqbest 已提交
231
    def test_in_memory_dataset_run(self):
X
xjqbest 已提交
232
        """
X
xjqbest 已提交
233
        Testcase for InMemoryDataset from create to run.
X
xjqbest 已提交
234
        """
235 236 237 238 239 240 241
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_b.txt")

        with open(filename1, "w") as f:
X
xjqbest 已提交
242 243 244 245
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
246
        with open(filename2, "w") as f:
X
xjqbest 已提交
247 248 249 250 251 252
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

253
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
254 255
        slots_vars = []
        for slot in slots:
256 257 258 259
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
X
xjqbest 已提交
260 261
            slots_vars.append(var)

262
        dataset = paddle.distributed.InMemoryDataset()
263 264 265 266
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
267
        dataset._init_distributed_settings(fea_eval=True, candidate_size=1)
268
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
269
        dataset.load_into_memory()
270
        dataset.slots_shuffle(["slot1"])
X
xjqbest 已提交
271
        dataset.local_shuffle()
272 273
        dataset._set_generate_unique_feasigns(True, 15)
        dataset._generate_local_tables_unlock(0, 11, 1, 25, 15)
X
xjqbest 已提交
274 275
        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
276
        if self.use_data_loader:
277 278
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
279 280 281 282 283 284 285 286 287 288
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
289

290
        temp_dir.cleanup()
X
xjqbest 已提交
291

292 293 294 295
    def test_in_memory_dataset_masterpatch(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
296 297 298 299 300 301 302
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch_b.txt")

        with open(filename1, "w") as f:
303 304 305 306 307 308 309 310 311 312
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
313
        with open(filename2, "w") as f:
314 315 316 317 318 319 320 321 322 323 324 325
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
            for slot in slots[:2]:
326 327 328 329
                var = fluid.layers.data(name=slot,
                                        shape=[1],
                                        dtype="int64",
                                        lod_level=1)
330 331
                slots_vars.append(var)
            for slot in slots[2:]:
332 333 334 335
                var = fluid.layers.data(name=slot,
                                        shape=[1],
                                        dtype="float32",
                                        lod_level=1)
336 337
                slots_vars.append(var)

338
        dataset = paddle.distributed.InMemoryDataset()
339 340 341 342
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     use_var=slots_vars)
343
        dataset._init_distributed_settings(parse_ins_id=True)
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
        dataset.set_filelist([
            "test_in_memory_dataset_masterpatch_a.txt",
            "test_in_memory_dataset_masterpatch_b.txt"
        ])
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

362 363
        #dataset._set_merge_by_lineid(2)
        dataset.update_settings(merge_size=2)
364 365
        dataset.dataset.merge_by_lineid()

366
        temp_dir.cleanup()
367

368 369 370 371
    def test_in_memory_dataset_masterpatch1(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
372 373 374 375 376 377 378
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch1_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_masterpatch1_b.txt")

        with open(filename1, "w") as f:
379 380 381 382 383 384 385 386 387 388
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
389
        with open(filename2, "w") as f:
390 391 392 393 394 395 396 397 398 399
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
            var1 = fluid.layers.data(name="slot1",
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=0)
            var2 = fluid.layers.data(name="slot2",
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=0)
            var3 = fluid.layers.data(name="slot3",
                                     shape=[1],
                                     dtype="float32",
                                     lod_level=0)
            var4 = fluid.layers.data(name="slot4",
                                     shape=[1],
                                     dtype="float32",
                                     lod_level=0)
416 417
            slots_vars = [var1, var2, var3, var4]

418
        dataset = paddle.distributed.InMemoryDataset()
419 420 421 422
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     use_var=slots_vars)
423
        dataset._init_distributed_settings(parse_ins_id=True)
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
        dataset.set_filelist([
            "test_in_memory_dataset_masterpatch1_a.txt",
            "test_in_memory_dataset_masterpatch1_b.txt"
        ])
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

442
        dataset._set_merge_by_lineid(2)
443 444
        dataset.dataset.merge_by_lineid()

445
        temp_dir.cleanup()
446

447 448 449 450 451 452
    def test_in_memory_dataset_run_2(self):
        """
        Testcase for InMemoryDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
453 454 455 456 457 458 459
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset_run_b.txt")

        with open(filename1, "w") as f:
460 461 462 463
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
464
        with open(filename2, "w") as f:
465 466 467 468 469 470 471 472 473
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
474 475 476 477
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="float32",
                                    lod_level=1)
478 479
            slots_vars.append(var)

480
        dataset = paddle.distributed.InMemoryDataset()
481 482 483 484
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
485
        dataset.set_filelist([filename1, filename2])
486 487 488
        dataset.load_into_memory()
        dataset.local_shuffle()

489 490
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
491
        exe.run(fluid.default_startup_program())
492 493 494 495

        for i in range(2):
            try:
                exe.train_from_dataset(fluid.default_main_program(), dataset)
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=1)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=2)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=2)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=3)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=4)
511 512 513 514 515
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

Z
Zeng Jinle 已提交
516
        if self.use_data_loader:
517 518
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
519 520 521 522 523 524 525 526 527 528
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
529

530 531 532
        dataset._set_merge_by_lineid(2)
        dataset._set_parse_ins_id(False)
        dataset._set_fleet_send_sleep_seconds(2)
533 534 535 536
        dataset.preload_into_memory()
        dataset.wait_preload_done()
        dataset.preload_into_memory(1)
        dataset.wait_preload_done()
537
        dataset.dataset.merge_by_lineid()
538 539
        dataset._set_merge_by_lineid(30)
        dataset._set_parse_ins_id(False)
540 541
        dataset.load_into_memory()
        dataset.dataset.merge_by_lineid()
542 543 544 545 546 547 548 549 550 551 552 553 554 555
        dataset.update_settings(batch_size=1,
                                thread_num=2,
                                input_type=1,
                                pipe_command="cat",
                                use_var=[],
                                fs_name="",
                                fs_ugi="",
                                download_cmd="cat",
                                merge_size=-1,
                                parse_ins_id=False,
                                parse_content=False,
                                fleet_send_batch_size=2,
                                fleet_send_sleep_seconds=2,
                                fea_eval=True)
556
        fleet_ptr = fluid.core.Fleet()
557
        fleet_ptr.set_client2client_config(1, 1, 1)
558
        fleet_ptr.get_cache_threshold(0)
559

560
        temp_dir.cleanup()
561

X
xjqbest 已提交
562
    def test_queue_dataset_run(self):
X
xjqbest 已提交
563
        """
X
xjqbest 已提交
564
        Testcase for QueueDataset from create to run.
X
xjqbest 已提交
565
        """
566 567 568 569 570
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
X
xjqbest 已提交
571 572 573 574
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
575
        with open(filename2, "w") as f:
X
xjqbest 已提交
576 577 578 579 580 581
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

582
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
583 584
        slots_vars = []
        for slot in slots:
585 586 587 588
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
X
xjqbest 已提交
589 590
            slots_vars.append(var)

591
        dataset = paddle.distributed.QueueDataset()
592 593 594 595
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
596
        dataset.set_filelist([filename1, filename2])
X
xjqbest 已提交
597 598 599

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
600
        if self.use_data_loader:
601 602
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
603 604 605 606 607 608 609 610 611 612
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
613

614
        dataset2 = paddle.distributed.QueueDataset()
615 616 617 618
        dataset2.init(batch_size=32,
                      thread_num=3,
                      pipe_command="cat",
                      use_var=slots_vars)
619 620 621 622 623 624 625 626
        dataset.set_filelist([])
        try:
            exe.train_from_dataset(fluid.default_main_program(), dataset2)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

627
        temp_dir.cleanup()
X
xjqbest 已提交
628

629 630 631 632 633 634
    def test_queue_dataset_run_2(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
635 636 637 638 639
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
640 641 642 643
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
644
        with open(filename2, "w") as f:
645 646 647 648 649 650 651 652 653
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
654 655 656 657
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="float32",
                                    lod_level=1)
658 659
            slots_vars.append(var)

660
        dataset = paddle.distributed.QueueDataset()
661 662 663 664
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
665
        dataset.set_filelist([filename1, filename2])
666

667 668
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
669 670
        exe.run(fluid.default_startup_program())
        if self.use_data_loader:
671 672
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
673 674 675 676 677 678 679 680 681 682 683
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)

684
        temp_dir.cleanup()
685 686 687 688 689 690 691

    def test_queue_dataset_run_3(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
692 693 694 695 696
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name, "test_queue_dataset_run_a.txt")
        filename2 = os.path.join(temp_dir.name, "test_queue_dataset_run_b.txt")

        with open(filename1, "w") as f:
697 698 699 700 701
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)
702
        with open(filename2, "w") as f:
703 704 705 706 707 708 709 710 711
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
712 713 714 715
            var = fluid.data(name=slot,
                             shape=[None, 1],
                             dtype="int64",
                             lod_level=1)
716 717
            slots_vars.append(var)

718
        dataset = paddle.distributed.InMemoryDataset()
719 720 721 722 723
        dataset.init(batch_size=1,
                     thread_num=2,
                     input_type=1,
                     pipe_command="cat",
                     use_var=slots_vars)
724
        dataset.set_filelist([filename1, filename2])
725 726
        dataset.load_into_memory()

727 728
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
729
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
730
        if self.use_data_loader:
731 732
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
733 734 735 736 737 738 739 740 741 742
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
743

744
        temp_dir.cleanup()
745

D
danleifeng 已提交
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804
    def test_run_with_inmemory_dataset_train_debug_mode(self):
        """
        Testcase for InMemoryDataset from create to run.
        """

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(dump_b_path, "w") as f:
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
            slots_vars.append(var)

        dataset = paddle.distributed.InMemoryDataset()
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     data_feed_type="SlotRecordInMemoryDataFeed",
                     use_var=slots_vars)
        dataset._init_distributed_settings(parse_ins_id=True,
                                           parse_content=True,
                                           fea_eval=True,
                                           candidate_size=10000)
        dataset.set_filelist([dump_a_path, dump_b_path])
        dataset.load_into_memory()

        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
        for i in range(2):
            try:
                exe.train_from_dataset(main_program, dataset, debug=True)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

        temp_dir.cleanup()

X
xjqbest 已提交
805

Z
Zeng Jinle 已提交
806
class TestDatasetWithDataLoader(TestDataset):
X
xujiaqi01 已提交
807 808 809 810
    """
    Test Dataset With Data Loader class. TestCases.
    """

Z
Zeng Jinle 已提交
811
    def setUp(self):
X
xujiaqi01 已提交
812 813 814
        """
        Test Dataset With Data Loader, setUp.
        """
Z
Zeng Jinle 已提交
815 816 817 818 819
        self.use_data_loader = True
        self.epoch_num = 10
        self.drop_last = False


820
class TestDatasetWithFetchHandler(unittest.TestCase):
X
xujiaqi01 已提交
821 822 823 824
    """
    Test Dataset With Fetch Handler. TestCases.
    """

825
    def net(self):
X
xujiaqi01 已提交
826 827 828
        """
        Test Dataset With Fetch Handler. TestCases.
        """
829 830 831 832
        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        poolings = []
        for slot in slots:
833 834 835 836
            data = fluid.layers.data(name=slot,
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=1)
837 838 839 840 841 842 843 844 845 846 847
            var = fluid.layers.cast(x=data, dtype='float32')
            pool = fluid.layers.sequence_pool(input=var, pool_type='AVERAGE')

            slots_vars.append(data)
            poolings.append(pool)

        concated = fluid.layers.concat(poolings, axis=1)
        fc = fluid.layers.fc(input=concated, act='tanh', size=32)
        return slots_vars, fc

    def get_dataset(self, inputs, files):
X
xujiaqi01 已提交
848 849 850 851 852 853 854
        """
        Test Dataset With Fetch Handler. TestCases.

        Args:
            inputs(list): inputs of get_dataset
            files(list): files of  get_dataset
        """
855
        dataset = paddle.distributed.QueueDataset()
856 857 858 859
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=inputs)
860 861 862 863
        dataset.set_filelist(files)
        return dataset

    def setUp(self):
X
xujiaqi01 已提交
864 865 866
        """
        Test Dataset With Fetch Handler. TestCases.
        """
867 868 869 870 871 872 873
        self.temp_dir = tempfile.TemporaryDirectory()
        self.filename1 = os.path.join(self.temp_dir.name,
                                      "test_queue_dataset_run_a.txt")
        self.filename2 = os.path.join(self.temp_dir.name,
                                      "test_queue_dataset_run_b.txt")

        with open(self.filename1, "w") as f:
874 875 876 877
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
878
        with open(self.filename2, "w") as f:
879 880 881 882 883 884 885
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

    def tearDown(self):
X
xujiaqi01 已提交
886 887 888
        """
        Test Dataset With Fetch Handler. TestCases.
        """
889
        self.temp_dir.cleanup()
890 891

    def test_dataset_none(self):
X
xujiaqi01 已提交
892 893 894
        """
        Test Dataset With Fetch Handler. TestCases.
        """
895
        slots_vars, out = self.net()
896
        files = [self.filename1, self.filename2]
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        # test dataset->None
        try:
            exe.train_from_dataset(fluid.default_main_program(), None)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
            self.assertEqual(error_msg, cpt.get_exception_message(e))
        except Exception as e:
            self.assertTrue(False)

    def test_infer_from_dataset(self):
X
xujiaqi01 已提交
914 915 916
        """
        Test Dataset With Fetch Handler. TestCases.
        """
917
        slots_vars, out = self.net()
918
        files = [self.filename1, self.filename2]
919 920 921 922 923 924 925 926 927 928 929 930
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        try:
            exe.infer_from_dataset(fluid.default_main_program(), dataset)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

931 932 933 934 935
    def test_fetch_handler(self):
        """
        Test Dataset With Fetch Handler. TestCases.
        """
        slots_vars, out = self.net()
936
        files = [self.filename1, self.filename2]
937 938 939 940 941 942 943 944 945
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        fh = fluid.executor.FetchHandler(out.name)
        fh.help()

        try:
946 947 948
            exe.train_from_dataset(program=fluid.default_main_program(),
                                   dataset=dataset,
                                   fetch_handler=fh)
949 950 951 952 953 954 955 956
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
            self.assertEqual(error_msg, cpt.get_exception_message(e))
        except Exception as e:
            self.assertTrue(False)

957

X
xujiaqi01 已提交
958 959 960 961 962 963 964 965 966 967 968 969 970
class TestDataset2(unittest.TestCase):
    """  TestCases for Dataset. """

    def setUp(self):
        """  TestCases for Dataset. """
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

    def test_dataset_fleet(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
971 972 973 974 975
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run_b.txt")
976 977 978

        self.skipTest("parameter server will add pslib UT later")

979
        with open(filename1, "w") as f:
X
xujiaqi01 已提交
980 981 982 983
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
984
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
985 986 987 988 989 990 991 992 993
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
994
        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
X
xujiaqi01 已提交
995 996 997 998 999 1000 1001 1002 1003
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
1004
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1005 1006 1007 1008
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1009
                fleet.init()
X
xujiaqi01 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
                adam = fleet.distributed_optimizer(adam)
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1021 1022
            dataset = paddle.distributed.InMemoryDataset()

1023 1024 1025 1026
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
1027
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1028 1029 1030 1031
            dataset.load_into_memory()
            fleet._opt_info = None
            fleet._fleet_ptr = None

1032
        temp_dir.cleanup()
X
xujiaqi01 已提交
1033 1034 1035 1036 1037

    def test_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1038 1039 1040 1041 1042 1043 1044
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_b.txt")

        with open(filename1, "w") as f:
X
xujiaqi01 已提交
1045 1046 1047 1048
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1049
        with open(filename2, "w") as f:
X
xujiaqi01 已提交
1050 1051 1052 1053 1054 1055 1056 1057 1058
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
1059
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
X
xujiaqi01 已提交
1060 1061 1062 1063 1064 1065 1066 1067 1068
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
1069
            fake_cost = paddle.mean(fake_cost)
X
xujiaqi01 已提交
1070 1071 1072 1073
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
1074
                fleet.init()
X
xujiaqi01 已提交
1075 1076 1077 1078
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
                adam = fleet.distributed_optimizer(adam,
                                                   strategy={
                                                       "fs_uri":
                                                       "fs_uri_xxx",
                                                       "fs_user":
                                                       "fs_user_xxx",
                                                       "fs_passwd":
                                                       "fs_passwd_xxx",
                                                       "fs_hadoop_bin":
                                                       "fs_hadoop_bin_xxx"
                                                   })
X
xujiaqi01 已提交
1090 1091 1092 1093 1094 1095
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1096
            dataset = paddle.distributed.InMemoryDataset()
1097 1098 1099 1100
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
1101
            dataset.set_filelist([filename1, filename2])
X
xujiaqi01 已提交
1102
            dataset.load_into_memory()
X
xujiaqi01 已提交
1103 1104 1105 1106
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1107 1108
            fleet._opt_info = None
            fleet._fleet_ptr = None
1109 1110
            dataset = paddle.distributed.InMemoryDataset()
            dataset.init(fs_name="", fs_ugi="")
1111
            d = paddle.distributed.fleet.DatasetBase()
1112
            try:
1113
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
1133
            dataset._set_fleet_send_batch_size(1024)
1134 1135 1136 1137
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1138
            #dataset.get_pv_data_size()
1139 1140
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1141
            dataset = paddle.distributed.QueueDataset()
1142 1143 1144 1145 1146 1147 1148 1149
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1150
            dataset = paddle.distributed.fleet.FileInstantDataset()
1151 1152 1153 1154 1155 1156 1157 1158
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1159

1160
        temp_dir.cleanup()
X
xujiaqi01 已提交
1161

1162 1163 1164 1165
    def test_bosps_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
1166 1167 1168 1169 1170 1171 1172
        temp_dir = tempfile.TemporaryDirectory()
        filename1 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_a.txt")
        filename2 = os.path.join(temp_dir.name,
                                 "test_in_memory_dataset2_run2_b.txt")

        with open(filename1, "w") as f:
1173 1174 1175 1176
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
1177
        with open(filename2, "w") as f:
1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
1197
            fake_cost = paddle.mean(fake_cost)
1198 1199 1200 1201 1202 1203 1204 1205 1206
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
                fleet.init()
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
                adam = fleet.distributed_optimizer(adam,
                                                   strategy={
                                                       "fs_uri":
                                                       "fs_uri_xxx",
                                                       "fs_user":
                                                       "fs_user_xxx",
                                                       "fs_passwd":
                                                       "fs_passwd_xxx",
                                                       "fs_hadoop_bin":
                                                       "fs_hadoop_bin_xxx"
                                                   })
1218 1219 1220 1221 1222 1223 1224
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
            dataset = paddle.distributed.fleet.BoxPSDataset()
1225 1226 1227 1228
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
1229
            dataset.set_filelist([filename1, filename2])
1230 1231 1232 1233 1234 1235 1236 1237
            dataset.load_into_memory()
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
            fleet._opt_info = None
            fleet._fleet_ptr = None
            dataset = paddle.distributed.fleet.BoxPSDataset()
1238 1239 1240 1241 1242 1243 1244 1245
            dataset.init(rank_offset="",
                         pv_batch_size=1,
                         fs_name="",
                         fs_ugi="",
                         data_feed_type="MultiSlotInMemoryDataFeed",
                         parse_logkey=True,
                         merge_by_sid=True,
                         enable_pv_merge=True)
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
            d = paddle.distributed.fleet.DatasetBase()
            try:
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            dataset._set_parse_logkey(True)
            dataset._set_merge_by_sid(True)
            dataset._set_enable_pv_merge(True)
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
            dataset._set_fleet_send_batch_size(1024)
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
            #dataset.get_pv_data_size()
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1279
        temp_dir.cleanup()
1280

X
xujiaqi01 已提交
1281

X
xjqbest 已提交
1282
if __name__ == '__main__':
X
xjqbest 已提交
1283
    unittest.main()