提交 cfec7e64 编写于 作者: J Jacob Champion

gppylib: remove race in WorkerPool progress test

Despite my best efforts to avoid races on overloaded test containers,
test_join_and_indicate_progress_prints_dots_until_pool_is_done() has
been failing fairly often. Replace the simple-but-flaky time-based test
with an implementation that serializes the components of the race.

(cherry picked from commit 9f5384bc)
上级 b2c5de61
import unittest import os
import StringIO import StringIO
import threading import threading
import time import time
import unittest
import mock import mock
...@@ -245,26 +246,52 @@ class WorkerPoolTest(unittest.TestCase): ...@@ -245,26 +246,52 @@ class WorkerPoolTest(unittest.TestCase):
self.assertEqual(stdout.getvalue(), '') self.assertEqual(stdout.getvalue(), '')
def test_join_and_indicate_progress_prints_dots_until_pool_is_done(self): def test_join_and_indicate_progress_prints_dots_until_pool_is_done(self):
# To avoid false negatives from the race conditions here, let's set up a
# situation where we'll print ten dots on average, and verify that there
# were at least five dots printed.
duration = 0.01
cmd = mock.Mock(spec=Command) cmd = mock.Mock(spec=Command)
def wait_for_duration():
time.sleep(duration)
cmd.run.side_effect = wait_for_duration
self.pool.addCommand(cmd)
stdout = StringIO.StringIO() # cmd.run() will block until this Event is set.
join_and_indicate_progress(self.pool, stdout, interval=(duration / 10)) event = threading.Event()
def wait_for_event():
event.wait()
cmd.run.side_effect = wait_for_event
# Open up a pipe and wrap each end in a file-like object.
read_end, write_end = os.pipe()
read_end = os.fdopen(read_end, 'r')
write_end = os.fdopen(write_end, 'w')
# Create a thread to perform join_and_indicate_progress().
def tmain():
join_and_indicate_progress(self.pool, write_end, interval=0.001)
write_end.close()
join_thread = threading.Thread(target=tmain)
try:
# Add the command, then join the WorkerPool.
self.pool.addCommand(cmd)
join_thread.start()
results = stdout.getvalue() # join_and_indicate_progress() is now writing to our pipe. Wait for
self.assertIn('.....', results) # a few dots...
self.assertTrue(results.endswith('\n')) for _ in range(3):
byte = read_end.read(1)
self.assertEqual(byte, '.')
# ...then stop the command.
event.set()
# Make sure the rest of the output consists of dots ending in a
# newline. (tmain() closes the write end of the pipe so that this
# read() will complete.)
remaining = read_end.read()
self.assertRegexpMatches(remaining, r'^[.]*\n$')
finally:
# Make sure that we unblock and join all threads, even on a test
# failure.
event.set()
join_thread.join()
def test_join_and_indicate_progress_flushes_every_dot(self): def test_join_and_indicate_progress_flushes_every_dot(self):
# Set up a test scenario like the progress test above.
duration = 0.005 duration = 0.005
cmd = mock.Mock(spec=Command) cmd = mock.Mock(spec=Command)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册