From 2c1b9236e06b06d3bd2db6c6e099450aaf2cab0d Mon Sep 17 00:00:00 2001 From: Ashwin Agrawal Date: Fri, 15 Dec 2017 17:36:26 -0800 Subject: [PATCH] Enhance gpsegwalrep.py to support full recovery. Mostly re-used init-mirrors for this functionality. Author: Ashwin Agrawal Author: Taylor Vesely --- gpAux/gpdemo/gpsegwalrep.py | 102 +++++++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 25 deletions(-) diff --git a/gpAux/gpdemo/gpsegwalrep.py b/gpAux/gpdemo/gpsegwalrep.py index ef2ea2e409..7a58b2c159 100755 --- a/gpAux/gpdemo/gpsegwalrep.py +++ b/gpAux/gpdemo/gpsegwalrep.py @@ -70,29 +70,40 @@ def displaySegmentConfiguration(): class InitMirrors(): ''' Initialize the WAL replication mirror segment ''' - def __init__(self, cluster_config, hostname): + def __init__(self, cluster_config, hostname, init=True): self.clusterconfig = cluster_config self.segconfigs = cluster_config.get_seg_configs() self.hostname = hostname + self.init = init def initThread(self, segconfig, user): commands = [] - primary_port = segconfig.port - primary_dir = segconfig.fselocation + if self.init: + primary_port = segconfig.port + primary_dir = segconfig.fselocation + mirror_dir = cluster_config.get_pair_dir(segconfig) + mirror_port = cluster_config.get_pair_port(segconfig) + else: + primary_port = cluster_config.get_pair_port(segconfig) + primary_dir = cluster_config.get_pair_dir(segconfig) + mirror_dir = segconfig.fselocation + mirror_port = segconfig.port + mirror_contentid = segconfig.content - mirror_dir = segconfig.fselocation.replace('dbfast', 'dbfast_mirror') - mirror_port = primary_port + 10000 - commands.append("echo 'host replication %s samenet trust' >> %s/pg_hba.conf" % (user, primary_dir)) - commands.append("pg_ctl -D %s reload" % primary_dir) + if self.init: + commands.append("echo 'host replication %s samenet trust' >> %s/pg_hba.conf" % (user, primary_dir)) + commands.append("pg_ctl -D %s reload" % primary_dir) # 1. create base backup + commands.append("rm -rf %s" % mirror_dir); commands.append("pg_basebackup -x -R -c fast -E ./pg_log -E ./db_dumps -E ./gpperfmon/data -E ./gpperfmon/logs -D %s -h %s -p %d" % (mirror_dir, self.hostname, primary_port)) commands.append("mkdir %s/pg_log; mkdir %s/pg_xlog/archive_status" % (mirror_dir, mirror_dir)) - # 2. update catalog - catalog_update_query = "select pg_catalog.gp_add_segment_mirror(%d::int2, '%s', '%s', %d, -1, '{pg_system, %s}')" % (mirror_contentid, self.hostname, self.hostname, mirror_port, mirror_dir) - commands.append("PGOPTIONS=\"-c gp_session_role=utility\" psql postgres -c \"%s\"" % catalog_update_query) + if self.init: + # 2. update catalog + catalog_update_query = "select pg_catalog.gp_add_segment_mirror(%d::int2, '%s', '%s', %d, -1, '{pg_system, %s}')" % (mirror_contentid, self.hostname, self.hostname, mirror_port, mirror_dir) + commands.append("PGOPTIONS=\"-c gp_session_role=utility\" psql postgres -c \"%s\"" % catalog_update_query) thread_name = 'Mirror content %d' % mirror_contentid command_finish = 'Initialized mirror at %s' % mirror_dir @@ -104,8 +115,11 @@ class InitMirrors(): initThreads = [] for segconfig in self.segconfigs: - assert(segconfig.preferred_role == GpSegmentConfiguration.ROLE_PRIMARY) assert(segconfig.content != GpSegmentConfiguration.MASTER_CONTENT_ID) + if self.init: + assert(segconfig.role == GpSegmentConfiguration.ROLE_PRIMARY) + else: + assert(segconfig.role == GpSegmentConfiguration.ROLE_MIRROR) thread = threading.Thread(target=self.initThread, args=(segconfig, user)) thread.start() initThreads.append(thread) @@ -280,11 +294,12 @@ class GpSegmentConfiguration(): IN_SYNC = 's' MASTER_CONTENT_ID = -1 - def __init__(self, dbid, content, port, fselocation, preferred_role, status, mode): + def __init__(self, dbid, content, port, fselocation, role, preferred_role, status, mode): self.dbid = dbid self.content = content self.port = port self.fselocation = fselocation + self.role = role self.preferred_role = preferred_role self.status = status self.mode = mode @@ -299,6 +314,7 @@ class ClusterConfiguration(): self.role = role self.status = status self.include_master = include_master + self._all_seg_configs = None self.refresh() def get_num_contents(self): @@ -307,6 +323,26 @@ class ClusterConfiguration(): def get_seg_configs(self): return self.seg_configs; + def get_pair_port(self, input_config): + for seg_config in self._all_seg_configs: + if (seg_config.content == input_config.content + and seg_config.role != input_config.role): + return seg_config.port + + assert(input_config.role == GpSegmentConfiguration.ROLE_PRIMARY) + ''' if not found then assume its mirror and hence return port at which mirror must be created ''' + return input_config.port + 10000 + + def get_pair_dir(self, input_config): + for seg_config in self._all_seg_configs: + if (seg_config.content == input_config.content + and seg_config.role != input_config.role): + return seg_config.fselocation + + assert(input_config.role == GpSegmentConfiguration.ROLE_PRIMARY) + ''' if not found then assume its mirror and hence return location at which mirror must be created ''' + return input_config.fselocation.replace('dbfast', 'dbfast_mirror') + def get_gp_segment_ids(self): ids = [] for seg_config in self.seg_configs: @@ -314,19 +350,10 @@ class ClusterConfiguration(): return ','.join(ids) def refresh(self): - query = ("SELECT dbid, content, port, fselocation, preferred_role, status, mode " + query = ("SELECT dbid, content, port, fselocation, role, preferred_role, status, mode " "FROM gp_segment_configuration s, pg_filespace_entry f " "WHERE s.dbid = fsedbid") - if self.status != "all": - query += " and s.status = '" + self.status + "'" - - if self.role != "all": - query += " and s.role = '" + self.role + "'" - - if not self.include_master: - query += " and s.content != " + str(GpSegmentConfiguration.MASTER_CONTENT_ID) - print '%s: fetching cluster configuration' % (datetime.datetime.now()) dburl = dbconn.DbURL(self.hostname, self.port, self.dbname) print '%s: fetched cluster configuration' % (datetime.datetime.now()) @@ -338,11 +365,27 @@ class ClusterConfiguration(): print e sys.exit(1) + self._all_seg_configs = [] self.seg_configs = [] self.num_contents = 0 for result in resultsets: - seg_config = GpSegmentConfiguration(result[0], result[1], result[2], result[3], result[4], result[5], result[6]) - self.seg_configs.append(seg_config) + seg_config = GpSegmentConfiguration(result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7]) + self._all_seg_configs.append(seg_config) + + append = True + if (self.status != "all" + and self.status != seg_config.status): + append = False + if (self.role != "all" + and self.role != seg_config.role): + append = False + + if (not self.include_master + and seg_config.content == GpSegmentConfiguration.MASTER_CONTENT_ID): + append = False + + if append: + self.seg_configs.append(seg_config) # Count primary segments if (seg_config.preferred_role == GpSegmentConfiguration.ROLE_PRIMARY @@ -368,6 +411,7 @@ class ColdMasterClusterConfiguration(ClusterConfiguration): master_seg_config = GpSegmentConfiguration(1, GpSegmentConfiguration.MASTER_CONTENT_ID, port, master_directory, GpSegmentConfiguration.ROLE_PRIMARY, + GpSegmentConfiguration.ROLE_PRIMARY, GpSegmentConfiguration.STATUS_DOWN, GpSegmentConfiguration.NOT_IN_SYNC) self.seg_configs.append(master_seg_config) @@ -384,7 +428,7 @@ def defargs(): help='Master port to get segment config information from') parser.add_argument('--database', type=str, required=False, default='postgres', help='Database name to get segment config information from') - parser.add_argument('operation', type=str, choices=['clusterstart', 'clusterstop', 'init', 'start', 'stop', 'destroy', 'recover']) + parser.add_argument('operation', type=str, choices=['clusterstart', 'clusterstop', 'init', 'start', 'stop', 'destroy', 'recover', 'recoverfull']) return parser.parse_args() @@ -501,6 +545,14 @@ if __name__ == "__main__": StopInstances(cluster_config).run() sys.exit(1) ForceFTSProbeScan(cluster_config, GpSegmentConfiguration.STATUS_UP, GpSegmentConfiguration.IN_SYNC) + elif args.operation == 'recoverfull': + cluster_config = ClusterConfiguration(args.host, args.port, args.database, + role=GpSegmentConfiguration.ROLE_MIRROR, + status=GpSegmentConfiguration.STATUS_DOWN) + if len(cluster_config.seg_configs) > 0: + InitMirrors(cluster_config, args.host, False).run() + StartInstances(cluster_config, args.host).run() + ForceFTSProbeScan(cluster_config, GpSegmentConfiguration.STATUS_UP, GpSegmentConfiguration.IN_SYNC) elif args.operation == 'stop': cluster_config = ClusterConfiguration(args.host, args.port, args.database, role=GpSegmentConfiguration.ROLE_MIRROR, -- GitLab