提交 fdf8b611 编写于 作者: J Jamie McAtamney 提交者: David Krieger

add gpmovemirrors behave tests

We add gpmovemirrors tests and add them to the pipeline as well.
Using behave tags, some of the tests can run locally on a demo
cluster.  We also introduce Behave tag logic to choose which
tests to run.

<Co-Authored-By> Kalen Krempely <kkrempely@pivotal.io>
<Co-Authored-By> Mark Sliva <msliva@pivotal.io>
上级 e9388cdb
......@@ -77,6 +77,8 @@ groups:
- gpaddmirrors
- gpexpand
- gpcheck
- gpmovemirrors_concourse_cluster
- gpmovemirrors_demo_cluster
- gppkg
- check_centos
## --------------------------------------------------------------------
......@@ -196,6 +198,8 @@ groups:
- gpaddmirrors
- gpexpand
- gpcheck
- gpmovemirrors_concourse_cluster
- gpmovemirrors_demo_cluster
- gppkg
- check_centos
- compile_gpdb_centos6
......@@ -1790,6 +1794,16 @@ jobs:
BEHAVE_TAGS: gpcheck_as_gpadmin
GPCHECK_SETUP: true
- name: gpmovemirrors_demo_cluster
plan:
- aggregate: *gets_for_behave
- task: gpmovemirrors_demo_cluster
file: gpdb_src/concourse/tasks/behave_gpdb.yml
image: gpdb6-centos6-test
params:
BEHAVE_FLAGS: --tags=gpmovemirrors --tags=demo_cluster
GPCHECK_SETUP: false
- name: analyzedb
plan:
- aggregate: *gets_for_behave
......@@ -1878,6 +1892,42 @@ jobs:
ensure:
<<: *set_failed
- name: gpmovemirrors_concourse_cluster
plan:
- aggregate:
- get: gpdb_src
params:
submodules:
- gpMgmt/bin/pythonSrc/ext
passed: [gate_cli_start]
- get: gpdb_binary
resource: bin_gpdb_centos6
passed: [gate_cli_start]
trigger: true
- get: ccp_src
- get: ccp-image
- put: terraform
params:
<<: *ccp_default_params
vars:
<<: *ccp_default_vars
number_of_nodes: 4
- task: gen_cluster
file: ccp_src/ci/tasks/gen_cluster.yml
params:
<<: *ccp_gen_cluster_default_params
- task: gpinitsystem
file: ccp_src/ci/tasks/gpinitsystem.yml
- task: run_tests
file: gpdb_src/concourse/tasks/run_behave.yml
image: ccp-image
params:
BEHAVE_FLAGS: --tags=gpmovemirrors --tags=concourse_cluster
on_success:
<<: *ccp_destroy
ensure:
<<: *set_failed
- name: pg_upgrade
ensure:
<<: *set_failed
......@@ -2155,6 +2205,8 @@ jobs:
- gpaddmirrors
- gpexpand
- gpcheck
- gpmovemirrors_concourse_cluster
- gpmovemirrors_demo_cluster
- gppkg
- check_centos
- interconnect
......
......@@ -102,6 +102,8 @@ groups:
- gpaddmirrors
- gpexpand
- gpcheck
- gpmovemirrors_concourse_cluster
- gpmovemirrors_demo_cluster
- gppkg
- check_centos
{% endif %}
......@@ -280,6 +282,8 @@ groups:
- gpaddmirrors
- gpexpand
- gpcheck
- gpmovemirrors_concourse_cluster
- gpmovemirrors_demo_cluster
- gppkg
- check_centos
- compile_gpdb_centos6
......@@ -1915,6 +1919,16 @@ jobs:
BEHAVE_TAGS: gpcheck_as_gpadmin
GPCHECK_SETUP: true
- name: gpmovemirrors_demo_cluster
plan:
- aggregate: *gets_for_behave
- task: gpmovemirrors_demo_cluster
file: gpdb_src/concourse/tasks/behave_gpdb.yml
image: gpdb6-centos6-test
params:
BEHAVE_FLAGS: --tags=gpmovemirrors --tags=demo_cluster
GPCHECK_SETUP: false
{% for test_name in CLI_2_suites %}
- name: [[ test_name ]]
plan:
......@@ -1978,6 +1992,42 @@ jobs:
ensure:
<<: *set_failed
- name: gpmovemirrors_concourse_cluster
plan:
- aggregate:
- get: gpdb_src
params:
submodules:
- gpMgmt/bin/pythonSrc/ext
passed: [gate_cli_start]
- get: gpdb_binary
resource: bin_gpdb_centos6
passed: [gate_cli_start]
trigger: [[ test_trigger ]]
- get: ccp_src
- get: ccp-image
- put: terraform
params:
<<: *ccp_default_params
vars:
<<: *ccp_default_vars
number_of_nodes: 4
- task: gen_cluster
file: ccp_src/ci/tasks/gen_cluster.yml
params:
<<: *ccp_gen_cluster_default_params
- task: gpinitsystem
file: ccp_src/ci/tasks/gpinitsystem.yml
- task: run_tests
file: gpdb_src/concourse/tasks/run_behave.yml
image: ccp-image
params:
BEHAVE_FLAGS: --tags=gpmovemirrors --tags=concourse_cluster
on_success:
<<: *ccp_destroy
ensure:
<<: *set_failed
{% if "centos6" in os_types and "ICW" in test_sections %}
- name: pg_upgrade
ensure:
......@@ -2270,6 +2320,8 @@ jobs:
- gpaddmirrors
- gpexpand
- gpcheck
- gpmovemirrors_concourse_cluster
- gpmovemirrors_demo_cluster
- gppkg
- check_centos
- interconnect
......
......@@ -242,7 +242,7 @@ class Configuration:
raise Exception(
"Old mirror segment is not currently in a mirror role: address = %s, port = %s, segment data directory = %s"
% (oldAddress, str(oldPort), oldDataDirectory))
oldFilespaces = lookupFilespaces(oldGpdb)
oldFilespaces = {}
oldMirror = Mirror(address=oldAddress
, port=oldPort
, dataDirectory=oldDataDirectory
......
......@@ -21,3 +21,42 @@ integration tests.
# this will run all tests tagged with smoke
make -f Makefile.behave behave tags=smoke
# this will run all tests tagged with smoke AND mirrors
make -f Makefile.behave behave flags="--tags smoke --tags mirrors"
---------------------------NOTE ON BEHAVE 1.2.4 TAGS-------------------
WARNING: The tag boolean logic syntax is different in this version versus
the version documented online.
From gpdb/gpMgmt/bin/pythonSrc/ext/behave-1.2.4/docs/behave.rst:
| Tag Expression
| --------------
|
| Scenarios inherit tags declared on the Feature level. The simplest
| TAG_EXPRESSION is simply a tag::
|
| --tags @dev
|
| You may even leave off the "@" - behave doesn't mind.
|
| When a tag in a tag expression starts with a ~, this represents boolean NOT::
|
| --tags ~@dev
|
| A tag expression can have several tags separated by a comma, which represents
| logical OR::
|
| --tags @dev,@wip
|
| The --tags option can be specified several times, and this represents logical
| AND, for instance this represents the boolean expression
| "(@foo or not @bar) and @zap"::
|
| --tags @foo,~@bar --tags @zap.
|
| Beware that if you want to use several negative tags to exclude several tags
| you have to use logical AND::
|
| --tags ~@fixme --tags ~@buggy.
......@@ -4,12 +4,14 @@ import shutil
from test.behave_utils.utils import drop_database_if_exists, start_database_if_not_started,\
create_database, \
run_command, check_user_permissions, run_gpcommand
from steps.mirrors_mgmt_utils import MirrorMgmtContext
from gppylib.db import dbconn
def before_feature(context, feature):
def before_feature(context, feature):
# we should be able to run gpexpand without having a cluster initialized
if 'gpexpand' in feature.tags or 'gpaddmirrors' in feature.tags or 'gpstate' in feature.tags:
tags_to_skip = ['gpexpand', 'gpaddmirrors', 'gpstate', 'gpmovemirrors']
if set(context.feature.tags).intersection(tags_to_skip):
return
drop_database_if_exists(context, 'testdb')
......@@ -52,27 +54,36 @@ def before_feature(context, feature):
dbconn.execSQL(context.conn, 'insert into t3 values(1, 4)')
context.conn.commit()
def after_feature(context, feature):
if 'analyzedb' in feature.tags:
context.conn.close()
if 'minirepro' in feature.tags:
context.conn.close()
def before_scenario(context, scenario):
if 'gpexpand' in context.feature.tags or 'gpaddmirrors' in context.feature.tags or 'gpstate' in context.feature.tags:
if 'gpmovemirrors' in context.feature.tags:
context.mirror_context = MirrorMgmtContext()
tags_to_skip = ['gpexpand', 'gpaddmirrors', 'gpstate', 'gpmovemirrors']
if set(context.feature.tags).intersection(tags_to_skip):
return
if 'analyzedb' not in context.feature.tags:
start_database_if_not_started(context)
drop_database_if_exists(context, 'testdb')
def after_scenario(context, scenario):
if 'gpexpand' in context.feature.tags \
or 'gpaddmirrors' in context.feature.tags \
or 'gpinitstandby' in context.feature.tags \
or 'gpstate' in context.feature.tags:
tags_to_skip = ['gpexpand', 'gpaddmirrors', 'gpstate', 'gpinitstandby']
if set(context.feature.tags).intersection(tags_to_skip):
return
if 'gpmovemirrors' in context.feature.tags:
if 'temp_base_dir' in context:
shutil.rmtree(context.temp_base_dir)
if 'analyzedb' not in context.feature.tags:
start_database_if_not_started(context)
......
......@@ -37,7 +37,7 @@ Feature: Tests for gpaddmirrors
And the database is killed on hosts "mdw,sdw1,sdw2,sdw3"
And a cluster is created with no mirrors on "mdw" and "sdw1, sdw2, sdw3"
And gpaddmirrors adds mirrors in spread configuration
Then verify the database has mirrors in spread configuration
Then verify that mirror segments are in "spread" configuration
And the user runs "gpstop -aqM fast"
Scenario: gpaddmirrors with a default master data directory
......
@gpmovemirrors
Feature: Tests for gpmovemirrors
# @demo_cluster tests
# To run these tests locally, you should have a local demo cluster created, and
# started. This test will modify the original local demo cluster. This test
# will, as a side effect, destroy the current contents of /tmp/gpmovemirrors and
# replace it with data as used in this test.
#
# @concourse_cluster tests
# These tests require a cluster in a specified configuration, so are not
# expected to work locally.
@demo_cluster
Scenario: gpmovemirrors fails with totally malformed input file
Given a standard local demo cluster is running
And a gpmovemirrors directory under '/tmp/gpmovemirrors' with mode '0700' is created
And a 'malformed' gpmovemirrors file is created
When the user runs gpmovemirrors
Then gpmovemirrors should return a return code of 3
@demo_cluster
Scenario: gpmovemirrors fails with bad host in input file
Given a standard local demo cluster is running
And a gpmovemirrors directory under '/tmp/gpmovemirrors' with mode '0700' is created
And a 'badhost' gpmovemirrors file is created
When the user runs gpmovemirrors
Then gpmovemirrors should return a return code of 3
@demo_cluster
Scenario: gpmovemirrors fails with invalid option parameter
Given a standard local demo cluster is running
And a gpmovemirrors directory under '/tmp/gpmovemirrors' with mode '0700' is created
And a 'good' gpmovemirrors file is created
When the user runs gpmovemirrors with additional args "--invalid-option"
Then gpmovemirrors should return a return code of 2
@demo_cluster
Scenario: gpmovemirrors can change the location of mirrors within a single host
Given a standard local demo cluster is created
And a gpmovemirrors directory under '/tmp/gpmovemirrors' with mode '0700' is created
And a 'good' gpmovemirrors file is created
When the user runs gpmovemirrors
Then gpmovemirrors should return a return code of 0
And verify the database has mirrors
And all the segments are running
And the segments are synchronized
And verify that mirrors are recognized after a restart
@concourse_cluster
Scenario: gpmovemirrors can change from group mirroring to spread mirroring
Given verify that mirror segments are in "group" configuration
And a sample gpmovemirrors input file is created in "spread" configuration
When the user runs "gpmovemirrors --input=/tmp/gpmovemirrors_input_spread"
Then gpmovemirrors should return a return code of 0
# Verify that mirrors are functional in the new configuration
Then verify the database has mirrors
And all the segments are running
And the segments are synchronized
And verify that mirror segments are in "spread" configuration
And verify that mirrors are recognized after a restart
@concourse_cluster
Scenario: gpmovemirrors can change from spread mirroring to group mirroring
Given verify that mirror segments are in "spread" configuration
And a sample gpmovemirrors input file is created in "group" configuration
When the user runs "gpmovemirrors --input=/tmp/gpmovemirrors_input_group"
Then gpmovemirrors should return a return code of 0
# Verify that mirrors are functional in the new configuration
Then verify the database has mirrors
And all the segments are running
And the segments are synchronized
And verify that mirror segments are in "group" configuration
And verify that mirrors are recognized after a restart
And the user runs "gpstop -aqM fast"
......@@ -22,6 +22,7 @@ import psutil
from behave import given, when, then
from datetime import datetime, timedelta
from time import sleep
from os import path
from gppylib.gparray import GpArray, ROLE_PRIMARY, ROLE_MIRROR
from gppylib.commands.gp import SegmentStart, GpStandbyStart, MasterStop
......@@ -320,7 +321,6 @@ def impl(context, env_var):
del context.orig_env[env_var]
@given('the user runs "{command}"')
@when('the user runs "{command}"')
@then('the user runs "{command}"')
......@@ -2031,14 +2031,25 @@ def step_impl(context, abbreviated_timezone):
if context.startup_timezone != abbreviated_timezone:
raise Exception("Expected timezone in startup.log to be %s, but it was %s" % (abbreviated_timezone, context.startup_timezone))
@given("a working directory of the test as '{working_directory}' with mode '{mode}'")
def impl(context, working_directory, mode):
_create_working_directory(context, working_directory, mode)
@given("a working directory of the test as '{working_directory}'")
def impl(context, working_directory):
_create_working_directory(context, working_directory)
def _create_working_directory(context, working_directory, mode=''):
context.working_directory = working_directory
# Don't fail if directory already exists, which can occur for the first scenario
shutil.rmtree(context.working_directory, ignore_errors=True)
os.mkdir(context.working_directory)
if (mode != ''):
os.mkdir(context.working_directory, int(mode,8))
else:
os.mkdir(context.working_directory)
def _create_cluster(context, master_host, segment_host_list):
def _create_cluster(context, master_host, segment_host_list, with_mirrors=False, mirroring_configuration='group'):
if segment_host_list == "":
segment_host_list = []
else:
......@@ -2050,41 +2061,32 @@ def _create_cluster(context, master_host, segment_host_list):
with dbconn.connect(dbconn.DbURL(dbname='template1')) as conn:
curs = dbconn.execSQL(conn, "select count(*) from gp_segment_configuration where role='m';")
count = curs.fetchall()[0][0]
if count == 0:
if not with_mirrors and count == 0:
print "Skipping creating a new cluster since the cluster is primary only already."
return
elif with_mirrors and count > 0:
print "Skipping creating a new cluster since the cluster has mirrors already."
return
except:
pass
testcluster = TestCluster(hosts=[master_host]+segment_host_list, base_dir=context.working_directory)
testcluster.reset_cluster()
testcluster.create_cluster(with_mirrors=False)
context.gpexpand_mirrors_enabled = False
testcluster.create_cluster(with_mirrors=with_mirrors, mirroring_configuration=mirroring_configuration)
context.gpexpand_mirrors_enabled = with_mirrors
@then('a cluster is created with no mirrors on "{master_host}" and "{segment_host_list}"')
@given('a cluster is created with no mirrors on "{master_host}" and "{segment_host_list}"')
def impl(context, master_host, segment_host_list):
_create_cluster(context, master_host, segment_host_list)
_create_cluster(context, master_host, segment_host_list, with_mirrors=False)
@given('a cluster is created with mirrors on "{master_host}" and "{segment_host}"')
def impl(context, master_host, segment_host):
del os.environ['MASTER_DATA_DIRECTORY']
os.environ['MASTER_DATA_DIRECTORY'] = os.path.join(context.working_directory,
'data/master/gpseg-1')
try:
with dbconn.connect(dbconn.DbURL(dbname='template1')) as conn:
curs = dbconn.execSQL(conn, "select count(*) from gp_segment_configuration where role='m';")
count = curs.fetchall()[0][0]
if count > 0:
print "Skipping creating a new cluster since the cluster has mirrors already."
return
except:
pass
@given('a cluster is created with mirrors on "{master_host}" and "{segment_host_list}"')
def impl(context, master_host, segment_host_list):
_create_cluster(context, master_host, segment_host_list, with_mirrors=True, mirroring_configuration='group')
testcluster = TestCluster(hosts=[master_host,segment_host], base_dir=context.working_directory)
testcluster.reset_cluster()
testcluster.create_cluster(with_mirrors=True)
context.gpexpand_mirrors_enabled = True
@given('a cluster is created with "{mirroring_configuration}" segment mirroring on "{master_host}" and "{segment_host_list}"')
def impl(context, mirroring_configuration, master_host, segment_host_list):
_create_cluster(context, master_host, segment_host_list, with_mirrors=True, mirroring_configuration=mirroring_configuration)
@given('the user runs gpexpand interview to add {num_of_segments} new segment and {num_of_hosts} new host "{hostnames}"')
@when('the user runs gpexpand interview to add {num_of_segments} new segment and {num_of_hosts} new host "{hostnames}"')
......@@ -2379,13 +2381,17 @@ def impl(context, hostnames):
remoteHost=host)
cmd.run(validateAfter=True)
@given("a temporary directory under '{tmp_base_dir}' with mode '{mode}' is created")
@given('a temporary directory under "{tmp_base_dir}" to expand into')
def impl(context, tmp_base_dir):
def make_temp_dir(context,tmp_base_dir, mode=''):
if not tmp_base_dir:
raise Exception("tmp_base_dir cannot be empty")
if not os.path.exists(tmp_base_dir):
os.mkdir(tmp_base_dir)
context.temp_base_dir = tempfile.mkdtemp(dir=tmp_base_dir)
if mode:
os.chmod(path.normpath(path.join(tmp_base_dir, context.temp_base_dir)),
int(mode,8))
@given('the new host "{hostnames}" is ready to go')
def impl(context, hostnames):
......
from behave import given, when, then
from os import path
from behave import given, when, then
from test.behave_utils.utils import *
from mgmt_utils import *
def _get_mirror_count():
with dbconn.connect(dbconn.DbURL(dbname='template1')) as conn:
sql = """SELECT count(*) FROM gp_segment_configuration WHERE role='m'"""
count_row = dbconn.execSQL(conn, sql).fetchone()
return count_row[0]
# This file contains steps for gpaddmirrors and gpmovemirrors tests
# This class is intended to store per-Scenario state that is built up over
# a series of steps.
class MirrorMgmtContext:
def __init__(self):
self.working_directory = None
self.input_file = None
def input_file_path(self):
if self.working_directory is None:
raise Exception("working directory not set")
if self.input_file is None:
raise Exception("input file not set")
return path.normpath(path.join(self.working_directory,self.input_file))
def _generate_input_config(spread=False):
datadir_config = _write_datadir_config()
mirror_config_output_file = "/tmp/test_gpaddmirrors.config"
cmd_str = 'gpaddmirrors -a -o %s -m %s' % (mirror_config_output_file, datadir_config)
if spread:
cmd_str += " -s"
Command('generate mirror_config file', cmd_str).run(validateAfter=True)
return mirror_config_output_file
def do_write(template, config_file_path):
mirror_data_dir = make_data_directory_called('mirror')
with open(config_file_path, 'w') as fp:
contents = template.format(mirror_data_dir)
fp.write(contents)
......@@ -32,45 +41,36 @@ def do_write(template, config_file_path):
def _write_datadir_config():
datadir_config = '/tmp/gpaddmirrors_datadir_config'
template = """
{0}
{0}
"""
do_write(template, datadir_config)
return datadir_config
def _write_datadir_config_for_three_mirrors():
datadir_config='/tmp/gpaddmirrors_datadir_config'
template = """
{0}
{0}
{0}
"""
do_write(template, datadir_config)
return datadir_config
def add_three_mirrors(context):
datadir_config = _write_datadir_config_for_three_mirrors()
mirror_config_output_file = "/tmp/test_gpaddmirrors.config"
cmd_str = 'gpaddmirrors -o %s -m %s' % (mirror_config_output_file, datadir_config)
Command('generate mirror_config file', cmd_str).run(validateAfter=True)
cmd = Command('gpaddmirrors ', 'gpaddmirrors -a -i %s ' % mirror_config_output_file)
cmd.run(validateAfter=True)
def add_mirrors(context):
context.mirror_config = _generate_input_config()
cmd = Command('gpaddmirrors ', 'gpaddmirrors -a -i %s ' % context.mirror_config)
cmd.run(validateAfter=True)
......@@ -84,6 +84,13 @@ def make_data_directory_called(data_directory_name):
return mirror_data_dir
def _get_mirror_count():
with dbconn.connect(dbconn.DbURL(dbname='template1')) as conn:
sql = """SELECT count(*) FROM gp_segment_configuration WHERE role='m'"""
count_row = dbconn.execSQL(conn, sql).fetchone()
return count_row[0]
@then('verify the database has mirrors')
def impl(context):
if _get_mirror_count() == 0:
......@@ -112,7 +119,6 @@ def impl(context):
@given('gpaddmirrors adds mirrors in spread configuration')
def impl(context):
context.mirror_config = _generate_input_config(spread=True)
cmd = Command('gpaddmirrors ', 'gpaddmirrors -a -i %s ' % context.mirror_config)
cmd.run(validateAfter=True)
......@@ -126,71 +132,191 @@ def impl(context):
@then('mirror hostlist matches the one saved in context')
def impl(context):
gparray = GpArray.initFromCatalog(dbconn.DbURL())
old_contentId_to_host = {}
curr_contentId_to_host = {}
old_content_to_host = {}
curr_content_to_host = {}
# Map content IDs to hostnames for every mirror, for both the saved GpArray
# and the current one.
for (array, hostMap) in [(context.gparray, old_contentId_to_host), (gparray, curr_contentId_to_host)]:
for (array, hostMap) in [(context.gparray, old_content_to_host), (gparray, curr_content_to_host)]:
for host in array.get_hostlist(includeMaster=False):
for mirror in array.get_list_of_mirror_segments_on_host(host):
hostMap[mirror.getSegmentContentId()] = host
if len(curr_contentId_to_host) != len(old_contentId_to_host):
if len(curr_content_to_host) != len(old_content_to_host):
raise Exception("Number of mirrors doesn't match between old and new clusters")
for key in old_contentId_to_host.keys():
if curr_contentId_to_host[key] != old_contentId_to_host[key]:
raise Exception("Mirror host doesn't match for contentId %s (old host=%s) (new host=%s)"
% (key, old_contentId_to_host[key], curr_contentId_to_host[key]))
for key in old_content_to_host.keys():
if curr_content_to_host[key] != old_content_to_host[key]:
raise Exception("Mirror host doesn't match for content %s (old host=%s) (new host=%s)"
% (key, old_content_to_host[key], curr_content_to_host[key]))
@then('verify the database has mirrors in spread configuration')
def impl(context):
hostname_to_primary_contentId_list = {}
mirror_contentId_to_hostname = {}
@given('verify that mirror segments are in "{mirror_config}" configuration')
@then('verify that mirror segments are in "{mirror_config}" configuration')
def impl(context, mirror_config):
if mirror_config not in ["group", "spread"]:
raise Exception('"%s" is not a valid mirror configuration for this step; options are "group" and "spread".')
gparray = GpArray.initFromCatalog(dbconn.DbURL())
host_list = gparray.get_hostlist(includeMaster=False)
for host in host_list:
primaries = gparray.get_list_of_primary_segments_on_host(host)
primary_content_list = []
for primary in primaries:
primary_content_list.append(primary.getSegmentContentId())
hostname_to_primary_contentId_list[host] = primary_content_list
mirrors = gparray.get_list_of_mirror_segments_on_host(host)
for mirror in mirrors:
mirror_contentId_to_hostname[mirror.getSegmentContentId()] = host
# Verify that for each host its mirrors are correctly spread.
# That is for a given host, all of its primaries would have
# mirrors on separate hosts. Therefore, we go through the list of
# primaries on a host and compute the set of hosts which have the
# corresponding mirrors. If spreading is working, then the
# cardinality of the mirror_host_set should equal the number of
# primaries on the given host.
for hostname in host_list:
# For the primaries on a given host, put the hosts of the
# corresponding mirrors into this set (a set to eliminate
# duplicate hostnames in case two mirrors end up on the same
# host).
mirror_host_set = set()
primary_content_list = hostname_to_primary_contentId_list[hostname]
for contentId in primary_content_list:
mirror_host = mirror_contentId_to_hostname[contentId]
if mirror_host == hostname:
raise Exception('host %s has both primary and mirror for contentID %d' %
(hostname, contentId))
mirror_host_set.add(mirror_host)
num_primaries = len(primary_content_list)
num_mirror_hosts = len(mirror_host_set)
if num_primaries != num_mirror_hosts:
raise Exception('host %s has %d primaries spread on only %d hosts' %
(hostname, num_primaries, num_mirror_hosts))
primary_to_mirror_host_map = {}
primary_content_map = {}
# Create a map from each host to the hosts holding the mirrors of all the
# primaries on the original host, e.g. the primaries for contents 0 and 1
# are on sdw1, the mirror for content 0 is on sdw2, and the mirror for
# content 1 is on sdw4, then primary_content_map[sdw1] = [sdw2, sdw4]
for segmentPair in gparray.segmentPairs:
primary_host, mirror_host = segmentPair.get_hosts()
pair_content = segmentPair.primaryDB.content
# Regardless of mirror configuration, a primary should never be mirrored on the same host
if primary_host == mirror_host:
raise Exception('Host %s has both primary and mirror for content %d' % (primary_host, pair_content))
primary_content_map[primary_host] = pair_content
if primary_host not in primary_to_mirror_host_map:
primary_to_mirror_host_map[primary_host] = set()
primary_to_mirror_host_map[primary_host].add(mirror_host)
if mirror_config == "spread":
# In spread configuration, each primary on a given host has its mirror
# on a different host, and no host has both the primary and the mirror
# for a given segment. For this to work, the cluster must have N hosts,
# where N is 1 more than the number of segments on each host.
# Thus, if the list of mirror hosts for a given primary host consists
# of exactly the list of hosts in the cluster minus that host itself,
# the mirrors in the cluster are correctly spread.
for primary_host in primary_to_mirror_host_map:
mirror_host_set = primary_to_mirror_host_map[primary_host]
other_host_set = set(host_list)
other_host_set.remove(primary_host)
if other_host_set != mirror_host_set:
raise Exception('Expected primaries on %s to be mirrored to %s, but they are mirrored to %s' %
(primary_host, other_host_set, mirror_host_set))
elif mirror_config == "group":
# In group configuration, all primaries on a given host are mirrored to
# a single other host.
# Thus, if the list of mirror hosts for a given primary host consists
# of a single host, and that host is not the same as the primary host,
# the mirrors in the cluster are correctly grouped.
for primary_host in primary_to_mirror_host_map:
num_mirror_hosts = len(primary_to_mirror_host_map[primary_host])
if num_mirror_hosts != 1:
raise Exception('Expected primaries on %s to all be mirrored to the same host, but they are mirrored to %d different hosts' %
(primary_host, num_mirror_hosts))
@given("a gpmovemirrors directory under '{parent_dir}' with mode '{mode}' is created")
def impl(context, parent_dir, mode):
make_temp_dir(context,parent_dir, mode)
context.mirror_context.working_directory = context.temp_base_dir
@given("a '{file_type}' gpmovemirrors file is created")
def impl(context, file_type):
segments = GpArray.initFromCatalog(dbconn.DbURL()).getSegmentList()
mirror = segments[0].mirrorDB
valid_config = '%s:%s:%s' % (mirror.getSegmentHostName(),
mirror.getSegmentPort(),
mirror.getSegmentDataDirectory())
if file_type == 'malformed':
contents = 'I really like coffee.'
elif file_type == 'badhost':
badhost_config = '%s:%s:%s' % ('badhost',
mirror.getSegmentPort(),
context.mirror_context.working_directory)
contents = '%s %s' % (valid_config, badhost_config)
elif file_type == 'good':
valid_config_with_different_dir = '%s:%s:%s' % (
mirror.getSegmentHostName(),
mirror.getSegmentPort(),
context.mirror_context.working_directory
)
contents = '%s %s' % (valid_config, valid_config_with_different_dir)
else:
raise Exception('unknown gpmovemirrors file_type specified')
context.mirror_context.input_file = "gpmovemirrors_%s.txt" % file_type
with open(context.mirror_context.input_file_path(), 'w') as fd:
fd.write(contents)
@when('the user runs gpmovemirrors')
def impl(context):
run_gpmovemirrors(context)
@when('the user runs gpmovemirrors with additional args "{extra_args}"')
def run_gpmovemirrors(context, extra_args=''):
cmd = "gpmovemirrors --input=%s %s" % (
context.mirror_context.input_file_path(), extra_args)
run_gpcommand(context, cmd)
@then('verify that mirrors are recognized after a restart')
def impl(context):
context.execute_steps( u'''
When an FTS probe is triggered
And the user runs "gpstop -a"
And wait until the process "gpstop" goes down
And the user runs "gpstart -a"
And wait until the process "gpstart" goes down
Then all the segments are running
And the segments are synchronized''')
@given('a sample gpmovemirrors input file is created in "{mirror_config}" configuration')
def impl(context, mirror_config):
if mirror_config not in ["group", "spread"]:
raise Exception('"%s" is not a valid mirror configuration for this step; options are "group" and "spread".')
# Port numbers and addresses are hardcoded to TestCluster values, assuming a 3-host 2-segment cluster.
input_filename = "/tmp/gpmovemirrors_input_%s" % mirror_config
line_template = "%s:%d:%s %s:%d:%s\n"
# The mirrors for contents 0 and 3 are excluded from the two maps below because they are the same in either configuration
# NOTE: this configuration of the GPDB cluster assumes that configuration set up in concourse's
# gpinitsystem task. The maps below are from {contentID : (port|hostname)}.
# Group mirroring (TestCluster default): sdw1 mirrors to sdw2, sdw2 mirrors to sdw3, sdw3 mirrors to sdw2
group_port_map = {0: 21000, 1: 21001, 2: 21000, 3: 21001, 4: 21000, 5: 21001}
group_address_map = {0: "sdw2", 1: "sdw2", 2: "sdw3", 3: "sdw3", 4: "sdw1", 5: "sdw1"}
# Spread mirroring: each host mirrors one primary to each of the other two hosts
spread_port_map = {0: 21000, 1: 21000, 2: 21000, 3: 21001, 4: 21001, 5: 21001}
spread_address_map = {0: "sdw2", 1: "sdw3", 2: "sdw1", 3: "sdw3", 4: "sdw1", 5: "sdw2"}
# Create a map from each host to the hosts holding the mirrors of all the
# primaries on the original host, e.g. the primaries for contents 0 and 1
# are on sdw1, the mirror for content 0 is on sdw2, and the mirror for
# content 1 is on sdw4, then primary_content_map[sdw1] = [sdw2, sdw4]
gparray = GpArray.initFromCatalog(dbconn.DbURL())
with open(input_filename, "w") as fd:
for content in [1,2,4,5]:
if mirror_config == "spread":
old_port = group_port_map[content]
old_address = group_address_map[content]
new_port = spread_port_map[content]
new_address = spread_address_map[content]
else:
old_port = spread_port_map[content]
old_address = spread_address_map[content]
new_port = group_port_map[content]
new_address = group_address_map[content]
mirrors = map(lambda segmentPair: segmentPair.mirrorDB, gparray.getSegmentList())
mirror = next(iter(filter(lambda mirror: mirror.getSegmentContentId() == content, mirrors)), None)
old_directory = mirror.getSegmentDataDirectory()
new_directory = '%s_moved' % old_directory
fd.write(line_template % (old_address, old_port, old_directory, new_address, new_port, new_directory))
fd.flush()
......@@ -12,7 +12,7 @@ from test.behave_utils.utils import (
)
from addmirrors_mgmt_utils import (add_three_mirrors)
from mirrors_mgmt_utils import (add_three_mirrors)
def assert_successful_command(context):
......
......@@ -127,8 +127,11 @@ class TestCluster:
def reset_cluster(self):
reset_hosts(self.hosts, test_base_dir = self.base_dir)
def create_cluster(self, with_mirrors=False):
def create_cluster(self, with_mirrors=False, mirroring_configuration='group'):
# Generate the config files to initialize the cluster
# todo: DATA_DIRECTORY and MIRROR_DATA_DIRECTORY should have only one directory, not 2 when specifying spread
if mirroring_configuration not in ['group', 'spread']:
raise Exception('Mirroring configuration must be group or spread')
self.mirror_enabled = with_mirrors
self._generate_gpinit_config_files()
assert os.path.exists(self.init_file)
......@@ -136,7 +139,8 @@ class TestCluster:
# run gpinitsystem
clean_env = 'unset MASTER_DATA_DIRECTORY; unset PGPORT;'
gpinitsystem_cmd = clean_env + 'gpinitsystem -a -c %s ' % (self.init_file)
segment_mirroring_option = '-S' if mirroring_configuration == 'spread' else ''
gpinitsystem_cmd = clean_env + 'gpinitsystem -a -c %s %s' % (self.init_file, segment_mirroring_option)
res = run_shell_command(gpinitsystem_cmd, 'run gpinitsystem', verbose=True)
# initsystem returns 1 for warnings and 2 for errors
if res['rc'] > 1:
......
......@@ -163,7 +163,7 @@ def check_database_is_running(context):
pgport = int(os.environ['PGPORT'])
running_status = chk_local_db_running(master_data_dir, pgport)
running_status = chk_local_db_running(os.environ.get('MASTER_DATA_DIRECTORY'), pgport)
gpdb_running = running_status[0] and running_status[1] and running_status[2] and running_status[3]
return gpdb_running
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册