test_dataset.py 45.5 KB
Newer Older
X
xjqbest 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
X
xjqbest 已提交
14
"""
X
xjqbest 已提交
15 16
TestCases for Dataset,
including create, config, run, etc.
X
xjqbest 已提交
17
"""
X
xjqbest 已提交
18 19

from __future__ import print_function
20
import paddle
X
xjqbest 已提交
21
import paddle.fluid as fluid
22
import paddle.compat as cpt
23
import paddle.fluid.core as core
X
xjqbest 已提交
24 25 26
import numpy as np
import os
import shutil
27
import tempfile
X
xjqbest 已提交
28 29 30 31
import unittest


class TestDataset(unittest.TestCase):
X
xjqbest 已提交
32
    """  TestCases for Dataset. """
33

Z
Zeng Jinle 已提交
34 35 36 37 38
    def setUp(self):
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

X
xjqbest 已提交
39
    def test_dataset_create(self):
X
xjqbest 已提交
40
        """ Testcase for dataset create. """
X
xjqbest 已提交
41
        try:
42
            dataset = paddle.distributed.InMemoryDataset()
X
xjqbest 已提交
43 44 45 46
        except:
            self.assertTrue(False)

        try:
47
            dataset = paddle.distributed.QueueDataset()
X
xjqbest 已提交
48 49 50
        except:
            self.assertTrue(False)

51
        try:
52
            dataset = paddle.distributed.fleet.dataset.FileInstantDataset()
53 54 55
        except:
            self.assertTrue(False)

X
xjqbest 已提交
56
        try:
57
            dataset = paddle.distributed.fleet.dataset.MyOwnDataset()
X
xjqbest 已提交
58 59 60 61
            self.assertTrue(False)
        except:
            self.assertTrue(True)

62 63 64 65 66 67 68
    def test_config(self):
        """
        Testcase for python config.
        """
        dataset = fluid.InMemoryDataset()
        dataset.set_parse_ins_id(True)
        dataset.set_parse_content(True)
69
        dataset._set_trainer_num(1)
70 71
        self.assertTrue(dataset.parse_ins_id)
        self.assertTrue(dataset.parse_content)
72
        self.assertEqual(dataset.trainer_num, 1)
73

74 75 76 77 78 79 80 81
    def test_shuffle_by_uid(self):
        """
        Testcase for shuffle_by_uid.
        """
        dataset = paddle.distributed.InMemoryDataset()
        dataset._set_uid_slot('6048')
        dataset._set_shuffle_by_uid(True)

82 83 84 85
    def test_run_with_dump(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
86 87 88 89 90 91

        temp_dir = tempfile.TemporaryDirectory()
        dump_a_path = os.path.join(temp_dir.name, 'test_run_with_dump_a.txt')
        dump_b_path = os.path.join(temp_dir.name, 'test_run_with_dump_b.txt')

        with open(dump_a_path, "w") as f:
92 93 94 95
            data = "1 a 1 a 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 b 1 b 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 c 1 c 1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
96
        with open(dump_b_path, "w") as f:
97 98 99 100 101 102 103 104 105
            data = "1 d 1 d 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 e 1 e 1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 f 1 f 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 g 1 g 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
106 107 108 109
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
110 111
            slots_vars.append(var)

112
        dataset = paddle.distributed.InMemoryDataset()
113 114 115 116
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
117
        dataset.update_settings(pipe_command="cat1")
118 119 120 121
        dataset._init_distributed_settings(parse_ins_id=True,
                                           parse_content=True,
                                           fea_eval=True,
                                           candidate_size=10000)
122
        dataset.set_filelist([dump_a_path, dump_b_path])
123 124 125
        dataset.load_into_memory()
        dataset.local_shuffle()

126 127 128 129 130 131
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
        exe.run(startup_program)
132 133
        for i in range(2):
            try:
134
                exe.train_from_dataset(main_program, dataset)
135 136 137 138 139
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

140
        temp_dir.cleanup()
141

X
xjqbest 已提交
142
    def test_dataset_config(self):
X
xjqbest 已提交
143
        """ Testcase for dataset configuration. """
X
xjqbest 已提交
144 145 146 147 148
        dataset = fluid.core.Dataset("MultiSlotDataset")
        dataset.set_thread_num(12)
        dataset.set_filelist(["a.txt", "b.txt", "c.txt"])
        dataset.set_trainer_num(4)
        dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")
149
        dataset.set_download_cmd("./read_from_afs my_fs_name my_fs_ugi")
150
        dataset.set_enable_pv_merge(False)
X
xjqbest 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167

        thread_num = dataset.get_thread_num()
        self.assertEqual(thread_num, 12)

        filelist = dataset.get_filelist()
        self.assertEqual(len(filelist), 3)
        self.assertEqual(filelist[0], "a.txt")
        self.assertEqual(filelist[1], "b.txt")
        self.assertEqual(filelist[2], "c.txt")

        trainer_num = dataset.get_trainer_num()
        self.assertEqual(trainer_num, 4)

        name, ugi = dataset.get_hdfs_config()
        self.assertEqual(name, "my_fs_name")
        self.assertEqual(ugi, "my_fs_ugi")

168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
        download_cmd = dataset.get_download_cmd()
        self.assertEqual(download_cmd, "./read_from_afs my_fs_name my_fs_ugi")

    def test_set_download_cmd(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
        filename1 = "afs:test_in_memory_dataset_run_a.txt"
        filename2 = "afs:test_in_memory_dataset_run_b.txt"
        with open(filename1, "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open(filename2, "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
192 193 194 195
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
196 197
            slots_vars.append(var)

198
        dataset = paddle.distributed.InMemoryDataset()
199 200 201 202 203
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     download_cmd="cat",
                     use_var=slots_vars)
204 205
        dataset.set_filelist([filename1, filename2])
        dataset.load_into_memory()
206 207 208 209 210
        paddle.enable_static()

        exe = paddle.static.Executor(paddle.CPUPlace())
        startup_program = paddle.static.Program()
        main_program = paddle.static.Program()
211
        exe = fluid.Executor(fluid.CPUPlace())
212
        exe.run(startup_program)
213
        if self.use_data_loader:
214 215
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
216 217
            for i in range(self.epoch_num):
                for data in data_loader():
218
                    exe.run(main_program, feed=data)
219 220 221
        else:
            for i in range(self.epoch_num):
                try:
222
                    exe.train_from_dataset(main_program, dataset)
223 224 225 226 227 228
                except Exception as e:
                    self.assertTrue(False)

        os.remove(filename1)
        os.remove(filename2)

X
xjqbest 已提交
229
    def test_in_memory_dataset_run(self):
X
xjqbest 已提交
230
        """
X
xjqbest 已提交
231
        Testcase for InMemoryDataset from create to run.
X
xjqbest 已提交
232 233
        """
        with open("test_in_memory_dataset_run_a.txt", "w") as f:
X
xjqbest 已提交
234 235 236 237
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
X
xjqbest 已提交
238
        with open("test_in_memory_dataset_run_b.txt", "w") as f:
X
xjqbest 已提交
239 240 241 242 243 244
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

245
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
246 247
        slots_vars = []
        for slot in slots:
248 249 250 251
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
X
xjqbest 已提交
252 253
            slots_vars.append(var)

254
        dataset = paddle.distributed.InMemoryDataset()
255 256 257 258
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
259
        dataset._init_distributed_settings(fea_eval=True, candidate_size=1)
260 261 262 263
        dataset.set_filelist([
            "test_in_memory_dataset_run_a.txt",
            "test_in_memory_dataset_run_b.txt"
        ])
X
xjqbest 已提交
264
        dataset.load_into_memory()
265
        dataset.slots_shuffle(["slot1"])
X
xjqbest 已提交
266
        dataset.local_shuffle()
267 268
        dataset._set_generate_unique_feasigns(True, 15)
        dataset._generate_local_tables_unlock(0, 11, 1, 25, 15)
X
xjqbest 已提交
269 270
        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
271
        if self.use_data_loader:
272 273
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
274 275 276 277 278 279 280 281 282 283
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
284

X
xjqbest 已提交
285 286
        os.remove("./test_in_memory_dataset_run_a.txt")
        os.remove("./test_in_memory_dataset_run_b.txt")
X
xjqbest 已提交
287

288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
    def test_in_memory_dataset_masterpatch(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
        with open("test_in_memory_dataset_masterpatch_a.txt", "w") as f:
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
        with open("test_in_memory_dataset_masterpatch_b.txt", "w") as f:
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
            for slot in slots[:2]:
316 317 318 319
                var = fluid.layers.data(name=slot,
                                        shape=[1],
                                        dtype="int64",
                                        lod_level=1)
320 321
                slots_vars.append(var)
            for slot in slots[2:]:
322 323 324 325
                var = fluid.layers.data(name=slot,
                                        shape=[1],
                                        dtype="float32",
                                        lod_level=1)
326 327
                slots_vars.append(var)

328
        dataset = paddle.distributed.InMemoryDataset()
329 330 331 332
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     use_var=slots_vars)
333
        dataset._init_distributed_settings(parse_ins_id=True)
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
        dataset.set_filelist([
            "test_in_memory_dataset_masterpatch_a.txt",
            "test_in_memory_dataset_masterpatch_b.txt"
        ])
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

352 353
        #dataset._set_merge_by_lineid(2)
        dataset.update_settings(merge_size=2)
354 355 356 357 358
        dataset.dataset.merge_by_lineid()

        os.remove("./test_in_memory_dataset_masterpatch_a.txt")
        os.remove("./test_in_memory_dataset_masterpatch_b.txt")

359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    def test_in_memory_dataset_masterpatch1(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
        with open("test_in_memory_dataset_masterpatch1_a.txt", "w") as f:
            data = "1 id1 1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 id1 1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 id2 1 1 1 1 1 0 1 0\n"
            data += "1 id3 1 0 1 0 1 1 1 1\n"
            data += "1 id3 1 1 1 1 1 0 1 0\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id4 1 0 1 0 1 1 1 1\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            data += "1 id5 1 1 1 1 1 0 1 0\n"
            f.write(data)
        with open("test_in_memory_dataset_masterpatch1_b.txt", "w") as f:
            data = "1 id6 1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 id6 1 1 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 id6 1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 id6 1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots_vars = []
        train_program = fluid.Program()
        startup_program = fluid.Program()
        with fluid.program_guard(train_program, startup_program):
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
            var1 = fluid.layers.data(name="slot1",
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=0)
            var2 = fluid.layers.data(name="slot2",
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=0)
            var3 = fluid.layers.data(name="slot3",
                                     shape=[1],
                                     dtype="float32",
                                     lod_level=0)
            var4 = fluid.layers.data(name="slot4",
                                     shape=[1],
                                     dtype="float32",
                                     lod_level=0)
401 402
            slots_vars = [var1, var2, var3, var4]

403
        dataset = paddle.distributed.InMemoryDataset()
404 405 406 407
        dataset.init(batch_size=32,
                     thread_num=1,
                     pipe_command="cat",
                     use_var=slots_vars)
408
        dataset._init_distributed_settings(parse_ins_id=True)
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
        dataset.set_filelist([
            "test_in_memory_dataset_masterpatch1_a.txt",
            "test_in_memory_dataset_masterpatch1_b.txt"
        ])
        dataset.load_into_memory()
        dataset.local_shuffle()

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(startup_program)

        for i in range(2):
            try:
                exe.train_from_dataset(train_program, dataset)
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

427
        dataset._set_merge_by_lineid(2)
428 429 430 431 432
        dataset.dataset.merge_by_lineid()

        os.remove("./test_in_memory_dataset_masterpatch1_a.txt")
        os.remove("./test_in_memory_dataset_masterpatch1_b.txt")

433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
    def test_in_memory_dataset_run_2(self):
        """
        Testcase for InMemoryDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
        with open("test_in_memory_dataset_run_a.txt", "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open("test_in_memory_dataset_run_b.txt", "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
454 455 456 457
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="float32",
                                    lod_level=1)
458 459
            slots_vars.append(var)

460
        dataset = paddle.distributed.InMemoryDataset()
461 462 463 464
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
465 466 467 468 469 470 471
        dataset.set_filelist([
            "test_in_memory_dataset_run_a.txt",
            "test_in_memory_dataset_run_b.txt"
        ])
        dataset.load_into_memory()
        dataset.local_shuffle()

472 473
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
474
        exe.run(fluid.default_startup_program())
475 476 477 478

        for i in range(2):
            try:
                exe.train_from_dataset(fluid.default_main_program(), dataset)
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=1)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=2)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=2)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=3)
                exe.train_from_dataset(fluid.default_main_program(),
                                       dataset,
                                       thread=4)
494 495 496 497 498
            except ImportError as e:
                pass
            except Exception as e:
                self.assertTrue(False)

Z
Zeng Jinle 已提交
499
        if self.use_data_loader:
500 501
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
502 503 504 505 506 507 508 509 510 511
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
512

513 514 515
        dataset._set_merge_by_lineid(2)
        dataset._set_parse_ins_id(False)
        dataset._set_fleet_send_sleep_seconds(2)
516 517 518 519
        dataset.preload_into_memory()
        dataset.wait_preload_done()
        dataset.preload_into_memory(1)
        dataset.wait_preload_done()
520
        dataset.dataset.merge_by_lineid()
521 522
        dataset._set_merge_by_lineid(30)
        dataset._set_parse_ins_id(False)
523 524
        dataset.load_into_memory()
        dataset.dataset.merge_by_lineid()
525 526 527 528 529 530 531 532 533 534 535 536 537 538
        dataset.update_settings(batch_size=1,
                                thread_num=2,
                                input_type=1,
                                pipe_command="cat",
                                use_var=[],
                                fs_name="",
                                fs_ugi="",
                                download_cmd="cat",
                                merge_size=-1,
                                parse_ins_id=False,
                                parse_content=False,
                                fleet_send_batch_size=2,
                                fleet_send_sleep_seconds=2,
                                fea_eval=True)
539
        fleet_ptr = fluid.core.Fleet()
540
        fleet_ptr.set_client2client_config(1, 1, 1)
541
        fleet_ptr.get_cache_threshold(0)
542

543 544 545
        os.remove("./test_in_memory_dataset_run_a.txt")
        os.remove("./test_in_memory_dataset_run_b.txt")

X
xjqbest 已提交
546
    def test_queue_dataset_run(self):
X
xjqbest 已提交
547
        """
X
xjqbest 已提交
548
        Testcase for QueueDataset from create to run.
X
xjqbest 已提交
549 550
        """
        with open("test_queue_dataset_run_a.txt", "w") as f:
X
xjqbest 已提交
551 552 553 554
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
X
xjqbest 已提交
555
        with open("test_queue_dataset_run_b.txt", "w") as f:
X
xjqbest 已提交
556 557 558 559 560 561
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

562
        slots = ["slot1", "slot2", "slot3", "slot4"]
X
xjqbest 已提交
563 564
        slots_vars = []
        for slot in slots:
565 566 567 568
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="int64",
                                    lod_level=1)
X
xjqbest 已提交
569 570
            slots_vars.append(var)

571
        dataset = paddle.distributed.QueueDataset()
572 573 574 575
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
576 577
        dataset.set_filelist(
            ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
X
xjqbest 已提交
578 579 580

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
581
        if self.use_data_loader:
582 583
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
584 585 586 587 588 589 590 591 592 593
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
X
xjqbest 已提交
594

595
        dataset2 = paddle.distributed.QueueDataset()
596 597 598 599
        dataset2.init(batch_size=32,
                      thread_num=3,
                      pipe_command="cat",
                      use_var=slots_vars)
600 601 602 603 604 605 606 607
        dataset.set_filelist([])
        try:
            exe.train_from_dataset(fluid.default_main_program(), dataset2)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

608 609 610 611
        if os.path.exists("./test_queue_dataset_run_a.txt"):
            os.remove("./test_queue_dataset_run_a.txt")
        if os.path.exists("./test_queue_dataset_run_b.txt"):
            os.remove("./test_queue_dataset_run_b.txt")
X
xjqbest 已提交
612

613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
    def test_queue_dataset_run_2(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
        with open("test_queue_dataset_run_a.txt", "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open("test_queue_dataset_run_b.txt", "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1_f", "slot2_f", "slot3_f", "slot4_f"]
        slots_vars = []
        for slot in slots:
634 635 636 637
            var = fluid.layers.data(name=slot,
                                    shape=[1],
                                    dtype="float32",
                                    lod_level=1)
638 639
            slots_vars.append(var)

640
        dataset = paddle.distributed.QueueDataset()
641 642 643 644
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=slots_vars)
645 646 647
        dataset.set_filelist(
            ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])

648 649
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
650 651
        exe.run(fluid.default_startup_program())
        if self.use_data_loader:
652 653
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
654 655 656 657 658 659 660 661 662 663 664
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)

665 666 667 668
        if os.path.exists("./test_queue_dataset_run_a.txt"):
            os.remove("./test_queue_dataset_run_a.txt")
        if os.path.exists("./test_queue_dataset_run_b.txt"):
            os.remove("./test_queue_dataset_run_b.txt")
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691

    def test_queue_dataset_run_3(self):
        """
        Testcase for QueueDataset from create to run.
        Use CUDAPlace
        Use float type id
        """
        with open("test_queue_dataset_run_a.txt", "w") as f:
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)
        with open("test_queue_dataset_run_b.txt", "w") as f:
            data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
            data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
            data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
            data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
692 693 694 695
            var = fluid.data(name=slot,
                             shape=[None, 1],
                             dtype="int64",
                             lod_level=1)
696 697
            slots_vars.append(var)

698
        dataset = paddle.distributed.InMemoryDataset()
699 700 701 702 703
        dataset.init(batch_size=1,
                     thread_num=2,
                     input_type=1,
                     pipe_command="cat",
                     use_var=slots_vars)
704 705 706 707
        dataset.set_filelist(
            ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
        dataset.load_into_memory()

708 709
        exe = fluid.Executor(fluid.CPUPlace(
        ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
710
        exe.run(fluid.default_startup_program())
Z
Zeng Jinle 已提交
711
        if self.use_data_loader:
712 713
            data_loader = fluid.io.DataLoader.from_dataset(
                dataset, fluid.cpu_places(), self.drop_last)
Z
Zeng Jinle 已提交
714 715 716 717 718 719 720 721 722 723
            for i in range(self.epoch_num):
                for data in data_loader():
                    exe.run(fluid.default_main_program(), feed=data)
        else:
            for i in range(self.epoch_num):
                try:
                    exe.train_from_dataset(fluid.default_main_program(),
                                           dataset)
                except Exception as e:
                    self.assertTrue(False)
724

725 726 727 728
        if os.path.exists("./test_queue_dataset_run_a.txt"):
            os.remove("./test_queue_dataset_run_a.txt")
        if os.path.exists("./test_queue_dataset_run_b.txt"):
            os.remove("./test_queue_dataset_run_b.txt")
729

X
xjqbest 已提交
730

Z
Zeng Jinle 已提交
731
class TestDatasetWithDataLoader(TestDataset):
X
xujiaqi01 已提交
732 733 734 735
    """
    Test Dataset With Data Loader class. TestCases.
    """

Z
Zeng Jinle 已提交
736
    def setUp(self):
X
xujiaqi01 已提交
737 738 739
        """
        Test Dataset With Data Loader, setUp.
        """
Z
Zeng Jinle 已提交
740 741 742 743 744
        self.use_data_loader = True
        self.epoch_num = 10
        self.drop_last = False


745
class TestDatasetWithFetchHandler(unittest.TestCase):
X
xujiaqi01 已提交
746 747 748 749
    """
    Test Dataset With Fetch Handler. TestCases.
    """

750
    def net(self):
X
xujiaqi01 已提交
751 752 753
        """
        Test Dataset With Fetch Handler. TestCases.
        """
754 755 756 757
        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        poolings = []
        for slot in slots:
758 759 760 761
            data = fluid.layers.data(name=slot,
                                     shape=[1],
                                     dtype="int64",
                                     lod_level=1)
762 763 764 765 766 767 768 769 770 771 772
            var = fluid.layers.cast(x=data, dtype='float32')
            pool = fluid.layers.sequence_pool(input=var, pool_type='AVERAGE')

            slots_vars.append(data)
            poolings.append(pool)

        concated = fluid.layers.concat(poolings, axis=1)
        fc = fluid.layers.fc(input=concated, act='tanh', size=32)
        return slots_vars, fc

    def get_dataset(self, inputs, files):
X
xujiaqi01 已提交
773 774 775 776 777 778 779
        """
        Test Dataset With Fetch Handler. TestCases.

        Args:
            inputs(list): inputs of get_dataset
            files(list): files of  get_dataset
        """
780
        dataset = paddle.distributed.QueueDataset()
781 782 783 784
        dataset.init(batch_size=32,
                     thread_num=3,
                     pipe_command="cat",
                     use_var=inputs)
785 786 787 788
        dataset.set_filelist(files)
        return dataset

    def setUp(self):
X
xujiaqi01 已提交
789 790 791
        """
        Test Dataset With Fetch Handler. TestCases.
        """
792 793 794 795 796 797 798 799 800 801 802 803 804
        with open("test_queue_dataset_run_a.txt", "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open("test_queue_dataset_run_b.txt", "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

    def tearDown(self):
X
xujiaqi01 已提交
805 806 807
        """
        Test Dataset With Fetch Handler. TestCases.
        """
808 809 810 811
        os.remove("./test_queue_dataset_run_a.txt")
        os.remove("./test_queue_dataset_run_b.txt")

    def test_dataset_none(self):
X
xujiaqi01 已提交
812 813 814
        """
        Test Dataset With Fetch Handler. TestCases.
        """
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
        slots_vars, out = self.net()
        files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        # test dataset->None
        try:
            exe.train_from_dataset(fluid.default_main_program(), None)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
            self.assertEqual(error_msg, cpt.get_exception_message(e))
        except Exception as e:
            self.assertTrue(False)

    def test_infer_from_dataset(self):
X
xujiaqi01 已提交
834 835 836
        """
        Test Dataset With Fetch Handler. TestCases.
        """
837 838 839 840 841 842 843 844 845 846 847 848 849 850
        slots_vars, out = self.net()
        files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        try:
            exe.infer_from_dataset(fluid.default_main_program(), dataset)
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except Exception as e:
            self.assertTrue(False)

851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
    def test_fetch_handler(self):
        """
        Test Dataset With Fetch Handler. TestCases.
        """
        slots_vars, out = self.net()
        files = ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]
        dataset = self.get_dataset(slots_vars, files)

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())

        fh = fluid.executor.FetchHandler(out.name)
        fh.help()

        try:
866 867 868
            exe.train_from_dataset(program=fluid.default_main_program(),
                                   dataset=dataset,
                                   fetch_handler=fh)
869 870 871 872 873 874 875 876
        except ImportError as e:
            print("warning: we skip trainer_desc_pb2 import problem in windows")
        except RuntimeError as e:
            error_msg = "dataset is need and should be initialized"
            self.assertEqual(error_msg, cpt.get_exception_message(e))
        except Exception as e:
            self.assertTrue(False)

877

X
xujiaqi01 已提交
878 879 880 881 882 883 884 885 886 887 888 889 890
class TestDataset2(unittest.TestCase):
    """  TestCases for Dataset. """

    def setUp(self):
        """  TestCases for Dataset. """
        self.use_data_loader = False
        self.epoch_num = 10
        self.drop_last = False

    def test_dataset_fleet(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
891 892 893

        self.skipTest("parameter server will add pslib UT later")

X
xujiaqi01 已提交
894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
        with open("test_in_memory_dataset2_run_a.txt", "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open("test_in_memory_dataset2_run_b.txt", "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
909
        from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
X
xujiaqi01 已提交
910 911 912 913 914 915 916 917 918 919 920 921 922 923
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
            fake_cost = fluid.layers.mean(fake_cost)
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
924
                fleet.init()
X
xujiaqi01 已提交
925 926 927 928 929 930 931 932 933 934 935
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
                adam = fleet.distributed_optimizer(adam)
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
936 937
            dataset = paddle.distributed.InMemoryDataset()

938 939 940 941
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
X
xujiaqi01 已提交
942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
            dataset.set_filelist([
                "test_in_memory_dataset2_run_a.txt",
                "test_in_memory_dataset2_run_b.txt"
            ])
            dataset.load_into_memory()
            fleet._opt_info = None
            fleet._fleet_ptr = None

        os.remove("./test_in_memory_dataset2_run_a.txt")
        os.remove("./test_in_memory_dataset2_run_b.txt")

    def test_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
        with open("test_in_memory_dataset2_run2_a.txt", "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open("test_in_memory_dataset2_run2_b.txt", "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
972
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
X
xujiaqi01 已提交
973 974 975 976 977 978 979 980 981 982 983 984 985 986
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
            fake_cost = fluid.layers.mean(fake_cost)
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
X
xujiaqi01 已提交
987
                fleet.init()
X
xujiaqi01 已提交
988 989 990 991
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
992 993 994 995 996 997 998 999 1000 1001 1002
                adam = fleet.distributed_optimizer(adam,
                                                   strategy={
                                                       "fs_uri":
                                                       "fs_uri_xxx",
                                                       "fs_user":
                                                       "fs_user_xxx",
                                                       "fs_passwd":
                                                       "fs_passwd_xxx",
                                                       "fs_hadoop_bin":
                                                       "fs_hadoop_bin_xxx"
                                                   })
X
xujiaqi01 已提交
1003 1004 1005 1006 1007 1008
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
1009
            dataset = paddle.distributed.InMemoryDataset()
1010 1011 1012 1013
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
X
xujiaqi01 已提交
1014 1015 1016 1017 1018
            dataset.set_filelist([
                "test_in_memory_dataset2_run2_a.txt",
                "test_in_memory_dataset2_run2_b.txt"
            ])
            dataset.load_into_memory()
X
xujiaqi01 已提交
1019 1020 1021 1022
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1023 1024
            fleet._opt_info = None
            fleet._fleet_ptr = None
1025 1026
            dataset = paddle.distributed.InMemoryDataset()
            dataset.init(fs_name="", fs_ugi="")
1027
            d = paddle.distributed.fleet.DatasetBase()
1028
            try:
1029
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
1049
            dataset._set_fleet_send_batch_size(1024)
1050 1051 1052 1053
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1054
            #dataset.get_pv_data_size()
1055 1056
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()
1057
            dataset = paddle.distributed.QueueDataset()
1058 1059 1060 1061 1062 1063 1064 1065
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
1066
            dataset = paddle.distributed.fleet.FileInstantDataset()
1067 1068 1069 1070 1071 1072 1073 1074
            try:
                dataset.local_shuffle()
            except:
                print("warning: catch expected error")
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
X
xujiaqi01 已提交
1075 1076 1077 1078

        os.remove("./test_in_memory_dataset2_run2_a.txt")
        os.remove("./test_in_memory_dataset2_run2_b.txt")

1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117
    def test_bosps_dataset_fleet2(self):
        """
        Testcase for InMemoryDataset from create to run.
        """
        with open("test_in_memory_dataset2_run2_a.txt", "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open("test_in_memory_dataset2_run2_b.txt", "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        train_program = fluid.Program()
        startup_program = fluid.Program()
        scope = fluid.Scope()
        from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
        with fluid.program_guard(train_program, startup_program):
            slots = ["slot1_ff", "slot2_ff", "slot3_ff", "slot4_ff"]
            slots_vars = []
            for slot in slots:
                var = fluid.layers.data(\
                    name=slot, shape=[1], dtype="float32", lod_level=1)
                slots_vars.append(var)
            fake_cost = \
                fluid.layers.elementwise_sub(slots_vars[0], slots_vars[-1])
            fake_cost = fluid.layers.mean(fake_cost)
        with fluid.scope_guard(scope):
            place = fluid.CPUPlace()
            exe = fluid.Executor(place)
            try:
                fleet.init()
            except ImportError as e:
                print("warning: no mpi4py")
            adam = fluid.optimizer.Adam(learning_rate=0.000005)
            try:
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128
                adam = fleet.distributed_optimizer(adam,
                                                   strategy={
                                                       "fs_uri":
                                                       "fs_uri_xxx",
                                                       "fs_user":
                                                       "fs_user_xxx",
                                                       "fs_passwd":
                                                       "fs_passwd_xxx",
                                                       "fs_hadoop_bin":
                                                       "fs_hadoop_bin_xxx"
                                                   })
1129 1130 1131 1132 1133 1134 1135
                adam.minimize([fake_cost], [scope])
            except AttributeError as e:
                print("warning: no mpi")
            except ImportError as e:
                print("warning: no mpi4py")
            exe.run(startup_program)
            dataset = paddle.distributed.fleet.BoxPSDataset()
1136 1137 1138 1139
            dataset.init(batch_size=32,
                         thread_num=3,
                         pipe_command="cat",
                         use_var=slots_vars)
1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
            dataset.set_filelist([
                "test_in_memory_dataset2_run2_a.txt",
                "test_in_memory_dataset2_run2_b.txt"
            ])
            dataset.load_into_memory()
            try:
                dataset.global_shuffle(fleet)
            except:
                print("warning: catch expected error")
            fleet._opt_info = None
            fleet._fleet_ptr = None
            dataset = paddle.distributed.fleet.BoxPSDataset()
1152 1153 1154 1155 1156 1157 1158 1159
            dataset.init(rank_offset="",
                         pv_batch_size=1,
                         fs_name="",
                         fs_ugi="",
                         data_feed_type="MultiSlotInMemoryDataFeed",
                         parse_logkey=True,
                         merge_by_sid=True,
                         enable_pv_merge=True)
1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193
            d = paddle.distributed.fleet.DatasetBase()
            try:
                dataset._set_feed_type("MultiSlotInMemoryDataFeed")
            except:
                print("warning: catch expected error")
            dataset.thread_num = 0
            try:
                dataset._prepare_to_run()
            except:
                print("warning: catch expected error")
            dataset._set_parse_logkey(True)
            dataset._set_merge_by_sid(True)
            dataset._set_enable_pv_merge(True)
            try:
                dataset.preprocess_instance()
            except:
                print("warning: catch expected error")
            try:
                dataset.set_current_phase(1)
            except:
                print("warning: catch expected error")
            try:
                dataset.postprocess_instance()
            except:
                print("warning: catch expected error")
            dataset._set_fleet_send_batch_size(1024)
            try:
                dataset.global_shuffle()
            except:
                print("warning: catch expected error")
            #dataset.get_pv_data_size()
            dataset.get_memory_data_size()
            dataset.get_shuffle_data_size()

X
xujiaqi01 已提交
1194

X
xjqbest 已提交
1195
if __name__ == '__main__':
X
xjqbest 已提交
1196
    unittest.main()