未验证 提交 90357195 编写于 作者: Z Zhenghua Lyu 提交者: GitHub

Correctly handle tablespace for GPexpand

GPexpand should take tablespaces into consideration. This means:

1. during the interview step stage, if there are user-created
     tablespaces in GreeplumDB cluster, we should generate a
     tablespace input file for GPexpand. User can modify it to
     use customized tablespace location on each new segment (Since
     we support create tablespace ... with (contentid=loc) syntax).

  2. during the template generatition stage, we should pack tablespace
     files generated by pg_basebackup into the template, and saves the
     restore paths into a json config file which is also a part of the
     template.

  3. During the stage of config a new segment, we should restore the
     tablespace files for primary. For mirrors, we just invoke pg_basebackup
     which has already handled tablespace cases.

An example of the whole process is shown below:

  * suppose there are tablespaces in the GreenplumDB cluster
  * tbs1 (oid1) -> location1; tbs2 (oid2) -> location2
  * and new segments are dbid1(primary), dbid2(mirror)

The tablespace input file will be:
```
  line1: tableSpaceNameOrders=tbs1:tbs2
  line2: tableSpaceOidOrders=oid1:oid2
  dbid1:loc1:loc2
```

The template's pg_tblspc dir will be:
  oid1
  oid2
  dumps
     |_ oid1
     |     |__ xxx_dbDummyID
     |
     |_ oid2
           |__ xxx_dbDummyID
  newTableSpaceInfo.json
上级 6ca08c65
......@@ -9,7 +9,10 @@ from gppylib.mainUtils import getProgramName
import copy
import datetime
import os
import random
import sys
import json
import shutil
import signal
import traceback
from time import strftime, sleep
......@@ -615,10 +618,11 @@ class SegmentTemplate:
return consolidatedHosts
def build_segment_template(self):
def build_segment_template(self, newTableSpaceInfo=None):
"""Builds segment template tar file"""
self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_STARTED', self.tempDir)
self._create_template()
# build segment template should consider tablespace files
self._create_template(newTableSpaceInfo)
self._fixup_template()
self._tar_template()
self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_DONE')
......@@ -632,7 +636,7 @@ class SegmentTemplate:
numNewSegments = len(self.gparray.getExpansionSegDbList())
self.statusLogger.set_status('BUILD_SEGMENTS_DONE', numNewSegments)
def _create_template(self):
def _create_template(self, newTableSpaceInfo=None):
"""Creates the schema template that is used by new segments"""
self.logger.info('Creating segment template')
......@@ -642,17 +646,89 @@ class SegmentTemplate:
self.conn.close()
# pg_basebackup in GreenplumDB need a parameter 'target_gp_dbid'
# it uses it to create tablespace-related data dirs in corresponding
# tablespace locations. Here we are using pg_basebackup to create
# a template for gpexpand so we can just provide a dummyDBID
# to make sure the name is different from any dbids in the system.
dummyDBID = self._gen_DummyDBID()
try:
masterSeg = self.gparray.master
cmd = PgBaseBackup(pgdata=self.tempDir,
host=masterSeg.getSegmentHostName(),
port=str(masterSeg.getSegmentPort()),
recovery_mode=False,
target_gp_dbid=masterSeg.getSegmentDbId())
target_gp_dbid=dummyDBID)
cmd.run(validateAfter=True)
except Exception, msg:
raise SegmentTemplateError(msg)
# new segments' tablespace info is loaded from tablespace
# input files if provided. If newTableSpaceInfo is None,
# then there are no user-created tablespaces in the system,
# no need to consider tablespace problems in gpexpand.
if newTableSpaceInfo:
self._handle_tablespace_template(dummyDBID, newTableSpaceInfo)
def _handle_tablespace_template(self, dummyDBID, newTableSpaceInfo):
"""
If there are user-created tablespaces in GreenplumDB cluster, we
have to pack them into the template. The logic here contains two
steps:
1. move the tablespace files generated by pg_basebackup into the
directory `tempDir/pg_tblspc/dumps`.
2. save the restore-paths of tablespace files on newsegments in
a json file `tempDir/pg_tblspc/newTableSpaceInfo.json`
newTableSpaceInfo is a python dict, its type spec is:
newTableSpaceInfo :: {
"names" : [ name::string ],
"oids" : [ oid::string ],
"details" : {
dbid::string : [ location::string ]
}
}
newTableSpaceInfo[names] and newTableSpaceInfo[oids] are tablespace infos
that are in the same order.
newTableSpaceInfo[dbid] is a list of locations in the same order of oids.
"""
master_tblspc_dir = self.gparray.master.getSegmentTableSpaceDirectory()
tbcspc_oids = os.listdir(master_tblspc_dir)
# tablespace_template_dir is the path we store tablespace files generated
# by pg_basebackup. Its directory structure is:
# tablespace_template_dir
# |__ oid1
# | |__ tablespace_file1_db_dumpdbid
# |
# |__ oid2
# | |__ tablespace_file2_db_dumpdbid
# ...
tablespace_template_dir = os.path.join(self.tempDir,
"pg_tblspc",
"dumps")
os.mkdir(tablespace_template_dir)
for tbcspc_oid in tbcspc_oids:
symlink_path = os.path.join(master_tblspc_dir, tbcspc_oid)
target_path = os.readlink(symlink_path)
os.mkdir(os.path.join(tablespace_template_dir, tbcspc_oid))
# the target name for copytree does not impact anything
shutil.copytree(target_path,
os.path.join(tablespace_template_dir, tbcspc_oid, str(dummyDBID)))
shutil.rmtree(os.path.join(os.path.dirname(target_path),
str(dummyDBID)))
with open(os.path.join(self.tempDir,
"pg_tblspc",
"newTableSpaceInfo.json"), "w") as f:
json.dump(newTableSpaceInfo, f)
def _gen_DummyDBID(self):
"""gen a random int that surely beyond the possible dbid range"""
return random.randint(40960, 81920)
def _select_src_segment(self):
"""Gets a segment to use as a source for pg_hba.conf
and postgresql.conf files"""
......@@ -1005,6 +1081,45 @@ class gpexpand:
return outputfile
def generate_tablespace_inputfile(self, filename):
master_tblspc_dir = self.gparray.master.getSegmentTableSpaceDirectory()
tblspc_oids = os.listdir(master_tblspc_dir)
if not tblspc_oids:
return None
tblspc_oid_names = self.get_tablespace_oid_names()
tblspc_info = {}
for oid in tblspc_oids:
location = os.path.dirname(os.readlink(os.path.join(master_tblspc_dir,
oid)))
tblspc_info[oid] = {"location": location,
"name": tblspc_oid_names[int(oid)]}
with open(filename, 'w') as f:
names = ":".join([tblspc_info[oid]["name"]
for oid in tblspc_oids])
oids = ":".join(tblspc_oids)
headline = "tableSpaceNameOrders={names}".format(names=names)
secline = "tableSpaceOidOrders={oids}".format(oids=oids)
print >>f, headline
print >>f, secline
for db in self.gparray.getExpansionSegDbList():
if db.isSegmentPrimary():
tmpStr = "{dbid}:{locations}"
locations = ":".join([tblspc_info[oid]["location"]
for oid in tblspc_oids])
print >>f, tmpStr.format(dbid=db.getSegmentDbId(),
locations=locations)
return filename
def get_tablespace_oid_names(self):
sql = "select oid, spcname from pg_tablespace"
cursor = dbconn.execSQL(self.conn, sql)
return dict(cursor.fetchall())
def addNewSegments(self, inputFileEntryList):
for seg in inputFileEntryList:
self.gparray.addExpansionSeg(content=int(seg.contentId)
......@@ -1076,6 +1191,51 @@ class gpexpand:
except IOError:
raise ExpansionError('Input file %s not found' % self.options.filename)
def read_tablespace_file(self):
"""
If there are user-created tablespaces in GreenplumDB cluster,
it returns a python dict, otherwise returns None.
The python dict is like:
newTableSpaceInfo is a python dict, its type spec is:
newTableSpaceInfo :: {
"names" : [ name::string ],
"oids" : [ oid::string ],
"details" : {
dbid::string : [ location::string ]
}
}
newTableSpaceInfo[names] and newTableSpaceInfo[oids] are tablespace infos
that are in the same order.
newTableSpaceInfo[dbid] is a list of locations in the same order of oids.
"""
master_tblspc_dir = self.gparray.master.getSegmentTableSpaceDirectory()
if not os.listdir(master_tblspc_dir):
return None
if not self.options.filename:
raise ExpansionError('Missing tablespace input file')
tablespace_inputfile = self.options.filename + ".ts"
new_tblspc_info = {}
with open(tablespace_inputfile) as f:
headline = f.readline().strip()
secline = f.readline().strip()
tblspc_names = headline.split('=')[1].strip().split(':')
tblspc_oids = secline.split('=')[1].strip().split(':')
new_tblspc_info["names"] = tblspc_names
new_tblspc_info["oids"] = tblspc_oids
details = {}
for line in f:
l = line.strip().split(':')
dbid = l[0]
locations = l[1:]
details[dbid] = locations
new_tblspc_info["details"] = details
return new_tblspc_info
def lock_catalog(self):
self.conn_catalog_lock = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
self.logger.info('Locking catalog')
......@@ -1092,7 +1252,7 @@ class gpexpand:
self.conn_catalog_lock = None
self.logger.info('Unlocked catalog')
def add_segments(self):
def add_segments(self, newTableSpaceInfo):
"""Starts the process of adding the new segments to the array"""
self.segTemplate = SegmentTemplate(logger=self.logger,
statusLogger=self.statusLogger,
......@@ -1105,7 +1265,7 @@ class gpexpand:
segTarDir=self.options.tardir,
batch_size=self.options.batch_size)
try:
self.segTemplate.build_segment_template()
self.segTemplate.build_segment_template(newTableSpaceInfo)
self.segTemplate.build_new_segments()
except SegmentTemplateError, msg:
raise ExpansionError(msg)
......@@ -2168,10 +2328,16 @@ before continuing by re-running this utility and providing the input file. """
outfile = _gp_expand.generate_inputfile()
outFileStr = ""
outFileStr = """\nInput configuration file was written to '%s'.""" % (outfile)
print outFileStr
outfile_ts = _gp_expand.generate_tablespace_inputfile(outfile + ".ts")
if outfile_ts:
outFileTsStr = """Tablespace Input configuration file was written to '%s'.""" % (outfile_ts)
print outFileTsStr
print
print """Please review the file and make sure that it is correct then re-run
with: gpexpand -i %s
""" % (outfile)
......@@ -2285,7 +2451,8 @@ def main(options, args, parser):
_gp_expand.sync_packages()
_gp_expand.start_prepare()
_gp_expand.lock_catalog()
_gp_expand.add_segments()
newTableSpaceInfo = _gp_expand.read_tablespace_file()
_gp_expand.add_segments(newTableSpaceInfo)
_gp_expand.update_original_segments()
_gp_expand.cleanup_new_segments()
_gp_expand.update_catalog()
......
......@@ -329,6 +329,12 @@ class Segment:
"""
return checkNotNone("dataDirectory", self.datadir)
def getSegmentTableSpaceDirectory(self):
"""
Return the pg_tblspc location for the segment.
"""
return checkNotNone("tblspcDirectory",
os.path.join(self.datadir, "pg_tblspc"))
# --------------------------------------------------------------------
# setters
......
......@@ -3,6 +3,9 @@
import datetime
import sys
import os
import re
import json
import shutil
from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRESS_USAGE
from gppylib.gpparseopts import OptParser, OptChecker
......@@ -171,6 +174,42 @@ class ConfExpSegCmd(Command):
logger.error(extractTarCmd.get_results())
raise
# Restore pg_tblspc
tblspc_config_json = os.path.join(self.datadir,
"pg_tblspc",
"newTableSpaceInfo.json")
if os.path.exists(tblspc_config_json):
# If the tablespace json config file exists,
# we have to restore the tablespace files as
# the json config file.
# json :: {
# "names" : [ name::string ],
# "oids" : [ oid::string ],
# "details" : {
# dbid::string : [ location::string ]
# }
# }
# newTableSpaceInfo[names] and newTableSpaceInfo[oids] are tablespace infos
# that are in the same order.
# newTableSpaceInfo[dbid] is a list of locations in the same order of oids.
with open(tblspc_config_json) as f:
tblspc_config = json.load(f)
oids = tblspc_config["oids"]
names = tblspc_config["names"]
locations = tblspc_config["details"][str(self.dbid)]
dumps_dir = os.path.join(self.datadir, "pg_tblspc", "dumps")
for oid, location in zip(oids, locations):
fn = os.listdir(os.path.join(dumps_dir, oid))[0]
shutil.move(os.path.join(dumps_dir, oid, fn),
os.path.join(location, str(self.dbid)))
os.unlink(os.path.join(self.datadir, "pg_tblspc", oid))
os.symlink(os.path.join(location, str(self.dbid)),
os.path.join(self.datadir, "pg_tblspc", oid))
shutil.rmtree(dumps_dir)
os.remove(tblspc_config_json)
# Create pg_log for new segment in case it dosen't exist.
try:
self.makeOrUpdatePathAsNeeded(os.path.join(self.datadir, "pg_log"))
......
......@@ -155,9 +155,31 @@ Feature: expand the cluster by adding more segments
And the cluster is setup for an expansion on hosts "mdw,sdw1,sdw2,sdw3"
And the new host "sdw2,sdw3" is ready to go
When the user runs gpexpand interview to add 1 new segment and 2 new host "sdw2,sdw3"
Then the number of segments have been saved
When the user runs gpexpand with the latest gpexpand_inputfile with additional parameters "--silent"
Then verify that the cluster has 14 new segments
@gpexpand_mirrors
@gpexpand_host_and_segment
@gpexpand_standby
Scenario: expand a cluster with tablespace that has mirrors with one new host
Given the database is not running
And a working directory of the test as '/data/gpdata/gpexpand'
And a temporary directory under "/data/gpdata/gpexpand/expandedData" to expand into
And a cluster is created with mirrors on "mdw" and "sdw1"
And the user runs gpinitstandby with options " "
And database "gptest" exists
And a tablespace is created with data
And another tablespace is created with data
And there are no gpexpand_inputfiles
And the cluster is setup for an expansion on hosts "mdw,sdw1,sdw2,sdw3"
And the new host "sdw2,sdw3" is ready to go
When the user runs gpexpand interview to add 1 new segment and 2 new host "sdw2,sdw3"
Then the number of segments have been saved
When the user runs gpexpand with the latest gpexpand_inputfile with additional parameters "--silent"
Then verify that the cluster has 14 new segments
When the user runs gpexpand to redistribute
Then the tablespace is valid after gpexpand
@gpexpand_verify_redistribution
Scenario: Verify data is correctly redistributed after expansion
......@@ -177,7 +199,7 @@ Feature: expand the cluster by adding more segments
Then the number of segments have been saved
When the user runs gpexpand with the latest gpexpand_inputfile with additional parameters "--silent"
Then verify that the cluster has 3 new segments
And the user drops the named connection "default"
And the user drops the named connection "default"
When the user runs gpexpand to redistribute
Then distribution information from table "public.redistribute" with data in "gptest" is verified against saved data
......
import pipes
import shutil
import tempfile
from behave import given, when, then
from behave import given, then
from pygresql import pg
from gppylib.db import dbconn
......@@ -70,6 +69,33 @@ class Tablespace:
raise Exception("Tablespace data is not identically distributed. Expected:\n%r\n but found:\n%r" % (
sorted(self.initial_data), sorted(data)))
def verify_for_gpexpand(self, hostname=None, port=0):
"""
For gpexpand, we need make sure:
1. data is the same after redistribution finished
2. the table's numsegments is enlarged to the new cluster size
"""
url = dbconn.DbURL(hostname=hostname, port=port, dbname=self.dbname)
with dbconn.connect(url) as conn:
db = pg.DB(conn)
data = db.query("SELECT gp_segment_id, i FROM tbl").getresult()
tbl_numsegments = dbconn.execSQLForSingleton(conn,
"SELECT numsegments FROM gp_distribution_policy "
"WHERE localoid = 'tbl'::regclass::oid")
num_segments = dbconn.execSQLForSingleton(conn,
"SELECT COUNT(DISTINCT(content)) - 1 FROM gp_segment_configuration")
if tbl_numsegments != num_segments:
raise Exception("After gpexpand the numsegments for tablespace table 'tbl' %d does not match "
"the number of segments in the cluster %d." % (tbl_numsegments, num_segments))
initial_data = [i for _, i in self.initial_data]
data_without_segid = [i for _, i in data]
if sorted(data_without_segid) != sorted(initial_data):
raise Exception("Tablespace data is not identically distributed after running gp_expand. "
"Expected pre-gpexpand data:\n%\n but found post-gpexpand data:\n%r" % (
sorted(self.initial_data), sorted(data)))
def _checkpoint_and_wait_for_replication_replay(db):
"""
......@@ -171,3 +197,9 @@ def impl(context):
@then('the other tablespace is valid')
def impl(context):
context.tablespaces["myspace"].verify()
@then('the tablespace is valid after gpexpand')
def impl(context):
for _, tbs in context.tablespaces.items():
tbs.verify_for_gpexpand()
......@@ -65,7 +65,9 @@ class Gpexpand:
return output, p1.wait()
def initialize_segments(self, additional_params=''):
input_files = sorted(glob.glob('%s/gpexpand_inputfile*' % self.working_directory))
fns = filter(lambda fn: not fn.endswith(".ts"),
glob.glob('%s/gpexpand_inputfile*' % self.working_directory))
input_files = sorted(fns)
return run_gpcommand(self.context, "gpexpand -i %s %s" % (input_files[-1], additional_params))
def get_redistribute_status(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册