提交 2c1b9236 编写于 作者: A Ashwin Agrawal 提交者: Asim RP

Enhance gpsegwalrep.py to support full recovery.

Mostly re-used init-mirrors for this functionality.

Author: Ashwin Agrawal <aagrawal@pivotal.io>
Author: Taylor Vesely <tvesely@pivotal.io>
上级 e799809e
......@@ -70,29 +70,40 @@ def displaySegmentConfiguration():
class InitMirrors():
''' Initialize the WAL replication mirror segment '''
def __init__(self, cluster_config, hostname):
def __init__(self, cluster_config, hostname, init=True):
self.clusterconfig = cluster_config
self.segconfigs = cluster_config.get_seg_configs()
self.hostname = hostname
self.init = init
def initThread(self, segconfig, user):
commands = []
primary_port = segconfig.port
primary_dir = segconfig.fselocation
if self.init:
primary_port = segconfig.port
primary_dir = segconfig.fselocation
mirror_dir = cluster_config.get_pair_dir(segconfig)
mirror_port = cluster_config.get_pair_port(segconfig)
else:
primary_port = cluster_config.get_pair_port(segconfig)
primary_dir = cluster_config.get_pair_dir(segconfig)
mirror_dir = segconfig.fselocation
mirror_port = segconfig.port
mirror_contentid = segconfig.content
mirror_dir = segconfig.fselocation.replace('dbfast', 'dbfast_mirror')
mirror_port = primary_port + 10000
commands.append("echo 'host replication %s samenet trust' >> %s/pg_hba.conf" % (user, primary_dir))
commands.append("pg_ctl -D %s reload" % primary_dir)
if self.init:
commands.append("echo 'host replication %s samenet trust' >> %s/pg_hba.conf" % (user, primary_dir))
commands.append("pg_ctl -D %s reload" % primary_dir)
# 1. create base backup
commands.append("rm -rf %s" % mirror_dir);
commands.append("pg_basebackup -x -R -c fast -E ./pg_log -E ./db_dumps -E ./gpperfmon/data -E ./gpperfmon/logs -D %s -h %s -p %d" % (mirror_dir, self.hostname, primary_port))
commands.append("mkdir %s/pg_log; mkdir %s/pg_xlog/archive_status" % (mirror_dir, mirror_dir))
# 2. update catalog
catalog_update_query = "select pg_catalog.gp_add_segment_mirror(%d::int2, '%s', '%s', %d, -1, '{pg_system, %s}')" % (mirror_contentid, self.hostname, self.hostname, mirror_port, mirror_dir)
commands.append("PGOPTIONS=\"-c gp_session_role=utility\" psql postgres -c \"%s\"" % catalog_update_query)
if self.init:
# 2. update catalog
catalog_update_query = "select pg_catalog.gp_add_segment_mirror(%d::int2, '%s', '%s', %d, -1, '{pg_system, %s}')" % (mirror_contentid, self.hostname, self.hostname, mirror_port, mirror_dir)
commands.append("PGOPTIONS=\"-c gp_session_role=utility\" psql postgres -c \"%s\"" % catalog_update_query)
thread_name = 'Mirror content %d' % mirror_contentid
command_finish = 'Initialized mirror at %s' % mirror_dir
......@@ -104,8 +115,11 @@ class InitMirrors():
initThreads = []
for segconfig in self.segconfigs:
assert(segconfig.preferred_role == GpSegmentConfiguration.ROLE_PRIMARY)
assert(segconfig.content != GpSegmentConfiguration.MASTER_CONTENT_ID)
if self.init:
assert(segconfig.role == GpSegmentConfiguration.ROLE_PRIMARY)
else:
assert(segconfig.role == GpSegmentConfiguration.ROLE_MIRROR)
thread = threading.Thread(target=self.initThread, args=(segconfig, user))
thread.start()
initThreads.append(thread)
......@@ -280,11 +294,12 @@ class GpSegmentConfiguration():
IN_SYNC = 's'
MASTER_CONTENT_ID = -1
def __init__(self, dbid, content, port, fselocation, preferred_role, status, mode):
def __init__(self, dbid, content, port, fselocation, role, preferred_role, status, mode):
self.dbid = dbid
self.content = content
self.port = port
self.fselocation = fselocation
self.role = role
self.preferred_role = preferred_role
self.status = status
self.mode = mode
......@@ -299,6 +314,7 @@ class ClusterConfiguration():
self.role = role
self.status = status
self.include_master = include_master
self._all_seg_configs = None
self.refresh()
def get_num_contents(self):
......@@ -307,6 +323,26 @@ class ClusterConfiguration():
def get_seg_configs(self):
return self.seg_configs;
def get_pair_port(self, input_config):
for seg_config in self._all_seg_configs:
if (seg_config.content == input_config.content
and seg_config.role != input_config.role):
return seg_config.port
assert(input_config.role == GpSegmentConfiguration.ROLE_PRIMARY)
''' if not found then assume its mirror and hence return port at which mirror must be created '''
return input_config.port + 10000
def get_pair_dir(self, input_config):
for seg_config in self._all_seg_configs:
if (seg_config.content == input_config.content
and seg_config.role != input_config.role):
return seg_config.fselocation
assert(input_config.role == GpSegmentConfiguration.ROLE_PRIMARY)
''' if not found then assume its mirror and hence return location at which mirror must be created '''
return input_config.fselocation.replace('dbfast', 'dbfast_mirror')
def get_gp_segment_ids(self):
ids = []
for seg_config in self.seg_configs:
......@@ -314,19 +350,10 @@ class ClusterConfiguration():
return ','.join(ids)
def refresh(self):
query = ("SELECT dbid, content, port, fselocation, preferred_role, status, mode "
query = ("SELECT dbid, content, port, fselocation, role, preferred_role, status, mode "
"FROM gp_segment_configuration s, pg_filespace_entry f "
"WHERE s.dbid = fsedbid")
if self.status != "all":
query += " and s.status = '" + self.status + "'"
if self.role != "all":
query += " and s.role = '" + self.role + "'"
if not self.include_master:
query += " and s.content != " + str(GpSegmentConfiguration.MASTER_CONTENT_ID)
print '%s: fetching cluster configuration' % (datetime.datetime.now())
dburl = dbconn.DbURL(self.hostname, self.port, self.dbname)
print '%s: fetched cluster configuration' % (datetime.datetime.now())
......@@ -338,11 +365,27 @@ class ClusterConfiguration():
print e
sys.exit(1)
self._all_seg_configs = []
self.seg_configs = []
self.num_contents = 0
for result in resultsets:
seg_config = GpSegmentConfiguration(result[0], result[1], result[2], result[3], result[4], result[5], result[6])
self.seg_configs.append(seg_config)
seg_config = GpSegmentConfiguration(result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7])
self._all_seg_configs.append(seg_config)
append = True
if (self.status != "all"
and self.status != seg_config.status):
append = False
if (self.role != "all"
and self.role != seg_config.role):
append = False
if (not self.include_master
and seg_config.content == GpSegmentConfiguration.MASTER_CONTENT_ID):
append = False
if append:
self.seg_configs.append(seg_config)
# Count primary segments
if (seg_config.preferred_role == GpSegmentConfiguration.ROLE_PRIMARY
......@@ -368,6 +411,7 @@ class ColdMasterClusterConfiguration(ClusterConfiguration):
master_seg_config = GpSegmentConfiguration(1, GpSegmentConfiguration.MASTER_CONTENT_ID,
port, master_directory,
GpSegmentConfiguration.ROLE_PRIMARY,
GpSegmentConfiguration.ROLE_PRIMARY,
GpSegmentConfiguration.STATUS_DOWN,
GpSegmentConfiguration.NOT_IN_SYNC)
self.seg_configs.append(master_seg_config)
......@@ -384,7 +428,7 @@ def defargs():
help='Master port to get segment config information from')
parser.add_argument('--database', type=str, required=False, default='postgres',
help='Database name to get segment config information from')
parser.add_argument('operation', type=str, choices=['clusterstart', 'clusterstop', 'init', 'start', 'stop', 'destroy', 'recover'])
parser.add_argument('operation', type=str, choices=['clusterstart', 'clusterstop', 'init', 'start', 'stop', 'destroy', 'recover', 'recoverfull'])
return parser.parse_args()
......@@ -501,6 +545,14 @@ if __name__ == "__main__":
StopInstances(cluster_config).run()
sys.exit(1)
ForceFTSProbeScan(cluster_config, GpSegmentConfiguration.STATUS_UP, GpSegmentConfiguration.IN_SYNC)
elif args.operation == 'recoverfull':
cluster_config = ClusterConfiguration(args.host, args.port, args.database,
role=GpSegmentConfiguration.ROLE_MIRROR,
status=GpSegmentConfiguration.STATUS_DOWN)
if len(cluster_config.seg_configs) > 0:
InitMirrors(cluster_config, args.host, False).run()
StartInstances(cluster_config, args.host).run()
ForceFTSProbeScan(cluster_config, GpSegmentConfiguration.STATUS_UP, GpSegmentConfiguration.IN_SYNC)
elif args.operation == 'stop':
cluster_config = ClusterConfiguration(args.host, args.port, args.database,
role=GpSegmentConfiguration.ROLE_MIRROR,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册