提交 76d8b14b 编写于 作者: X Xin Pan

Add timeline support for distributed training

上级 0595f23e
...@@ -80,6 +80,8 @@ parser.add_argument( ...@@ -80,6 +80,8 @@ parser.add_argument(
type=str, type=str,
default="", default="",
help="Comma-separated list of hostname:port pairs") help="Comma-separated list of hostname:port pairs")
parser.add_argument(
"--profile", action='store_true', help="If set, profile a few steps.")
# Flags for defining the tf.train.Server # Flags for defining the tf.train.Server
parser.add_argument( parser.add_argument(
...@@ -183,8 +185,8 @@ def main(): ...@@ -183,8 +185,8 @@ def main():
start_time = time.time() start_time = time.time()
num_samples = 0 num_samples = 0
train_pass_acc.reset() train_pass_acc.reset()
for batch_id, data in enumerate(train_reader()):
ts = time.time() def run_step(batch_id, data):
img_data = np.array( img_data = np.array(
map(lambda x: x[0].reshape(data_shape), data)).astype( map(lambda x: x[0].reshape(data_shape), data)).astype(
"float32") "float32")
...@@ -196,14 +198,28 @@ def main(): ...@@ -196,14 +198,28 @@ def main():
feed={"pixel": img_data, feed={"pixel": img_data,
"label": y_data}, "label": y_data},
fetch_list=[avg_cost, batch_acc, batch_size]) fetch_list=[avg_cost, batch_acc, batch_size])
return loss, acc, b_size
if args.profile and args.task_index == 0:
# warmup.
for batch_id, data in enumerate(train_reader()):
if batch_id > 5: break
run_step(batch_id, data)
with profiler.profiler('All', 'total', '/tmp/profile_vgg'):
for batch_id, data in enumerate(train_reader()):
if batch_id > 5: break
run_step(batch_id, data)
for batch_id, data in enumerate(train_reader()):
ts = time.time()
loss, acc, b_size = run_step(batch_id, data)
iters += 1 iters += 1
num_samples += len(data) num_samples += len(data)
train_pass_acc.add(value=acc, weight=b_size) train_pass_acc.add(value=acc, weight=b_size)
print( print(
"Task:%d Pass = %d, Iters = %d, Loss = %f, Accuracy = %f, " "Pass = %d, Iters = %d, Loss = %f, Accuracy = %f, "
"Speed = %.2f img/s " % (args.task_index, pass_id, iters, "Speed = %.2f img/s" % (pass_id, iters, loss, acc,
loss, acc, len(data) / (time.time() - ts))
len(data) / (time.time() - ts))
) # The accuracy is the accumulation of batches, but not the current batch. ) # The accuracy is the accumulation of batches, but not the current batch.
pass_elapsed = time.time() - start_time pass_elapsed = time.time() - start_time
......
...@@ -33,7 +33,7 @@ ExternalProject_Add( ...@@ -33,7 +33,7 @@ ExternalProject_Add(
extern_grpc extern_grpc
DEPENDS protobuf zlib DEPENDS protobuf zlib
GIT_REPOSITORY "https://github.com/grpc/grpc.git" GIT_REPOSITORY "https://github.com/grpc/grpc.git"
GIT_TAG "v1.10.x" GIT_TAG "v1.8.x"
PREFIX ${GRPC_SOURCES_DIR} PREFIX ${GRPC_SOURCES_DIR}
UPDATE_COMMAND "" UPDATE_COMMAND ""
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
......
...@@ -69,6 +69,10 @@ message VariableMessage { ...@@ -69,6 +69,10 @@ message VariableMessage {
bytes rows = 9; bytes rows = 9;
// Look up table block execution output variable name. // Look up table block execution output variable name.
string out_varname = 10; string out_varname = 10;
// If true, the ps server will start profiling, the ps
// server stops profiling and generates a profile to /tmp/profile_ps_*
// when profile switches from true to false.
bool profile = 11;
} }
message VoidMessage {} message VoidMessage {}
...@@ -23,6 +23,7 @@ limitations under the License. */ ...@@ -23,6 +23,7 @@ limitations under the License. */
#include "paddle/fluid/operators/detail/bytebuffer_stream.h" #include "paddle/fluid/operators/detail/bytebuffer_stream.h"
#include "paddle/fluid/operators/detail/proto_encoder_helper.h" #include "paddle/fluid/operators/detail/proto_encoder_helper.h"
#include "paddle/fluid/operators/detail/variable_response.h" #include "paddle/fluid/operators/detail/variable_response.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -45,6 +46,13 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, ...@@ -45,6 +46,13 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
void* payload = nullptr; void* payload = nullptr;
size_t payload_size; size_t payload_size;
ProtoEncodeHelper e(static_cast<char*>(buf), 1024); ProtoEncodeHelper e(static_cast<char*>(buf), 1024);
// Note: normally the profiler is enabled in 1 trainer, hence only
// 1 trainer returns true for ShouldSendProfileState(). It tells PS
// servers the trainer's profiling state so that PS can follow the
// trainer.
if (platform::ShouldSendProfileState()) {
e.WriteBool(VarMsg::kProfileFieldNumber, platform::IsProfileEnabled());
}
e.WriteString(VarMsg::kVarnameFieldNumber, name); e.WriteString(VarMsg::kVarnameFieldNumber, name);
if (var->IsType<framework::LoDTensor>()) { if (var->IsType<framework::LoDTensor>()) {
e.WriteUint64(VarMsg::kTypeFieldNumber, 0); e.WriteUint64(VarMsg::kTypeFieldNumber, 0);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/operators/detail/send_recv.pb.h" #include "paddle/fluid/operators/detail/send_recv.pb.h"
#include "paddle/fluid/operators/detail/sendrecvop_utils.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h"
...@@ -427,7 +428,26 @@ int VariableResponse::Parse(Source* source) { ...@@ -427,7 +428,26 @@ int VariableResponse::Parse(Source* source) {
meta_.set_out_varname(temp); meta_.set_out_varname(temp);
break; break;
} }
case sendrecv::VariableMessage::kProfileFieldNumber: {
bool profiling;
if (!input.ReadRaw(reinterpret_cast<void*>(&profiling), 1)) {
return tag;
}
meta_.set_profile(profiling);
int64_t lisner_id = platform::ListenerId();
if (lisner_id <= 0) {
break;
}
if (profiling && !platform::IsProfileEnabled()) {
platform::EnableProfiler(platform::ProfilerState::kCPU);
} else if (!profiling && platform::IsProfileEnabled()) {
// TODO(panyx0718): Should we allow to customize file dir.
platform::DisableProfiler(
platform::EventSortingKey::kDefault,
string::Sprintf("/tmp/profile_ps_%lld", lisner_id));
}
break;
}
default: { default: {
// Unknown tag, return unknown error. // Unknown tag, return unknown error.
return -1; return -1;
......
...@@ -18,6 +18,7 @@ limitations under the License. */ ...@@ -18,6 +18,7 @@ limitations under the License. */
#include <vector> #include <vector>
#include "paddle/fluid/operators/listen_and_serv_op.h" #include "paddle/fluid/operators/listen_and_serv_op.h"
#include "paddle/fluid/platform/profiler.h"
namespace paddle { namespace paddle {
namespace operators { namespace operators {
...@@ -294,6 +295,8 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, ...@@ -294,6 +295,8 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor,
void ListenAndServOp::RunImpl(const framework::Scope &scope, void ListenAndServOp::RunImpl(const framework::Scope &scope,
const platform::Place &dev_place) const { const platform::Place &dev_place) const {
// Mark this as PS that it should decide profiling by listening from trainer.
platform::SetProfileLisener();
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place); auto &dev_ctx = *pool.Get(dev_place);
framework::Scope &recv_scope = scope.NewScope(); framework::Scope &recv_scope = scope.NewScope();
...@@ -328,9 +331,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, ...@@ -328,9 +331,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope,
rpc_service_->WaitServerReady(); rpc_service_->WaitServerReady();
// Write to a file of server selected port for python use. // Write to a file of server selected port for python use.
std::string file_path = std::string file_path = string::Sprintf("/tmp/paddle.%d.selected_port",
string::Sprintf("/tmp/paddle.%d.selected_port", static_cast<int>(::getpid()));
static_cast<int>(::getpid()));
SavePort(file_path); SavePort(file_path);
if (sync_mode) { if (sync_mode) {
RunSyncLoop(&executor, program, &recv_scope, prefetch_block); RunSyncLoop(&executor, program, &recv_scope, prefetch_block);
......
...@@ -13,12 +13,15 @@ See the License for the specific language governing permissions and ...@@ -13,12 +13,15 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
#include <sys/time.h> #include <sys/time.h>
#include <time.h> #include <time.h>
#include <algorithm> #include <algorithm>
#include <iomanip> #include <iomanip>
#include <limits>
#include <map> #include <map>
#include <mutex> // NOLINT #include <mutex> // NOLINT
#include <random>
#include <string> #include <string>
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
#include <cuda.h> #include <cuda.h>
...@@ -33,6 +36,9 @@ namespace platform { ...@@ -33,6 +36,9 @@ namespace platform {
struct EventList; struct EventList;
static int64_t profiler_lister_id = 0;
static bool should_send_profile_state = false;
// The profiler state, the initial value is ProfilerState::kDisabled // The profiler state, the initial value is ProfilerState::kDisabled
static ProfilerState g_state = ProfilerState::kDisabled; static ProfilerState g_state = ProfilerState::kDisabled;
// The thread local event list only can be accessed by the specific thread // The thread local event list only can be accessed by the specific thread
...@@ -219,13 +225,12 @@ void EnableProfiler(ProfilerState state) { ...@@ -219,13 +225,12 @@ void EnableProfiler(ProfilerState state) {
PADDLE_ENFORCE(state != ProfilerState::kDisabled, PADDLE_ENFORCE(state != ProfilerState::kDisabled,
"Can't enbale profling, since the input state is ", "Can't enbale profling, since the input state is ",
"ProfilerState::kDisabled"); "ProfilerState::kDisabled");
PADDLE_ENFORCE(g_state == ProfilerState::kDisabled, if (state == g_state) {
"The profiling state should be disabled when calling ", return;
"EnableProfiler.");
g_state = state;
if (g_state == ProfilerState::kAll) {
GetDeviceTracer()->Enable();
} }
g_state = state;
should_send_profile_state = true;
GetDeviceTracer()->Enable();
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
if (g_state == ProfilerState::kCUDA) { if (g_state == ProfilerState::kCUDA) {
// Generate some dummy events first to reduce the startup overhead. // Generate some dummy events first to reduce the startup overhead.
...@@ -435,8 +440,7 @@ void ParseEvents(const std::vector<std::vector<Event>>& events, ...@@ -435,8 +440,7 @@ void ParseEvents(const std::vector<std::vector<Event>>& events,
void DisableProfiler(EventSortingKey sorted_key, void DisableProfiler(EventSortingKey sorted_key,
const std::string& profile_path) { const std::string& profile_path) {
PADDLE_ENFORCE(g_state != ProfilerState::kDisabled, if (g_state == ProfilerState::kDisabled) return;
"Can't disable profiling, since it's not starting.");
// Mark the profiling stop. // Mark the profiling stop.
Mark("_stop_profiler_", nullptr); Mark("_stop_profiler_", nullptr);
...@@ -444,12 +448,25 @@ void DisableProfiler(EventSortingKey sorted_key, ...@@ -444,12 +448,25 @@ void DisableProfiler(EventSortingKey sorted_key,
ParseEvents(all_events, sorted_key); ParseEvents(all_events, sorted_key);
ResetProfiler(); ResetProfiler();
DeviceTracer* tracer = GetDeviceTracer(); DeviceTracer* tracer = GetDeviceTracer();
if (g_state == ProfilerState::kAll && tracer && tracer->IsEnabled()) { if (tracer->IsEnabled()) {
tracer->Disable(); tracer->Disable();
tracer->GenProfile(profile_path); tracer->GenProfile(profile_path);
} }
g_state = ProfilerState::kDisabled; g_state = ProfilerState::kDisabled;
should_send_profile_state = true;
}
bool IsProfileEnabled() { return g_state != ProfilerState::kDisabled; }
bool ShouldSendProfileState() { return should_send_profile_state; }
void SetProfileLisener() {
std::mt19937 rng;
rng.seed(std::random_device()());
std::uniform_int_distribution<std::mt19937::result_type> dist6(
1, std::numeric_limits<int64_t>::max());
profiler_lister_id = dist6(rng);
} }
int64_t ListenerId() { return profiler_lister_id; }
} // namespace platform } // namespace platform
} // namespace paddle } // namespace paddle
...@@ -114,5 +114,13 @@ void ResetProfiler(); ...@@ -114,5 +114,13 @@ void ResetProfiler();
void DisableProfiler(EventSortingKey sorted_key, void DisableProfiler(EventSortingKey sorted_key,
const std::string& profile_path); const std::string& profile_path);
// Test if the profiler is currently enabled.
bool IsProfileEnabled();
// Whether the trainer should send profiling state to PS.
bool ShouldSendProfileState();
// Mark current process as PS by assigning a lister id.
void SetProfileLisener();
int64_t ListenerId();
} // namespace platform } // namespace platform
} // namespace paddle } // namespace paddle
...@@ -22,7 +22,11 @@ import paddle.fluid.proto.profiler.profiler_pb2 as profiler_pb2 ...@@ -22,7 +22,11 @@ import paddle.fluid.proto.profiler.profiler_pb2 as profiler_pb2
parser = argparse.ArgumentParser(description=__doc__) parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument( parser.add_argument(
'--profile_path', type=str, default='', help='Input profile file name.') '--profile_path',
type=str,
default='',
help='Input profile file name. If there are multiple file, the format '
'should be trainer1=file1,trainer2=file2,ps=file3')
parser.add_argument( parser.add_argument(
'--timeline_path', type=str, default='', help='Output timeline file name.') '--timeline_path', type=str, default='', help='Output timeline file name.')
args = parser.parse_args() args = parser.parse_args()
...@@ -108,8 +112,8 @@ class _ChromeTraceFormatter(object): ...@@ -108,8 +112,8 @@ class _ChromeTraceFormatter(object):
class Timeline(object): class Timeline(object):
def __init__(self, profile_pb): def __init__(self, profile_dict):
self._profile_pb = profile_pb self._profile_dict = profile_dict
self._pid = 0 self._pid = 0
self._devices = dict() self._devices = dict()
self._chrome_trace = _ChromeTraceFormatter() self._chrome_trace = _ChromeTraceFormatter()
...@@ -120,35 +124,37 @@ class Timeline(object): ...@@ -120,35 +124,37 @@ class Timeline(object):
return cur_pid return cur_pid
def _allocate_pids(self): def _allocate_pids(self):
for event in self._profile_pb.events: for k, profile_pb in self._profile_dict.iteritems():
if event.type == profiler_pb2.Event.CPU: for event in profile_pb.events:
if (event.device_id, "CPU") not in self._devices: if event.type == profiler_pb2.Event.CPU:
pid = self._allocate_pid() if (k, event.device_id, "CPU") not in self._devices:
self._devices[(event.device_id, "CPU")] = pid pid = self._allocate_pid()
self._chrome_trace.emit_pid("cpu:block:%d" % self._devices[(k, event.device_id, "CPU")] = pid
(event.device_id), pid) self._chrome_trace.emit_pid("%s:cpu:block:%d" %
elif event.type == profiler_pb2.Event.GPUKernel: (k, event.device_id), pid)
if (event.device_id, "GPUKernel") not in self._devices: elif event.type == profiler_pb2.Event.GPUKernel:
pid = self._allocate_pid() if (k, event.device_id, "GPUKernel") not in self._devices:
self._devices[(event.device_id, "GPUKernel")] = pid pid = self._allocate_pid()
self._chrome_trace.emit_pid("gpu:%d" % (event.device_id), self._devices[(k, event.device_id, "GPUKernel")] = pid
pid) self._chrome_trace.emit_pid("%s:gpu:%d" %
(k, event.device_id), pid)
def _allocate_events(self): def _allocate_events(self):
for event in self._profile_pb.events: for k, profile_pb in self._profile_dict.iteritems():
if event.type == profiler_pb2.Event.CPU: for event in profile_pb.events:
type = "CPU" if event.type == profiler_pb2.Event.CPU:
elif event.type == profiler_pb2.Event.GPUKernel: type = "CPU"
type = "GPUKernel" elif event.type == profiler_pb2.Event.GPUKernel:
pid = self._devices[(event.device_id, type)] type = "GPUKernel"
args = {'name': event.name} pid = self._devices[(k, event.device_id, type)]
if event.memcopy.bytes > 0: args = {'name': event.name}
args = {'mem_bytes': event.memcopy.bytes} if event.memcopy.bytes > 0:
# TODO(panyx0718): Chrome tracing only handles ms. However, some args = {'mem_bytes': event.memcopy.bytes}
# ops takes micro-seconds. Hence, we keep the ns here. # TODO(panyx0718): Chrome tracing only handles ms. However, some
self._chrome_trace.emit_region( # ops takes micro-seconds. Hence, we keep the ns here.
event.start_ns, (event.end_ns - event.start_ns) / 1.0, pid, self._chrome_trace.emit_region(
event.sub_device_id, 'Op', event.name, args) event.start_ns, (event.end_ns - event.start_ns) / 1.0, pid,
event.sub_device_id, 'Op', event.name, args)
def generate_chrome_trace(self): def generate_chrome_trace(self):
self._allocate_pids() self._allocate_pids()
...@@ -163,11 +169,23 @@ timeline_path = '/tmp/timeline' ...@@ -163,11 +169,23 @@ timeline_path = '/tmp/timeline'
if args.timeline_path: if args.timeline_path:
timeline_path = args.timeline_path timeline_path = args.timeline_path
with open(profile_path, 'r') as f: profile_paths = profile_path.split(',')
profile_s = f.read() profile_dict = dict()
profile_pb = profiler_pb2.Profile() if len(profile_path) == 1:
profile_pb.ParseFromString(profile_s) with open(profile_path, 'r') as f:
profile_s = f.read()
tl = Timeline(profile_pb) profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(profile_s)
profile_dict['trainer'] = profile_pb
else:
for profile_path in profile_paths:
k, v = profile_path.split('=')
with open(v, 'r') as f:
profile_s = f.read()
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(profile_s)
profile_dict[k] = profile_pb
tl = Timeline(profile_dict)
with open(timeline_path, 'w') as f: with open(timeline_path, 'w') as f:
f.write(tl.generate_chrome_trace()) f.write(tl.generate_chrome_trace())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册