未验证 提交 89f2c652 编写于 作者: S sneaxiy 提交者: GitHub

remove paddle.fluid.distributed (#49517)

上级 5defefd6
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from .node import DownpourServer
from .node import DownpourWorker
from ..backward import append_backward
import ps_pb2 as pslib
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
from paddle.fluid.distribute_lookup_table import (
find_distributed_lookup_table_inputs,
)
from paddle.fluid.distribute_lookup_table import (
find_distributed_lookup_table_outputs,
)
from google.protobuf import text_format
class DownpourSGD:
r"""
Distributed optimizer of downpour stochastic gradient descent
Standard implementation of Google's Downpour SGD
in Large Scale Distributed Deep Networks
Args:
learning_rate (float): the learning rate used to update parameters. \
Can be a float value
Examples:
.. code-block:: python
opt = fluid.DistributedOptimizer(sgd_opt)
opt.minimize()
downpour_sgd = fluid.distributed.DownpourSGD(learning_rate=0.2)
downpour_sgd.minimize(cost)
"""
def __init__(self, learning_rate=0.001, window=1):
# todo(guru4elephant): add more optimizers here as argument
# todo(guru4elephant): make learning_rate as a variable
self.learning_rate_ = learning_rate
self.window_ = window
self.type = "downpour"
self.data_norm_name = [
".batch_size",
".batch_square_sum",
".batch_sum",
".batch_size@GRAD",
".batch_square_sum@GRAD",
".batch_sum@GRAD",
]
def minimize(
self,
losses,
startup_program=None,
parameter_list=None,
no_grad_set=None,
):
"""
DownpounSGD is a distributed optimizer so
that user can call minimize to generate backward
operators and optimization operators within minimize function
Args:
loss(Variable): loss variable defined by user
startup_program(Program): startup program that defined by user
parameter_list(str list): parameter names defined by users
no_grad_set(set): a set of variables that is defined by users
so that these variables do not need gradient computation
Returns:
[ps_param, worker_skipped_ops]
ps_param: parameter server protobuf desc
worker_skipped_ops: operator names that need
to be skipped during execution
"""
if not isinstance(losses, list):
raise ValueError('losses is a list, just lick [model.cost]')
table_name = find_distributed_lookup_table(losses[0].block.program)
prefetch_slots = find_distributed_lookup_table_inputs(
losses[0].block.program, table_name
)
prefetch_slots_emb = find_distributed_lookup_table_outputs(
losses[0].block.program, table_name
)
ps_param = pslib.PSParameter()
server = DownpourServer()
worker = DownpourWorker(self.window_)
sparse_table_index = 0
server.add_sparse_table(
sparse_table_index,
self.learning_rate_,
prefetch_slots,
prefetch_slots_emb,
)
worker.add_sparse_table(
sparse_table_index,
self.learning_rate_,
prefetch_slots,
prefetch_slots_emb,
)
dense_table_index = 1
program_configs = []
param_grads_list = []
for loss_index in range(len(losses)):
program_config = ps_param.trainer_param.program_config.add()
program_config.program_id = str(
id(losses[loss_index].block.program)
)
program_config.pull_sparse_table_id.extend([sparse_table_index])
program_config.push_sparse_table_id.extend([sparse_table_index])
params_grads = sorted(
append_backward(
losses[loss_index], parameter_list, no_grad_set
),
key=lambda x: x[0].name,
)
param_grads_list.append(params_grads)
params = []
grads = []
data_norm_params = []
data_norm_grads = []
for i in params_grads:
is_data_norm_data = False
for data_norm_name in self.data_norm_name:
if i[0].name.endswith(data_norm_name):
is_data_norm_data = True
data_norm_params.append(i[0])
if not is_data_norm_data:
params.append(i[0])
for i in params_grads:
is_data_norm_data = False
for data_norm_grad in self.data_norm_name:
if i[0].name.endswith(data_norm_grad):
is_data_norm_data = True
data_norm_grads.append(i[1])
if not is_data_norm_data:
grads.append(i[1])
server.add_dense_table(
dense_table_index, self.learning_rate_, params, grads
)
worker.add_dense_table(
dense_table_index, self.learning_rate_, params, grads
)
program_config.pull_dense_table_id.extend([dense_table_index])
program_config.push_dense_table_id.extend([dense_table_index])
if len(data_norm_params) != 0 and len(data_norm_grads) != 0:
dense_table_index += 1
server.add_data_norm_table(
dense_table_index,
self.learning_rate_,
data_norm_params,
data_norm_grads,
)
worker.add_dense_table(
dense_table_index,
self.learning_rate_,
data_norm_params,
data_norm_grads,
)
program_config.pull_dense_table_id.extend([dense_table_index])
program_config.push_dense_table_id.extend([dense_table_index])
dense_table_index += 1
program_configs.append(program_config)
ps_param.server_param.CopyFrom(server.get_desc())
ps_param.trainer_param.CopyFrom(worker.get_desc())
for program_config in program_configs:
ps_param.trainer_param.program_config.extend([program_config])
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
ps_param.trainer_param.skip_op.extend(worker_skipped_ops)
# all fleet operations should be defined in operators in the future
# we want to return an object here containing:
# 1) worker execution strategy
# 2) pserver execution strategy
# 3) fleet configurations
# 4) skipped operators in runtime
# 5) distributed optimization
opt_info = {}
opt_info["trainer"] = "DistMultiTrainer"
opt_info["device_worker"] = "DownpourSGD"
opt_info["optimizer"] = "DownpourSGD"
opt_info["fleet_desc"] = ps_param
opt_info["worker_skipped_ops"] = worker_skipped_ops
for loss in losses:
loss.block.program._fleet_opt = opt_info
return None, param_grads_list
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import sys
from .. import core
from . import ps_instance
from google.protobuf import text_format
__all__ = ['Fleet']
class Fleet:
""" """
def __init__(self):
self.instance_ = ps_instance.PaddlePSInstance()
self.fleet_ = core.FleetWrapper()
def stop(self):
self.instance_.barrier_worker()
if self.instance.is_first_worker():
self.fleet_.stop_server()
self.instance_.barrier_worker()
self.instance_.barrier_all()
self.instance.finalize()
def init_pserver(self, opt_info):
if "fleet_desc" in opt_info:
self.dist_desc_str_ = text_format.MessageToString(
opt_info["fleet_desc"]
)
self.dist_desc_ = opt_info["fleet_desc"]
else:
print(
"You should run distributed optimization to get opt_info first"
)
sys.exit(-1)
self.fleet_.init_server(self.dist_desc_str_)
ip = self.fleet_.start_server()
self.instance_.set_ip(ip)
self.instance.barrier_all()
ips = self.instance.gather_ips()
self.fleet.gather_servers(ips, self.instance_.get_node_cnt())
self.instance_.barrier_all()
def init_worker(self, opt_info):
if "fleet_desc" in opt_info:
self.dist_desc_str_ = text_format.MessageToString(
opt_info["fleet_desc"]
)
self.dist_desc_ = opt_info["fleet_desc"]
else:
print(
"You should run distributed optimization to get opt_info first"
)
sys.exit(-1)
self.instance_.barrier_all()
ips = self.instance.gather_ips()
self.fleet_.init_worker(
self.dist_desc_str_,
ips,
self.instance_.get_node_cnt(),
self.instance._rankid,
)
self.instance.barrier_worker()
def init_pserver_model(self):
if self.instance_.is_first_worker():
self.fleet_.init_model()
self.instance_.barrier_worker()
def save_pserver_model(self, save_path):
self.fleet_.save_model(save_path)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class FileSystem:
"""
A file system that support hadoop client desc.
Args:
fs_type (string): fs_type, for example is "afs"
user (string): hadoop param
passwd (string): hadoop param
hadoop bin (string): hadoop param
Examples:
fs = FileSystm()
"""
def __init__(
self,
fs_type="afs",
uri="afs://xx",
user=None,
passwd=None,
hadoop_bin="",
):
assert user is not None
assert passwd is not None
assert hadoop_bin is not None
import ps_pb2 as pslib
self.fs_client = pslib.FsClientParameter()
self.fs_client.uri = uri
self.fs_client.user = user
self.fs_client.passwd = passwd
# self.fs_client.buffer_size = 0
self.fs_client.hadoop_bin = hadoop_bin
# self.fs_client.afs_conf = afs_conf if not afs_conf else ""
def get_desc(self):
"""
get hadoop desc.
"""
return self.fs_client
class MPIHelper:
"""
MPIHelper is a wrapper of mpi4py, support get_rank get_size etc.
Args:
No params
Examples:
mh = MPIHelper()
mh.get_ip()
"""
def __init__(self):
from mpi4py import MPI
self.comm = MPI.COMM_WORLD
self.MPI = MPI
def get_rank(self):
return self.comm.Get_rank()
def get_size(self):
return self.comm.Get_size()
def get_ip(self):
import socket
local_ip = socket.gethostbyname(socket.gethostname())
return local_ip
def get_hostname(self):
import socket
return socket.gethostname()
def finalize(self):
self.MPI.Finalize()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import ps_pb2 as pslib
# NOTE: reduce removed in fuctools in python3
from functools import reduce
class Server:
"""
A Server basic class.
"""
def __init__(self):
pass
class Worker:
"""
A Worker basic class.
"""
def __init__(self):
pass
class DownpourServer(Server):
"""
DownpourServer class is used to generate server program_desc
Args:
server: it is pslib.ServerParameter()
Examples:
server = DownpourServer()
"""
def __init__(self):
self.server_ = pslib.ServerParameter()
self.server_.downpour_server_param.service_param.start_server_port = 0
self.server_.downpour_server_param.service_param.server_class = (
"DownpourBrpcPsServer"
)
self.server_.downpour_server_param.service_param.client_class = (
"DownpourBrpcPsClient"
)
self.server_.downpour_server_param.service_param.service_class = (
"DownpourPsService"
)
self.server_.downpour_server_param.service_param.server_thread_num = 12
def add_sparse_table(
self, table_id, learning_rate, slot_key_vars, slot_value_var
):
r"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
Returns:
return None
"""
table = self.server_.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourSparseTable"
table.type = pslib.PS_SPARSE_TABLE
table.accessor.accessor_class = "DownpourFeatureValueAccessor"
table.accessor.sparse_sgd_param.learning_rate = learning_rate
table.accessor.sparse_sgd_param.initial_g2sum = 3
table.accessor.sparse_sgd_param.initial_range = 1e-4
table.accessor.sparse_sgd_param.weight_bounds.extend([-10, 10])
table.accessor.embedx_dim = 8
table.accessor.embedx_threshold = 5
table.accessor.fea_dim = 11
table.accessor.downpour_accessor_param.nonclk_coeff = 0.1
table.accessor.downpour_accessor_param.click_coeff = 2
table.accessor.downpour_accessor_param.base_threshold = 0.2
table.accessor.downpour_accessor_param.delta_threshold = 0.15
table.accessor.downpour_accessor_param.delta_keep_days = 31
table.accessor.downpour_accessor_param.show_click_decay_rate = 0.999
table.accessor.downpour_accessor_param.delete_threshold = 0.8
def add_dense_table(self, table_id, learning_rate, param_var, grad_var):
r"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table = self.server_.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourDenseTable"
table.type = pslib.PS_DENSE_TABLE
table.accessor.accessor_class = "DownpourDenseValueAccessor"
table.accessor.dense_sgd_param.name = "adam"
table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993
table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999
table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8
table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99
table.accessor.dense_sgd_param.naive.learning_rate = 0.0002
fea_dim = 0
for param in filter(
lambda x: x.name.find("embedding") == -1, param_var
):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
table.accessor.fea_dim = fea_dim
def get_desc(self):
"""
Return downpour server program_desc
"""
return self.server_
class DownpourWorker(Worker):
"""
DownpourWorker class is used to generate worker program_desc
Args:
window (int): push params frequency
worker: it is pslib.DownpourTrainerParameter
Examples:
worker = DownpourWorker(1)
"""
def __init__(self, window):
self.window = window
self.worker_ = pslib.DownpourTrainerParameter()
def add_sparse_table(
self, table_id, learning_rate, slot_key_vars, slot_value_vars
):
r"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
Returns:
return None
"""
table = self.worker_.sparse_table.add()
table.table_id = table_id
table.slot_key.extend([var.name for var in slot_key_vars])
table.slot_value.extend([var.name for var in slot_value_vars])
table.slot_gradient.extend(
[var.name + "@GRAD" for var in slot_value_vars]
)
def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars):
r"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table = self.worker_.dense_table.add()
table.table_id = table_id
table.dense_variable_name.extend(
filter(
lambda x: x.find("embedding") == -1,
[p.name for p in param_vars],
)
)
table.dense_gradient_variable_name.extend(
filter(
lambda x: x.find("embedding") == -1, [g.name for g in grad_vars]
)
)
def get_desc(self):
"""
Return downpour worker program_desc
"""
return self.worker_
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from .helper import MPIHelper
class PaddlePSInstance:
"""
PaddlePSInstance class is used to generate A instance of server or worker
Args:
server_worker_mode: is a value 0 or 1, default is 1
proc_per_node: process per node, default is 2
Examples:
instance = PaddlePSInstance(1, 2)
"""
def __init__(self, server_worker_mode=1, proc_per_node=2):
self.dh = MPIHelper()
self._rankid = self.dh.get_rank()
self._server_worker_mode = server_worker_mode
self._proc_per_node = proc_per_node
self._nodes = self.dh.get_size()
self._ip = 0
self._worker_num = self._nodes * self._proc_per_node / 2
self._server_num = self._nodes * self._proc_per_node / 2
self._total_server_worker = self._worker_num + self._server_num
self._node_type = None # IDLE=-1, WORKER=1, SERVER=0
self._set_nodetype()
self._comm = None
self._split_comm()
def _set_nodetype(self):
if self._server_worker_mode == 0:
if self._rankid < self._server_num:
self._node_type = 1
elif self._rankid < self._total_server_worker:
self._node_type = 0
else:
self._node_type = -1
elif self._server_worker_mode == 1:
if self._rankid < self._total_server_worker:
if 0 == self._rankid % self._proc_per_node % 2:
self._node_type = 0
else:
self._node_type = 1
else:
self._node_type = -1
else:
self._node_type = -1
def _split_comm(self):
if self.is_server():
self._comm = self.dh.comm.Split(self._node_type)
elif self.is_worker():
self._comm = self.dh.comm.Split(self._node_type)
pass
def get_worker_id(self):
"""
Return worker index
"""
if self._server_worker_mode == 0:
return self._rankid == self.server_num
else:
return self._rankid / self._proc_per_node
def get_server_id(self):
"""
Return server index
"""
if self._server_worker_mode == 0:
return self.rank_id
else:
return self.rank_id / self._proc_per_node
def is_worker(self):
"""
Return instance is worker or not
"""
return self._node_type == 1
def is_server(self):
"""
Return instance is server or not
"""
return self._node_type == 0
def is_first_worker(self):
"""
Return instance is first worker or not
"""
return self.is_worker() and 0 == self.get_worker_id()
def set_ip(self, ip):
"""
set server ip
"""
self._ip = ip
def gather_ips(self):
"""
Return all servers and workers ip through mpi allgather
"""
self._ips = self.dh.comm.allgather(self._ip)
return self._ips
def get_node_cnt(self):
"""
Return node cnt
"""
return self._nodes
def get_worker_num(self):
"""
Return worker num
"""
return self._worker_num
def get_server_num(self):
"""
Return server num
"""
return self._server_num
def barrier_all(self):
"""
barrier workers and servers
"""
self.dh.comm.barrier()
def barrier_worker(self):
"""
barrier workers
"""
if self.is_worker():
self._comm.barrier()
pass
def finalize(self):
"""
MPI finalize
"""
self.dh.finalize()
pass
if __name__ == "__main__":
instance = PaddlePSInstance(1, 2)
instance.barrier_all()
此差异已折叠。
......@@ -333,7 +333,6 @@ packages=['paddle',
'paddle.fluid.dygraph',
'paddle.fluid.proto',
'paddle.fluid.proto.profiler',
'paddle.fluid.distributed',
'paddle.fluid.layers',
'paddle.fluid.dataloader',
'paddle.fluid.contrib',
......
......@@ -1232,7 +1232,6 @@ def get_setup_parameters():
'paddle.fluid.dygraph',
'paddle.fluid.proto',
'paddle.fluid.proto.profiler',
'paddle.fluid.distributed',
'paddle.fluid.layers',
'paddle.fluid.dataloader',
'paddle.fluid.contrib',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册