diff --git a/paddle/fluid/platform/cpu_helper.cc b/paddle/fluid/platform/cpu_helper.cc index 234a04b5c2eb5ee643e8a4e723b28331cd8e6ee0..4e52e8ff00ca505bff4eb77396b4989470ee3f57 100644 --- a/paddle/fluid/platform/cpu_helper.cc +++ b/paddle/fluid/platform/cpu_helper.cc @@ -29,6 +29,12 @@ namespace platform { void SetNumThreads(int num_threads) { #ifdef PADDLE_USE_OPENBLAS + // windows has no support for openblas multi-thread +#ifdef _WIN32 + if (num_threads > 1) { + num_threads = 1; + } +#endif int real_num_threads = num_threads > 1 ? num_threads : 1; openblas_set_num_threads(real_num_threads); #elif defined(PADDLE_WITH_MKLML) diff --git a/paddle/fluid/platform/init.cc b/paddle/fluid/platform/init.cc index 84d1b852cbe5a334ddfc27d404e879b178e341fe..69bbe8794d33635fcb0521f5554d83c0930499ca 100644 --- a/paddle/fluid/platform/init.cc +++ b/paddle/fluid/platform/init.cc @@ -113,13 +113,6 @@ void InitDevices(bool init_p2p, const std::vector devices) { places.emplace_back(platform::CPUPlace()); platform::DeviceContextPool::Init(places); -// windows has no support for openblas multi-thread -#ifdef _WIN32 - if (FLAGS_paddle_num_threads > 1) { - FLAGS_paddle_num_threads = 1; - } -#endif - #ifndef PADDLE_WITH_MKLDNN platform::SetNumThreads(FLAGS_paddle_num_threads); #endif diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 81299189160739e54f39348ad327ff2edd2ac0e0..dbe49c98bd150ba1a882977da5fa96f93451c33a 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -47,7 +47,8 @@ from . import profiler from . import unique_name from . import recordio_writer from . import parallel_executor -from .parallel_executor import * +if os.name != 'nt': + from .parallel_executor import * from paddle.fluid.layers.math_op_patch import monkey_patch_variable Tensor = LoDTensor diff --git a/python/paddle/fluid/contrib/inferencer.py b/python/paddle/fluid/contrib/inferencer.py index b966ae01d039d7e9510dae73ecadb97b494f68c2..b8d5f4ffeadca0a7b103682f175d50dc46fa258a 100644 --- a/python/paddle/fluid/contrib/inferencer.py +++ b/python/paddle/fluid/contrib/inferencer.py @@ -15,15 +15,13 @@ from __future__ import print_function import contextlib -import os from .. import core from .. import executor from .. import framework from .. import io -if os.name != 'nt': - from .. import parallel_executor +from .. import parallel_executor from .. import unique_name from .trainer import check_and_get_place diff --git a/python/paddle/fluid/contrib/trainer.py b/python/paddle/fluid/contrib/trainer.py index 096821a5ba690074ecbf023cf87fed7e206d023f..8569e486f91786b5562e84dcdccf6d91da0612cc 100644 --- a/python/paddle/fluid/contrib/trainer.py +++ b/python/paddle/fluid/contrib/trainer.py @@ -28,8 +28,7 @@ from .. import framework from .. import io # optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module from .. import optimizer as opt_module -if os.name != 'nt': - from .. import parallel_executor +from .. import parallel_executor from ..transpiler import distribute_transpiler __all__ = [ diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 3f4dd5eb712e738bbee8f93c062375033b8ab2f6..33f6df67a42100c034a9c7eb7e34f80c82554f4a 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -25,263 +25,264 @@ import os __all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] -ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy -BuildStrategy = core.ParallelExecutor.BuildStrategy - - -class ParallelExecutor(object): - """ - ParallelExecutor is designed for data parallelism, which focuses on distributing - the data across different nodes and every node operates on the data in parallel. - If you use ParallelExecutor to run the current program on GPU, the node means GPU - device, and ParallelExecutor will get the available GPU device automatically on - the current machine. If you use ParallelExecutor to run the current program on CPU, - the node means the CPU device, and you can specify the CPU device number by adding - 'CPU_NUM' environment variable, for example 'CPU_NUM=4', if the environment variable - is not found, ParallelExecutor will call `multiprocessing.cpu_count` to get the number - of CPUs in the system. - - Args: - use_cuda (bool): Whether to use CUDA or not. - loss_name (str): The loss name must set in training. Default None. - main_program (Program): The program that need to run, if not provided, - then default_main_program will be used. Default None. - share_vars_from(ParallelExecutor): If provide, it will share variables - from the specified ParallelExecutor. Default None. - exec_strategy(ExecutionStrategy): exec_strategy is used to control how to run - the program in ParallelExecutor, for example how many threads are used to - execute the program, how many iterations to clean up the temp variables - which is generated during execution. For more information, please refer - to fluid.ExecutionStrategy. Default None. - build_strategy(BuildStrategy): build_strategy is used to control how to - build the SSA Graph in ParallelExecutor by setting the property, - for example reduce_strategy, gradient_scale_strategy. For more information, - please refer to fluid.BuildStrategy. Default None. - num_trainers(int): If greater than 1, NCCL will be initialized with - multiple rank of nodes, each node should have same number of GPUs. - Distributed training will be enabled then. Default 1. - trainer_id(int): Must use together with num_trainers. trainer_id is the - "rank" of current node starts from 0. Default 0. - scope(Scope): scope to run with, default use fluid.global_scope(). - - Returns: - ParallelExecutor: The initialized ParallelExecutor object. - - Raises: - TypeError: If share_vars_from is provided, but not ParallelExecutor object. - - Examples: - .. code-block:: python - - train_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) - test_exe = fluid.ParallelExecutor(use_cuda=True, - main_program=test_program, - share_vars_from=train_exe) - - train_loss, = train_exe.run([loss.name], feed=feed_dict) - test_loss, = test_exe.run([loss.name], feed=feed_dict) - """ - - def __init__(self, - use_cuda, - loss_name=None, - main_program=None, - share_vars_from=None, - exec_strategy=None, - build_strategy=None, - num_trainers=1, - trainer_id=0, - scope=None): - self._places = [] - self._act_places = [] - if use_cuda: - for i in six.moves.range(core.get_cuda_device_count()): - p = core.Place() - self._act_places.append(core.CUDAPlace(i)) - p.set_place(self._act_places[-1]) - self._places.append(p) - else: - cpu_num = int( - os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - for i in six.moves.range(cpu_num): - p = core.Place() - self._act_places.append(core.CPUPlace()) - p.set_place(self._act_places[-1]) - self._places.append(p) - assert self._places, "no place for execution" - - if exec_strategy is None: - exec_strategy = ExecutionStrategy() - exec_strategy.use_cuda = use_cuda - - if exec_strategy.num_threads == 0: - if use_cuda: - # Experiments on se-resnext shows that too many threads hurt - # performance. Worth tunning for other models in the future. - exec_strategy.num_threads = len(self._places) * 4 - else: - cpu_num = int( - os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - exec_strategy.num_threads = cpu_num * 2 - - # Set 1 thread num under nccl2 distribute - # env to make sure all gpus run ops in same order. - if num_trainers > 1: - assert (use_cuda) - # FIXME(gongwb): avoid this set. - exec_strategy.num_threads = 1 - - if build_strategy is None: - build_strategy = BuildStrategy() - - main = main_program - main = main if main else framework.default_main_program() - if scope == None: - scope = executor.global_scope() - - if share_vars_from and not isinstance(share_vars_from, - ParallelExecutor): - raise TypeError("share_vars_from must be ParallelExecutor.") - - local_scopes = share_vars_from.executor.local_scopes( - ) if share_vars_from else [] - - self.persistable_vars = [ - v.name for v in [ - var for var in main.list_vars() - if var.persistable and var.type != core.VarDesc.VarType.RAW - ] - ] - - self.executor = core.ParallelExecutor( - self._places, - set([ - cpt.to_text(p.name) - for p in main.global_block().iter_parameters() - if not p.stop_gradient - ]), - set(cpt.to_text(var) for var in self.persistable_vars), main.desc, - cpt.to_text(loss_name) - if loss_name else six.u(''), scope, local_scopes, exec_strategy, - build_strategy, num_trainers, trainer_id) - self.scope = scope - - def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True): - """ - Run a parallel executor with fetch_list. - - The feed parameter can be a dict or a list. If feed is a dict, the - feed data will be split into multiple devices. If feed is a list, we - assume the data has been splitted into multiple devices, the each - element in the list will be copied to each device directly. - - For example, if the feed is a dict: - - >>> exe = ParallelExecutor() - >>> # the image will be splitted into devices. If there is two devices - >>> # each device will process an image with shape (24, 1, 28, 28) - >>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))}) +if os.name != 'nt': + ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy + BuildStrategy = core.ParallelExecutor.BuildStrategy - For example, if the feed is a list: - >>> exe = ParallelExecutor() - >>> # each device will process each element in the list. - >>> # the 1st device will process an image with shape (48, 1, 28, 28) - >>> # the 2nd device will process an image with shape (32, 1, 28, 28) - >>> # - >>> # you can use exe.device_count to get the device number. - >>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))}, - >>> {"image": numpy.random.random(size=(32, 1, 28, 28))}, - >>> ]) + class ParallelExecutor(object): + """ + ParallelExecutor is designed for data parallelism, which focuses on distributing + the data across different nodes and every node operates on the data in parallel. + If you use ParallelExecutor to run the current program on GPU, the node means GPU + device, and ParallelExecutor will get the available GPU device automatically on + the current machine. If you use ParallelExecutor to run the current program on CPU, + the node means the CPU device, and you can specify the CPU device number by adding + 'CPU_NUM' environment variable, for example 'CPU_NUM=4', if the environment variable + is not found, ParallelExecutor will call `multiprocessing.cpu_count` to get the number + of CPUs in the system. Args: - fetch_list(list): The fetched variable names - feed(list|dict|None): The feed variables. If the feed is a dict, - tensors in that dict will be splitted into each devices. If - the feed is a list, each element of the list will be copied - to each device. Default None. - feed_dict: Alias for feed parameter, for backward compatibility. - This parameter has been deprecated. Default None. - return_numpy(bool): Whether converts the fetched tensor to numpy. - Default: True. + use_cuda (bool): Whether to use CUDA or not. + loss_name (str): The loss name must set in training. Default None. + main_program (Program): The program that need to run, if not provided, + then default_main_program will be used. Default None. + share_vars_from(ParallelExecutor): If provide, it will share variables + from the specified ParallelExecutor. Default None. + exec_strategy(ExecutionStrategy): exec_strategy is used to control how to run + the program in ParallelExecutor, for example how many threads are used to + execute the program, how many iterations to clean up the temp variables + which is generated during execution. For more information, please refer + to fluid.ExecutionStrategy. Default None. + build_strategy(BuildStrategy): build_strategy is used to control how to + build the SSA Graph in ParallelExecutor by setting the property, + for example reduce_strategy, gradient_scale_strategy. For more information, + please refer to fluid.BuildStrategy. Default None. + num_trainers(int): If greater than 1, NCCL will be initialized with + multiple rank of nodes, each node should have same number of GPUs. + Distributed training will be enabled then. Default 1. + trainer_id(int): Must use together with num_trainers. trainer_id is the + "rank" of current node starts from 0. Default 0. + scope(Scope): scope to run with, default use fluid.global_scope(). Returns: - List: The fetched result list. + ParallelExecutor: The initialized ParallelExecutor object. Raises: - ValueError: If the feed is a list, but its length is not equal the - length of active places, or its element's is not dict. - - NOTES: - 1. If the feed's type is dict, the number of data that feeds to - ParallelExecutor must be bigger than active places. Otherwise, - it will throw exception from C++ side. Special attention should be - paid to check whether the last batch of the dataset is bigger - than active places. - 2. If active places are more than one, the fetch results for each - variable is a list, and each element of this list is the variable of - respective active place. + TypeError: If share_vars_from is provided, but not ParallelExecutor object. Examples: .. code-block:: python - pe = fluid.ParallelExecutor(use_cuda=use_cuda, - loss_name=avg_cost.name, - main_program=fluid.default_main_program()) - loss = pe.run(feed=feeder.feed(cur_batch), - fetch_list=[avg_cost.name])) + train_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) + test_exe = fluid.ParallelExecutor(use_cuda=True, + main_program=test_program, + share_vars_from=train_exe) + + train_loss, = train_exe.run([loss.name], feed=feed_dict) + test_loss, = test_exe.run([loss.name], feed=feed_dict) """ - if feed is None and feed_dict is not None: - feed = feed_dict - print( - "`feed_dict` is deprecated. Please use `feed=`", - file=sys.stderr) - - if isinstance(feed, dict): - feed_tensor_dict = dict() - for feed_name in feed: - feed_tensor = feed[feed_name] - if not isinstance(feed_tensor, core.LoDTensor): - feed_tensor = core.LoDTensor() - # always set to CPU place, since the tensor need to be splitted - # it is fast in CPU - feed_tensor.set(feed[feed_name], core.CPUPlace()) - feed_tensor_dict[feed_name] = feed_tensor - - self.executor.feed_and_split_tensor_into_local_scopes( - feed_tensor_dict) - elif isinstance(feed, list) or isinstance(feed, tuple): - if len(feed) != len(self._act_places): - raise ValueError( - "Feed a list of tensor, the list should be the same size as places" - ) - - res = list() - - for i, each in enumerate(feed): - if not isinstance(each, dict): - raise TypeError( - "Each element of feed list should be a dict") - res_dict = dict() - for feed_name in each: - tensor = each[feed_name] - if not isinstance(tensor, core.LoDTensor): - tmp = core.LoDTensor() - tmp.set(tensor, self._act_places[i]) - tensor = tmp - res_dict[feed_name] = tensor - res.append(res_dict) - self.executor.feed_tensors_into_local_scopes(res) - - fetch_var_name = '@FETCHED_VAR_NAME@' - self.executor.run(fetch_list, fetch_var_name) - arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() - - if return_numpy: - return executor.as_numpy(arr) - - return [arr[i] for i in range(len(arr))] - - @property - def device_count(self): - return len(self._act_places) + + def __init__(self, + use_cuda, + loss_name=None, + main_program=None, + share_vars_from=None, + exec_strategy=None, + build_strategy=None, + num_trainers=1, + trainer_id=0, + scope=None): + self._places = [] + self._act_places = [] + if use_cuda: + for i in six.moves.range(core.get_cuda_device_count()): + p = core.Place() + self._act_places.append(core.CUDAPlace(i)) + p.set_place(self._act_places[-1]) + self._places.append(p) + else: + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + for i in six.moves.range(cpu_num): + p = core.Place() + self._act_places.append(core.CPUPlace()) + p.set_place(self._act_places[-1]) + self._places.append(p) + assert self._places, "no place for execution" + + if exec_strategy is None: + exec_strategy = ExecutionStrategy() + exec_strategy.use_cuda = use_cuda + + if exec_strategy.num_threads == 0: + if use_cuda: + # Experiments on se-resnext shows that too many threads hurt + # performance. Worth tunning for other models in the future. + exec_strategy.num_threads = len(self._places) * 4 + else: + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + exec_strategy.num_threads = cpu_num * 2 + + # Set 1 thread num under nccl2 distribute + # env to make sure all gpus run ops in same order. + if num_trainers > 1: + assert (use_cuda) + # FIXME(gongwb): avoid this set. + exec_strategy.num_threads = 1 + + if build_strategy is None: + build_strategy = BuildStrategy() + + main = main_program + main = main if main else framework.default_main_program() + if scope == None: + scope = executor.global_scope() + + if share_vars_from and not isinstance(share_vars_from, + ParallelExecutor): + raise TypeError("share_vars_from must be ParallelExecutor.") + + local_scopes = share_vars_from.executor.local_scopes( + ) if share_vars_from else [] + + self.persistable_vars = [ + v.name for v in [ + var for var in main.list_vars() + if var.persistable and var.type != core.VarDesc.VarType.RAW + ] + ] + + self.executor = core.ParallelExecutor( + self._places, + set([ + cpt.to_text(p.name) + for p in main.global_block().iter_parameters() + if not p.stop_gradient + ]), + set(cpt.to_text(var) for var in self.persistable_vars), main.desc, + cpt.to_text(loss_name) + if loss_name else six.u(''), scope, local_scopes, exec_strategy, + build_strategy, num_trainers, trainer_id) + self.scope = scope + + def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True): + """ + Run a parallel executor with fetch_list. + + The feed parameter can be a dict or a list. If feed is a dict, the + feed data will be split into multiple devices. If feed is a list, we + assume the data has been splitted into multiple devices, the each + element in the list will be copied to each device directly. + + For example, if the feed is a dict: + + >>> exe = ParallelExecutor() + >>> # the image will be splitted into devices. If there is two devices + >>> # each device will process an image with shape (24, 1, 28, 28) + >>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))}) + + For example, if the feed is a list: + + >>> exe = ParallelExecutor() + >>> # each device will process each element in the list. + >>> # the 1st device will process an image with shape (48, 1, 28, 28) + >>> # the 2nd device will process an image with shape (32, 1, 28, 28) + >>> # + >>> # you can use exe.device_count to get the device number. + >>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))}, + >>> {"image": numpy.random.random(size=(32, 1, 28, 28))}, + >>> ]) + + Args: + fetch_list(list): The fetched variable names + feed(list|dict|None): The feed variables. If the feed is a dict, + tensors in that dict will be splitted into each devices. If + the feed is a list, each element of the list will be copied + to each device. Default None. + feed_dict: Alias for feed parameter, for backward compatibility. + This parameter has been deprecated. Default None. + return_numpy(bool): Whether converts the fetched tensor to numpy. + Default: True. + + Returns: + List: The fetched result list. + + Raises: + ValueError: If the feed is a list, but its length is not equal the + length of active places, or its element's is not dict. + + NOTES: + 1. If the feed's type is dict, the number of data that feeds to + ParallelExecutor must be bigger than active places. Otherwise, + it will throw exception from C++ side. Special attention should be + paid to check whether the last batch of the dataset is bigger + than active places. + 2. If active places are more than one, the fetch results for each + variable is a list, and each element of this list is the variable of + respective active place. + + Examples: + .. code-block:: python + + pe = fluid.ParallelExecutor(use_cuda=use_cuda, + loss_name=avg_cost.name, + main_program=fluid.default_main_program()) + loss = pe.run(feed=feeder.feed(cur_batch), + fetch_list=[avg_cost.name])) + """ + if feed is None and feed_dict is not None: + feed = feed_dict + print( + "`feed_dict` is deprecated. Please use `feed=`", + file=sys.stderr) + + if isinstance(feed, dict): + feed_tensor_dict = dict() + for feed_name in feed: + feed_tensor = feed[feed_name] + if not isinstance(feed_tensor, core.LoDTensor): + feed_tensor = core.LoDTensor() + # always set to CPU place, since the tensor need to be splitted + # it is fast in CPU + feed_tensor.set(feed[feed_name], core.CPUPlace()) + feed_tensor_dict[feed_name] = feed_tensor + + self.executor.feed_and_split_tensor_into_local_scopes( + feed_tensor_dict) + elif isinstance(feed, list) or isinstance(feed, tuple): + if len(feed) != len(self._act_places): + raise ValueError( + "Feed a list of tensor, the list should be the same size as places" + ) + + res = list() + + for i, each in enumerate(feed): + if not isinstance(each, dict): + raise TypeError( + "Each element of feed list should be a dict") + res_dict = dict() + for feed_name in each: + tensor = each[feed_name] + if not isinstance(tensor, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(tensor, self._act_places[i]) + tensor = tmp + res_dict[feed_name] = tensor + res.append(res_dict) + self.executor.feed_tensors_into_local_scopes(res) + + fetch_var_name = '@FETCHED_VAR_NAME@' + self.executor.run(fetch_list, fetch_var_name) + arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() + + if return_numpy: + return executor.as_numpy(arr) + + return [arr[i] for i in range(len(arr))] + + @property + def device_count(self): + return len(self._act_places)