parallel_executor_test_base.py 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15 16
from __future__ import print_function

C
chengduoZH 已提交
17 18
import multiprocessing
import os
19 20
import unittest
import paddle.fluid as fluid
S
sneaxiy 已提交
21
import paddle.fluid.core as core
22 23
import time
import numpy as np
24 25
import math
import sys
26 27 28 29 30 31 32

__all__ = ['TestParallelExecutorBase']


class TestParallelExecutorBase(unittest.TestCase):
    def check_network_convergence(self,
                                  method,
33
                                  use_cuda=True,
34 35 36 37 38 39 40
                                  memory_opt=True,
                                  iter=50,
                                  batch_size=None,
                                  allow_op_delay=False,
                                  feed_dict=None,
                                  seed=None,
                                  use_parallel_executor=True,
C
chengduo 已提交
41
                                  use_reduce=False,
C
chengduo 已提交
42
                                  fuse_elewise_add_act_ops=False,
Y
yuyang18 已提交
43
                                  optimizer=fluid.optimizer.Adam,
S
sneaxiy 已提交
44 45
                                  use_fast_executor=False,
                                  enable_sequential_execution=False):
46 47 48 49 50 51 52 53 54 55 56 57 58 59
        def run_executor(exe, feed, fetch_list, program=None):
            if isinstance(exe, fluid.ParallelExecutor):
                res = exe.run(fetch_list=fetch_list, feed=feed)
            elif isinstance(exe, fluid.Executor):
                if program is None:
                    program = fluid.default_main_program()
                res = exe.run(program=program, feed=feed, fetch_list=fetch_list)
            else:
                raise ValueError('Unkown type exe')
            return res

        main = fluid.Program()
        startup = fluid.Program()
        startup.random_seed = 1  # Fix random seed
60
        main.random_seed = 1
61 62 63
        with fluid.program_guard(main, startup):
            if seed is not None:
                startup.random_seed = seed
64 65
                main.random_seed = seed

66
            loss = method(use_feed=feed_dict is not None)
C
chengduo 已提交
67 68

            optimizer().minimize(loss)
69

70 71
            if memory_opt:
                fluid.memory_optimize(main)
72

73
            place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
74 75 76 77
            startup_exe = fluid.Executor(place)
            startup_exe.run(startup)
            exec_strategy = fluid.ExecutionStrategy()
            exec_strategy.allow_op_delay = allow_op_delay
Y
yuyang18 已提交
78
            if use_fast_executor:
Y
yuyang18 已提交
79
                exec_strategy.use_experimental_executor = True
80 81

            build_strategy = fluid.BuildStrategy()
82 83
            build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
                if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce
C
chengduo 已提交
84
            build_strategy.fuse_elewise_add_act_ops = fuse_elewise_add_act_ops
S
sneaxiy 已提交
85
            build_strategy.enable_sequential_execution = enable_sequential_execution
S
sneaxiy 已提交
86 87
            if use_cuda and core.is_compiled_with_cuda():
                build_strategy.remove_unnecessary_lock = True
88 89 90

            if use_parallel_executor:
                exe = fluid.ParallelExecutor(
91
                    use_cuda,
92 93 94 95 96 97 98
                    loss_name=loss.name,
                    exec_strategy=exec_strategy,
                    build_strategy=build_strategy)
            else:
                exe = fluid.Executor(place=place)

            if batch_size is not None:
C
chengduoZH 已提交
99 100 101
                batch_size *= fluid.core.get_cuda_device_count(
                ) if use_cuda else int(
                    os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
102 103 104 105
            begin = time.time()
            first_loss, = run_executor(
                exe=exe, feed=feed_dict, fetch_list=[loss.name])

106
            for i in range(iter):
107 108 109 110 111 112 113
                run_executor(exe=exe, feed=feed_dict, fetch_list=[])

            last_loss, = run_executor(
                exe=exe, feed=feed_dict, fetch_list=[loss.name])
            end = time.time()

            if batch_size is not None:
114 115
                print("%.4f Instance per second" % (
                    (batch_size * iter + 2) / (end - begin)))
116

117 118 119 120 121 122
            avg_last_loss_val = np.array(last_loss).mean()
            avg_first_loss_val = np.array(first_loss).mean()
            if math.isnan(float(avg_last_loss_val)) or math.isnan(
                    float(avg_first_loss_val)):
                sys.exit("got NaN loss, training failed.")

123
            print(first_loss, last_loss)
124 125
            # self.assertGreater(first_loss[0], last_loss[0])
            return first_loss, last_loss