From 26d4fe1080f01a87153d08b1c2de2173444a9ea6 Mon Sep 17 00:00:00 2001 From: Ashwin Agrawal Date: Thu, 21 Sep 2017 18:10:03 -0700 Subject: [PATCH] Refactor the gpsegwalrep.py Adding timestamp to better trace the command. Adding ClusterConfiguration to improve the reuse, and remove ColdStartMaster class. Signed-off-by: Xin Zhang --- gpAux/gpdemo/gpsegwalrep.py | 130 +++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 61 deletions(-) diff --git a/gpAux/gpdemo/gpsegwalrep.py b/gpAux/gpdemo/gpsegwalrep.py index f140b8883a..e5522871e1 100755 --- a/gpAux/gpdemo/gpsegwalrep.py +++ b/gpAux/gpdemo/gpsegwalrep.py @@ -30,6 +30,7 @@ import os import sys import subprocess import threading +import datetime from gppylib.db import dbconn PRINT_LOCK = threading.Lock() @@ -40,7 +41,7 @@ def runcommands(commands, thread_name, command_finish, exit_on_error=True): for command in commands: try: - output.append('Running command... %s' % command) + output.append('%s: Running command... %s' % (datetime.datetime.now(), command)) with THREAD_LOCK: output = output + subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).split('\n') except subprocess.CalledProcessError, e: @@ -54,7 +55,7 @@ def runcommands(commands, thread_name, command_finish, exit_on_error=True): sys.exit(e.returncode) - output.append(command_finish) + output.append('%s: %s' % (datetime.datetime.now(), command_finish)) with PRINT_LOCK: for line in output: print '%s: %s' % (thread_name, line) @@ -63,8 +64,9 @@ def runcommands(commands, thread_name, command_finish, exit_on_error=True): class InitMirrors(): ''' Initialize the WAL replication mirror segment ''' - def __init__(self, segconfigs, hostname): - self.segconfigs = segconfigs + def __init__(self, cluster_config, hostname): + self.clusterconfig = cluster_config + self.segconfigs = cluster_config.get_seg_configs() self.hostname = hostname def initThread(self, segconfig, user): @@ -77,13 +79,15 @@ class InitMirrors(): commands.append("echo 'host replication %s samenet trust' >> %s/pg_hba.conf" % (user, primary_dir)) commands.append("pg_ctl -D %s reload" % primary_dir) + + # 1. create base backup commands.append("pg_basebackup -x -R -c fast -E ./pg_log -E ./db_dumps -E ./gpperfmon/data -E ./gpperfmon/logs -D %s -h %s -p %d" % (mirror_dir, self.hostname, primary_port)) commands.append("mkdir %s/pg_log; mkdir %s/pg_xlog/archive_status" % (mirror_dir, mirror_dir)) + # 2. update catalog catalog_update_query = "select pg_catalog.gp_add_segment_mirror(%d::int2, '%s', '%s', %d, -1, '{pg_system, %s}')" % (mirror_contentid, self.hostname, self.hostname, mirror_port, mirror_dir) commands.append("PGOPTIONS=\"-c gp_session_role=utility\" psql postgres -c \"%s\"" % catalog_update_query) - thread_name = 'Mirror content %d' % mirror_contentid command_finish = 'Initialized mirror at %s' % mirror_dir runcommands(commands, thread_name, command_finish) @@ -102,7 +106,6 @@ class InitMirrors(): commands.append("gpstop -u"); runcommands(commands, "Main Mirror Init", "Notified primaries of mirror addition") - initThreads = [] for segconfig in self.segconfigs: if segconfig[4] == 'p' and segconfig[1] != -1: @@ -116,12 +119,12 @@ class InitMirrors(): class StartInstances(): ''' Start a greenplum segment ''' - def __init__(self, segconfigs, host, segment_type='all', wait=False): - self.segconfigs = segconfigs + def __init__(self, cluster_config, host, segment_type='all', wait=False): + self.clusterconfig = cluster_config + self.segconfigs = cluster_config.get_seg_configs() self.host = host self.segment_type = segment_type self.wait = wait - self.__num_contents = None def startThread(self, segconfig): commands = [] @@ -130,14 +133,14 @@ class StartInstances(): contentid = segconfig[1] segment_port = segconfig[2] segment_dir = segconfig[3] - segment_role = self.getRole(contentid) + segment_role = StartInstances.getRole(contentid) # Need to set the dbid to 0 on segments to prevent use in mmxlog records if contentid != -1: dbid = 0 opts = "-p %d --gp_dbid=%d --silent-mode=true -i -M %s --gp_contentid=%d --gp_num_contents_in_cluster=%d" % \ - (segment_port, dbid, segment_role, contentid, self.num_contents) + (segment_port, dbid, segment_role, contentid, self.clusterconfig.get_num_contents()) # Arguments for the master. -x sets the dbid for the standby master. Hardcoded to 0 for now, but may need to be # refactored when we start to focus on the standby master. @@ -163,19 +166,8 @@ class StartInstances(): command_finish = 'Started %s segment with content %d and port %d at %s' % (segment_label, contentid, segment_port, segment_dir) runcommands(commands, thread_name, command_finish) - @property - def num_contents(self): - if self.__num_contents == None: - self.__num_contents = 0 - for segconfig in self.segconfigs: - # Count primary segments - if segconfig[4] == 'p' and segconfig[1] != -1: - self.__num_contents += 1 - - return self.__num_contents - - - def getRole(self, contentid): + @staticmethod + def getRole(contentid): if contentid == -1: return 'master' else: @@ -192,26 +184,12 @@ class StartInstances(): for thread in startThreads: thread.join() -class ColdStartMaster(StartInstances): - ''' Start the greenplum master segment''' - - def __init__(self, host, port, master_directory, num_contents=3): - self.segconfigs = [[0]*5] - self.segconfigs[0][0] = 1 # dbid - self.segconfigs[0][1] = -1 # content - self.segconfigs[0][2] = port - self.segconfigs[0][3] = master_directory - self.segconfigs[0][4] = 'p' # preferred role - self.num_contents = num_contents - self.host = host - self.segment_type = 'p' - self.wait = True - class StopInstances(): ''' Stop all segments''' - def __init__(self, segconfigs, segment_type='all'): - self.segconfigs = segconfigs + def __init__(self, cluster_config, segment_type='all'): + self.clusterconfig = cluster_config + self.segconfigs = cluster_config.get_seg_configs() self.segment_type = segment_type def stopThread(self, segconfig): @@ -258,8 +236,9 @@ class StopInstances(): class DestroyMirrors(): ''' Destroy the WAL replication mirror segment ''' - def __init__(self, segconfigs): - self.segconfigs = segconfigs + def __init__(self, cluster_config): + self.clusterconfig = cluster_config + self.segconfigs = cluster_config.get_seg_configs() def destroyThread(self, segconfig): commands = [] @@ -298,18 +277,46 @@ class DestroyMirrors(): commands.append("gpstop -u"); runcommands(commands, "Main Mirror Destroy", "Notified primaries of mirror removal") -def getSegInfo(hostname, port, dbname): - query = "SELECT dbid, content, port, fselocation, preferred_role FROM gp_segment_configuration s, pg_filespace_entry f WHERE s.dbid = fsedbid" - dburl = dbconn.DbURL(hostname, port, dbname) +class ClusterConfiguration(): + ''' Cluster configuration ''' + + def __init__(self, hostname, port, dbname): + query = "SELECT dbid, content, port, fselocation, preferred_role FROM gp_segment_configuration s, pg_filespace_entry f WHERE s.dbid = fsedbid" + print '%s: fetching cluster configuration' % (datetime.datetime.now()) + dburl = dbconn.DbURL(hostname, port, dbname) + print '%s: fetched cluster configuration' % (datetime.datetime.now()) + + try: + with dbconn.connect(dburl, utility=True) as conn: + self.seg_configs = dbconn.execSQL(conn, query).fetchall() + except Exception, e: + print e + sys.exit(1) + + self.num_contents = 0; + for seg_config in self.seg_configs: + # Count primary segments + if seg_config[4] == 'p' and seg_config[1] != -1: + self.num_contents += 1 + + def get_num_contents(self): + return self.num_contents; + + def get_seg_configs(self): + return self.seg_configs; + +class ColdMasterClusterConfiguration(ClusterConfiguration): - try: - with dbconn.connect(dburl, utility=True) as conn: - segconfigs = dbconn.execSQL(conn, query).fetchall() - except Exception, e: - print e - sys.exit(1) + # this constructor is only used for ColdStartMaster + def __init__(self, port, master_directory): + self.seg_configs = [[0]*5] + self.seg_configs[0][0] = 1 # dbid + self.seg_configs[0][1] = -1 # content + self.seg_configs[0][2] = port + self.seg_configs[0][3] = master_directory + self.seg_configs[0][4] = 'p' # preferred role - return segconfigs + self.num_contents = 1 # need to be > 1 to avoid assert failure def defargs(): parser = argparse.ArgumentParser(description='Initialize, start, stop, or destroy WAL replication mirror segments') @@ -331,22 +338,23 @@ if __name__ == "__main__": # If we are starting the cluster, we need to start the master before we get the segment info if args.operation == 'clusterstart': - ColdStartMaster(args.host, int(args.port), args.master_directory).run() + cold_master_cluster_config = ColdMasterClusterConfiguration(int(args.port), args.master_directory) + StartInstances(cold_master_cluster_config, args.host, segment_type='p', wait=True).run() # Get information on all segments - segconfigs = getSegInfo(args.host, args.port, args.database) + cluster_config = ClusterConfiguration(args.host, args.port, args.database) if args.operation == 'clusterstart': - StopInstances(segconfigs, 'master').run() + StopInstances(cold_master_cluster_config, 'master').run() # Execute the chosen operation if args.operation == 'init': - InitMirrors(segconfigs, args.host).run() + InitMirrors(cluster_config, args.host).run() elif args.operation == 'clusterstart': - StartInstances(segconfigs, args.host).run() + StartInstances(cluster_config, args.host).run() elif args.operation == 'start': - StartInstances(segconfigs, args.host, 'm').run() + StartInstances(cluster_config, args.host, segment_type='m').run() elif args.operation == 'stop': - StopInstances(segconfigs, 'm').run() + StopInstances(cluster_config, segment_type='m').run() elif args.operation == 'destroy': - DestroyMirrors(segconfigs).run() + DestroyMirrors(cluster_config).run() -- GitLab