提交 9a8517fd 编写于 作者: T typhoonzero

daemonize the server process

上级 0e850c74
...@@ -136,17 +136,26 @@ class ListenAndServ(object): ...@@ -136,17 +136,26 @@ class ListenAndServ(object):
# simple recv mode, recv operators inputs. # simple recv mode, recv operators inputs.
for iname in op.input_names: for iname in op.input_names:
for in_var_name in op.input(iname): for in_var_name in op.input(iname):
params.append(parent_block.var(name)) params.append(parent_block.var(in_var_name))
grads.append(parent_block.var(name)) grads.append(parent_block.var(in_var_name))
return params, grads return params, grads
def parent_block(self):
prog = self.helper.main_program
parent_idx = prog.current_block().parent_idx
assert parent_idx >= 0
parent_block = prog.block(parent_idx)
return parent_block
def complete_op(self): def complete_op(self):
main_program = self.helper.main_program main_program = self.helper.main_program
current_block = main_program.current_block() current_block = main_program.current_block()
parent_block = self.parent_block() parent_block = self.parent_block()
params, grads = self.get_params_and_grads() params, grads = self.get_params_and_grads()
param_names = [p.name for p in params]
grad_names = [g.name for g in grads]
parent_block.append_op( parent_block.append_op(
type='recv', type='recv',
inputs={}, inputs={},
...@@ -154,8 +163,8 @@ class ListenAndServ(object): ...@@ -154,8 +163,8 @@ class ListenAndServ(object):
attrs={ attrs={
'endpoint': self.endpoint, 'endpoint': self.endpoint,
'Fanin': self.fan_in, 'Fanin': self.fan_in,
'ParamList': params, 'ParamList': param_names,
'GradList': grads, 'GradList': grad_names,
'OptimizeBlock': current_block 'OptimizeBlock': current_block
}) })
...@@ -177,7 +186,7 @@ def Send(endpoints, send_vars, get_vars): ...@@ -177,7 +186,7 @@ def Send(endpoints, send_vars, get_vars):
assert (type(get_vars) == list) assert (type(get_vars) == list)
epmap = endpoints.split(",") epmap = endpoints.split(",")
endpoints = set(epmap) endpoints = list(set(epmap))
helper = LayerHelper("Send", **locals()) helper = LayerHelper("Send", **locals())
helper.append_op( helper.append_op(
......
...@@ -17,40 +17,60 @@ import unittest ...@@ -17,40 +17,60 @@ import unittest
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import paddle.v2.fluid.layers as layers import paddle.v2.fluid.layers as layers
import numpy import numpy
import threading from multiprocessing import Process
import os, sys
class TestRecvOp(unittest.TestCase): class TestRecvOp(unittest.TestCase):
def test_send(self): def test_send(self):
# Run init_serv in a thread # Run init_serv in a thread
place = fluid.CPUPlace() place = fluid.CPUPlace()
t = threading.Thread(target=self.init_serv, args=(place, )) p = Process(target=self.init_serv, args=(place, ))
t.start() p.daemon = True
p.start()
self.init_client(place) self.init_client(place)
t.join() # FIXME(typhoonzero): find a way to gracefully shutdown the server.
os.system("kill -9 %d" % p.pid)
p.join()
def init_serv(self, place): def init_serv(self, place):
main = fluid.Program() main = fluid.Program()
with fluid.program_guard(main): with fluid.program_guard(main):
x = layers.data(shape=[32, 32], dtype='float32', name='X') x = layers.data(
i = fluid.initializer.Constant(value=1.0) shape=[32, 32],
y = i(x, main.global_block()) dtype='float32',
serv = layers.ListenAndServ("127.0.0.1:6174") name="X",
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False)
with serv.do(): with serv.do():
layers.scale(input=y, scale=10.0) o = layers.scale(x=x, scale=10.0)
main.global_block().create_var(
name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape)
print main
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(main) exe.run(main)
def init_client(self, place): def init_client(self, place):
main = fluid.Program() main = fluid.Program()
with fluid.program_guard(main): with fluid.program_guard(main):
x = layers.data(shape=[32, 32], dtype='float32', name='X') x = layers.data(
i = fluid.initializer.Constant(value=1.0) shape=[32, 32],
i(x, main.global_block()) dtype='float32',
name='X',
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
layers.Send("127.0.0.1:6174", [x], [x]) layers.Send("127.0.0.1:6174", [x], [x])
print main
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(main) exe.run(main)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
# test = TestRecvOp()
# place = fluid.CPUPlace()
# if sys.argv[1] == "server":
# test.init_serv(place)
# else:
# test.init_client(place)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册