未验证 提交 f37bd035 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #14153 from jacquesqiao/fix-pserver-crash-when-no-parameter

set en empty optimize block if pserver has no optimize block
...@@ -405,6 +405,31 @@ class TestL2DecayWithPiecewise(TranspilerTest): ...@@ -405,6 +405,31 @@ class TestL2DecayWithPiecewise(TranspilerTest):
["sum", "scale", "scale", "elementwise_add", "momentum"]) ["sum", "scale", "scale", "elementwise_add", "momentum"])
class TestEmptyPserverOptimizeBlocks(TranspilerTest):
def net_conf(self):
x = fluid.layers.data(name='x', shape=[1000], dtype='float32')
# only one parameter
y_predict = fluid.layers.fc(input=x,
size=1000,
act=None,
param_attr=fluid.ParamAttr(name='fc_w'),
bias_attr=False)
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=1.0)
sgd_optimizer.minimize(avg_cost)
def transpiler_test_impl(self):
config = fluid.DistributeTranspilerConfig()
config.slice_var_up = False
pserver, startup = self.get_pserver(ep=self.pserver2_ep, config=config)
self.assertEqual(len(pserver.blocks), 2)
self.assertEqual(len(pserver.blocks[1].ops), 0)
class TestDistLookupTableBase(TranspilerTest): class TestDistLookupTableBase(TranspilerTest):
def network_with_table(self, is_sparse, is_distributed): def network_with_table(self, is_sparse, is_distributed):
self.table_size = 1000 self.table_size = 1000
......
...@@ -55,6 +55,46 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id): ...@@ -55,6 +55,46 @@ def run_pserver(use_cuda, sync_mode, ip, port, trainers, trainer_id):
exe.run(pserver_prog) exe.run(pserver_prog)
def run_pserver_with_empty_block(use_cuda, sync_mode, ip, port, trainers,
trainer_id):
x = fluid.layers.data(name='x', shape=[1], dtype='float32')
y_predict = fluid.layers.fc(input=x, size=1, act=None, bias_attr=False)
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
# loss function
cost = fluid.layers.square_error_cost(input=y_predict, label=y)
avg_cost = fluid.layers.mean(cost)
# optimizer
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
ps1 = ip + ":" + str(int(port) + 1)
ps2 = ip + ":" + port
pserver_endpoints = ps1 + "," + ps2
config = fluid.DistributeTranspilerConfig()
config.slice_var_up = False
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=sync_mode)
pserver_prog = t.get_pserver_program(ps2)
# pserver2 have no parameter
assert (len(pserver_prog.blocks) == 2)
assert (len(pserver_prog.blocks[1].ops) == 0)
pserver_startup = t.get_startup_program(ps2, pserver_prog)
exe.run(pserver_startup)
exe.run(pserver_prog)
class TestListenAndServOp(OpTest): class TestListenAndServOp(OpTest):
def setUp(self): def setUp(self):
self.ps_timeout = 5 self.ps_timeout = 5
...@@ -63,9 +103,9 @@ class TestListenAndServOp(OpTest): ...@@ -63,9 +103,9 @@ class TestListenAndServOp(OpTest):
self.trainers = 1 self.trainers = 1
self.trainer_id = 0 self.trainer_id = 0
def _start_pserver(self, use_cuda, sync_mode): def _start_pserver(self, use_cuda, sync_mode, pserver_func):
p = Process( p = Process(
target=run_pserver, target=pserver_func,
args=(use_cuda, sync_mode, self.ip, self.port, self.trainers, args=(use_cuda, sync_mode, self.ip, self.port, self.trainers,
self.trainer_id)) self.trainer_id))
p.daemon = True p.daemon = True
...@@ -92,7 +132,24 @@ class TestListenAndServOp(OpTest): ...@@ -92,7 +132,24 @@ class TestListenAndServOp(OpTest):
def test_handle_signal_in_serv_op(self): def test_handle_signal_in_serv_op(self):
# run pserver on CPU in sync mode # run pserver on CPU in sync mode
p1 = self._start_pserver(False, True) p1 = self._start_pserver(False, True, run_pserver)
self._wait_ps_ready(p1.pid)
# raise SIGTERM to pserver
os.kill(p1.pid, signal.SIGINT)
p1.join()
# run pserver on CPU in async mode
p2 = self._start_pserver(False, False, run_pserver)
self._wait_ps_ready(p2.pid)
# raise SIGTERM to pserver
os.kill(p2.pid, signal.SIGTERM)
p2.join()
def test_list_and_serv_run_empty_optimize_block(self):
# run pserver on CPU in sync mode
p1 = self._start_pserver(False, True, run_pserver_with_empty_block)
self._wait_ps_ready(p1.pid) self._wait_ps_ready(p1.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
...@@ -100,7 +157,7 @@ class TestListenAndServOp(OpTest): ...@@ -100,7 +157,7 @@ class TestListenAndServOp(OpTest):
p1.join() p1.join()
# run pserver on CPU in async mode # run pserver on CPU in async mode
p2 = self._start_pserver(False, False) p2 = self._start_pserver(False, False, run_pserver_with_empty_block)
self._wait_ps_ready(p2.pid) self._wait_ps_ready(p2.pid)
# raise SIGTERM to pserver # raise SIGTERM to pserver
......
...@@ -35,6 +35,7 @@ import sys ...@@ -35,6 +35,7 @@ import sys
import numpy as np import numpy as np
import collections import collections
import six import six
import logging
from .ps_dispatcher import RoundRobin, HashName, PSDispatcher from .ps_dispatcher import RoundRobin, HashName, PSDispatcher
from .. import core, framework from .. import core, framework
...@@ -767,6 +768,15 @@ in a single call.") ...@@ -767,6 +768,15 @@ in a single call.")
prefetch_var_name_to_block_id.extend( prefetch_var_name_to_block_id.extend(
lookup_table_var_name_to_block_id) lookup_table_var_name_to_block_id)
if len(optimize_blocks) == 0:
logging.warn("pserver [" + str(endpoint) +
"] has no optimize block!!")
pre_block_idx = pserver_program.num_blocks - 1
empty_block = pserver_program._create_block(pre_block_idx)
optimize_blocks.append(empty_block)
# In some case, some parameter server will have no parameter to optimize
# So we give an empty optimize block to parameter server.
attrs = { attrs = {
"optimize_blocks": optimize_blocks, "optimize_blocks": optimize_blocks,
"endpoint": endpoint, "endpoint": endpoint,
...@@ -1280,7 +1290,6 @@ to transpile() call.") ...@@ -1280,7 +1290,6 @@ to transpile() call.")
} }
outputs = {"ParamOut": [param_var]} outputs = {"ParamOut": [param_var]}
# only support sgd now # only support sgd now
import logging
logging.warn( logging.warn(
"distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of " "distribute lookup table only support sgd optimizer, change it's optimizer to sgd instead of "
+ table_opt_op.type) + table_opt_op.type)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册