提交 8f0a0fa5 编写于 作者: J Jesse Zhang 提交者: David Kimura

Ensure failover is complete before bringing up the mirror (#4963)

To "rebalance" a primary-mirror pair, gprecoverseg -r performs the following steps:

1.  bring down the acting primary
2.  issue a query that triggers the failover
3.  bring up the mirror (gprecoverseg -F)

Currently these 3 steps are happening in close succession. However, there is a chance that between step 2 and step 3, the mirror promotion happens slower than we expect. The implicit assumption here is that the acting mirror has finished transitioning to the primary role before step 3 is performed.

This patch adds a retry in "sort of step 2, definitely before step 3", to ensure a good state before we can bring up the mirror.

Co-authored-by: Jesse Zhang sbjesse@gmail.com
Co-authored-by: David Kimura dkimura@pivotal.io
上级 ac74f825
......@@ -6,6 +6,10 @@ from gppylib.commands.gp import GpSegStopCmd, GpRecoverseg
from gppylib.commands import base
from gppylib import gplog
from gppylib.operations.segment_reconfigurer import SegmentReconfigurer
MIRROR_PROMOTION_TIMEOUT=30
class ReconfigDetectionSQLQueryCommand(base.SQLCommand):
"""A distributed query that will cause the system to detect
......@@ -70,22 +74,9 @@ class GpSegmentRebalanceOperation:
self.logger.info("gprecoverseg will continue with a partial rebalance.")
pool.empty_completed_items()
# issue a distributed query to make sure we pick up the fault
# that we just caused by shutting down segments
conn = None
try:
self.logger.info("Triggering segment reconfiguration")
dburl = dbconn.DbURL()
conn = dbconn.connect(dburl)
cmd = ReconfigDetectionSQLQueryCommand(conn)
pool.addCommand(cmd)
pool.wait_and_printdots(1, False)
except Exception:
# This exception is expected
pass
finally:
if conn:
conn.close()
segment_reconfigurer = SegmentReconfigurer(logger=self.logger,
worker_pool=pool, timeout=MIRROR_PROMOTION_TIMEOUT)
segment_reconfigurer.reconfigure()
# Final step is to issue a recoverseg operation to resync segments
self.logger.info("Starting segment synchronization")
......
import time
from gppylib.commands import base
from gppylib.db import dbconn
import pygresql.pg
FTS_PROBE_QUERY = 'SELECT gp_request_fts_probe_scan()'
class SegmentReconfigurer:
def __init__(self, logger, worker_pool, timeout):
self.logger = logger
self.pool = worker_pool
self.timeout = timeout
def _trigger_fts_probe(self, dburl):
conn = pygresql.pg.connect(dburl.pgdb,
dburl.pghost,
dburl.pgport,
None,
dburl.pguser,
dburl.pgpass,
)
conn.query(FTS_PROBE_QUERY)
conn.close()
def reconfigure(self):
# issue a distributed query to make sure we pick up the fault
# that we just caused by shutting down segments
self.logger.info("Triggering segment reconfiguration")
dburl = dbconn.DbURL()
self._trigger_fts_probe(dburl)
start_time = time.time()
while True:
try:
# this issues a BEGIN
# so the primaries'd better be up
conn = dbconn.connect(dburl)
except Exception as e:
now = time.time()
if now < start_time + self.timeout:
continue
else:
raise RuntimeError("Mirror promotion did not complete in {0} seconds.".format(self.timeout))
else:
conn.close()
break
import datetime
import time
from gppylib.operations.segment_reconfigurer import SegmentReconfigurer, FTS_PROBE_QUERY
from gppylib.test.unit.gp_unittest import GpTestCase
from pygresql import pgdb
import mock
from mock import Mock, patch, call, MagicMock
import contextlib
import pygresql.pg
class MyDbUrl:
pass
class SegmentReconfiguerTestCase(GpTestCase):
db = 'database'
host = 'mdw'
port = 15432
user = 'postgres'
passwd = 'passwd'
timeout = 30
def setUp(self):
self.conn = Mock(name='conn')
self.logger = Mock()
self.worker_pool = Mock()
self.db_url = db_url = MyDbUrl()
db_url.pgdb = self.db
db_url.pghost = self.host
db_url.pgport = self.port
db_url.pguser = self.user
db_url.pgpass = self.passwd
self.connect = MagicMock()
cm = contextlib.nested(
patch('gppylib.db.dbconn.connect', new=self.connect),
patch('gppylib.db.dbconn.DbURL', return_value=self.db_url),
patch('pygresql.pg.connect'),
)
cm.__enter__()
self.cm = cm
def tearDown(self):
self.cm.__exit__(None, None, None)
def test_it_triggers_fts_probe(self):
reconfigurer = SegmentReconfigurer(logger=self.logger,
worker_pool=self.worker_pool, timeout=self.timeout)
reconfigurer.reconfigure()
pygresql.pg.connect.assert_has_calls([
call(self.db, self.host, self.port, None, self.user, self.passwd),
call().query(FTS_PROBE_QUERY),
call().close(),
]
)
def test_it_retries_the_connection(self):
self.connect.configure_mock(side_effect=[pgdb.DatabaseError, pgdb.DatabaseError, self.conn])
reconfigurer = SegmentReconfigurer(logger=self.logger,
worker_pool=self.worker_pool, timeout=self.timeout)
reconfigurer.reconfigure()
self.connect.assert_has_calls([call(self.db_url), call(self.db_url), call(self.db_url), ])
self.conn.close.assert_any_call()
@patch('time.time')
def test_it_gives_up_after_30_seconds(self, now_mock):
start_datetime = datetime.datetime(2018, 5, 9, 16, 0, 0)
start_time = time.mktime(start_datetime.timetuple())
now_mock.configure_mock(return_value=start_time)
def fail_for_half_a_minute():
new_time = start_time
for i in xrange(2):
# leap forward 15 seconds
new_time += self.timeout / 2
now_mock.configure_mock(return_value=new_time)
yield pgdb.DatabaseError
self.connect.configure_mock(side_effect=fail_for_half_a_minute())
reconfigurer = SegmentReconfigurer(logger=self.logger,
worker_pool=self.worker_pool, timeout=self.timeout)
with self.assertRaises(RuntimeError) as context:
reconfigurer.reconfigure()
self.assertEqual("Mirror promotion did not complete in {0} seconds.".format(self.timeout), context.exception.message)
self.connect.assert_has_calls([call(self.db_url), call(self.db_url), ])
self.conn.close.assert_has_calls([])
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册