提交 162f2d41 编写于 作者: P peizhilin

disable the openblas multi-thread on windows since no support

adjust the python script
上级 d1429ac4
...@@ -29,6 +29,12 @@ namespace platform { ...@@ -29,6 +29,12 @@ namespace platform {
void SetNumThreads(int num_threads) { void SetNumThreads(int num_threads) {
#ifdef PADDLE_USE_OPENBLAS #ifdef PADDLE_USE_OPENBLAS
// windows has no support for openblas multi-thread
#ifdef _WIN32
if (num_threads > 1) {
num_threads = 1;
}
#endif
int real_num_threads = num_threads > 1 ? num_threads : 1; int real_num_threads = num_threads > 1 ? num_threads : 1;
openblas_set_num_threads(real_num_threads); openblas_set_num_threads(real_num_threads);
#elif defined(PADDLE_WITH_MKLML) #elif defined(PADDLE_WITH_MKLML)
......
...@@ -113,13 +113,6 @@ void InitDevices(bool init_p2p, const std::vector<int> devices) { ...@@ -113,13 +113,6 @@ void InitDevices(bool init_p2p, const std::vector<int> devices) {
places.emplace_back(platform::CPUPlace()); places.emplace_back(platform::CPUPlace());
platform::DeviceContextPool::Init(places); platform::DeviceContextPool::Init(places);
// windows has no support for openblas multi-thread
#ifdef _WIN32
if (FLAGS_paddle_num_threads > 1) {
FLAGS_paddle_num_threads = 1;
}
#endif
#ifndef PADDLE_WITH_MKLDNN #ifndef PADDLE_WITH_MKLDNN
platform::SetNumThreads(FLAGS_paddle_num_threads); platform::SetNumThreads(FLAGS_paddle_num_threads);
#endif #endif
......
...@@ -47,7 +47,8 @@ from . import profiler ...@@ -47,7 +47,8 @@ from . import profiler
from . import unique_name from . import unique_name
from . import recordio_writer from . import recordio_writer
from . import parallel_executor from . import parallel_executor
from .parallel_executor import * if os.name != 'nt':
from .parallel_executor import *
from paddle.fluid.layers.math_op_patch import monkey_patch_variable from paddle.fluid.layers.math_op_patch import monkey_patch_variable
Tensor = LoDTensor Tensor = LoDTensor
......
...@@ -15,15 +15,13 @@ ...@@ -15,15 +15,13 @@
from __future__ import print_function from __future__ import print_function
import contextlib import contextlib
import os
from .. import core from .. import core
from .. import executor from .. import executor
from .. import framework from .. import framework
from .. import io from .. import io
if os.name != 'nt': from .. import parallel_executor
from .. import parallel_executor
from .. import unique_name from .. import unique_name
from .trainer import check_and_get_place from .trainer import check_and_get_place
......
...@@ -28,8 +28,7 @@ from .. import framework ...@@ -28,8 +28,7 @@ from .. import framework
from .. import io from .. import io
# optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module # optimizer is same as the parameter of Trainer.__init__. Rename it to opt_module
from .. import optimizer as opt_module from .. import optimizer as opt_module
if os.name != 'nt': from .. import parallel_executor
from .. import parallel_executor
from ..transpiler import distribute_transpiler from ..transpiler import distribute_transpiler
__all__ = [ __all__ = [
......
...@@ -25,263 +25,264 @@ import os ...@@ -25,263 +25,264 @@ import os
__all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] __all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy']
ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy if os.name != 'nt':
BuildStrategy = core.ParallelExecutor.BuildStrategy ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy
BuildStrategy = core.ParallelExecutor.BuildStrategy
class ParallelExecutor(object):
"""
ParallelExecutor is designed for data parallelism, which focuses on distributing
the data across different nodes and every node operates on the data in parallel.
If you use ParallelExecutor to run the current program on GPU, the node means GPU
device, and ParallelExecutor will get the available GPU device automatically on
the current machine. If you use ParallelExecutor to run the current program on CPU,
the node means the CPU device, and you can specify the CPU device number by adding
'CPU_NUM' environment variable, for example 'CPU_NUM=4', if the environment variable
is not found, ParallelExecutor will call `multiprocessing.cpu_count` to get the number
of CPUs in the system.
Args:
use_cuda (bool): Whether to use CUDA or not.
loss_name (str): The loss name must set in training. Default None.
main_program (Program): The program that need to run, if not provided,
then default_main_program will be used. Default None.
share_vars_from(ParallelExecutor): If provide, it will share variables
from the specified ParallelExecutor. Default None.
exec_strategy(ExecutionStrategy): exec_strategy is used to control how to run
the program in ParallelExecutor, for example how many threads are used to
execute the program, how many iterations to clean up the temp variables
which is generated during execution. For more information, please refer
to fluid.ExecutionStrategy. Default None.
build_strategy(BuildStrategy): build_strategy is used to control how to
build the SSA Graph in ParallelExecutor by setting the property,
for example reduce_strategy, gradient_scale_strategy. For more information,
please refer to fluid.BuildStrategy. Default None.
num_trainers(int): If greater than 1, NCCL will be initialized with
multiple rank of nodes, each node should have same number of GPUs.
Distributed training will be enabled then. Default 1.
trainer_id(int): Must use together with num_trainers. trainer_id is the
"rank" of current node starts from 0. Default 0.
scope(Scope): scope to run with, default use fluid.global_scope().
Returns:
ParallelExecutor: The initialized ParallelExecutor object.
Raises:
TypeError: If share_vars_from is provided, but not ParallelExecutor object.
Examples:
.. code-block:: python
train_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
test_exe = fluid.ParallelExecutor(use_cuda=True,
main_program=test_program,
share_vars_from=train_exe)
train_loss, = train_exe.run([loss.name], feed=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
"""
def __init__(self,
use_cuda,
loss_name=None,
main_program=None,
share_vars_from=None,
exec_strategy=None,
build_strategy=None,
num_trainers=1,
trainer_id=0,
scope=None):
self._places = []
self._act_places = []
if use_cuda:
for i in six.moves.range(core.get_cuda_device_count()):
p = core.Place()
self._act_places.append(core.CUDAPlace(i))
p.set_place(self._act_places[-1])
self._places.append(p)
else:
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
for i in six.moves.range(cpu_num):
p = core.Place()
self._act_places.append(core.CPUPlace())
p.set_place(self._act_places[-1])
self._places.append(p)
assert self._places, "no place for execution"
if exec_strategy is None:
exec_strategy = ExecutionStrategy()
exec_strategy.use_cuda = use_cuda
if exec_strategy.num_threads == 0:
if use_cuda:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
exec_strategy.num_threads = len(self._places) * 4
else:
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exec_strategy.num_threads = cpu_num * 2
# Set 1 thread num under nccl2 distribute
# env to make sure all gpus run ops in same order.
if num_trainers > 1:
assert (use_cuda)
# FIXME(gongwb): avoid this set.
exec_strategy.num_threads = 1
if build_strategy is None:
build_strategy = BuildStrategy()
main = main_program
main = main if main else framework.default_main_program()
if scope == None:
scope = executor.global_scope()
if share_vars_from and not isinstance(share_vars_from,
ParallelExecutor):
raise TypeError("share_vars_from must be ParallelExecutor.")
local_scopes = share_vars_from.executor.local_scopes(
) if share_vars_from else []
self.persistable_vars = [
v.name for v in [
var for var in main.list_vars()
if var.persistable and var.type != core.VarDesc.VarType.RAW
]
]
self.executor = core.ParallelExecutor(
self._places,
set([
cpt.to_text(p.name)
for p in main.global_block().iter_parameters()
if not p.stop_gradient
]),
set(cpt.to_text(var) for var in self.persistable_vars), main.desc,
cpt.to_text(loss_name)
if loss_name else six.u(''), scope, local_scopes, exec_strategy,
build_strategy, num_trainers, trainer_id)
self.scope = scope
def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True):
"""
Run a parallel executor with fetch_list.
The feed parameter can be a dict or a list. If feed is a dict, the
feed data will be split into multiple devices. If feed is a list, we
assume the data has been splitted into multiple devices, the each
element in the list will be copied to each device directly.
For example, if the feed is a dict:
>>> exe = ParallelExecutor()
>>> # the image will be splitted into devices. If there is two devices
>>> # each device will process an image with shape (24, 1, 28, 28)
>>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))})
For example, if the feed is a list:
>>> exe = ParallelExecutor() class ParallelExecutor(object):
>>> # each device will process each element in the list. """
>>> # the 1st device will process an image with shape (48, 1, 28, 28) ParallelExecutor is designed for data parallelism, which focuses on distributing
>>> # the 2nd device will process an image with shape (32, 1, 28, 28) the data across different nodes and every node operates on the data in parallel.
>>> # If you use ParallelExecutor to run the current program on GPU, the node means GPU
>>> # you can use exe.device_count to get the device number. device, and ParallelExecutor will get the available GPU device automatically on
>>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))}, the current machine. If you use ParallelExecutor to run the current program on CPU,
>>> {"image": numpy.random.random(size=(32, 1, 28, 28))}, the node means the CPU device, and you can specify the CPU device number by adding
>>> ]) 'CPU_NUM' environment variable, for example 'CPU_NUM=4', if the environment variable
is not found, ParallelExecutor will call `multiprocessing.cpu_count` to get the number
of CPUs in the system.
Args: Args:
fetch_list(list): The fetched variable names use_cuda (bool): Whether to use CUDA or not.
feed(list|dict|None): The feed variables. If the feed is a dict, loss_name (str): The loss name must set in training. Default None.
tensors in that dict will be splitted into each devices. If main_program (Program): The program that need to run, if not provided,
the feed is a list, each element of the list will be copied then default_main_program will be used. Default None.
to each device. Default None. share_vars_from(ParallelExecutor): If provide, it will share variables
feed_dict: Alias for feed parameter, for backward compatibility. from the specified ParallelExecutor. Default None.
This parameter has been deprecated. Default None. exec_strategy(ExecutionStrategy): exec_strategy is used to control how to run
return_numpy(bool): Whether converts the fetched tensor to numpy. the program in ParallelExecutor, for example how many threads are used to
Default: True. execute the program, how many iterations to clean up the temp variables
which is generated during execution. For more information, please refer
to fluid.ExecutionStrategy. Default None.
build_strategy(BuildStrategy): build_strategy is used to control how to
build the SSA Graph in ParallelExecutor by setting the property,
for example reduce_strategy, gradient_scale_strategy. For more information,
please refer to fluid.BuildStrategy. Default None.
num_trainers(int): If greater than 1, NCCL will be initialized with
multiple rank of nodes, each node should have same number of GPUs.
Distributed training will be enabled then. Default 1.
trainer_id(int): Must use together with num_trainers. trainer_id is the
"rank" of current node starts from 0. Default 0.
scope(Scope): scope to run with, default use fluid.global_scope().
Returns: Returns:
List: The fetched result list. ParallelExecutor: The initialized ParallelExecutor object.
Raises: Raises:
ValueError: If the feed is a list, but its length is not equal the TypeError: If share_vars_from is provided, but not ParallelExecutor object.
length of active places, or its element's is not dict.
NOTES:
1. If the feed's type is dict, the number of data that feeds to
ParallelExecutor must be bigger than active places. Otherwise,
it will throw exception from C++ side. Special attention should be
paid to check whether the last batch of the dataset is bigger
than active places.
2. If active places are more than one, the fetch results for each
variable is a list, and each element of this list is the variable of
respective active place.
Examples: Examples:
.. code-block:: python .. code-block:: python
pe = fluid.ParallelExecutor(use_cuda=use_cuda, train_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
loss_name=avg_cost.name, test_exe = fluid.ParallelExecutor(use_cuda=True,
main_program=fluid.default_main_program()) main_program=test_program,
loss = pe.run(feed=feeder.feed(cur_batch), share_vars_from=train_exe)
fetch_list=[avg_cost.name]))
train_loss, = train_exe.run([loss.name], feed=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
""" """
if feed is None and feed_dict is not None:
feed = feed_dict def __init__(self,
print( use_cuda,
"`feed_dict` is deprecated. Please use `feed=`", loss_name=None,
file=sys.stderr) main_program=None,
share_vars_from=None,
if isinstance(feed, dict): exec_strategy=None,
feed_tensor_dict = dict() build_strategy=None,
for feed_name in feed: num_trainers=1,
feed_tensor = feed[feed_name] trainer_id=0,
if not isinstance(feed_tensor, core.LoDTensor): scope=None):
feed_tensor = core.LoDTensor() self._places = []
# always set to CPU place, since the tensor need to be splitted self._act_places = []
# it is fast in CPU if use_cuda:
feed_tensor.set(feed[feed_name], core.CPUPlace()) for i in six.moves.range(core.get_cuda_device_count()):
feed_tensor_dict[feed_name] = feed_tensor p = core.Place()
self._act_places.append(core.CUDAPlace(i))
self.executor.feed_and_split_tensor_into_local_scopes( p.set_place(self._act_places[-1])
feed_tensor_dict) self._places.append(p)
elif isinstance(feed, list) or isinstance(feed, tuple): else:
if len(feed) != len(self._act_places): cpu_num = int(
raise ValueError( os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
"Feed a list of tensor, the list should be the same size as places" for i in six.moves.range(cpu_num):
) p = core.Place()
self._act_places.append(core.CPUPlace())
res = list() p.set_place(self._act_places[-1])
self._places.append(p)
for i, each in enumerate(feed): assert self._places, "no place for execution"
if not isinstance(each, dict):
raise TypeError( if exec_strategy is None:
"Each element of feed list should be a dict") exec_strategy = ExecutionStrategy()
res_dict = dict() exec_strategy.use_cuda = use_cuda
for feed_name in each:
tensor = each[feed_name] if exec_strategy.num_threads == 0:
if not isinstance(tensor, core.LoDTensor): if use_cuda:
tmp = core.LoDTensor() # Experiments on se-resnext shows that too many threads hurt
tmp.set(tensor, self._act_places[i]) # performance. Worth tunning for other models in the future.
tensor = tmp exec_strategy.num_threads = len(self._places) * 4
res_dict[feed_name] = tensor else:
res.append(res_dict) cpu_num = int(
self.executor.feed_tensors_into_local_scopes(res) os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exec_strategy.num_threads = cpu_num * 2
fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name) # Set 1 thread num under nccl2 distribute
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() # env to make sure all gpus run ops in same order.
if num_trainers > 1:
if return_numpy: assert (use_cuda)
return executor.as_numpy(arr) # FIXME(gongwb): avoid this set.
exec_strategy.num_threads = 1
return [arr[i] for i in range(len(arr))]
if build_strategy is None:
@property build_strategy = BuildStrategy()
def device_count(self):
return len(self._act_places) main = main_program
main = main if main else framework.default_main_program()
if scope == None:
scope = executor.global_scope()
if share_vars_from and not isinstance(share_vars_from,
ParallelExecutor):
raise TypeError("share_vars_from must be ParallelExecutor.")
local_scopes = share_vars_from.executor.local_scopes(
) if share_vars_from else []
self.persistable_vars = [
v.name for v in [
var for var in main.list_vars()
if var.persistable and var.type != core.VarDesc.VarType.RAW
]
]
self.executor = core.ParallelExecutor(
self._places,
set([
cpt.to_text(p.name)
for p in main.global_block().iter_parameters()
if not p.stop_gradient
]),
set(cpt.to_text(var) for var in self.persistable_vars), main.desc,
cpt.to_text(loss_name)
if loss_name else six.u(''), scope, local_scopes, exec_strategy,
build_strategy, num_trainers, trainer_id)
self.scope = scope
def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True):
"""
Run a parallel executor with fetch_list.
The feed parameter can be a dict or a list. If feed is a dict, the
feed data will be split into multiple devices. If feed is a list, we
assume the data has been splitted into multiple devices, the each
element in the list will be copied to each device directly.
For example, if the feed is a dict:
>>> exe = ParallelExecutor()
>>> # the image will be splitted into devices. If there is two devices
>>> # each device will process an image with shape (24, 1, 28, 28)
>>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))})
For example, if the feed is a list:
>>> exe = ParallelExecutor()
>>> # each device will process each element in the list.
>>> # the 1st device will process an image with shape (48, 1, 28, 28)
>>> # the 2nd device will process an image with shape (32, 1, 28, 28)
>>> #
>>> # you can use exe.device_count to get the device number.
>>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))},
>>> {"image": numpy.random.random(size=(32, 1, 28, 28))},
>>> ])
Args:
fetch_list(list): The fetched variable names
feed(list|dict|None): The feed variables. If the feed is a dict,
tensors in that dict will be splitted into each devices. If
the feed is a list, each element of the list will be copied
to each device. Default None.
feed_dict: Alias for feed parameter, for backward compatibility.
This parameter has been deprecated. Default None.
return_numpy(bool): Whether converts the fetched tensor to numpy.
Default: True.
Returns:
List: The fetched result list.
Raises:
ValueError: If the feed is a list, but its length is not equal the
length of active places, or its element's is not dict.
NOTES:
1. If the feed's type is dict, the number of data that feeds to
ParallelExecutor must be bigger than active places. Otherwise,
it will throw exception from C++ side. Special attention should be
paid to check whether the last batch of the dataset is bigger
than active places.
2. If active places are more than one, the fetch results for each
variable is a list, and each element of this list is the variable of
respective active place.
Examples:
.. code-block:: python
pe = fluid.ParallelExecutor(use_cuda=use_cuda,
loss_name=avg_cost.name,
main_program=fluid.default_main_program())
loss = pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name]))
"""
if feed is None and feed_dict is not None:
feed = feed_dict
print(
"`feed_dict` is deprecated. Please use `feed=`",
file=sys.stderr)
if isinstance(feed, dict):
feed_tensor_dict = dict()
for feed_name in feed:
feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
# always set to CPU place, since the tensor need to be splitted
# it is fast in CPU
feed_tensor.set(feed[feed_name], core.CPUPlace())
feed_tensor_dict[feed_name] = feed_tensor
self.executor.feed_and_split_tensor_into_local_scopes(
feed_tensor_dict)
elif isinstance(feed, list) or isinstance(feed, tuple):
if len(feed) != len(self._act_places):
raise ValueError(
"Feed a list of tensor, the list should be the same size as places"
)
res = list()
for i, each in enumerate(feed):
if not isinstance(each, dict):
raise TypeError(
"Each element of feed list should be a dict")
res_dict = dict()
for feed_name in each:
tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(tensor, self._act_places[i])
tensor = tmp
res_dict[feed_name] = tensor
res.append(res_dict)
self.executor.feed_tensors_into_local_scopes(res)
fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
if return_numpy:
return executor.as_numpy(arr)
return [arr[i] for i in range(len(arr))]
@property
def device_count(self):
return len(self._act_places)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册