diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index cfc1c2a5f5ffb218dc5bd801046ab8495bcff09d..e1cb683e1ecf12d507a954003a8fae6312b85324 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -155,22 +155,31 @@ nv_test(op_registry_test SRCS op_registry_test.cc DEPS op_registry) if(WITH_PYTHON) py_proto_compile(framework_py_proto SRCS framework.proto data_feed.proto) py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto) + py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto) #Generate an empty \ #__init__.py to make framework_py_proto as a valid python module. add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) - add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto) + add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto) if (NOT WIN32) add_custom_command(TARGET framework_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto + COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto + COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto/__init__.py COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/ + COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto COMMENT "Copy generated python proto into directory paddle/fluid/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) else(NOT WIN32) string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/") + string(REPLACE "/" "\\" fleet_proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fleet/proto/") add_custom_command(TARGET framework_py_proto POST_BUILD COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto + COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto + COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto/__init__.py COMMAND copy /Y *.py ${proto_dstpath} + COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath} COMMENT "Copy generated python proto into directory paddle/fluid/proto." + COMMENT "Copy generated python proto into directory paddle/fleet/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) endif(NOT WIN32) endif() diff --git a/paddle/fluid/framework/distributed_strategy.proto b/paddle/fluid/framework/distributed_strategy.proto new file mode 100644 index 0000000000000000000000000000000000000000..9bcd79cd34f07cb38ea28e1068bb6045cb82d27a --- /dev/null +++ b/paddle/fluid/framework/distributed_strategy.proto @@ -0,0 +1,87 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; +package paddle.fleet; + +enum Mode { + COLLECTIVE = 1; + PS = 2; + PIPELINE = 3; + HETER = 4; // support XPU and GPU computing server +} + +message DistributedStrategy { + optional Mode mode = 1 [ default = COLLECTIVE ]; // just for serialization + // collective training strategy + optional bool amp = 2 [ default = false ]; + optional int32 amp_loss_scaling = 3 [ default = 32768 ]; + optional bool recompute = 4 [ default = false ]; + repeated string recompute_checkpoints = 5; + optional bool localsgd = 6 [ default = false ]; + optional int32 localsgd_k_step = 7 [ default = 4 ]; + optional bool dgc = 8 [ default = false ]; + optional bool hierachical_allreduce = 9 [ default = false ]; + optional int32 nccl_comm_num = 10 [ default = 1 ]; + optional bool gradient_merge = 11 [ default = false ]; + optional int32 gradient_merge_k_step = 12 [ default = 1 ]; + optional bool sequential_execution = 13 [ default = false ]; + optional bool enable_backward_optimizer_op_deps = 14 [ default = true ]; + optional bool lars = 15 [ default = false ]; + optional bool lamb = 16 [ default = false ]; + optional bool fuse_elewise_add_act_ops = 17 [ default = false ]; + optional bool fuse_bn_act_ops = 18 [ default = false ]; + optional bool enable_auto_fusion = 19 [ default = false ]; + optional bool fuse_relu_depthwise_conv = 20 [ default = false ]; + optional bool enable_inplace = 21 [ default = false ]; + optional bool fuse_all_reduce_ops = 22 [ default = false ]; + optional int32 num_iteration_per_drop_scope = 23 [ default = 1 ]; + optional bool sync_batch_norm = 24 [ default = false ]; + optional bool fuse_all_optimizer_ops = 25 [ default = false ]; + + // pipeline training + optional bool pipeline = 101 [ default = false ]; + optional int32 pipeline_micro_batch = 102; + + // parameter server training + optional bool sync = 201 [ default = false ]; + optional bool async = 202 [ default = true ]; + optional int32 async_k_step = 203 [ default = -1 ]; + optional int32 max_merge_var_num = 204 [ default = 1 ]; + optional int32 send_queue_size = 205 [ default = 16 ]; + optional bool independent_recv_thread = 206 [ default = false ]; + optional int32 min_send_grad_num_before_recv = 207 [ default = 1 ]; + optional int32 thread_pool_size = 208 [ default = 1 ]; + optional int32 send_wait_times = 209 [ default = 1 ]; + optional bool runtime_split_send_recv = 210 [ default = false ]; + optional bool use_thread_barrier = 211 [ default = false ]; + + // elastic deep learning strategies + optional bool elastic = 301 [ default = false ]; + + // auto parallel + optional bool auto = 401 [ default = false ]; +} + +message DistributedJobInfo { + optional int32 worker_num = 1; + optional int32 server_num = 2; + repeated string worker_ips = 3; + repeated string server_endpoints = 4; + optional string origin_startup = 5; + optional string origin_main = 6; // without backpropagation and optimization + optional string distributed_main = 7; // with backpropagation and optimization + optional string optimizer_name = 8; // optimizer name + optional DistributedStrategy strategy = 101; +} diff --git a/python/paddle/__init__.py b/python/paddle/__init__.py index 9862ef8ac069233f7a436635fdd875e39ea0416f..a2096a01ccdd98f69839e1a1d129eddcd5e7c880 100644 --- a/python/paddle/__init__.py +++ b/python/paddle/__init__.py @@ -36,6 +36,7 @@ import paddle.distributed import paddle.sysconfig import paddle.tensor import paddle.nn +import paddle.fleet import paddle.framework import paddle.imperative import paddle.optimizer diff --git a/python/paddle/fleet/__init__.py b/python/paddle/fleet/__init__.py index 343a6ca9bd7dfd5dfb30caf77d1cb6bd10c1d090..a5a8d12ed440077714a59773e1c870848e9de229 100644 --- a/python/paddle/fleet/__init__.py +++ b/python/paddle/fleet/__init__.py @@ -13,16 +13,11 @@ # limitations under the License. # TODO: define distributed api under this directory, -# __all__ = ['metric', -# 'optimizer', -# 'RoleMaker', -# 'dataset', -# ' DatasetFactory', -# ' InMemoryDataset', -# ' QueueDataset', -# 'transpiler', -# ' DistributeTranspiler', -# ' DistributeTranspilerConfig', -# ' HashName', -# ' RoundRobin', -# 'collective'] +from .base.distributed_strategy import DistributedStrategy +#from .base.role_maker import PaddleCloudRoleMaker, UserDefinedRoleMaker +#from .base.fleet_base import Fleet + +#__all__ = [ +# "DistributedStrategy", "PaddleCloudRoleMaker", "UserDefinedRoleMaker" +#] +__all__ = ['DistributedStrategy'] diff --git a/python/paddle/fleet/base/__init__.py b/python/paddle/fleet/base/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..abf198b97e6e818e1fbe59006f98492640bcee54 --- /dev/null +++ b/python/paddle/fleet/base/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/paddle/fleet/base/distributed_strategy.py b/python/paddle/fleet/base/distributed_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..c72e3ca160dafc0ad2f166bca826f689a100a92d --- /dev/null +++ b/python/paddle/fleet/base/distributed_strategy.py @@ -0,0 +1,525 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle.fleet.proto import distributed_strategy_pb2 +from paddle.fluid.framework import Variable + + +class DistributedJobInfo(object): + """ + DistributedJobInfo will serialize all distributed training information + Just for inner use: 1) debug 2) replicate experiments + """ + + def __init__(self): + self.job_info = distributed_strategy_pb2.DistributedJobInfo() + + def _set_worker_num(self, worker_num): + self.job_info.worker_num = worker_num + + def _set_server_num(self, server_num): + self.job_info.server_num = server_num + + def _set_worker_ips(self, worker_ips): + self.job_info.worker_ips.extend(worker_ips) + + def _set_server_endpoints(self, server_endpoints): + self.job_info.server_endpoints.extend(server_endpoints) + + def _set_origin_startup(self, origin_startup_prog): + self.job_info.origin_startup = str(origin_startup_prog) + + def _set_origin_main(self, origin_main_prog): + self.job_info.origin_main = str(origin_main_prog) + + def _distributed_main(self, distributed_main_prog): + self.job_info.distributed_main = str(distributed_main_prog) + + def _optimizer_name(self, optimizer_name): + self.job_info.optimizer_name = optimizer_name + + def _set_distributed_strategy(self, dist_strategy): + self.job_info.strategy = dist_strategy + + +class DistributedStrategy(object): + def __init__(self): + self.strategy = distributed_strategy_pb2.DistributedStrategy() + + @property + def amp(self): + return self.strategy.amp + + @amp.setter + def amp(self, flag): + if isinstance(flag, bool): + self.strategy.amp = flag + else: + print("WARNING: amp should have value of bool type") + + @property + def amp_loss_scaling(self): + return self.strategy.amp_loss_scaling + + @amp_loss_scaling.setter + def amp_loss_scaling(self, value): + if isinstance(value, int): + self.strategy.amp_loss_scaling = value + else: + print("WARNING: amp_loss_scaling should have value of int type") + + @property + def recompute(self): + return self.strategy.recompute + + @recompute.setter + def recompute(self, flag): + if isinstance(flag, bool): + self.strategy.recompute = flag + else: + print("WARNING: recompute should have value of bool type") + + @property + def recompute_checkpoints(self): + return self.strategy.recompute_checkpoints + + @recompute_checkpoints.setter + def recompute_checkpoints(self, checkpoints): + if isinstance(checkpoints, list): + str_list = True + var_list = True + for item in checkpoints: + if not isinstance(item, str): + str_list = False + if not isinstance(item, Variable): + var_list = False + + assert (str_list and var_list) == False + if str_list: + self.strategy.ClearField("recompute_checkpoints") + self.strategy.recompute_checkpoints.extend(checkpoints) + elif var_list: + names = [x.name for x in checkpoints] + self.strategy.ClearField("recompute_checkpoints") + self.strategy.recompute_checkpoints.extend(names) + else: + print( + "WARNING: recompute_checkpoints should have value of list[Variable] or list[name] type" + ) + else: + print( + "WARNING: recompute_checkpoints should have value of list[Variable] or list[name] type" + ) + + @property + def pipeline(self): + return self.strategy.pipeline + + @pipeline.setter + def pipeline(self, flag): + if isinstance(flag, bool): + self.strategy.pipeline = flag + else: + print("WARNING: pipeline should have value of bool type") + + @property + def pipeline_micro_batch(self): + return self.strategy.pipeline_micro_batch + + @pipeline_micro_batch.setter + def pipeline_micro_batch(self, value): + if isinstance(value, int): + self.strategy.pipeline_micro_batch = value + else: + print("WARNING: pipeline micro batch should have value of int type") + + @property + def localsgd(self): + return self.strategy.localsgd + + @localsgd.setter + def localsgd(self, flag): + if isinstance(flag, bool): + self.strategy.localsgd = flag + else: + print("WARNING: localsgd should have value of bool type") + + @property + def localsgd_k_step(self): + return self.strategy.localsgd_k_step + + @localsgd_k_step.setter + def localsgd_k_step(self, value): + if isinstance(value, int): + self.strategy.localsgd_k_step = value + else: + print("WARNING: localsgd_k_step should have value of int type") + + @property + def dgc(self): + return self.strategy.dgc + + @dgc.setter + def dgc(self, flag): + if isinstance(flag, bool): + self.strategy.dgc = flag + else: + print("WARNING: dgc should have value of bool type") + + @property + def hierachical_allreduce(self): + return self.strategy.hierachical_allreduce + + @hierachical_allreduce.setter + def hierachical_allreduce(self, flag): + if isinstance(flag, bool): + self.strategy.hierachical_allreduce = flag + else: + print( + "WARNING: hierachical_allreduce should have value of bool type") + + @property + def nccl_comm_num(self): + return self.strategy.nccl_comm_num + + @nccl_comm_num.setter + def nccl_comm_num(self, value): + if isinstance(value, int): + self.strategy.nccl_comm_num = value + else: + print("WARNING: nccl_comm_num should have value of int type") + + @property + def gradient_merge(self): + return self.strategy.gradient_merge + + @gradient_merge.setter + def gradient_merge(self, flag): + if isinstance(flag, bool): + self.strategy.gradient_merge = flag + else: + print("WARNING: gradient_merge should have value of bool type") + + @property + def gradient_merge_k_step(self): + return self.strategy.gradient_merge_k_step + + @gradient_merge_k_step.setter + def gradient_merge_k_step(self, value): + if isinstance(value, int): + self.strategy.gradient_merge_k_step = value + else: + print( + "WARNING: gradient_merge_k_step should have value of int type") + + @property + def sequential_execution(self): + return self.strategy.sequential_execution + + @sequential_execution.setter + def sequential_execution(self, flag): + if isinstance(flag, bool): + self.strategy.sequential_execution = flag + else: + print( + "WARNING: sequential_execution should have value of bool type") + + @property + def lars(self): + return self.strategy.lars + + @lars.setter + def lars(self, flag): + if isinstance(flag, bool): + self.strategy.lars = flag + else: + print("WARNING: lars should have value of bool type") + + @property + def lamb(self): + return self.strategy.lamb + + @lamb.setter + def lamb(self, flag): + if isinstance(flag, bool): + self.strategy.lamb = flag + else: + print("WARNING: lamb should have value of bool type") + + @property + def fuse_elewise_add_act_ops(self): + return self.strategy.fuse_elewise_add_act_ops + + @fuse_elewise_add_act_ops.setter + def fuse_elewise_add_act_ops(self, flag): + if isinstance(flag, bool): + self.strategy.fuse_elewise_add_act_ops = flag + else: + print( + "WARNING: fuse_elewise_add_act_ops should have value of bool type" + ) + + @property + def fuse_bn_act_ops(self): + return self.strategy.fuse_bn_act_ops + + @fuse_bn_act_ops.setter + def fuse_bn_act_ops(self, flag): + if isinstance(flag, bool): + self.strategy.fuse_bn_act_ops = flag + else: + print("WARNING: fuse_bn_act_ops should have value of bool type") + + @property + def enable_auto_fusion(self): + return self.strategy.enable_auto_fusion + + @enable_auto_fusion.setter + def enable_auto_fusion(self, flag): + if isinstance(flag, bool): + self.strategy.enable_auto_fusion = flag + else: + print("WARNING: enable_auto_fusion should have value of bool type") + + @property + def fuse_relu_depthwise_conv(self): + return self.strategy.fuse_relu_depthwise_conv + + @fuse_relu_depthwise_conv.setter + def fuse_relu_depthwise_conv(self, flag): + if isinstance(flag, bool): + self.strategy.fuse_relu_depthwise_conv = flag + else: + print( + "WARNING: fuse_relu_depthwise_conv should have value of bool type" + ) + + @property + def enable_inplace(self): + return self.strategy.enable_inplace + + @enable_inplace.setter + def enable_inplace(self, flag): + if isinstance(flag, bool): + self.strategy.enable_inplace = flag + else: + print("WARNING: enable_inplace should have value of bool type") + + @property + def fuse_all_reduce_ops(self): + return self.strategy.fuse_all_reduce_ops + + @fuse_all_reduce_ops.setter + def fuse_all_reduce_ops(self, flag): + if isinstance(flag, bool): + self.strategy.fuse_all_reduce_ops = flag + else: + print("WARNING: fuse_all_reduce_ops should have value of bool type") + + @property + def num_iteration_per_drop_scope(self): + return self.strategy.num_iteration_per_drop_scope + + @num_iteration_per_drop_scope.setter + def num_iteration_per_drop_scope(self, flag): + if isinstance(flag, int): + self.strategy.num_iteration_per_drop_scope = flag + else: + print( + "WARNING: num_iteration_per_drop_scope should have value of int type" + ) + + @property + def sync_batch_norm(self): + return self.strategy.sync_batch_norm + + @sync_batch_norm.setter + def sync_batch_norm(self, flag): + if isinstance(flag, bool): + self.strategy.sync_batch_norm = flag + else: + print("WARNING: sync_batch_norm should have value of bool type") + + @property + def fuse_all_optimizer_ops(self): + return self.strategy.fuse_all_optimizer_ops + + @fuse_all_optimizer_ops.setter + def fuse_all_optimizer_ops(self, flag): + if isinstance(flag, bool): + self.strategy.fuse_all_optimizer_ops = flag + else: + print( + "WARNING: fuse_all_optimizer_ops should have value of bool type") + + @property + def sync(self): + return self.strategy.sync + + @sync.setter + def sync(self, flag): + if isinstance(flag, bool): + self.strategy.sync = flag + else: + print("WARNING: sync should have value of bool type") + + @property + def async(self): + return self.strategy.async + + @async.setter + def async(self, flag): + if isinstance(flag, bool): + self.strategy.async = flag + else: + print("WARNING: async should have value of bool type") + + @property + def async_k_step(self): + return self.strategy.async_k_step + + @async_k_step.setter + def async_k_step(self, value): + if isinstance(value, int): + self.strategy.async_k_step = value + else: + print("WARNING: async_k_step should have value of int type") + + @property + def max_merge_var_num(self): + return self.strategy.max_merge_var_num + + @max_merge_var_num.setter + def max_merge_var_num(self, value): + if isinstance(value, int): + self.strategy.max_merge_var_num = value + else: + print("WARNING: max_merge_var_num should have value of int type") + + @property + def send_queue_size(self): + return self.strategy.send_queue_size + + @send_queue_size.setter + def send_queue_size(self, value): + if isinstance(value, int): + self.strategy.send_queue_size = value + else: + print("WARNING: send_queue_size should have value of int type") + + @property + def independent_recv_thread(self): + return self.strategy.independent_recv_thread + + @independent_recv_thread.setter + def independent_recv_thread(self, value): + if isinstance(value, bool): + self.strategy.independent_recv_thread = value + else: + print( + "WARNING: independent_recv_thread should have value of int type") + + @property + def min_send_grad_num_before_recv(self): + return self.strategy.min_send_grad_num_before_recv + + @min_send_grad_num_before_recv.setter + def min_send_grad_num_before_recv(self, value): + if isinstance(value, int): + self.strategy.min_send_grad_num_before_recv = value + else: + print( + "WARNING: min_send_grad_num_before_recv should have value of int type" + ) + + @property + def thread_pool_size(self): + return self.strategy.thread_pool_size + + @thread_pool_size.setter + def thread_pool_size(self, value): + if isinstance(value, int): + self.strategy.thread_pool_size = value + else: + print("WARNING:thread_pool_size should have value of int type") + + @property + def send_wait_times(self): + return self.strategy.send_wait_times + + @send_wait_times.setter + def send_wait_times(self, value): + if isinstance(value, int): + self.strategy.send_wait_times = value + else: + print("WARNING: send_wait_times should have value of int type") + + @property + def runtime_split_send_recv(self): + return self.strategy.runtime_split_send_recv + + @runtime_split_send_recv.setter + def runtime_split_send_recv(self, flag): + if isinstance(flag, bool): + self.strategy.runtime_split_send_recv = flag + else: + print("WARNING: runtime_split_send_recv should be bool type") + + @property + def use_thread_barrier(self): + return self.strategy.use_thread_barrier + + @use_thread_barrier.setter + def use_thread_barrier(self, flag): + if isinstance(flag, bool): + self.strategy.use_thread_barrier = flag + else: + print("WARNING: use_thread_barrier should be bool type") + + @property + def enable_backward_optimizer_op_deps(self): + return self.strategy.enable_backward_optimizer_op_deps + + @enable_backward_optimizer_op_deps.setter + def enable_backward_optimizer_op_deps(self, flag): + if isinstance(flag, bool): + self.strategy.enable_backward_optimizer_op_deps = flag + else: + print( + "WARNING: enable_backward_optimizer_op_deps should be bool type") + + @property + def elastic(self): + return self.strategy.elastic + + @elastic.setter + def elastic(self, flag): + if isinstance(flag, bool): + self.strategy.elastic = flag + else: + print("WARNING: elastic should have value of bool type") + + @property + def auto(self): + return self.strategy.auto + + @auto.setter + def auto(self, flag): + if isinstance(flag, bool): + self.strategy.auto = flag + else: + print("WARNING: auto should have value of bool type") + + def __repr__(self): + return str(self.strategy) diff --git a/python/paddle/fleet/base/fleet_base.py b/python/paddle/fleet/base/fleet_base.py new file mode 100644 index 0000000000000000000000000000000000000000..881044006479e074283c645c5247efa08c3b37b9 --- /dev/null +++ b/python/paddle/fleet/base/fleet_base.py @@ -0,0 +1,19 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from paddle.fleet import RoleMakerBase +from . import obj_creator + +# __all__ = ['Fleet'] diff --git a/python/paddle/fleet/base/obj_creator.py b/python/paddle/fleet/base/obj_creator.py new file mode 100644 index 0000000000000000000000000000000000000000..15a403d79edcf7210863b624074827494684c38a --- /dev/null +++ b/python/paddle/fleet/base/obj_creator.py @@ -0,0 +1,23 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from util_base import UtilBase + + +def _create_fleet_obj_from_role_maker(role_maker): + pass + + +def _create_fleet_util_from_role_maker(role_maker): + pass diff --git a/python/paddle/fleet/base/role_maker.py b/python/paddle/fleet/base/role_maker.py new file mode 100644 index 0000000000000000000000000000000000000000..f6b5c8ac12e92dcbe6ca710f20d509cabaafac63 --- /dev/null +++ b/python/paddle/fleet/base/role_maker.py @@ -0,0 +1,16 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defination of Role Makers.""" + +# __all__ = ['RoleMakerBase', 'UserDefinedRoleMaker', 'PaddleCloudRoleMaker'] diff --git a/python/paddle/fleet/base/util_base.py b/python/paddle/fleet/base/util_base.py new file mode 100644 index 0000000000000000000000000000000000000000..7654d0bcd9cd657ab79e9acf74b8fdfb72c489de --- /dev/null +++ b/python/paddle/fleet/base/util_base.py @@ -0,0 +1,64 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Fleet Utils.""" +"""distributed operations""" +"""basic collective operations in python""" +"""remote file system""" + +# __all__ = ['UtilBase'] +''' +class UtilBase(object): + def __init__(self, role_maker, fleet_obj): + self.role_maker = roke_maker + self.fleet_obj = fleet_obj + + def set_file_system(self, fs_client): + self.fs_client = fs_client + + def broadcast(self): + pass + + def all_gather(self): + pass + + def all_reduce(self): + pass + + def reduce_scatter(self): + pass + + def reduce(self): + pass + + def get_file_shard(self, files): + pass + + def feed_gen(self, batch_size, feed_vars_dims, feeded_vars_filelist): + pass + + def save_program(program, output_dir): + pass + + def load_program(input_dir): + pass + + def load_var(): + pass + + def save_var(): + pass + + def print_on_rank(self): + pass +''' diff --git a/python/paddle/fleet/collective/__init__.py b/python/paddle/fleet/collective/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8647330f3290f3142cabca9a7e3fe162a9838dda --- /dev/null +++ b/python/paddle/fleet/collective/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and diff --git a/python/paddle/fleet/dataset/__init__.py b/python/paddle/fleet/dataset/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8647330f3290f3142cabca9a7e3fe162a9838dda --- /dev/null +++ b/python/paddle/fleet/dataset/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and diff --git a/python/paddle/fleet/metrics/__init__.py b/python/paddle/fleet/metrics/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..abf198b97e6e818e1fbe59006f98492640bcee54 --- /dev/null +++ b/python/paddle/fleet/metrics/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/paddle/fleet/metrics/metric.py b/python/paddle/fleet/metrics/metric.py new file mode 100644 index 0000000000000000000000000000000000000000..847ddc47ac89114f2012bc6b9990a69abfe39fb3 --- /dev/null +++ b/python/paddle/fleet/metrics/metric.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/paddle/fleet/parameter_server/__init__.py b/python/paddle/fleet/parameter_server/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..847ddc47ac89114f2012bc6b9990a69abfe39fb3 --- /dev/null +++ b/python/paddle/fleet/parameter_server/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py new file mode 100644 index 0000000000000000000000000000000000000000..3ec220d841d98e41e7fdaaf3d3342389aebdf919 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_fleet_distributed_strategy.py @@ -0,0 +1,350 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import os + + +class TestStrategyConfig(unittest.TestCase): + def test_amp(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.amp = True + self.assertEqual(strategy.amp, True) + strategy.amp = False + self.assertEqual(strategy.amp, False) + strategy.amp = "True" + self.assertEqual(strategy.amp, False) + + def test_amp_loss_scaling(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.amp_loss_scaling = 32768 + self.assertEqual(strategy.amp_loss_scaling, 32768) + strategy.amp_loss_scaling = 0.1 + self.assertEqual(strategy.amp_loss_scaling, 32768) + + def test_recompute(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.recompute = True + self.assertEqual(strategy.recompute, True) + strategy.recompute = False + self.assertEqual(strategy.recompute, False) + strategy.recompute = "True" + self.assertEqual(strategy.recompute, False) + + def test_recompute_checkpoints(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.recompute_checkpoints = ["var1", "var2", "var3"] + self.assertEqual(len(strategy.recompute_checkpoints), 3) + import paddle.fluid as fluid + program = fluid.Program() + cur_block = program.current_block() + var1 = cur_block.create_var(name="var4", shape=[1, 1], dtype="int32") + var2 = cur_block.create_var(name="var5", shape=[1, 1], dtype="int32") + var3 = cur_block.create_var(name="var6", shape=[1, 1], dtype="int32") + strategy.recompute_checkpoints = [var1, var2, var3] + self.assertEqual(len(strategy.recompute_checkpoints), 3) + self.assertEqual(strategy.recompute_checkpoints[0], "var4") + strategy.recompute_checkpoints = [var1, "var2", var3] + self.assertEqual(strategy.recompute_checkpoints[1], "var5") + + def test_pipeline(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.pipeline = True + self.assertEqual(strategy.pipeline, True) + strategy.pipeline = False + self.assertEqual(strategy.pipeline, False) + strategy.pipeline = "True" + self.assertEqual(strategy.pipeline, False) + + def test_pipeline_micro_batch(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.pipeline_micro_batch = 1 + self.assertEqual(strategy.pipeline_micro_batch, 1) + strategy.pipeline_micro_batch = 0.1 + self.assertEqual(strategy.pipeline_micro_batch, 1) + + def test_localsgd(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.localsgd = True + self.assertEqual(strategy.localsgd, True) + strategy.localsgd = False + self.assertEqual(strategy.localsgd, False) + strategy.localsgd = "True" + self.assertEqual(strategy.localsgd, False) + + def test_localsgd_k_step(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.localsgd_k_step = 1 + self.assertEqual(strategy.localsgd_k_step, 1) + strategy.localsgd_k_step = "2" + self.assertEqual(strategy.localsgd_k_step, 1) + + def test_dgc(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.dgc = True + self.assertEqual(strategy.dgc, True) + strategy.dgc = False + self.assertEqual(strategy.dgc, False) + strategy.dgc = "True" + self.assertEqual(strategy.dgc, False) + + def test_hierachical_allreduce(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.hierachical_allreduce = True + self.assertEqual(strategy.hierachical_allreduce, True) + strategy.hierachical_allreduce = False + self.assertEqual(strategy.hierachical_allreduce, False) + strategy.hierachical_allreduce = "True" + self.assertEqual(strategy.hierachical_allreduce, False) + + def test_nccl_comm_num(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.nccl_comm_num = 1 + self.assertEqual(strategy.nccl_comm_num, 1) + strategy.nccl_comm_num = "2" + self.assertEqual(strategy.nccl_comm_num, 1) + + def test_gradient_merge(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.gradient_merge = True + self.assertEqual(strategy.gradient_merge, True) + strategy.gradient_merge = False + self.assertEqual(strategy.gradient_merge, False) + strategy.gradient_merge = "True" + self.assertEqual(strategy.gradient_merge, False) + + def test_gradient_merge_k_step(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.gradient_merge_k_step = 1 + self.assertEqual(strategy.gradient_merge_k_step, 1) + strategy.gradient_merge_k_step = "2" + self.assertEqual(strategy.gradient_merge_k_step, 1) + + def test_sequential_execution(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.sequential_execution = True + self.assertEqual(strategy.sequential_execution, True) + strategy.sequential_execution = False + self.assertEqual(strategy.sequential_execution, False) + strategy.sequential_execution = "True" + self.assertEqual(strategy.sequential_execution, False) + + def test_lars(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.lars = True + self.assertEqual(strategy.lars, True) + strategy.lars = False + self.assertEqual(strategy.lars, False) + strategy.lars = "True" + self.assertEqual(strategy.lars, False) + + def test_lamb(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.lamb = True + self.assertEqual(strategy.lamb, True) + strategy.lamb = False + self.assertEqual(strategy.lamb, False) + strategy.lamb = "True" + self.assertEqual(strategy.lamb, False) + + def test_fuse_elewise_add_act_ops(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.fuse_elewise_add_act_ops = True + self.assertEqual(strategy.fuse_elewise_add_act_ops, True) + strategy.fuse_elewise_add_act_ops = False + self.assertEqual(strategy.fuse_elewise_add_act_ops, False) + strategy.fuse_elewise_add_act_ops = "True" + self.assertEqual(strategy.fuse_elewise_add_act_ops, False) + + def test_fuse_bn_act_ops(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.fuse_bn_act_ops = True + self.assertEqual(strategy.fuse_bn_act_ops, True) + strategy.fuse_bn_act_ops = False + self.assertEqual(strategy.fuse_bn_act_ops, False) + strategy.fuse_bn_act_ops = "True" + self.assertEqual(strategy.fuse_bn_act_ops, False) + + def test_enable_auto_fusion(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.enable_auto_fusion = True + self.assertEqual(strategy.enable_auto_fusion, True) + strategy.enable_auto_fusion = False + self.assertEqual(strategy.enable_auto_fusion, False) + strategy.enable_auto_fusion = "True" + self.assertEqual(strategy.enable_auto_fusion, False) + + def test_fuse_relu_depthwise_conv(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.fuse_relu_depthwise_conv = True + self.assertEqual(strategy.fuse_relu_depthwise_conv, True) + strategy.fuse_relu_depthwise_conv = False + self.assertEqual(strategy.fuse_relu_depthwise_conv, False) + strategy.fuse_relu_depthwise_conv = "True" + self.assertEqual(strategy.fuse_relu_depthwise_conv, False) + + def test_enable_inplace(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.enable_inplace = True + self.assertEqual(strategy.enable_inplace, True) + strategy.enable_inplace = False + self.assertEqual(strategy.enable_inplace, False) + strategy.enable_inplace = "True" + self.assertEqual(strategy.enable_inplace, False) + + def test_fuse_all_reduce_ops(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.fuse_all_reduce_ops = True + self.assertEqual(strategy.fuse_all_reduce_ops, True) + strategy.fuse_all_reduce_ops = False + self.assertEqual(strategy.fuse_all_reduce_ops, False) + strategy.fuse_all_reduce_ops = "True" + self.assertEqual(strategy.fuse_all_reduce_ops, False) + + def test_num_iteration_per_drop_scope(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.num_iteration_per_drop_scope = 1 + self.assertEqual(strategy.num_iteration_per_drop_scope, 1) + strategy.num_iteration_per_drop_scope = 0.1 + self.assertEqual(strategy.num_iteration_per_drop_scope, 1) + + def test_sync_batch_norm(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.sync_batch_norm = True + self.assertEqual(strategy.sync_batch_norm, True) + strategy.sync_batch_norm = False + self.assertEqual(strategy.sync_batch_norm, False) + strategy.sync_batch_norm = "True" + self.assertEqual(strategy.sync_batch_norm, False) + + def test_fuse_all_optimizer_ops(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.fuse_all_optimizer_ops = True + self.assertEqual(strategy.fuse_all_optimizer_ops, True) + strategy.fuse_all_optimizer_ops = False + self.assertEqual(strategy.fuse_all_optimizer_ops, False) + strategy.fuse_all_optimizer_ops = "True" + self.assertEqual(strategy.fuse_all_optimizer_ops, False) + + def test_sync(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.sync = True + self.assertEqual(strategy.sync, True) + strategy.sync = False + self.assertEqual(strategy.sync, False) + strategy.sync = "True" + self.assertEqual(strategy.sync, False) + + def test_async(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.async = True + self.assertEqual(strategy.async, True) + strategy.async = False + self.assertEqual(strategy.async, False) + strategy.async = "True" + self.assertEqual(strategy.async, False) + + def test_async_k_step(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.async_k_step = 10000 + self.assertEqual(strategy.async_k_step, 10000) + strategy.async_k_step = 0.1 + self.assertEqual(strategy.async_k_step, 10000) + + def test_send_queue_size(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.send_queue_size = 10000 + self.assertEqual(strategy.send_queue_size, 10000) + strategy.send_queue_size = 0.1 + self.assertEqual(strategy.send_queue_size, 10000) + + def test_independent_recv_thread(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.independent_recv_thread = True + self.assertEqual(strategy.independent_recv_thread, True) + strategy.independent_recv_thread = False + self.assertEqual(strategy.independent_recv_thread, False) + strategy.independent_recv_thread = "True" + self.assertEqual(strategy.independent_recv_thread, False) + + def test_min_send_grad_num_before_recv(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.min_send_grad_num_before_recv = 10000 + self.assertEqual(strategy.min_send_grad_num_before_recv, 10000) + strategy.min_send_grad_num_before_recv = 0.1 + self.assertEqual(strategy.min_send_grad_num_before_recv, 10000) + + def test_thread_pool_size(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.thread_pool_size = 10000 + self.assertEqual(strategy.thread_pool_size, 10000) + strategy.thread_pool_size = 0.1 + self.assertEqual(strategy.thread_pool_size, 10000) + + def test_send_wait_times(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.send_wait_times = 10000 + self.assertEqual(strategy.send_wait_times, 10000) + strategy.send_wait_times = 0.1 + self.assertEqual(strategy.send_wait_times, 10000) + + def test_runtime_split_send_recv(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.runtime_split_send_recv = True + self.assertEqual(strategy.runtime_split_send_recv, True) + strategy.runtime_split_send_recv = False + self.assertEqual(strategy.runtime_split_send_recv, False) + strategy.runtime_split_send_recv = "True" + self.assertEqual(strategy.runtime_split_send_recv, False) + + def use_thread_barrier(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.thread_barrier = True + self.assertEqual(strategy.thread_barrier, True) + strategy.thread_barrier = False + self.assertEqual(strategy.thread_barrier, False) + strategy.thread_barrier = "True" + self.assertEqual(strategy.thread_barrier, False) + + def test_enable_backward_optimizer_op_deps(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.enable_backward_optimizer_op_deps = True + self.assertEqual(strategy.enable_backward_optimizer_op_deps, True) + strategy.enable_backward_optimizer_op_deps = False + self.assertEqual(strategy.enable_backward_optimizer_op_deps, False) + strategy.enable_backward_optimizer_op_deps = "True" + self.assertEqual(strategy.enable_backward_optimizer_op_deps, False) + + def test_elastic(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.elastic = True + self.assertEqual(strategy.elastic, True) + strategy.elastic = False + self.assertEqual(strategy.elastic, False) + strategy.elastic = "True" + self.assertEqual(strategy.elastic, False) + + def test_auto(self): + strategy = paddle.fleet.DistributedStrategy() + strategy.auto = True + self.assertEqual(strategy.auto, True) + strategy.auto = False + self.assertEqual(strategy.auto, False) + strategy.auto = "True" + self.assertEqual(strategy.auto, False) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/setup.py.in b/python/setup.py.in index 4fbfab17d58e292089522909aa11c1dd7b55ce9f..613a0bf2b74761c2f25b6696fea80cdc89866045 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -143,6 +143,13 @@ packages=['paddle', 'paddle.incubate', 'paddle.incubate.complex', 'paddle.incubate.complex.tensor', + 'paddle.fleet', + 'paddle.fleet.base', + 'paddle.fleet.collective', + 'paddle.fleet.dataset', + 'paddle.fleet.metrics', + 'paddle.fleet.parameter_server', + 'paddle.fleet.proto', 'paddle.framework', 'paddle.fluid', 'paddle.fluid.dygraph',