未验证 提交 5e61b04c 编写于 作者: F Frank Lin 提交者: GitHub

Fix UT: Try fixing test_the_one_ps and test_communicator_geo (#55263)

* extend timeout

* Rewrite test_communicator_geo

* black
上级 2dcb0ebf
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from test_communicator_geo import TestCommunicatorGeoEnd2End
import paddle
paddle.enable_static()
pipe_name = os.getenv("PIPE_FILE")
class RunServer(TestCommunicatorGeoEnd2End):
def runTest(self):
pass
os.environ["TRAINING_ROLE"] = "PSERVER"
half_run_server = RunServer()
with open(pipe_name, 'w') as pipe:
pipe.write('done')
half_run_server.run_ut()
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
import os import os
import subprocess import subprocess
import sys import sys
import time import tempfile
import unittest import unittest
import numpy import numpy
...@@ -124,51 +124,22 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase): ...@@ -124,51 +124,22 @@ class TestCommunicatorGeoEnd2End(unittest.TestCase):
self.run_pserver(role, strategy) self.run_pserver(role, strategy)
def test_communicator(self): def test_communicator(self):
run_server_cmd = """ temp_dir = tempfile.TemporaryDirectory()
pipe_name = os.path.join(temp_dir.name, 'mypipe')
import sys try:
import os os.mkfifo(pipe_name)
except OSError as oe:
import time print(f"Failed to create pipe: {oe}")
import threading
import subprocess
import unittest
import numpy
import paddle
import paddle.fluid as fluid
from paddle.distributed.communicator import Communicator
import paddle.incubate.distributed.fleet.role_maker as role_maker
from paddle.incubate.distributed.fleet.parameter_server.mode import DistributedMode
import paddle.distributed.fleet as fleet
from test_communicator_geo import TestCommunicatorGeoEnd2End
paddle.enable_static()
class RunServer(TestCommunicatorGeoEnd2End):
def runTest(self):
pass
os.environ["TRAINING_ROLE"] = "PSERVER"
half_run_server = RunServer()
half_run_server.run_ut()
"""
server_file = "run_server_for_communicator_geo.py"
with open(server_file, "w") as wb:
wb.write(run_server_cmd)
port = find_free_ports(1).pop() port = find_free_ports(1).pop()
os.environ["TRAINING_ROLE"] = "PSERVER" os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["PADDLE_PORT"] = str(port) os.environ["PADDLE_PORT"] = str(port)
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = f"127.0.0.1:{port}" os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = f"127.0.0.1:{port}"
os.environ["PIPE_FILE"] = pipe_name
_python = sys.executable _python = sys.executable
server_file = "run_server_for_communicator_geo.py"
ps_cmd = f"{_python} {server_file}" ps_cmd = f"{_python} {server_file}"
ps_proc = subprocess.Popen( ps_proc = subprocess.Popen(
...@@ -177,7 +148,8 @@ half_run_server.run_ut() ...@@ -177,7 +148,8 @@ half_run_server.run_ut()
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
) )
time.sleep(5) with open(pipe_name, 'r') as pipe:
start_command = pipe.read()
os.environ["TRAINING_ROLE"] = "TRAINER" os.environ["TRAINING_ROLE"] = "TRAINER"
...@@ -186,9 +158,6 @@ half_run_server.run_ut() ...@@ -186,9 +158,6 @@ half_run_server.run_ut()
ps_proc.wait() ps_proc.wait()
outs, errs = ps_proc.communicate() outs, errs = ps_proc.communicate()
if os.path.exists(server_file):
os.remove(server_file)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -11,7 +11,7 @@ endif() ...@@ -11,7 +11,7 @@ endif()
foreach(TEST_OP ${TEST_OPS}) foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP}) py_test_modules(${TEST_OP} MODULES ${TEST_OP})
list(APPEND TEST_OPS ${TEST_OP}) list(APPEND TEST_OPS ${TEST_OP})
set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 50) set_tests_properties(${TEST_OP} PROPERTIES TIMEOUT 120)
endforeach() endforeach()
if(WITH_HETERPS AND NOT WITH_PSLIB) if(WITH_HETERPS AND NOT WITH_PSLIB)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册