提交 703b26e6 编写于 作者: P peizhilin

add profiler, parallel_executor back

上级 935387f3
...@@ -31,9 +31,7 @@ function(windows_symbolic TARGET) ...@@ -31,9 +31,7 @@ function(windows_symbolic TARGET)
endfunction() endfunction()
add_subdirectory(ir) add_subdirectory(ir)
if (NOT WIN32)
add_subdirectory(details) add_subdirectory(details)
endif (NOT WIN32)
# ddim lib # ddim lib
proto_library(framework_proto SRCS framework.proto) proto_library(framework_proto SRCS framework.proto)
...@@ -118,13 +116,8 @@ cc_test(op_proto_maker_test SRCS op_proto_maker_test.cc DEPS op_proto_maker) ...@@ -118,13 +116,8 @@ cc_test(op_proto_maker_test SRCS op_proto_maker_test.cc DEPS op_proto_maker)
cc_library(op_info SRCS op_info.cc DEPS attribute framework_proto) cc_library(op_info SRCS op_info.cc DEPS attribute framework_proto)
cc_library(shape_inference SRCS shape_inference.cc DEPS ddim attribute device_context) cc_library(shape_inference SRCS shape_inference.cc DEPS ddim attribute device_context)
if (NOT WIN32)
cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog
shape_inference data_transform lod_tensor profiler) shape_inference data_transform lod_tensor profiler)
else()
cc_library(operator SRCS operator.cc DEPS op_info device_context tensor scope glog
shape_inference data_transform lod_tensor)
endif(NOT WIN32)
cc_test(operator_test SRCS operator_test.cc DEPS operator op_registry device_context) cc_test(operator_test SRCS operator_test.cc DEPS operator op_registry device_context)
...@@ -179,12 +172,10 @@ else() ...@@ -179,12 +172,10 @@ else()
cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op) cc_test(test_naive_executor SRCS naive_executor_test.cc DEPS naive_executor elementwise_add_op)
endif() endif()
if (NOT WIN32)
cc_library(parallel_executor SRCS parallel_executor.cc DEPS cc_library(parallel_executor SRCS parallel_executor.cc DEPS
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor threaded_ssa_graph_executor scope_buffered_ssa_graph_executor
graph build_strategy graph build_strategy
fast_threaded_ssa_graph_executor) fast_threaded_ssa_graph_executor)
endif() # NOT WIN32
cc_library(prune SRCS prune.cc DEPS framework_proto) cc_library(prune SRCS prune.cc DEPS framework_proto)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
......
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
// limitations under the License. // limitations under the License.
#pragma once #pragma once
#include <ThreadPool.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/details/exception_holder.h" #include "paddle/fluid/framework/details/exception_holder.h"
#include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/execution_strategy.h"
......
...@@ -17,7 +17,8 @@ ...@@ -17,7 +17,8 @@
#ifdef _WIN32 #ifdef _WIN32
#define posix_memalign_free _aligned_free #define posix_memalign_free _aligned_free
#define posix_memalign(p, a, s) (((*(p)) = _aligned_malloc((s), (a))), *(p) ? 0 : errno) #define posix_memalign(p, a, s) \
(((*(p)) = _aligned_malloc((s), (a))), *(p) ? 0 : errno)
#endif #endif
namespace paddle { namespace paddle {
......
if (NOT WIN32)
proto_library(profiler_proto SRCS profiler.proto DEPS framework_proto) proto_library(profiler_proto SRCS profiler.proto DEPS framework_proto)
py_proto_compile(profiler_py_proto SRCS profiler.proto) py_proto_compile(profiler_py_proto SRCS profiler.proto)
...@@ -6,11 +5,19 @@ add_custom_target(profiler_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch _ ...@@ -6,11 +5,19 @@ add_custom_target(profiler_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch _
add_dependencies(profiler_py_proto profiler_py_proto_init) add_dependencies(profiler_py_proto profiler_py_proto_init)
if (NOT WIN32)
add_custom_command(TARGET profiler_py_proto POST_BUILD add_custom_command(TARGET profiler_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/profiler COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/profiler
COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/profiler COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/profiler
COMMENT "Copy generated python proto into directory paddle/fluid/proto/profiler." COMMENT "Copy generated python proto into directory paddle/fluid/proto/profiler."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
else(NOT WIN32)
string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/profiler/")
add_custom_command(TARGET profiler_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/profiler
COMMAND copy /Y *.py ${proto_dstpath}
COMMENT "Copy generated python proto into directory paddle/fluid/proto/profiler."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif(NOT WIN32) endif(NOT WIN32)
if(WITH_GPU) if(WITH_GPU)
...@@ -60,12 +67,9 @@ cc_test(init_test SRCS init_test.cc DEPS device_context) ...@@ -60,12 +67,9 @@ cc_test(init_test SRCS init_test.cc DEPS device_context)
nv_test(cudnn_helper_test SRCS cudnn_helper_test.cc DEPS dynload_cuda) nv_test(cudnn_helper_test SRCS cudnn_helper_test.cc DEPS dynload_cuda)
nv_test(transform_test SRCS transform_test.cu DEPS memory place device_context) nv_test(transform_test SRCS transform_test.cu DEPS memory place device_context)
if (NOT WIN32)
cc_library(device_tracer SRCS device_tracer.cc DEPS boost profiler_proto framework_proto ${GPU_CTX_DEPS}) cc_library(device_tracer SRCS device_tracer.cc DEPS boost profiler_proto framework_proto ${GPU_CTX_DEPS})
cc_library(profiler SRCS profiler.cc DEPS device_context device_tracer) cc_library(profiler SRCS profiler.cc DEPS device_context device_tracer)
cc_test(profiler_test SRCS profiler_test.cc DEPS profiler) cc_test(profiler_test SRCS profiler_test.cc DEPS profiler)
endif(NOT WIN32)
nv_test(float16_gpu_test SRCS float16_test.cu DEPS lod_tensor) nv_test(float16_gpu_test SRCS float16_test.cu DEPS lod_tensor)
cc_test(float16_test SRCS float16_test.cc DEPS lod_tensor) cc_test(float16_test SRCS float16_test.cc DEPS lod_tensor)
......
...@@ -13,17 +13,11 @@ See the License for the specific language governing permissions and ...@@ -13,17 +13,11 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#pragma once #pragma once
#if !defined(_WIN32)
#include <sys/time.h>
#else
#include <windows.h>
#endif // !_WIN32
#include <time.h>
#include <chrono> // NOLINT #include <chrono> // NOLINT
#include <string> #include <string>
#include "paddle/fluid/platform/dynload/cupti.h" #include "paddle/fluid/platform/dynload/cupti.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/platform/profiler.pb.h" #include "paddle/fluid/platform/profiler.pb.h"
namespace paddle { namespace paddle {
...@@ -32,15 +26,11 @@ namespace platform { ...@@ -32,15 +26,11 @@ namespace platform {
/////////////////////// ///////////////////////
// WARN: Under Development. Don't depend on it yet. // WARN: Under Development. Don't depend on it yet.
////////////////////// //////////////////////
#if !defined(_WIN32)
inline uint64_t PosixInNsec() { inline uint64_t PosixInNsec() {
struct timeval tv; struct timeval tv;
gettimeofday(&tv, nullptr); gettimeofday(&tv, nullptr);
return 1000 * (static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec); return 1000 * (static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec);
} }
#else
inline uint64_t PosixInNsec() { return static_cast<uint64_t>(0); }
#endif // !_WIN32
// DeviceTracer performs the following tasks: // DeviceTracer performs the following tasks:
// 1. Register cuda callbacks for various events: kernel, memcpy, etc. // 1. Register cuda callbacks for various events: kernel, memcpy, etc.
......
...@@ -134,7 +134,7 @@ struct EOFException : public std::exception { ...@@ -134,7 +134,7 @@ struct EOFException : public std::exception {
#define LIKELY(condition) __builtin_expect(static_cast<bool>(condition), 1) #define LIKELY(condition) __builtin_expect(static_cast<bool>(condition), 1)
#else #else
// there is no equivalent intrinsics in msvc. // there is no equivalent intrinsics in msvc.
#define LIKELY(condition) !(condition) #define LIKELY(condition) (condition)
#endif #endif
template <typename... Args> template <typename... Args>
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <cstdio> #include <cstdio>
#include <stdexcept> #include <stdexcept>
#include <time.h>
#include <memory> #include <memory>
#include <string> #include <string>
...@@ -27,6 +28,7 @@ ...@@ -27,6 +28,7 @@
#include <dlfcn.h> // dladdr #include <dlfcn.h> // dladdr
#include <execinfo.h> // backtrace #include <execinfo.h> // backtrace
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/time.h>
#include <algorithm> // std::accumulate #include <algorithm> // std::accumulate
#else #else
#include <io.h> // _popen, _pclose #include <io.h> // _popen, _pclose
...@@ -57,6 +59,25 @@ static void *dlopen(const char *filename, int flag) { ...@@ -57,6 +59,25 @@ static void *dlopen(const char *filename, int flag) {
return reinterpret_cast<void *>(hModule); return reinterpret_cast<void *>(hModule);
} }
static int gettimeofday(struct timeval *tp, void *tzp) {
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm.tm_isdst = -1;
clock = mktime(&tm);
tp->tv_sec = clock;
tp->tv_usec = wtm.wMilliseconds * 1000;
return (0);
}
#endif // !_WIN32 #endif // !_WIN32
static void ExecShellCommand(const std::string &cmd, std::string *message) { static void ExecShellCommand(const std::string &cmd, std::string *message) {
......
...@@ -13,8 +13,8 @@ See the License for the specific language governing permissions and ...@@ -13,8 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/port.h"
#include <sys/time.h>
#include <algorithm> #include <algorithm>
#include <iomanip> #include <iomanip>
#include <limits> #include <limits>
...@@ -438,10 +438,10 @@ void ParseEvents(const std::vector<std::vector<Event>>& events, ...@@ -438,10 +438,10 @@ void ParseEvents(const std::vector<std::vector<Event>>& events,
event_items[index].total_time += event_time; event_items[index].total_time += event_time;
// min time // min time
event_items[index].min_time = event_items[index].min_time =
std::min(event_time, event_items[index].min_time); (std::min)(event_time, event_items[index].min_time);
// max time // max time
event_items[index].max_time = event_items[index].max_time =
std::max(event_time, event_items[index].max_time); (std::max)(event_time, event_items[index].max_time);
} }
// remove the push marker from the list // remove the push marker from the list
......
...@@ -69,7 +69,6 @@ void PushEvent(const std::string& name, const DeviceContext* dev_ctx); ...@@ -69,7 +69,6 @@ void PushEvent(const std::string& name, const DeviceContext* dev_ctx);
void PopEvent(const std::string& name, const DeviceContext* dev_ctx); void PopEvent(const std::string& name, const DeviceContext* dev_ctx);
#if !defined(_WIN32)
struct RecordEvent { struct RecordEvent {
// dev_ctx can be set to nullptr if device is cpu. // dev_ctx can be set to nullptr if device is cpu.
RecordEvent(const std::string& name, const DeviceContext* dev_ctx); RecordEvent(const std::string& name, const DeviceContext* dev_ctx);
...@@ -106,15 +105,6 @@ struct RecordBlock { ...@@ -106,15 +105,6 @@ struct RecordBlock {
std::string name_; std::string name_;
uint64_t start_ns_; uint64_t start_ns_;
}; };
#else
// windows do not support profiler temporarily.
struct RecordEvent {
RecordEvent(const std::string& name, const DeviceContext* dev_ctx) {}
};
struct RecordBlock {
explicit RecordBlock(int block_id) {}
};
#endif
// Return the event list of all threads. Assumed the returned value calls // Return the event list of all threads. Assumed the returned value calls
// event_lists, event_lists[i][j] represents the j-th Event of i-th thread. // event_lists, event_lists[i][j] represents the j-th Event of i-th thread.
......
...@@ -45,16 +45,15 @@ class StreamCallbackManager { ...@@ -45,16 +45,15 @@ class StreamCallbackManager {
inline void AddCallback(Callback &&callback) const { inline void AddCallback(Callback &&callback) const {
auto *stream_callback_context = auto *stream_callback_context =
new StreamCallbackContext(this, std::forward<Callback>(callback)); new StreamCallbackContext(this, std::forward<Callback>(callback));
PADDLE_ENFORCE(
#if CUDA_VERSION >= 10000 #if CUDA_VERSION >= 10000
cudaLaunchHostFunc(stream_, StreamCallbackManager::StreamCallbackFunc, PADDLE_ENFORCE(cudaLaunchHostFunc(stream_,
stream_callback_context) StreamCallbackManager::StreamCallbackFunc,
stream_callback_context)); // NOLINT
#else #else
cudaStreamAddCallback(stream_, PADDLE_ENFORCE(cudaStreamAddCallback(
StreamCallbackManager::StreamCallbackFunc, stream_, StreamCallbackManager::StreamCallbackFunc,
stream_callback_context, 0) stream_callback_context, 0)); // NOLINT
#endif #endif
); // NOLINT
} }
void Wait() const { thread_pool_.reset(new ThreadPool(1)); } void Wait() const { thread_pool_.reset(new ThreadPool(1)); }
......
set(PYBIND_DEPS pybind python proto_desc memory executor prune feed_fetch_method pass_builder) set(PYBIND_DEPS pybind python proto_desc memory executor prune feed_fetch_method pass_builder parallel_executor profiler)
set(PYBIND_SRCS pybind.cc exception.cc protobuf.cc const_value.cc recordio.cc) set(PYBIND_SRCS pybind.cc exception.cc protobuf.cc const_value.cc recordio.cc)
if(NOT WIN32)
list(APPEND PYBIND_DEPS parallel_executor profiler)
endif(NOT WIN32)
if(WITH_PYTHON) if(WITH_PYTHON)
if(WITH_AMD_GPU) if(WITH_AMD_GPU)
hip_library(paddle_pybind SHARED hip_library(paddle_pybind SHARED
......
...@@ -36,9 +36,7 @@ limitations under the License. */ ...@@ -36,9 +36,7 @@ limitations under the License. */
#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/lod_tensor_array.h" #include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#ifndef _WIN32
#include "paddle/fluid/framework/parallel_executor.h" #include "paddle/fluid/framework/parallel_executor.h"
#endif
#include "paddle/fluid/framework/prune.h" #include "paddle/fluid/framework/prune.h"
#include "paddle/fluid/framework/reader.h" #include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/selected_rows.h"
...@@ -637,7 +635,6 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -637,7 +635,6 @@ All parameter, weight, gradient are variables in Paddle.
#endif #endif
#endif #endif
#ifndef _WIN32
py::enum_<platform::ProfilerState>(m, "ProfilerState", py::arithmetic()) py::enum_<platform::ProfilerState>(m, "ProfilerState", py::arithmetic())
.value("kDisabled", platform::ProfilerState::kDisabled) .value("kDisabled", platform::ProfilerState::kDisabled)
.value("kCPU", platform::ProfilerState::kCPU) .value("kCPU", platform::ProfilerState::kCPU)
...@@ -658,7 +655,6 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -658,7 +655,6 @@ All parameter, weight, gradient are variables in Paddle.
m.def("disable_profiler", platform::DisableProfiler); m.def("disable_profiler", platform::DisableProfiler);
m.def("is_profiler_enabled", platform::IsProfileEnabled); m.def("is_profiler_enabled", platform::IsProfileEnabled);
m.def("reset_profiler", platform::ResetProfiler); m.def("reset_profiler", platform::ResetProfiler);
#endif
py::class_<ir::Pass, std::shared_ptr<ir::Pass>> pass(m, "Pass"); py::class_<ir::Pass, std::shared_ptr<ir::Pass>> pass(m, "Pass");
pass.def(py::init()) pass.def(py::init())
...@@ -687,7 +683,6 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -687,7 +683,6 @@ All parameter, weight, gradient are variables in Paddle.
.def("remove_pass", .def("remove_pass",
[](ir::PassBuilder &self, size_t idx) { self.RemovePass(idx); }); [](ir::PassBuilder &self, size_t idx) { self.RemovePass(idx); });
#ifndef _WIN32
// -- python binds for parallel executor. // -- python binds for parallel executor.
py::class_<ParallelExecutor> pe(m, "ParallelExecutor"); py::class_<ParallelExecutor> pe(m, "ParallelExecutor");
py::class_<ExecutionStrategy> exec_strategy(pe, "ExecutionStrategy", R"DOC( py::class_<ExecutionStrategy> exec_strategy(pe, "ExecutionStrategy", R"DOC(
...@@ -913,7 +908,6 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -913,7 +908,6 @@ All parameter, weight, gradient are variables in Paddle.
pybind11::gil_scoped_release release; pybind11::gil_scoped_release release;
self.Run(fetch_tensors, fetched_var_name); self.Run(fetch_tensors, fetched_var_name);
}); });
#endif
BindRecordIOWriter(&m); BindRecordIOWriter(&m);
return m.ptr(); return m.ptr();
......
...@@ -47,8 +47,7 @@ from . import profiler ...@@ -47,8 +47,7 @@ from . import profiler
from . import unique_name from . import unique_name
from . import recordio_writer from . import recordio_writer
from . import parallel_executor from . import parallel_executor
if os.name != 'nt': from .parallel_executor import *
from .parallel_executor import *
from paddle.fluid.layers.math_op_patch import monkey_patch_variable from paddle.fluid.layers.math_op_patch import monkey_patch_variable
Tensor = LoDTensor Tensor = LoDTensor
......
...@@ -25,264 +25,263 @@ import os ...@@ -25,264 +25,263 @@ import os
__all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] __all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy']
if os.name != 'nt': ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy
ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy BuildStrategy = core.ParallelExecutor.BuildStrategy
BuildStrategy = core.ParallelExecutor.BuildStrategy
class ParallelExecutor(object): class ParallelExecutor(object):
"""
ParallelExecutor is designed for data parallelism, which focuses on distributing
the data across different nodes and every node operates on the data in parallel.
If you use ParallelExecutor to run the current program on GPU, the node means GPU
device, and ParallelExecutor will get the available GPU device automatically on
the current machine. If you use ParallelExecutor to run the current program on CPU,
the node means the CPU device, and you can specify the CPU device number by adding
'CPU_NUM' environment variable, for example 'CPU_NUM=4', if the environment variable
is not found, ParallelExecutor will call `multiprocessing.cpu_count` to get the number
of CPUs in the system.
Args:
use_cuda (bool): Whether to use CUDA or not.
loss_name (str): The loss name must set in training. Default None.
main_program (Program): The program that need to run, if not provided,
then default_main_program will be used. Default None.
share_vars_from(ParallelExecutor): If provide, it will share variables
from the specified ParallelExecutor. Default None.
exec_strategy(ExecutionStrategy): exec_strategy is used to control how to run
the program in ParallelExecutor, for example how many threads are used to
execute the program, how many iterations to clean up the temp variables
which is generated during execution. For more information, please refer
to fluid.ExecutionStrategy. Default None.
build_strategy(BuildStrategy): build_strategy is used to control how to
build the SSA Graph in ParallelExecutor by setting the property,
for example reduce_strategy, gradient_scale_strategy. For more information,
please refer to fluid.BuildStrategy. Default None.
num_trainers(int): If greater than 1, NCCL will be initialized with
multiple rank of nodes, each node should have same number of GPUs.
Distributed training will be enabled then. Default 1.
trainer_id(int): Must use together with num_trainers. trainer_id is the
"rank" of current node starts from 0. Default 0.
scope(Scope): scope to run with, default use fluid.global_scope().
Returns:
ParallelExecutor: The initialized ParallelExecutor object.
Raises:
TypeError: If share_vars_from is provided, but not ParallelExecutor object.
Examples:
.. code-block:: python
train_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
test_exe = fluid.ParallelExecutor(use_cuda=True,
main_program=test_program,
share_vars_from=train_exe)
train_loss, = train_exe.run([loss.name], feed=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
"""
def __init__(self,
use_cuda,
loss_name=None,
main_program=None,
share_vars_from=None,
exec_strategy=None,
build_strategy=None,
num_trainers=1,
trainer_id=0,
scope=None):
self._places = []
self._act_places = []
if use_cuda:
for i in six.moves.range(core.get_cuda_device_count()):
p = core.Place()
self._act_places.append(core.CUDAPlace(i))
p.set_place(self._act_places[-1])
self._places.append(p)
else:
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
for i in six.moves.range(cpu_num):
p = core.Place()
self._act_places.append(core.CPUPlace())
p.set_place(self._act_places[-1])
self._places.append(p)
assert self._places, "no place for execution"
if exec_strategy is None:
exec_strategy = ExecutionStrategy()
exec_strategy.use_cuda = use_cuda
if exec_strategy.num_threads == 0:
if use_cuda:
# Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future.
exec_strategy.num_threads = len(self._places) * 4
else:
cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exec_strategy.num_threads = cpu_num * 2
# Set 1 thread num under nccl2 distribute
# env to make sure all gpus run ops in same order.
if num_trainers > 1:
assert (use_cuda)
# FIXME(gongwb): avoid this set.
exec_strategy.num_threads = 1
if build_strategy is None:
build_strategy = BuildStrategy()
main = main_program
main = main if main else framework.default_main_program()
if scope == None:
scope = executor.global_scope()
if share_vars_from and not isinstance(share_vars_from,
ParallelExecutor):
raise TypeError("share_vars_from must be ParallelExecutor.")
local_scopes = share_vars_from.executor.local_scopes(
) if share_vars_from else []
self.persistable_vars = [
v.name for v in [
var for var in main.list_vars()
if var.persistable and var.type != core.VarDesc.VarType.RAW
]
]
self.executor = core.ParallelExecutor(
self._places,
set([
cpt.to_text(p.name)
for p in main.global_block().iter_parameters()
if not p.stop_gradient
]),
set(cpt.to_text(var) for var in self.persistable_vars), main.desc,
cpt.to_text(loss_name)
if loss_name else six.u(''), scope, local_scopes, exec_strategy,
build_strategy, num_trainers, trainer_id)
self.scope = scope
def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True):
""" """
ParallelExecutor is designed for data parallelism, which focuses on distributing Run a parallel executor with fetch_list.
the data across different nodes and every node operates on the data in parallel.
If you use ParallelExecutor to run the current program on GPU, the node means GPU The feed parameter can be a dict or a list. If feed is a dict, the
device, and ParallelExecutor will get the available GPU device automatically on feed data will be split into multiple devices. If feed is a list, we
the current machine. If you use ParallelExecutor to run the current program on CPU, assume the data has been splitted into multiple devices, the each
the node means the CPU device, and you can specify the CPU device number by adding element in the list will be copied to each device directly.
'CPU_NUM' environment variable, for example 'CPU_NUM=4', if the environment variable
is not found, ParallelExecutor will call `multiprocessing.cpu_count` to get the number For example, if the feed is a dict:
of CPUs in the system.
>>> exe = ParallelExecutor()
>>> # the image will be splitted into devices. If there is two devices
>>> # each device will process an image with shape (24, 1, 28, 28)
>>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))})
For example, if the feed is a list:
>>> exe = ParallelExecutor()
>>> # each device will process each element in the list.
>>> # the 1st device will process an image with shape (48, 1, 28, 28)
>>> # the 2nd device will process an image with shape (32, 1, 28, 28)
>>> #
>>> # you can use exe.device_count to get the device number.
>>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))},
>>> {"image": numpy.random.random(size=(32, 1, 28, 28))},
>>> ])
Args: Args:
use_cuda (bool): Whether to use CUDA or not. fetch_list(list): The fetched variable names
loss_name (str): The loss name must set in training. Default None. feed(list|dict|None): The feed variables. If the feed is a dict,
main_program (Program): The program that need to run, if not provided, tensors in that dict will be splitted into each devices. If
then default_main_program will be used. Default None. the feed is a list, each element of the list will be copied
share_vars_from(ParallelExecutor): If provide, it will share variables to each device. Default None.
from the specified ParallelExecutor. Default None. feed_dict: Alias for feed parameter, for backward compatibility.
exec_strategy(ExecutionStrategy): exec_strategy is used to control how to run This parameter has been deprecated. Default None.
the program in ParallelExecutor, for example how many threads are used to return_numpy(bool): Whether converts the fetched tensor to numpy.
execute the program, how many iterations to clean up the temp variables Default: True.
which is generated during execution. For more information, please refer
to fluid.ExecutionStrategy. Default None.
build_strategy(BuildStrategy): build_strategy is used to control how to
build the SSA Graph in ParallelExecutor by setting the property,
for example reduce_strategy, gradient_scale_strategy. For more information,
please refer to fluid.BuildStrategy. Default None.
num_trainers(int): If greater than 1, NCCL will be initialized with
multiple rank of nodes, each node should have same number of GPUs.
Distributed training will be enabled then. Default 1.
trainer_id(int): Must use together with num_trainers. trainer_id is the
"rank" of current node starts from 0. Default 0.
scope(Scope): scope to run with, default use fluid.global_scope().
Returns: Returns:
ParallelExecutor: The initialized ParallelExecutor object. List: The fetched result list.
Raises: Raises:
TypeError: If share_vars_from is provided, but not ParallelExecutor object. ValueError: If the feed is a list, but its length is not equal the
length of active places, or its element's is not dict.
NOTES:
1. If the feed's type is dict, the number of data that feeds to
ParallelExecutor must be bigger than active places. Otherwise,
it will throw exception from C++ side. Special attention should be
paid to check whether the last batch of the dataset is bigger
than active places.
2. If active places are more than one, the fetch results for each
variable is a list, and each element of this list is the variable of
respective active place.
Examples: Examples:
.. code-block:: python .. code-block:: python
train_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) pe = fluid.ParallelExecutor(use_cuda=use_cuda,
test_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=avg_cost.name,
main_program=test_program, main_program=fluid.default_main_program())
share_vars_from=train_exe) loss = pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name]))
train_loss, = train_exe.run([loss.name], feed=feed_dict)
test_loss, = test_exe.run([loss.name], feed=feed_dict)
""" """
if feed is None and feed_dict is not None:
def __init__(self, feed = feed_dict
use_cuda, print(
loss_name=None, "`feed_dict` is deprecated. Please use `feed=`",
main_program=None, file=sys.stderr)
share_vars_from=None,
exec_strategy=None, if isinstance(feed, dict):
build_strategy=None, feed_tensor_dict = dict()
num_trainers=1, for feed_name in feed:
trainer_id=0, feed_tensor = feed[feed_name]
scope=None): if not isinstance(feed_tensor, core.LoDTensor):
self._places = [] feed_tensor = core.LoDTensor()
self._act_places = [] # always set to CPU place, since the tensor need to be splitted
if use_cuda: # it is fast in CPU
for i in six.moves.range(core.get_cuda_device_count()): feed_tensor.set(feed[feed_name], core.CPUPlace())
p = core.Place() feed_tensor_dict[feed_name] = feed_tensor
self._act_places.append(core.CUDAPlace(i))
p.set_place(self._act_places[-1]) self.executor.feed_and_split_tensor_into_local_scopes(
self._places.append(p) feed_tensor_dict)
else: elif isinstance(feed, list) or isinstance(feed, tuple):
cpu_num = int( if len(feed) != len(self._act_places):
os.environ.get('CPU_NUM', multiprocessing.cpu_count())) raise ValueError(
for i in six.moves.range(cpu_num): "Feed a list of tensor, the list should be the same size as places"
p = core.Place() )
self._act_places.append(core.CPUPlace())
p.set_place(self._act_places[-1]) res = list()
self._places.append(p)
assert self._places, "no place for execution" for i, each in enumerate(feed):
if not isinstance(each, dict):
if exec_strategy is None: raise TypeError(
exec_strategy = ExecutionStrategy() "Each element of feed list should be a dict")
exec_strategy.use_cuda = use_cuda res_dict = dict()
for feed_name in each:
if exec_strategy.num_threads == 0: tensor = each[feed_name]
if use_cuda: if not isinstance(tensor, core.LoDTensor):
# Experiments on se-resnext shows that too many threads hurt tmp = core.LoDTensor()
# performance. Worth tunning for other models in the future. tmp.set(tensor, self._act_places[i])
exec_strategy.num_threads = len(self._places) * 4 tensor = tmp
else: res_dict[feed_name] = tensor
cpu_num = int( res.append(res_dict)
os.environ.get('CPU_NUM', multiprocessing.cpu_count())) self.executor.feed_tensors_into_local_scopes(res)
exec_strategy.num_threads = cpu_num * 2
fetch_var_name = '@FETCHED_VAR_NAME@'
# Set 1 thread num under nccl2 distribute self.executor.run(fetch_list, fetch_var_name)
# env to make sure all gpus run ops in same order. arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
if num_trainers > 1:
assert (use_cuda) if return_numpy:
# FIXME(gongwb): avoid this set. return executor.as_numpy(arr)
exec_strategy.num_threads = 1
return [arr[i] for i in range(len(arr))]
if build_strategy is None:
build_strategy = BuildStrategy() @property
def device_count(self):
main = main_program return len(self._act_places)
main = main if main else framework.default_main_program()
if scope == None:
scope = executor.global_scope()
if share_vars_from and not isinstance(share_vars_from,
ParallelExecutor):
raise TypeError("share_vars_from must be ParallelExecutor.")
local_scopes = share_vars_from.executor.local_scopes(
) if share_vars_from else []
self.persistable_vars = [
v.name for v in [
var for var in main.list_vars()
if var.persistable and var.type != core.VarDesc.VarType.RAW
]
]
self.executor = core.ParallelExecutor(
self._places,
set([
cpt.to_text(p.name)
for p in main.global_block().iter_parameters()
if not p.stop_gradient
]),
set(cpt.to_text(var)
for var in self.persistable_vars), main.desc,
cpt.to_text(loss_name)
if loss_name else six.u(''), scope, local_scopes, exec_strategy,
build_strategy, num_trainers, trainer_id)
self.scope = scope
def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True):
"""
Run a parallel executor with fetch_list.
The feed parameter can be a dict or a list. If feed is a dict, the
feed data will be split into multiple devices. If feed is a list, we
assume the data has been splitted into multiple devices, the each
element in the list will be copied to each device directly.
For example, if the feed is a dict:
>>> exe = ParallelExecutor()
>>> # the image will be splitted into devices. If there is two devices
>>> # each device will process an image with shape (24, 1, 28, 28)
>>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))})
For example, if the feed is a list:
>>> exe = ParallelExecutor()
>>> # each device will process each element in the list.
>>> # the 1st device will process an image with shape (48, 1, 28, 28)
>>> # the 2nd device will process an image with shape (32, 1, 28, 28)
>>> #
>>> # you can use exe.device_count to get the device number.
>>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))},
>>> {"image": numpy.random.random(size=(32, 1, 28, 28))},
>>> ])
Args:
fetch_list(list): The fetched variable names
feed(list|dict|None): The feed variables. If the feed is a dict,
tensors in that dict will be splitted into each devices. If
the feed is a list, each element of the list will be copied
to each device. Default None.
feed_dict: Alias for feed parameter, for backward compatibility.
This parameter has been deprecated. Default None.
return_numpy(bool): Whether converts the fetched tensor to numpy.
Default: True.
Returns:
List: The fetched result list.
Raises:
ValueError: If the feed is a list, but its length is not equal the
length of active places, or its element's is not dict.
NOTES:
1. If the feed's type is dict, the number of data that feeds to
ParallelExecutor must be bigger than active places. Otherwise,
it will throw exception from C++ side. Special attention should be
paid to check whether the last batch of the dataset is bigger
than active places.
2. If active places are more than one, the fetch results for each
variable is a list, and each element of this list is the variable of
respective active place.
Examples:
.. code-block:: python
pe = fluid.ParallelExecutor(use_cuda=use_cuda,
loss_name=avg_cost.name,
main_program=fluid.default_main_program())
loss = pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name]))
"""
if feed is None and feed_dict is not None:
feed = feed_dict
print(
"`feed_dict` is deprecated. Please use `feed=`",
file=sys.stderr)
if isinstance(feed, dict):
feed_tensor_dict = dict()
for feed_name in feed:
feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
# always set to CPU place, since the tensor need to be splitted
# it is fast in CPU
feed_tensor.set(feed[feed_name], core.CPUPlace())
feed_tensor_dict[feed_name] = feed_tensor
self.executor.feed_and_split_tensor_into_local_scopes(
feed_tensor_dict)
elif isinstance(feed, list) or isinstance(feed, tuple):
if len(feed) != len(self._act_places):
raise ValueError(
"Feed a list of tensor, the list should be the same size as places"
)
res = list()
for i, each in enumerate(feed):
if not isinstance(each, dict):
raise TypeError(
"Each element of feed list should be a dict")
res_dict = dict()
for feed_name in each:
tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(tensor, self._act_places[i])
tensor = tmp
res_dict[feed_name] = tensor
res.append(res_dict)
self.executor.feed_tensors_into_local_scopes(res)
fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
if return_numpy:
return executor.as_numpy(arr)
return [arr[i] for i in range(len(arr))]
@property
def device_count(self):
return len(self._act_places)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册