未验证 提交 3ab3a7f3 编写于 作者: W Wu Yi 提交者: GitHub

Trainer auto wait pserver ports (#13341)

* trainer auto wait pserver port ready

* add file

* fix docstring

* add option to not wait

* update api spec

* clean

* fix test hang
上级 76222342
......@@ -59,7 +59,7 @@ paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], vara
paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
paddle.fluid.InferenceTranspiler.__init__
paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
......@@ -346,7 +346,7 @@ paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'con
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None))
paddle.fluid.transpiler.InferenceTranspiler.__init__
paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,))
......
......@@ -62,7 +62,7 @@ class TranspilerTest(unittest.TestCase):
t = self._transpiler_instance(config)
trainer_main = t.get_trainer_program()
trainer_main = t.get_trainer_program(wait_port=False)
trainer_startup = fluid.default_startup_program()
assert (src.num_blocks == 1)
......
......@@ -16,3 +16,4 @@ from __future__ import print_function
from .program_utils import *
from .ufind import *
from .checkport import *
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import time
import socket
from contextlib import closing
def wait_server_ready(endpoints):
"""
Wait until parameter servers are ready, use connext_ex to detect
port readiness.
Args:
endpoints (list): endpoints string list, like:
["127.0.0.1:8080", "127.0.0.1:8081"]
Examples:
.. code-block:: python
wait_server_ready(["127.0.0.1:8080", "127.0.0.1:8081"])
"""
while True:
all_ok = True
for ep in endpoints:
ip_port = ep.split(":")
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0:
all_ok = False
if not all_ok:
sys.stderr.write("pserver not ready, wait 3 sec to retry...\n")
sys.stderr.flush()
time.sleep(3)
else:
break
......@@ -381,7 +381,7 @@ class DistributeTranspiler(object):
pserver_endpoints)
self._split_table_grad_and_add_send_vars(program, pserver_endpoints)
def get_trainer_program(self):
def get_trainer_program(self, wait_port=True):
"""
Get transpiled trainer side program.
......@@ -393,6 +393,9 @@ class DistributeTranspiler(object):
delete_ops(self.origin_program.global_block(), self.optimize_ops)
self.origin_program.__str__()
if wait_port:
wait_server_ready(self.pserver_endpoints)
return self.origin_program
def _get_trainer_startup_program(self, recv_vars, eplist):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册