提交 26d4fe10 编写于 作者: A Ashwin Agrawal 提交者: Xin Zhang

Refactor the gpsegwalrep.py

Adding timestamp to better trace the command.

Adding ClusterConfiguration to improve the reuse, and remove
ColdStartMaster class.
Signed-off-by: NXin Zhang <xzhang@pivotal.io>
上级 efed2fcc
......@@ -30,6 +30,7 @@ import os
import sys
import subprocess
import threading
import datetime
from gppylib.db import dbconn
PRINT_LOCK = threading.Lock()
......@@ -40,7 +41,7 @@ def runcommands(commands, thread_name, command_finish, exit_on_error=True):
for command in commands:
try:
output.append('Running command... %s' % command)
output.append('%s: Running command... %s' % (datetime.datetime.now(), command))
with THREAD_LOCK:
output = output + subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).split('\n')
except subprocess.CalledProcessError, e:
......@@ -54,7 +55,7 @@ def runcommands(commands, thread_name, command_finish, exit_on_error=True):
sys.exit(e.returncode)
output.append(command_finish)
output.append('%s: %s' % (datetime.datetime.now(), command_finish))
with PRINT_LOCK:
for line in output:
print '%s: %s' % (thread_name, line)
......@@ -63,8 +64,9 @@ def runcommands(commands, thread_name, command_finish, exit_on_error=True):
class InitMirrors():
''' Initialize the WAL replication mirror segment '''
def __init__(self, segconfigs, hostname):
self.segconfigs = segconfigs
def __init__(self, cluster_config, hostname):
self.clusterconfig = cluster_config
self.segconfigs = cluster_config.get_seg_configs()
self.hostname = hostname
def initThread(self, segconfig, user):
......@@ -77,13 +79,15 @@ class InitMirrors():
commands.append("echo 'host replication %s samenet trust' >> %s/pg_hba.conf" % (user, primary_dir))
commands.append("pg_ctl -D %s reload" % primary_dir)
# 1. create base backup
commands.append("pg_basebackup -x -R -c fast -E ./pg_log -E ./db_dumps -E ./gpperfmon/data -E ./gpperfmon/logs -D %s -h %s -p %d" % (mirror_dir, self.hostname, primary_port))
commands.append("mkdir %s/pg_log; mkdir %s/pg_xlog/archive_status" % (mirror_dir, mirror_dir))
# 2. update catalog
catalog_update_query = "select pg_catalog.gp_add_segment_mirror(%d::int2, '%s', '%s', %d, -1, '{pg_system, %s}')" % (mirror_contentid, self.hostname, self.hostname, mirror_port, mirror_dir)
commands.append("PGOPTIONS=\"-c gp_session_role=utility\" psql postgres -c \"%s\"" % catalog_update_query)
thread_name = 'Mirror content %d' % mirror_contentid
command_finish = 'Initialized mirror at %s' % mirror_dir
runcommands(commands, thread_name, command_finish)
......@@ -102,7 +106,6 @@ class InitMirrors():
commands.append("gpstop -u");
runcommands(commands, "Main Mirror Init", "Notified primaries of mirror addition")
initThreads = []
for segconfig in self.segconfigs:
if segconfig[4] == 'p' and segconfig[1] != -1:
......@@ -116,12 +119,12 @@ class InitMirrors():
class StartInstances():
''' Start a greenplum segment '''
def __init__(self, segconfigs, host, segment_type='all', wait=False):
self.segconfigs = segconfigs
def __init__(self, cluster_config, host, segment_type='all', wait=False):
self.clusterconfig = cluster_config
self.segconfigs = cluster_config.get_seg_configs()
self.host = host
self.segment_type = segment_type
self.wait = wait
self.__num_contents = None
def startThread(self, segconfig):
commands = []
......@@ -130,14 +133,14 @@ class StartInstances():
contentid = segconfig[1]
segment_port = segconfig[2]
segment_dir = segconfig[3]
segment_role = self.getRole(contentid)
segment_role = StartInstances.getRole(contentid)
# Need to set the dbid to 0 on segments to prevent use in mmxlog records
if contentid != -1:
dbid = 0
opts = "-p %d --gp_dbid=%d --silent-mode=true -i -M %s --gp_contentid=%d --gp_num_contents_in_cluster=%d" % \
(segment_port, dbid, segment_role, contentid, self.num_contents)
(segment_port, dbid, segment_role, contentid, self.clusterconfig.get_num_contents())
# Arguments for the master. -x sets the dbid for the standby master. Hardcoded to 0 for now, but may need to be
# refactored when we start to focus on the standby master.
......@@ -163,19 +166,8 @@ class StartInstances():
command_finish = 'Started %s segment with content %d and port %d at %s' % (segment_label, contentid, segment_port, segment_dir)
runcommands(commands, thread_name, command_finish)
@property
def num_contents(self):
if self.__num_contents == None:
self.__num_contents = 0
for segconfig in self.segconfigs:
# Count primary segments
if segconfig[4] == 'p' and segconfig[1] != -1:
self.__num_contents += 1
return self.__num_contents
def getRole(self, contentid):
@staticmethod
def getRole(contentid):
if contentid == -1:
return 'master'
else:
......@@ -192,26 +184,12 @@ class StartInstances():
for thread in startThreads:
thread.join()
class ColdStartMaster(StartInstances):
''' Start the greenplum master segment'''
def __init__(self, host, port, master_directory, num_contents=3):
self.segconfigs = [[0]*5]
self.segconfigs[0][0] = 1 # dbid
self.segconfigs[0][1] = -1 # content
self.segconfigs[0][2] = port
self.segconfigs[0][3] = master_directory
self.segconfigs[0][4] = 'p' # preferred role
self.num_contents = num_contents
self.host = host
self.segment_type = 'p'
self.wait = True
class StopInstances():
''' Stop all segments'''
def __init__(self, segconfigs, segment_type='all'):
self.segconfigs = segconfigs
def __init__(self, cluster_config, segment_type='all'):
self.clusterconfig = cluster_config
self.segconfigs = cluster_config.get_seg_configs()
self.segment_type = segment_type
def stopThread(self, segconfig):
......@@ -258,8 +236,9 @@ class StopInstances():
class DestroyMirrors():
''' Destroy the WAL replication mirror segment '''
def __init__(self, segconfigs):
self.segconfigs = segconfigs
def __init__(self, cluster_config):
self.clusterconfig = cluster_config
self.segconfigs = cluster_config.get_seg_configs()
def destroyThread(self, segconfig):
commands = []
......@@ -298,18 +277,46 @@ class DestroyMirrors():
commands.append("gpstop -u");
runcommands(commands, "Main Mirror Destroy", "Notified primaries of mirror removal")
def getSegInfo(hostname, port, dbname):
query = "SELECT dbid, content, port, fselocation, preferred_role FROM gp_segment_configuration s, pg_filespace_entry f WHERE s.dbid = fsedbid"
dburl = dbconn.DbURL(hostname, port, dbname)
class ClusterConfiguration():
''' Cluster configuration '''
def __init__(self, hostname, port, dbname):
query = "SELECT dbid, content, port, fselocation, preferred_role FROM gp_segment_configuration s, pg_filespace_entry f WHERE s.dbid = fsedbid"
print '%s: fetching cluster configuration' % (datetime.datetime.now())
dburl = dbconn.DbURL(hostname, port, dbname)
print '%s: fetched cluster configuration' % (datetime.datetime.now())
try:
with dbconn.connect(dburl, utility=True) as conn:
self.seg_configs = dbconn.execSQL(conn, query).fetchall()
except Exception, e:
print e
sys.exit(1)
self.num_contents = 0;
for seg_config in self.seg_configs:
# Count primary segments
if seg_config[4] == 'p' and seg_config[1] != -1:
self.num_contents += 1
def get_num_contents(self):
return self.num_contents;
def get_seg_configs(self):
return self.seg_configs;
class ColdMasterClusterConfiguration(ClusterConfiguration):
try:
with dbconn.connect(dburl, utility=True) as conn:
segconfigs = dbconn.execSQL(conn, query).fetchall()
except Exception, e:
print e
sys.exit(1)
# this constructor is only used for ColdStartMaster
def __init__(self, port, master_directory):
self.seg_configs = [[0]*5]
self.seg_configs[0][0] = 1 # dbid
self.seg_configs[0][1] = -1 # content
self.seg_configs[0][2] = port
self.seg_configs[0][3] = master_directory
self.seg_configs[0][4] = 'p' # preferred role
return segconfigs
self.num_contents = 1 # need to be > 1 to avoid assert failure
def defargs():
parser = argparse.ArgumentParser(description='Initialize, start, stop, or destroy WAL replication mirror segments')
......@@ -331,22 +338,23 @@ if __name__ == "__main__":
# If we are starting the cluster, we need to start the master before we get the segment info
if args.operation == 'clusterstart':
ColdStartMaster(args.host, int(args.port), args.master_directory).run()
cold_master_cluster_config = ColdMasterClusterConfiguration(int(args.port), args.master_directory)
StartInstances(cold_master_cluster_config, args.host, segment_type='p', wait=True).run()
# Get information on all segments
segconfigs = getSegInfo(args.host, args.port, args.database)
cluster_config = ClusterConfiguration(args.host, args.port, args.database)
if args.operation == 'clusterstart':
StopInstances(segconfigs, 'master').run()
StopInstances(cold_master_cluster_config, 'master').run()
# Execute the chosen operation
if args.operation == 'init':
InitMirrors(segconfigs, args.host).run()
InitMirrors(cluster_config, args.host).run()
elif args.operation == 'clusterstart':
StartInstances(segconfigs, args.host).run()
StartInstances(cluster_config, args.host).run()
elif args.operation == 'start':
StartInstances(segconfigs, args.host, 'm').run()
StartInstances(cluster_config, args.host, segment_type='m').run()
elif args.operation == 'stop':
StopInstances(segconfigs, 'm').run()
StopInstances(cluster_config, segment_type='m').run()
elif args.operation == 'destroy':
DestroyMirrors(segconfigs).run()
DestroyMirrors(cluster_config).run()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册