未验证 提交 d5e40d1b 编写于 作者: D Dong Daxiang 提交者: GitHub

Paddle fleet distributed strategy (#25379)

* add paddle.fleet.DistributedStrategy for 2.0
上级 0954e907
...@@ -155,22 +155,31 @@ nv_test(op_registry_test SRCS op_registry_test.cc DEPS op_registry) ...@@ -155,22 +155,31 @@ nv_test(op_registry_test SRCS op_registry_test.cc DEPS op_registry)
if(WITH_PYTHON) if(WITH_PYTHON)
py_proto_compile(framework_py_proto SRCS framework.proto data_feed.proto) py_proto_compile(framework_py_proto SRCS framework.proto data_feed.proto)
py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto) py_proto_compile(trainer_py_proto SRCS trainer_desc.proto data_feed.proto)
py_proto_compile(distributed_strategy_py_proto SRCS distributed_strategy.proto)
#Generate an empty \ #Generate an empty \
#__init__.py to make framework_py_proto as a valid python module. #__init__.py to make framework_py_proto as a valid python module.
add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py) add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto) add_dependencies(framework_py_proto framework_py_proto_init trainer_py_proto distributed_strategy_py_proto)
if (NOT WIN32) if (NOT WIN32)
add_custom_command(TARGET framework_py_proto POST_BUILD add_custom_command(TARGET framework_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto
COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto/__init__.py
COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/ COMMAND cp *.py ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/
COMMAND cp distributed_strategy_*.py ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto
COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/fluid/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
else(NOT WIN32) else(NOT WIN32)
string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/") string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/")
string(REPLACE "/" "\\" fleet_proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fleet/proto/")
add_custom_command(TARGET framework_py_proto POST_BUILD add_custom_command(TARGET framework_py_proto POST_BUILD
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fluid/proto
COMMAND ${CMAKE_COMMAND} -E make_directory ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto
COMMAND ${CMAKE_COMMAND} -E touch ${PADDLE_BINARY_DIR}/python/paddle/fleet/proto/__init__.py
COMMAND copy /Y *.py ${proto_dstpath} COMMAND copy /Y *.py ${proto_dstpath}
COMMAND copy /Y distributed_strategy_*.py ${fleet_proto_dstpath}
COMMENT "Copy generated python proto into directory paddle/fluid/proto." COMMENT "Copy generated python proto into directory paddle/fluid/proto."
COMMENT "Copy generated python proto into directory paddle/fleet/proto."
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif(NOT WIN32) endif(NOT WIN32)
endif() endif()
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
package paddle.fleet;
enum Mode {
COLLECTIVE = 1;
PS = 2;
PIPELINE = 3;
HETER = 4; // support XPU and GPU computing server
}
message DistributedStrategy {
optional Mode mode = 1 [ default = COLLECTIVE ]; // just for serialization
// collective training strategy
optional bool amp = 2 [ default = false ];
optional int32 amp_loss_scaling = 3 [ default = 32768 ];
optional bool recompute = 4 [ default = false ];
repeated string recompute_checkpoints = 5;
optional bool localsgd = 6 [ default = false ];
optional int32 localsgd_k_step = 7 [ default = 4 ];
optional bool dgc = 8 [ default = false ];
optional bool hierachical_allreduce = 9 [ default = false ];
optional int32 nccl_comm_num = 10 [ default = 1 ];
optional bool gradient_merge = 11 [ default = false ];
optional int32 gradient_merge_k_step = 12 [ default = 1 ];
optional bool sequential_execution = 13 [ default = false ];
optional bool enable_backward_optimizer_op_deps = 14 [ default = true ];
optional bool lars = 15 [ default = false ];
optional bool lamb = 16 [ default = false ];
optional bool fuse_elewise_add_act_ops = 17 [ default = false ];
optional bool fuse_bn_act_ops = 18 [ default = false ];
optional bool enable_auto_fusion = 19 [ default = false ];
optional bool fuse_relu_depthwise_conv = 20 [ default = false ];
optional bool enable_inplace = 21 [ default = false ];
optional bool fuse_all_reduce_ops = 22 [ default = false ];
optional int32 num_iteration_per_drop_scope = 23 [ default = 1 ];
optional bool sync_batch_norm = 24 [ default = false ];
optional bool fuse_all_optimizer_ops = 25 [ default = false ];
// pipeline training
optional bool pipeline = 101 [ default = false ];
optional int32 pipeline_micro_batch = 102;
// parameter server training
optional bool sync = 201 [ default = false ];
optional bool async = 202 [ default = true ];
optional int32 async_k_step = 203 [ default = -1 ];
optional int32 max_merge_var_num = 204 [ default = 1 ];
optional int32 send_queue_size = 205 [ default = 16 ];
optional bool independent_recv_thread = 206 [ default = false ];
optional int32 min_send_grad_num_before_recv = 207 [ default = 1 ];
optional int32 thread_pool_size = 208 [ default = 1 ];
optional int32 send_wait_times = 209 [ default = 1 ];
optional bool runtime_split_send_recv = 210 [ default = false ];
optional bool use_thread_barrier = 211 [ default = false ];
// elastic deep learning strategies
optional bool elastic = 301 [ default = false ];
// auto parallel
optional bool auto = 401 [ default = false ];
}
message DistributedJobInfo {
optional int32 worker_num = 1;
optional int32 server_num = 2;
repeated string worker_ips = 3;
repeated string server_endpoints = 4;
optional string origin_startup = 5;
optional string origin_main = 6; // without backpropagation and optimization
optional string distributed_main = 7; // with backpropagation and optimization
optional string optimizer_name = 8; // optimizer name
optional DistributedStrategy strategy = 101;
}
...@@ -36,6 +36,7 @@ import paddle.distributed ...@@ -36,6 +36,7 @@ import paddle.distributed
import paddle.sysconfig import paddle.sysconfig
import paddle.tensor import paddle.tensor
import paddle.nn import paddle.nn
import paddle.fleet
import paddle.framework import paddle.framework
import paddle.imperative import paddle.imperative
import paddle.optimizer import paddle.optimizer
......
...@@ -13,16 +13,11 @@ ...@@ -13,16 +13,11 @@
# limitations under the License. # limitations under the License.
# TODO: define distributed api under this directory, # TODO: define distributed api under this directory,
# __all__ = ['metric', from .base.distributed_strategy import DistributedStrategy
# 'optimizer', #from .base.role_maker import PaddleCloudRoleMaker, UserDefinedRoleMaker
# 'RoleMaker', #from .base.fleet_base import Fleet
# 'dataset',
# ' DatasetFactory', #__all__ = [
# ' InMemoryDataset', # "DistributedStrategy", "PaddleCloudRoleMaker", "UserDefinedRoleMaker"
# ' QueueDataset', #]
# 'transpiler', __all__ = ['DistributedStrategy']
# ' DistributeTranspiler',
# ' DistributeTranspilerConfig',
# ' HashName',
# ' RoundRobin',
# 'collective']
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle.fleet.proto import distributed_strategy_pb2
from paddle.fluid.framework import Variable
class DistributedJobInfo(object):
"""
DistributedJobInfo will serialize all distributed training information
Just for inner use: 1) debug 2) replicate experiments
"""
def __init__(self):
self.job_info = distributed_strategy_pb2.DistributedJobInfo()
def _set_worker_num(self, worker_num):
self.job_info.worker_num = worker_num
def _set_server_num(self, server_num):
self.job_info.server_num = server_num
def _set_worker_ips(self, worker_ips):
self.job_info.worker_ips.extend(worker_ips)
def _set_server_endpoints(self, server_endpoints):
self.job_info.server_endpoints.extend(server_endpoints)
def _set_origin_startup(self, origin_startup_prog):
self.job_info.origin_startup = str(origin_startup_prog)
def _set_origin_main(self, origin_main_prog):
self.job_info.origin_main = str(origin_main_prog)
def _distributed_main(self, distributed_main_prog):
self.job_info.distributed_main = str(distributed_main_prog)
def _optimizer_name(self, optimizer_name):
self.job_info.optimizer_name = optimizer_name
def _set_distributed_strategy(self, dist_strategy):
self.job_info.strategy = dist_strategy
class DistributedStrategy(object):
def __init__(self):
self.strategy = distributed_strategy_pb2.DistributedStrategy()
@property
def amp(self):
return self.strategy.amp
@amp.setter
def amp(self, flag):
if isinstance(flag, bool):
self.strategy.amp = flag
else:
print("WARNING: amp should have value of bool type")
@property
def amp_loss_scaling(self):
return self.strategy.amp_loss_scaling
@amp_loss_scaling.setter
def amp_loss_scaling(self, value):
if isinstance(value, int):
self.strategy.amp_loss_scaling = value
else:
print("WARNING: amp_loss_scaling should have value of int type")
@property
def recompute(self):
return self.strategy.recompute
@recompute.setter
def recompute(self, flag):
if isinstance(flag, bool):
self.strategy.recompute = flag
else:
print("WARNING: recompute should have value of bool type")
@property
def recompute_checkpoints(self):
return self.strategy.recompute_checkpoints
@recompute_checkpoints.setter
def recompute_checkpoints(self, checkpoints):
if isinstance(checkpoints, list):
str_list = True
var_list = True
for item in checkpoints:
if not isinstance(item, str):
str_list = False
if not isinstance(item, Variable):
var_list = False
assert (str_list and var_list) == False
if str_list:
self.strategy.ClearField("recompute_checkpoints")
self.strategy.recompute_checkpoints.extend(checkpoints)
elif var_list:
names = [x.name for x in checkpoints]
self.strategy.ClearField("recompute_checkpoints")
self.strategy.recompute_checkpoints.extend(names)
else:
print(
"WARNING: recompute_checkpoints should have value of list[Variable] or list[name] type"
)
else:
print(
"WARNING: recompute_checkpoints should have value of list[Variable] or list[name] type"
)
@property
def pipeline(self):
return self.strategy.pipeline
@pipeline.setter
def pipeline(self, flag):
if isinstance(flag, bool):
self.strategy.pipeline = flag
else:
print("WARNING: pipeline should have value of bool type")
@property
def pipeline_micro_batch(self):
return self.strategy.pipeline_micro_batch
@pipeline_micro_batch.setter
def pipeline_micro_batch(self, value):
if isinstance(value, int):
self.strategy.pipeline_micro_batch = value
else:
print("WARNING: pipeline micro batch should have value of int type")
@property
def localsgd(self):
return self.strategy.localsgd
@localsgd.setter
def localsgd(self, flag):
if isinstance(flag, bool):
self.strategy.localsgd = flag
else:
print("WARNING: localsgd should have value of bool type")
@property
def localsgd_k_step(self):
return self.strategy.localsgd_k_step
@localsgd_k_step.setter
def localsgd_k_step(self, value):
if isinstance(value, int):
self.strategy.localsgd_k_step = value
else:
print("WARNING: localsgd_k_step should have value of int type")
@property
def dgc(self):
return self.strategy.dgc
@dgc.setter
def dgc(self, flag):
if isinstance(flag, bool):
self.strategy.dgc = flag
else:
print("WARNING: dgc should have value of bool type")
@property
def hierachical_allreduce(self):
return self.strategy.hierachical_allreduce
@hierachical_allreduce.setter
def hierachical_allreduce(self, flag):
if isinstance(flag, bool):
self.strategy.hierachical_allreduce = flag
else:
print(
"WARNING: hierachical_allreduce should have value of bool type")
@property
def nccl_comm_num(self):
return self.strategy.nccl_comm_num
@nccl_comm_num.setter
def nccl_comm_num(self, value):
if isinstance(value, int):
self.strategy.nccl_comm_num = value
else:
print("WARNING: nccl_comm_num should have value of int type")
@property
def gradient_merge(self):
return self.strategy.gradient_merge
@gradient_merge.setter
def gradient_merge(self, flag):
if isinstance(flag, bool):
self.strategy.gradient_merge = flag
else:
print("WARNING: gradient_merge should have value of bool type")
@property
def gradient_merge_k_step(self):
return self.strategy.gradient_merge_k_step
@gradient_merge_k_step.setter
def gradient_merge_k_step(self, value):
if isinstance(value, int):
self.strategy.gradient_merge_k_step = value
else:
print(
"WARNING: gradient_merge_k_step should have value of int type")
@property
def sequential_execution(self):
return self.strategy.sequential_execution
@sequential_execution.setter
def sequential_execution(self, flag):
if isinstance(flag, bool):
self.strategy.sequential_execution = flag
else:
print(
"WARNING: sequential_execution should have value of bool type")
@property
def lars(self):
return self.strategy.lars
@lars.setter
def lars(self, flag):
if isinstance(flag, bool):
self.strategy.lars = flag
else:
print("WARNING: lars should have value of bool type")
@property
def lamb(self):
return self.strategy.lamb
@lamb.setter
def lamb(self, flag):
if isinstance(flag, bool):
self.strategy.lamb = flag
else:
print("WARNING: lamb should have value of bool type")
@property
def fuse_elewise_add_act_ops(self):
return self.strategy.fuse_elewise_add_act_ops
@fuse_elewise_add_act_ops.setter
def fuse_elewise_add_act_ops(self, flag):
if isinstance(flag, bool):
self.strategy.fuse_elewise_add_act_ops = flag
else:
print(
"WARNING: fuse_elewise_add_act_ops should have value of bool type"
)
@property
def fuse_bn_act_ops(self):
return self.strategy.fuse_bn_act_ops
@fuse_bn_act_ops.setter
def fuse_bn_act_ops(self, flag):
if isinstance(flag, bool):
self.strategy.fuse_bn_act_ops = flag
else:
print("WARNING: fuse_bn_act_ops should have value of bool type")
@property
def enable_auto_fusion(self):
return self.strategy.enable_auto_fusion
@enable_auto_fusion.setter
def enable_auto_fusion(self, flag):
if isinstance(flag, bool):
self.strategy.enable_auto_fusion = flag
else:
print("WARNING: enable_auto_fusion should have value of bool type")
@property
def fuse_relu_depthwise_conv(self):
return self.strategy.fuse_relu_depthwise_conv
@fuse_relu_depthwise_conv.setter
def fuse_relu_depthwise_conv(self, flag):
if isinstance(flag, bool):
self.strategy.fuse_relu_depthwise_conv = flag
else:
print(
"WARNING: fuse_relu_depthwise_conv should have value of bool type"
)
@property
def enable_inplace(self):
return self.strategy.enable_inplace
@enable_inplace.setter
def enable_inplace(self, flag):
if isinstance(flag, bool):
self.strategy.enable_inplace = flag
else:
print("WARNING: enable_inplace should have value of bool type")
@property
def fuse_all_reduce_ops(self):
return self.strategy.fuse_all_reduce_ops
@fuse_all_reduce_ops.setter
def fuse_all_reduce_ops(self, flag):
if isinstance(flag, bool):
self.strategy.fuse_all_reduce_ops = flag
else:
print("WARNING: fuse_all_reduce_ops should have value of bool type")
@property
def num_iteration_per_drop_scope(self):
return self.strategy.num_iteration_per_drop_scope
@num_iteration_per_drop_scope.setter
def num_iteration_per_drop_scope(self, flag):
if isinstance(flag, int):
self.strategy.num_iteration_per_drop_scope = flag
else:
print(
"WARNING: num_iteration_per_drop_scope should have value of int type"
)
@property
def sync_batch_norm(self):
return self.strategy.sync_batch_norm
@sync_batch_norm.setter
def sync_batch_norm(self, flag):
if isinstance(flag, bool):
self.strategy.sync_batch_norm = flag
else:
print("WARNING: sync_batch_norm should have value of bool type")
@property
def fuse_all_optimizer_ops(self):
return self.strategy.fuse_all_optimizer_ops
@fuse_all_optimizer_ops.setter
def fuse_all_optimizer_ops(self, flag):
if isinstance(flag, bool):
self.strategy.fuse_all_optimizer_ops = flag
else:
print(
"WARNING: fuse_all_optimizer_ops should have value of bool type")
@property
def sync(self):
return self.strategy.sync
@sync.setter
def sync(self, flag):
if isinstance(flag, bool):
self.strategy.sync = flag
else:
print("WARNING: sync should have value of bool type")
@property
def async(self):
return self.strategy.async
@async.setter
def async(self, flag):
if isinstance(flag, bool):
self.strategy.async = flag
else:
print("WARNING: async should have value of bool type")
@property
def async_k_step(self):
return self.strategy.async_k_step
@async_k_step.setter
def async_k_step(self, value):
if isinstance(value, int):
self.strategy.async_k_step = value
else:
print("WARNING: async_k_step should have value of int type")
@property
def max_merge_var_num(self):
return self.strategy.max_merge_var_num
@max_merge_var_num.setter
def max_merge_var_num(self, value):
if isinstance(value, int):
self.strategy.max_merge_var_num = value
else:
print("WARNING: max_merge_var_num should have value of int type")
@property
def send_queue_size(self):
return self.strategy.send_queue_size
@send_queue_size.setter
def send_queue_size(self, value):
if isinstance(value, int):
self.strategy.send_queue_size = value
else:
print("WARNING: send_queue_size should have value of int type")
@property
def independent_recv_thread(self):
return self.strategy.independent_recv_thread
@independent_recv_thread.setter
def independent_recv_thread(self, value):
if isinstance(value, bool):
self.strategy.independent_recv_thread = value
else:
print(
"WARNING: independent_recv_thread should have value of int type")
@property
def min_send_grad_num_before_recv(self):
return self.strategy.min_send_grad_num_before_recv
@min_send_grad_num_before_recv.setter
def min_send_grad_num_before_recv(self, value):
if isinstance(value, int):
self.strategy.min_send_grad_num_before_recv = value
else:
print(
"WARNING: min_send_grad_num_before_recv should have value of int type"
)
@property
def thread_pool_size(self):
return self.strategy.thread_pool_size
@thread_pool_size.setter
def thread_pool_size(self, value):
if isinstance(value, int):
self.strategy.thread_pool_size = value
else:
print("WARNING:thread_pool_size should have value of int type")
@property
def send_wait_times(self):
return self.strategy.send_wait_times
@send_wait_times.setter
def send_wait_times(self, value):
if isinstance(value, int):
self.strategy.send_wait_times = value
else:
print("WARNING: send_wait_times should have value of int type")
@property
def runtime_split_send_recv(self):
return self.strategy.runtime_split_send_recv
@runtime_split_send_recv.setter
def runtime_split_send_recv(self, flag):
if isinstance(flag, bool):
self.strategy.runtime_split_send_recv = flag
else:
print("WARNING: runtime_split_send_recv should be bool type")
@property
def use_thread_barrier(self):
return self.strategy.use_thread_barrier
@use_thread_barrier.setter
def use_thread_barrier(self, flag):
if isinstance(flag, bool):
self.strategy.use_thread_barrier = flag
else:
print("WARNING: use_thread_barrier should be bool type")
@property
def enable_backward_optimizer_op_deps(self):
return self.strategy.enable_backward_optimizer_op_deps
@enable_backward_optimizer_op_deps.setter
def enable_backward_optimizer_op_deps(self, flag):
if isinstance(flag, bool):
self.strategy.enable_backward_optimizer_op_deps = flag
else:
print(
"WARNING: enable_backward_optimizer_op_deps should be bool type")
@property
def elastic(self):
return self.strategy.elastic
@elastic.setter
def elastic(self, flag):
if isinstance(flag, bool):
self.strategy.elastic = flag
else:
print("WARNING: elastic should have value of bool type")
@property
def auto(self):
return self.strategy.auto
@auto.setter
def auto(self, flag):
if isinstance(flag, bool):
self.strategy.auto = flag
else:
print("WARNING: auto should have value of bool type")
def __repr__(self):
return str(self.strategy)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from paddle.fleet import RoleMakerBase
from . import obj_creator
# __all__ = ['Fleet']
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from util_base import UtilBase
def _create_fleet_obj_from_role_maker(role_maker):
pass
def _create_fleet_util_from_role_maker(role_maker):
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defination of Role Makers."""
# __all__ = ['RoleMakerBase', 'UserDefinedRoleMaker', 'PaddleCloudRoleMaker']
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Fleet Utils."""
"""distributed operations"""
"""basic collective operations in python"""
"""remote file system"""
# __all__ = ['UtilBase']
'''
class UtilBase(object):
def __init__(self, role_maker, fleet_obj):
self.role_maker = roke_maker
self.fleet_obj = fleet_obj
def set_file_system(self, fs_client):
self.fs_client = fs_client
def broadcast(self):
pass
def all_gather(self):
pass
def all_reduce(self):
pass
def reduce_scatter(self):
pass
def reduce(self):
pass
def get_file_shard(self, files):
pass
def feed_gen(self, batch_size, feed_vars_dims, feeded_vars_filelist):
pass
def save_program(program, output_dir):
pass
def load_program(input_dir):
pass
def load_var():
pass
def save_var():
pass
def print_on_rank(self):
pass
'''
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import os
class TestStrategyConfig(unittest.TestCase):
def test_amp(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.amp = True
self.assertEqual(strategy.amp, True)
strategy.amp = False
self.assertEqual(strategy.amp, False)
strategy.amp = "True"
self.assertEqual(strategy.amp, False)
def test_amp_loss_scaling(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.amp_loss_scaling = 32768
self.assertEqual(strategy.amp_loss_scaling, 32768)
strategy.amp_loss_scaling = 0.1
self.assertEqual(strategy.amp_loss_scaling, 32768)
def test_recompute(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.recompute = True
self.assertEqual(strategy.recompute, True)
strategy.recompute = False
self.assertEqual(strategy.recompute, False)
strategy.recompute = "True"
self.assertEqual(strategy.recompute, False)
def test_recompute_checkpoints(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.recompute_checkpoints = ["var1", "var2", "var3"]
self.assertEqual(len(strategy.recompute_checkpoints), 3)
import paddle.fluid as fluid
program = fluid.Program()
cur_block = program.current_block()
var1 = cur_block.create_var(name="var4", shape=[1, 1], dtype="int32")
var2 = cur_block.create_var(name="var5", shape=[1, 1], dtype="int32")
var3 = cur_block.create_var(name="var6", shape=[1, 1], dtype="int32")
strategy.recompute_checkpoints = [var1, var2, var3]
self.assertEqual(len(strategy.recompute_checkpoints), 3)
self.assertEqual(strategy.recompute_checkpoints[0], "var4")
strategy.recompute_checkpoints = [var1, "var2", var3]
self.assertEqual(strategy.recompute_checkpoints[1], "var5")
def test_pipeline(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.pipeline = True
self.assertEqual(strategy.pipeline, True)
strategy.pipeline = False
self.assertEqual(strategy.pipeline, False)
strategy.pipeline = "True"
self.assertEqual(strategy.pipeline, False)
def test_pipeline_micro_batch(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.pipeline_micro_batch = 1
self.assertEqual(strategy.pipeline_micro_batch, 1)
strategy.pipeline_micro_batch = 0.1
self.assertEqual(strategy.pipeline_micro_batch, 1)
def test_localsgd(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.localsgd = True
self.assertEqual(strategy.localsgd, True)
strategy.localsgd = False
self.assertEqual(strategy.localsgd, False)
strategy.localsgd = "True"
self.assertEqual(strategy.localsgd, False)
def test_localsgd_k_step(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.localsgd_k_step = 1
self.assertEqual(strategy.localsgd_k_step, 1)
strategy.localsgd_k_step = "2"
self.assertEqual(strategy.localsgd_k_step, 1)
def test_dgc(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.dgc = True
self.assertEqual(strategy.dgc, True)
strategy.dgc = False
self.assertEqual(strategy.dgc, False)
strategy.dgc = "True"
self.assertEqual(strategy.dgc, False)
def test_hierachical_allreduce(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.hierachical_allreduce = True
self.assertEqual(strategy.hierachical_allreduce, True)
strategy.hierachical_allreduce = False
self.assertEqual(strategy.hierachical_allreduce, False)
strategy.hierachical_allreduce = "True"
self.assertEqual(strategy.hierachical_allreduce, False)
def test_nccl_comm_num(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.nccl_comm_num = 1
self.assertEqual(strategy.nccl_comm_num, 1)
strategy.nccl_comm_num = "2"
self.assertEqual(strategy.nccl_comm_num, 1)
def test_gradient_merge(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.gradient_merge = True
self.assertEqual(strategy.gradient_merge, True)
strategy.gradient_merge = False
self.assertEqual(strategy.gradient_merge, False)
strategy.gradient_merge = "True"
self.assertEqual(strategy.gradient_merge, False)
def test_gradient_merge_k_step(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.gradient_merge_k_step = 1
self.assertEqual(strategy.gradient_merge_k_step, 1)
strategy.gradient_merge_k_step = "2"
self.assertEqual(strategy.gradient_merge_k_step, 1)
def test_sequential_execution(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.sequential_execution = True
self.assertEqual(strategy.sequential_execution, True)
strategy.sequential_execution = False
self.assertEqual(strategy.sequential_execution, False)
strategy.sequential_execution = "True"
self.assertEqual(strategy.sequential_execution, False)
def test_lars(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.lars = True
self.assertEqual(strategy.lars, True)
strategy.lars = False
self.assertEqual(strategy.lars, False)
strategy.lars = "True"
self.assertEqual(strategy.lars, False)
def test_lamb(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.lamb = True
self.assertEqual(strategy.lamb, True)
strategy.lamb = False
self.assertEqual(strategy.lamb, False)
strategy.lamb = "True"
self.assertEqual(strategy.lamb, False)
def test_fuse_elewise_add_act_ops(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.fuse_elewise_add_act_ops = True
self.assertEqual(strategy.fuse_elewise_add_act_ops, True)
strategy.fuse_elewise_add_act_ops = False
self.assertEqual(strategy.fuse_elewise_add_act_ops, False)
strategy.fuse_elewise_add_act_ops = "True"
self.assertEqual(strategy.fuse_elewise_add_act_ops, False)
def test_fuse_bn_act_ops(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.fuse_bn_act_ops = True
self.assertEqual(strategy.fuse_bn_act_ops, True)
strategy.fuse_bn_act_ops = False
self.assertEqual(strategy.fuse_bn_act_ops, False)
strategy.fuse_bn_act_ops = "True"
self.assertEqual(strategy.fuse_bn_act_ops, False)
def test_enable_auto_fusion(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.enable_auto_fusion = True
self.assertEqual(strategy.enable_auto_fusion, True)
strategy.enable_auto_fusion = False
self.assertEqual(strategy.enable_auto_fusion, False)
strategy.enable_auto_fusion = "True"
self.assertEqual(strategy.enable_auto_fusion, False)
def test_fuse_relu_depthwise_conv(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.fuse_relu_depthwise_conv = True
self.assertEqual(strategy.fuse_relu_depthwise_conv, True)
strategy.fuse_relu_depthwise_conv = False
self.assertEqual(strategy.fuse_relu_depthwise_conv, False)
strategy.fuse_relu_depthwise_conv = "True"
self.assertEqual(strategy.fuse_relu_depthwise_conv, False)
def test_enable_inplace(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.enable_inplace = True
self.assertEqual(strategy.enable_inplace, True)
strategy.enable_inplace = False
self.assertEqual(strategy.enable_inplace, False)
strategy.enable_inplace = "True"
self.assertEqual(strategy.enable_inplace, False)
def test_fuse_all_reduce_ops(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.fuse_all_reduce_ops = True
self.assertEqual(strategy.fuse_all_reduce_ops, True)
strategy.fuse_all_reduce_ops = False
self.assertEqual(strategy.fuse_all_reduce_ops, False)
strategy.fuse_all_reduce_ops = "True"
self.assertEqual(strategy.fuse_all_reduce_ops, False)
def test_num_iteration_per_drop_scope(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.num_iteration_per_drop_scope = 1
self.assertEqual(strategy.num_iteration_per_drop_scope, 1)
strategy.num_iteration_per_drop_scope = 0.1
self.assertEqual(strategy.num_iteration_per_drop_scope, 1)
def test_sync_batch_norm(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.sync_batch_norm = True
self.assertEqual(strategy.sync_batch_norm, True)
strategy.sync_batch_norm = False
self.assertEqual(strategy.sync_batch_norm, False)
strategy.sync_batch_norm = "True"
self.assertEqual(strategy.sync_batch_norm, False)
def test_fuse_all_optimizer_ops(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.fuse_all_optimizer_ops = True
self.assertEqual(strategy.fuse_all_optimizer_ops, True)
strategy.fuse_all_optimizer_ops = False
self.assertEqual(strategy.fuse_all_optimizer_ops, False)
strategy.fuse_all_optimizer_ops = "True"
self.assertEqual(strategy.fuse_all_optimizer_ops, False)
def test_sync(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.sync = True
self.assertEqual(strategy.sync, True)
strategy.sync = False
self.assertEqual(strategy.sync, False)
strategy.sync = "True"
self.assertEqual(strategy.sync, False)
def test_async(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.async = True
self.assertEqual(strategy.async, True)
strategy.async = False
self.assertEqual(strategy.async, False)
strategy.async = "True"
self.assertEqual(strategy.async, False)
def test_async_k_step(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.async_k_step = 10000
self.assertEqual(strategy.async_k_step, 10000)
strategy.async_k_step = 0.1
self.assertEqual(strategy.async_k_step, 10000)
def test_send_queue_size(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.send_queue_size = 10000
self.assertEqual(strategy.send_queue_size, 10000)
strategy.send_queue_size = 0.1
self.assertEqual(strategy.send_queue_size, 10000)
def test_independent_recv_thread(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.independent_recv_thread = True
self.assertEqual(strategy.independent_recv_thread, True)
strategy.independent_recv_thread = False
self.assertEqual(strategy.independent_recv_thread, False)
strategy.independent_recv_thread = "True"
self.assertEqual(strategy.independent_recv_thread, False)
def test_min_send_grad_num_before_recv(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.min_send_grad_num_before_recv = 10000
self.assertEqual(strategy.min_send_grad_num_before_recv, 10000)
strategy.min_send_grad_num_before_recv = 0.1
self.assertEqual(strategy.min_send_grad_num_before_recv, 10000)
def test_thread_pool_size(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.thread_pool_size = 10000
self.assertEqual(strategy.thread_pool_size, 10000)
strategy.thread_pool_size = 0.1
self.assertEqual(strategy.thread_pool_size, 10000)
def test_send_wait_times(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.send_wait_times = 10000
self.assertEqual(strategy.send_wait_times, 10000)
strategy.send_wait_times = 0.1
self.assertEqual(strategy.send_wait_times, 10000)
def test_runtime_split_send_recv(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.runtime_split_send_recv = True
self.assertEqual(strategy.runtime_split_send_recv, True)
strategy.runtime_split_send_recv = False
self.assertEqual(strategy.runtime_split_send_recv, False)
strategy.runtime_split_send_recv = "True"
self.assertEqual(strategy.runtime_split_send_recv, False)
def use_thread_barrier(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.thread_barrier = True
self.assertEqual(strategy.thread_barrier, True)
strategy.thread_barrier = False
self.assertEqual(strategy.thread_barrier, False)
strategy.thread_barrier = "True"
self.assertEqual(strategy.thread_barrier, False)
def test_enable_backward_optimizer_op_deps(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.enable_backward_optimizer_op_deps = True
self.assertEqual(strategy.enable_backward_optimizer_op_deps, True)
strategy.enable_backward_optimizer_op_deps = False
self.assertEqual(strategy.enable_backward_optimizer_op_deps, False)
strategy.enable_backward_optimizer_op_deps = "True"
self.assertEqual(strategy.enable_backward_optimizer_op_deps, False)
def test_elastic(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.elastic = True
self.assertEqual(strategy.elastic, True)
strategy.elastic = False
self.assertEqual(strategy.elastic, False)
strategy.elastic = "True"
self.assertEqual(strategy.elastic, False)
def test_auto(self):
strategy = paddle.fleet.DistributedStrategy()
strategy.auto = True
self.assertEqual(strategy.auto, True)
strategy.auto = False
self.assertEqual(strategy.auto, False)
strategy.auto = "True"
self.assertEqual(strategy.auto, False)
if __name__ == '__main__':
unittest.main()
...@@ -143,6 +143,13 @@ packages=['paddle', ...@@ -143,6 +143,13 @@ packages=['paddle',
'paddle.incubate', 'paddle.incubate',
'paddle.incubate.complex', 'paddle.incubate.complex',
'paddle.incubate.complex.tensor', 'paddle.incubate.complex.tensor',
'paddle.fleet',
'paddle.fleet.base',
'paddle.fleet.collective',
'paddle.fleet.dataset',
'paddle.fleet.metrics',
'paddle.fleet.parameter_server',
'paddle.fleet.proto',
'paddle.framework', 'paddle.framework',
'paddle.fluid', 'paddle.fluid',
'paddle.fluid.dygraph', 'paddle.fluid.dygraph',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册