private_helper_function.py 2.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import time
import socket
from contextlib import closing
from six import string_types

20 21
__all__ = []

22 23 24 25 26

def wait_server_ready(endpoints):
    """
    Wait until parameter servers are ready, use connext_ex to detect
    port readiness.
L
Ligoml 已提交
27

28
    Args:
29
    endpoints (list|tuple): endpoints string list, like:
30
    ["127.0.0.1:8080", "127.0.0.1:8081"]
L
Ligoml 已提交
31

32 33 34 35 36 37 38 39 40 41 42
    Examples:
    .. code-block:: python

         wait_server_ready(["127.0.0.1:8080", "127.0.0.1:8081"])
    """
    assert not isinstance(endpoints, str)
    while True:
        all_ok = True
        not_ready_endpoints = []
        for ep in endpoints:
            ip_port = ep.split(":")
L
Ligoml 已提交
43 44 45
            with closing(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            ) as sock:
46
                sock.settimeout(2)
47 48 49 50
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                if hasattr(socket, 'SO_REUSEPORT'):
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

51 52 53 54 55 56
                result = sock.connect_ex((ip_port[0], int(ip_port[1])))
                if result != 0:
                    all_ok = False
                    not_ready_endpoints.append(ep)
        if not all_ok:
            sys.stderr.write("server not ready, wait 3 sec to retry...\n")
L
Ligoml 已提交
57 58 59
            sys.stderr.write(
                "not ready endpoints:" + str(not_ready_endpoints) + "\n"
            )
60 61 62 63
            sys.stderr.flush()
            time.sleep(3)
        else:
            break