From 76d8b14bceeb7f2292b617bb19c33dbcfd6dc8f6 Mon Sep 17 00:00:00 2001 From: Xin Pan Date: Thu, 3 May 2018 05:30:50 -0700 Subject: [PATCH] Add timeline support for distributed training --- benchmark/cluster/vgg16/vgg16_fluid.py | 28 ++++-- cmake/external/grpc.cmake | 2 +- paddle/fluid/operators/detail/send_recv.proto | 4 + .../operators/detail/sendrecvop_utils.cc | 8 ++ .../operators/detail/variable_response.cc | 22 ++++- paddle/fluid/operators/listen_and_serv_op.cc | 8 +- paddle/fluid/platform/profiler.cc | 35 ++++++-- paddle/fluid/platform/profiler.h | 8 ++ tools/timeline.py | 90 +++++++++++-------- 9 files changed, 149 insertions(+), 56 deletions(-) diff --git a/benchmark/cluster/vgg16/vgg16_fluid.py b/benchmark/cluster/vgg16/vgg16_fluid.py index 6c7d2c103..05b5f3977 100644 --- a/benchmark/cluster/vgg16/vgg16_fluid.py +++ b/benchmark/cluster/vgg16/vgg16_fluid.py @@ -80,6 +80,8 @@ parser.add_argument( type=str, default="", help="Comma-separated list of hostname:port pairs") +parser.add_argument( + "--profile", action='store_true', help="If set, profile a few steps.") # Flags for defining the tf.train.Server parser.add_argument( @@ -183,8 +185,8 @@ def main(): start_time = time.time() num_samples = 0 train_pass_acc.reset() - for batch_id, data in enumerate(train_reader()): - ts = time.time() + + def run_step(batch_id, data): img_data = np.array( map(lambda x: x[0].reshape(data_shape), data)).astype( "float32") @@ -196,14 +198,28 @@ def main(): feed={"pixel": img_data, "label": y_data}, fetch_list=[avg_cost, batch_acc, batch_size]) + return loss, acc, b_size + + if args.profile and args.task_index == 0: + # warmup. + for batch_id, data in enumerate(train_reader()): + if batch_id > 5: break + run_step(batch_id, data) + with profiler.profiler('All', 'total', '/tmp/profile_vgg'): + for batch_id, data in enumerate(train_reader()): + if batch_id > 5: break + run_step(batch_id, data) + + for batch_id, data in enumerate(train_reader()): + ts = time.time() + loss, acc, b_size = run_step(batch_id, data) iters += 1 num_samples += len(data) train_pass_acc.add(value=acc, weight=b_size) print( - "Task:%d Pass = %d, Iters = %d, Loss = %f, Accuracy = %f, " - "Speed = %.2f img/s " % (args.task_index, pass_id, iters, - loss, acc, - len(data) / (time.time() - ts)) + "Pass = %d, Iters = %d, Loss = %f, Accuracy = %f, " + "Speed = %.2f img/s" % (pass_id, iters, loss, acc, + len(data) / (time.time() - ts)) ) # The accuracy is the accumulation of batches, but not the current batch. pass_elapsed = time.time() - start_time diff --git a/cmake/external/grpc.cmake b/cmake/external/grpc.cmake index e90948782..ef520b128 100644 --- a/cmake/external/grpc.cmake +++ b/cmake/external/grpc.cmake @@ -33,7 +33,7 @@ ExternalProject_Add( extern_grpc DEPENDS protobuf zlib GIT_REPOSITORY "https://github.com/grpc/grpc.git" - GIT_TAG "v1.10.x" + GIT_TAG "v1.8.x" PREFIX ${GRPC_SOURCES_DIR} UPDATE_COMMAND "" CONFIGURE_COMMAND "" diff --git a/paddle/fluid/operators/detail/send_recv.proto b/paddle/fluid/operators/detail/send_recv.proto index 02bb2b9ce..fffa9ae7a 100644 --- a/paddle/fluid/operators/detail/send_recv.proto +++ b/paddle/fluid/operators/detail/send_recv.proto @@ -69,6 +69,10 @@ message VariableMessage { bytes rows = 9; // Look up table block execution output variable name. string out_varname = 10; + // If true, the ps server will start profiling, the ps + // server stops profiling and generates a profile to /tmp/profile_ps_* + // when profile switches from true to false. + bool profile = 11; } message VoidMessage {} diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 766bcf1ac..d68cf467f 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -23,6 +23,7 @@ limitations under the License. */ #include "paddle/fluid/operators/detail/bytebuffer_stream.h" #include "paddle/fluid/operators/detail/proto_encoder_helper.h" #include "paddle/fluid/operators/detail/variable_response.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -45,6 +46,13 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var, void* payload = nullptr; size_t payload_size; ProtoEncodeHelper e(static_cast(buf), 1024); + // Note: normally the profiler is enabled in 1 trainer, hence only + // 1 trainer returns true for ShouldSendProfileState(). It tells PS + // servers the trainer's profiling state so that PS can follow the + // trainer. + if (platform::ShouldSendProfileState()) { + e.WriteBool(VarMsg::kProfileFieldNumber, platform::IsProfileEnabled()); + } e.WriteString(VarMsg::kVarnameFieldNumber, name); if (var->IsType()) { e.WriteUint64(VarMsg::kTypeFieldNumber, 0); diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/detail/variable_response.cc index fbef8d02a..335491e95 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/detail/variable_response.cc @@ -17,6 +17,7 @@ #include #include #include +#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/operators/detail/send_recv.pb.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h" @@ -427,7 +428,26 @@ int VariableResponse::Parse(Source* source) { meta_.set_out_varname(temp); break; } - + case sendrecv::VariableMessage::kProfileFieldNumber: { + bool profiling; + if (!input.ReadRaw(reinterpret_cast(&profiling), 1)) { + return tag; + } + meta_.set_profile(profiling); + int64_t lisner_id = platform::ListenerId(); + if (lisner_id <= 0) { + break; + } + if (profiling && !platform::IsProfileEnabled()) { + platform::EnableProfiler(platform::ProfilerState::kCPU); + } else if (!profiling && platform::IsProfileEnabled()) { + // TODO(panyx0718): Should we allow to customize file dir. + platform::DisableProfiler( + platform::EventSortingKey::kDefault, + string::Sprintf("/tmp/profile_ps_%lld", lisner_id)); + } + break; + } default: { // Unknown tag, return unknown error. return -1; diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 59b945115..470a567e8 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -18,6 +18,7 @@ limitations under the License. */ #include #include "paddle/fluid/operators/listen_and_serv_op.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { @@ -294,6 +295,8 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, void ListenAndServOp::RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const { + // Mark this as PS that it should decide profiling by listening from trainer. + platform::SetProfileLisener(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(dev_place); framework::Scope &recv_scope = scope.NewScope(); @@ -328,9 +331,8 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_->WaitServerReady(); // Write to a file of server selected port for python use. - std::string file_path = - string::Sprintf("/tmp/paddle.%d.selected_port", - static_cast(::getpid())); + std::string file_path = string::Sprintf("/tmp/paddle.%d.selected_port", + static_cast(::getpid())); SavePort(file_path); if (sync_mode) { RunSyncLoop(&executor, program, &recv_scope, prefetch_block); diff --git a/paddle/fluid/platform/profiler.cc b/paddle/fluid/platform/profiler.cc index 412cdda28..ac16e4cd5 100644 --- a/paddle/fluid/platform/profiler.cc +++ b/paddle/fluid/platform/profiler.cc @@ -13,12 +13,15 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/platform/profiler.h" + #include #include #include #include +#include #include #include // NOLINT +#include #include #ifdef PADDLE_WITH_CUDA #include @@ -33,6 +36,9 @@ namespace platform { struct EventList; +static int64_t profiler_lister_id = 0; +static bool should_send_profile_state = false; + // The profiler state, the initial value is ProfilerState::kDisabled static ProfilerState g_state = ProfilerState::kDisabled; // The thread local event list only can be accessed by the specific thread @@ -219,13 +225,12 @@ void EnableProfiler(ProfilerState state) { PADDLE_ENFORCE(state != ProfilerState::kDisabled, "Can't enbale profling, since the input state is ", "ProfilerState::kDisabled"); - PADDLE_ENFORCE(g_state == ProfilerState::kDisabled, - "The profiling state should be disabled when calling ", - "EnableProfiler."); - g_state = state; - if (g_state == ProfilerState::kAll) { - GetDeviceTracer()->Enable(); + if (state == g_state) { + return; } + g_state = state; + should_send_profile_state = true; + GetDeviceTracer()->Enable(); #ifdef PADDLE_WITH_CUDA if (g_state == ProfilerState::kCUDA) { // Generate some dummy events first to reduce the startup overhead. @@ -435,8 +440,7 @@ void ParseEvents(const std::vector>& events, void DisableProfiler(EventSortingKey sorted_key, const std::string& profile_path) { - PADDLE_ENFORCE(g_state != ProfilerState::kDisabled, - "Can't disable profiling, since it's not starting."); + if (g_state == ProfilerState::kDisabled) return; // Mark the profiling stop. Mark("_stop_profiler_", nullptr); @@ -444,12 +448,25 @@ void DisableProfiler(EventSortingKey sorted_key, ParseEvents(all_events, sorted_key); ResetProfiler(); DeviceTracer* tracer = GetDeviceTracer(); - if (g_state == ProfilerState::kAll && tracer && tracer->IsEnabled()) { + if (tracer->IsEnabled()) { tracer->Disable(); tracer->GenProfile(profile_path); } g_state = ProfilerState::kDisabled; + should_send_profile_state = true; +} + +bool IsProfileEnabled() { return g_state != ProfilerState::kDisabled; } +bool ShouldSendProfileState() { return should_send_profile_state; } + +void SetProfileLisener() { + std::mt19937 rng; + rng.seed(std::random_device()()); + std::uniform_int_distribution dist6( + 1, std::numeric_limits::max()); + profiler_lister_id = dist6(rng); } +int64_t ListenerId() { return profiler_lister_id; } } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/profiler.h b/paddle/fluid/platform/profiler.h index 428d9ebce..c8b8c258a 100644 --- a/paddle/fluid/platform/profiler.h +++ b/paddle/fluid/platform/profiler.h @@ -114,5 +114,13 @@ void ResetProfiler(); void DisableProfiler(EventSortingKey sorted_key, const std::string& profile_path); +// Test if the profiler is currently enabled. +bool IsProfileEnabled(); +// Whether the trainer should send profiling state to PS. +bool ShouldSendProfileState(); +// Mark current process as PS by assigning a lister id. +void SetProfileLisener(); +int64_t ListenerId(); + } // namespace platform } // namespace paddle diff --git a/tools/timeline.py b/tools/timeline.py index f4083c824..8cd6353d4 100644 --- a/tools/timeline.py +++ b/tools/timeline.py @@ -22,7 +22,11 @@ import paddle.fluid.proto.profiler.profiler_pb2 as profiler_pb2 parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( - '--profile_path', type=str, default='', help='Input profile file name.') + '--profile_path', + type=str, + default='', + help='Input profile file name. If there are multiple file, the format ' + 'should be trainer1=file1,trainer2=file2,ps=file3') parser.add_argument( '--timeline_path', type=str, default='', help='Output timeline file name.') args = parser.parse_args() @@ -108,8 +112,8 @@ class _ChromeTraceFormatter(object): class Timeline(object): - def __init__(self, profile_pb): - self._profile_pb = profile_pb + def __init__(self, profile_dict): + self._profile_dict = profile_dict self._pid = 0 self._devices = dict() self._chrome_trace = _ChromeTraceFormatter() @@ -120,35 +124,37 @@ class Timeline(object): return cur_pid def _allocate_pids(self): - for event in self._profile_pb.events: - if event.type == profiler_pb2.Event.CPU: - if (event.device_id, "CPU") not in self._devices: - pid = self._allocate_pid() - self._devices[(event.device_id, "CPU")] = pid - self._chrome_trace.emit_pid("cpu:block:%d" % - (event.device_id), pid) - elif event.type == profiler_pb2.Event.GPUKernel: - if (event.device_id, "GPUKernel") not in self._devices: - pid = self._allocate_pid() - self._devices[(event.device_id, "GPUKernel")] = pid - self._chrome_trace.emit_pid("gpu:%d" % (event.device_id), - pid) + for k, profile_pb in self._profile_dict.iteritems(): + for event in profile_pb.events: + if event.type == profiler_pb2.Event.CPU: + if (k, event.device_id, "CPU") not in self._devices: + pid = self._allocate_pid() + self._devices[(k, event.device_id, "CPU")] = pid + self._chrome_trace.emit_pid("%s:cpu:block:%d" % + (k, event.device_id), pid) + elif event.type == profiler_pb2.Event.GPUKernel: + if (k, event.device_id, "GPUKernel") not in self._devices: + pid = self._allocate_pid() + self._devices[(k, event.device_id, "GPUKernel")] = pid + self._chrome_trace.emit_pid("%s:gpu:%d" % + (k, event.device_id), pid) def _allocate_events(self): - for event in self._profile_pb.events: - if event.type == profiler_pb2.Event.CPU: - type = "CPU" - elif event.type == profiler_pb2.Event.GPUKernel: - type = "GPUKernel" - pid = self._devices[(event.device_id, type)] - args = {'name': event.name} - if event.memcopy.bytes > 0: - args = {'mem_bytes': event.memcopy.bytes} - # TODO(panyx0718): Chrome tracing only handles ms. However, some - # ops takes micro-seconds. Hence, we keep the ns here. - self._chrome_trace.emit_region( - event.start_ns, (event.end_ns - event.start_ns) / 1.0, pid, - event.sub_device_id, 'Op', event.name, args) + for k, profile_pb in self._profile_dict.iteritems(): + for event in profile_pb.events: + if event.type == profiler_pb2.Event.CPU: + type = "CPU" + elif event.type == profiler_pb2.Event.GPUKernel: + type = "GPUKernel" + pid = self._devices[(k, event.device_id, type)] + args = {'name': event.name} + if event.memcopy.bytes > 0: + args = {'mem_bytes': event.memcopy.bytes} + # TODO(panyx0718): Chrome tracing only handles ms. However, some + # ops takes micro-seconds. Hence, we keep the ns here. + self._chrome_trace.emit_region( + event.start_ns, (event.end_ns - event.start_ns) / 1.0, pid, + event.sub_device_id, 'Op', event.name, args) def generate_chrome_trace(self): self._allocate_pids() @@ -163,11 +169,23 @@ timeline_path = '/tmp/timeline' if args.timeline_path: timeline_path = args.timeline_path -with open(profile_path, 'r') as f: - profile_s = f.read() - profile_pb = profiler_pb2.Profile() - profile_pb.ParseFromString(profile_s) - -tl = Timeline(profile_pb) +profile_paths = profile_path.split(',') +profile_dict = dict() +if len(profile_path) == 1: + with open(profile_path, 'r') as f: + profile_s = f.read() + profile_pb = profiler_pb2.Profile() + profile_pb.ParseFromString(profile_s) + profile_dict['trainer'] = profile_pb +else: + for profile_path in profile_paths: + k, v = profile_path.split('=') + with open(v, 'r') as f: + profile_s = f.read() + profile_pb = profiler_pb2.Profile() + profile_pb.ParseFromString(profile_s) + profile_dict[k] = profile_pb + +tl = Timeline(profile_dict) with open(timeline_path, 'w') as f: f.write(tl.generate_chrome_trace()) -- GitLab