提交 06213b79 编写于 作者: D dongdaxiang

add hadoop helper function for distributed training

上级 49130f9b
...@@ -150,8 +150,13 @@ class AsyncExecutor(object): ...@@ -150,8 +150,13 @@ class AsyncExecutor(object):
data_feed.desc(), filelist, thread_num, data_feed.desc(), filelist, thread_num,
fetch_var_names, debug) fetch_var_names, debug)
def config_ps(self, dist_desc, host_sign_list, node_num, index): def config_distributed_nodes(self, dist_opt):
self.executor.config_pslib(dist_desc, host_sign_list, node_num, index) # get total rank
# get rank index
# get iplists
# get hadoop info
return
def start_server(self): def start_server(self):
self.executor.start_server() self.executor.start_server()
......
...@@ -21,7 +21,8 @@ def find_distributed_lookup_table_inputs(program, table_name): ...@@ -21,7 +21,8 @@ def find_distributed_lookup_table_inputs(program, table_name):
for op in program.global_block().ops: for op in program.global_block().ops:
if op.type == LOOKUP_TABLE_TYPE: if op.type == LOOKUP_TABLE_TYPE:
if table_name == op.input("W")[0]: if table_name == op.input("W")[0]:
inputs.extend([local_vars[name] for name in op.input("Ids")]) inputs.extend(
[local_vars[name] for name in op.input("Ids")])
return inputs return inputs
def find_distributed_lookup_table_outputs(program, table_name): def find_distributed_lookup_table_outputs(program, table_name):
...@@ -30,7 +31,8 @@ def find_distributed_lookup_table_outputs(program, table_name): ...@@ -30,7 +31,8 @@ def find_distributed_lookup_table_outputs(program, table_name):
for op in program.global_block().ops: for op in program.global_block().ops:
if op.type == LOOKUP_TABLE_TYPE: if op.type == LOOKUP_TABLE_TYPE:
if table_name == op.input("W")[0]: if table_name == op.input("W")[0]:
outputs.extend([local_vars[name] for name in op.output("Out")]) outputs.extend(
[local_vars[name] for name in op.output("Out")])
return outputs return outputs
def find_distributed_lookup_table(program): def find_distributed_lookup_table(program):
......
...@@ -8,30 +8,57 @@ from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_o ...@@ -8,30 +8,57 @@ from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_o
from google.protobuf import text_format from google.protobuf import text_format
class DownpourSGD(object): class DownpourSGD(object):
"""
Distributed optimizer of downpour stochastic gradient descent
Standard implementation of Google's Downpour SGD
in Large Scale Distributed Deep Networks
Args:
learning_rate (float): the learning rate used to update parameters. \
Can be a float value
Examples:
.. code-block:: python
downpour_sgd = fluid.distributed.DownpourSGD(learning_rate=0.2)
downpour_sgd.minimize(cost)
"""
def __init__(self, learning_rate=0.001, window=1): def __init__(self, learning_rate=0.001, window=1):
# todo(guru4elephant): if optimizer is not None, will warning here # todo(guru4elephant): add more optimizers here as argument
# todo(guru4elephant): make learning_rate as a variable
self.learning_rate_ = learning_rate self.learning_rate_ = learning_rate
self.window_ = window self.window_ = window
self.type = "downpour"
def minimize(self, loss, startup_program=None, def minimize(self, loss, startup_program=None,
parameter_list=None, no_grad_set=None): parameter_list=None, no_grad_set=None):
params_grads = sorted(append_backward(loss), key=lambda x:x[0].name) params_grads = sorted(append_backward(
loss, parameter_list, no_grad_set), key=lambda x:x[0].name)
table_name = find_distributed_lookup_table(loss.block.program) table_name = find_distributed_lookup_table(loss.block.program)
prefetch_slots = find_distributed_lookup_table_inputs( prefetch_slots = find_distributed_lookup_table_inputs(
loss.block.program, table_name) loss.block.program, table_name)
prefetch_slots_emb = find_distributed_lookup_table_outputs( prefetch_slots_emb = find_distributed_lookup_table_outputs(
loss.block.program, table_name) loss.block.program, table_name)
server = DownpourServer() server = DownpourServer()
# window is communication strategy
worker = DownpourWorker(self.window_) worker = DownpourWorker(self.window_)
server.add_sparse_table(0, self.learning_rate_, # Todo(guru4elephant): support multiple tables definitions
# currently support one big sparse table
sparse_table_index = 0
# currently merge all dense parameters into one dense table
dense_table_index = 1
server.add_sparse_table(sparse_table_index, self.learning_rate_,
prefetch_slots, prefetch_slots_emb) prefetch_slots, prefetch_slots_emb)
server.add_dense_table(1, self.learning_rate_, params_grads[0], params_grads[1]) server.add_dense_table(dense_table_index, self.learning_rate_,
worker.add_sparse_table(0, self.learning_rate_, params_grads[0], params_grads[1])
worker.add_sparse_table(sparse_table_index, self.learning_rate_,
prefetch_slots, prefetch_slots_emb) prefetch_slots, prefetch_slots_emb)
worker.add_dense_table(1, self.learning_rate_, params_grads[0], params_grads[1]) worker.add_dense_table(dense_table_index, self.learning_rate_,
params_grads[0], params_grads[1])
ps_param = pslib.PSParameter() ps_param = pslib.PSParameter()
ps_param.server_param.CopyFrom(server.get_desc()) ps_param.server_param.CopyFrom(server.get_desc())
#ps_param.worker_param.CopyFrom(worker.get_desc()) ps_param.worker_param.CopyFrom(worker.get_desc())
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"] worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
ps_param_str = text_format.MessageToString(ps_param) ps_param_str = text_format.MessageToString(ps_param)
return [ps_param_str, worker_skipped_ops, text_format.MessageToString(worker.get_desc())] return [ps_param_str, worker_skipped_ops]
from mpi4py import MPI from mpi4py import MPI
class FileSystem(object):
def __init__(self, fs_type="afs",
uri="afs://tianqi.afs.baidu.com:9902",
user=None,
passwd=None,
hadoop_bin="",
afs_conf=None):
assert user not None
assert passwd not None
assert hadoop_bin not None
fs_client = pslib.FsClientParameter()
if fs_type == "afs":
fs_client.fs_type = pslib.FsApiType.AFS
else:
fs_client.fs_type = pslib.FsApiType.HDFS
fs_client.uri = uri
fs_client.user = user
fs_client.passwd = passwd
fs_client.buffer_size = 0
fs_client.afs_conf = afs_conf if not afs_conf else ""
class MPIHelper(object): class MPIHelper(object):
def __init__(self): def __init__(self):
self.comm = MPI.COMM_WORLD self.comm = MPI.COMM_WORLD
...@@ -18,3 +40,5 @@ class MPIHelper(object): ...@@ -18,3 +40,5 @@ class MPIHelper(object):
def get_hostname(self): def get_hostname(self):
import socket import socket
return socket.gethostname() return socket.gethostname()
...@@ -12,7 +12,6 @@ class Worker(object): ...@@ -12,7 +12,6 @@ class Worker(object):
class DownpourServer(Server): class DownpourServer(Server):
def __init__(self): def __init__(self):
#self.server_ = pslib.ServerParameter().downpour_server_param
self.server_ = pslib.ServerParameter() self.server_ = pslib.ServerParameter()
def add_sparse_table(self, table_id, learning_rate, def add_sparse_table(self, table_id, learning_rate,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册