提交 ae760e25 编写于 作者: H Heikki Linnakangas 提交者: Taylor Vesely

Remove primary_mirror_mode stuff.

Revert the state machine and other logic in postmaster.c the way it is in
upstream. Remove some GUCs related to mirrored and non-mirrored mode. Remove
the -M, -x and -y postmaster options, and change management scripts to not
pass those options.
上级 51dd2b53
......@@ -6,10 +6,10 @@
#include "access/heapam.h"
#include "access/genam.h"
#include "catalog/indexing.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbvars.h"
#include "libpq/ip.h"
#include "postmaster/postmaster.h"
#include "postmaster/primary_mirror_transition_client.h"
#include "utils/builtins.h"
#include "utils/faultinjector.h"
#include "utils/fmgroids.h"
......@@ -19,27 +19,91 @@ PG_MODULE_MAGIC;
extern Datum gp_inject_fault(PG_FUNCTION_ARGS);
static StringInfo transitionMsgErrors;
static void
transitionErrorLogFn(char *response)
static char *
processTransitionRequest_faultInject(char *faultName, char *type, char *ddlStatement, char *databaseName, char *tableName, int numOccurrences, int sleepTimeSeconds)
{
appendStringInfo(transitionMsgErrors, "%s\n", response);
}
StringInfo buf = makeStringInfo();
#ifdef FAULT_INJECTOR
FaultInjectorEntry_s faultInjectorEntry;
elog(DEBUG1, "FAULT INJECTED: Name %s Type %s, DDL %s, DB %s, Table %s, NumOccurrences %d SleepTime %d",
faultName, type, ddlStatement, databaseName, tableName, numOccurrences, sleepTimeSeconds );
strlcpy(faultInjectorEntry.faultName, faultName, sizeof(faultInjectorEntry.faultName));
faultInjectorEntry.faultInjectorIdentifier = FaultInjectorIdentifierStringToEnum(faultName);
if (faultInjectorEntry.faultInjectorIdentifier == FaultInjectorIdNotSpecified) {
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not recognize fault name")));
appendStringInfo(buf, "Failure: could not recognize fault name");
goto exit;
}
static void
transitionReceivedDataFn(char *response)
{
elog(NOTICE, "%s", response);
}
faultInjectorEntry.faultInjectorType = FaultInjectorTypeStringToEnum(type);
if (faultInjectorEntry.faultInjectorType == FaultInjectorTypeNotSpecified ||
faultInjectorEntry.faultInjectorType == FaultInjectorTypeMax) {
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not recognize fault type")));
static bool
checkForNeedToExitFn(void)
{
CHECK_FOR_INTERRUPTS();
return false;
appendStringInfo(buf, "Failure: could not recognize fault type");
goto exit;
}
faultInjectorEntry.sleepTime = sleepTimeSeconds;
if (sleepTimeSeconds < 0 || sleepTimeSeconds > 7200) {
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid sleep time, allowed range [0, 7200 sec]")));
appendStringInfo(buf, "Failure: invalid sleep time, allowed range [0, 7200 sec]");
goto exit;
}
faultInjectorEntry.ddlStatement = FaultInjectorDDLStringToEnum(ddlStatement);
if (faultInjectorEntry.ddlStatement == DDLMax) {
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("could not recognize DDL statement")));
appendStringInfo(buf, "Failure: could not recognize DDL statement");
goto exit;
}
snprintf(faultInjectorEntry.databaseName, sizeof(faultInjectorEntry.databaseName), "%s", databaseName);
snprintf(faultInjectorEntry.tableName, sizeof(faultInjectorEntry.tableName), "%s", tableName);
faultInjectorEntry.occurrence = numOccurrences;
if (numOccurrences > 1000)
{
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid occurrence number, allowed range [1, 1000]")));
appendStringInfo(buf, "Failure: invalid occurrence number, allowed range [1, 1000]");
goto exit;
}
if (FaultInjector_SetFaultInjection(&faultInjectorEntry) == STATUS_OK)
{
if (faultInjectorEntry.faultInjectorType == FaultInjectorTypeStatus)
appendStringInfo(buf, "%s", faultInjectorEntry.bufOutput);
else
appendStringInfo(buf, "Success:");
}
else
appendStringInfo(buf, "Failure: %s", faultInjectorEntry.bufOutput);
exit:
#else
appendStringInfo(buf, "Failure: Fault Injector not available");
#endif
return buf->data;
}
PG_FUNCTION_INFO_V1(gp_inject_fault);
Datum
gp_inject_fault(PG_FUNCTION_ARGS)
......@@ -52,102 +116,55 @@ gp_inject_fault(PG_FUNCTION_ARGS)
int numOccurrences = PG_GETARG_INT32(5);
int sleepTimeSeconds = PG_GETARG_INT32(6);
int dbid = PG_GETARG_INT32(7);
StringInfo faultmsg = makeStringInfo();
/* Fast path if injecting fault in our postmaster. */
if (GpIdentity.dbid == dbid)
{
appendStringInfo(faultmsg, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
faultName, type, ddlStatement, databaseName,
tableName, numOccurrences, sleepTimeSeconds);
int offset = 0;
char *response =
processTransitionRequest_faultInject(
faultmsg->data, &offset, faultmsg->len);
char *response;
response = processTransitionRequest_faultInject(
faultName, type, ddlStatement, databaseName,
tableName, numOccurrences, sleepTimeSeconds);
if (!response)
elog(ERROR, "failed to inject fault locally (dbid %d)", dbid);
if (strncmp(response, "Success:", strlen("Success:")) != 0)
elog(ERROR, "%s", response);
elog(NOTICE, "%s", response);
PG_RETURN_DATUM(true);
}
/* Obtain host and port of the requested dbid */
HeapTuple tuple;
Relation rel = heap_open(GpSegmentConfigRelationId, AccessShareLock);
ScanKeyData scankey;
SysScanDesc sscan;
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum((int16) dbid));
sscan = systable_beginscan(rel, GpSegmentConfigDbidIndexId, true,
GetTransactionSnapshot(), 1, &scankey);
tuple = systable_getnext(sscan);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cannot find dbid %d", dbid);
bool isnull;
Datum datum = heap_getattr(tuple, Anum_gp_segment_configuration_hostname,
RelationGetDescr(rel), &isnull);
char *hostname;
if (!isnull)
hostname =
DatumGetCString(DirectFunctionCall1(textout, datum));
else
elog(ERROR, "hostname is null for dbid %d", dbid);
int port = DatumGetInt32(heap_getattr(tuple,
Anum_gp_segment_configuration_port,
RelationGetDescr(rel), &isnull));
systable_endscan(sscan);
heap_close(rel, NoLock);
struct addrinfo *addrList = NULL;
struct addrinfo hint;
int ret;
/* Initialize hint structure */
MemSet(&hint, 0, sizeof(hint));
hint.ai_socktype = SOCK_STREAM;
hint.ai_family = AF_UNSPEC;
char portStr[100];
if (snprintf(portStr, sizeof(portStr), "%d", port) >= sizeof(portStr))
elog(ERROR, "port number too long for dbid %d", dbid);
/* Use pg_getaddrinfo_all() to resolve the address */
ret = pg_getaddrinfo_all(hostname, portStr, &hint, &addrList);
if (ret || !addrList)
{
if (addrList)
pg_freeaddrinfo_all(hint.ai_family, addrList);
elog(ERROR, "could not translate host name \"%s\" to address: %s\n",
hostname, gai_strerror(ret));
}
PrimaryMirrorTransitionClientInfo client;
client.receivedDataCallbackFn = transitionReceivedDataFn;
client.errorLogFn = transitionErrorLogFn;
client.checkForNeedToExitFn = checkForNeedToExitFn;
transitionMsgErrors = makeStringInfo();
appendStringInfo(faultmsg, "%s\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n",
"faultInject", faultName, type, ddlStatement,
databaseName, tableName, numOccurrences,
sleepTimeSeconds);
if (sendTransitionMessage(&client, addrList, faultmsg->data, faultmsg->len,
1 /* retries */, 60 /* timeout */) !=
TRANS_ERRCODE_SUCCESS)
else if (Gp_role == GP_ROLE_DISPATCH)
{
pg_freeaddrinfo_all(hint.ai_family, addrList);
ereport(ERROR, (errmsg("failed to inject %s fault in dbid %d",
faultName, dbid),
errdetail("%s", transitionMsgErrors->data)));
/*
* Otherwise, relay the command to executor nodes.
*
* We'd only really need to dispatch it to the one that it's meant for,
* but for now, just send it everywhere. The other nodes will just
* ignore it.
*
* (Perhaps this function should be defined as EXECUTE ON SEGMENTS,
* instead of dispatching manually here? But then it wouldn't run on
* QD. There is no EXECUTE ON SEGMENTS AND MASTER options, at the
* moment...)
*
* NOTE: Because we use the normal dispatcher to send this query,
* if a fault has already been injected to the dispatcher code,
* this will trigger it. That means that if you wish to inject
* faults on both the dispatcher and an executor in the same test,
* you need to be careful with the order you inject the faults!
*/
char *sql;
sql = psprintf("select gp_inject_fault(%s, %s, %s, %s, %s, %d, %d, %d)",
quote_literal_internal(faultName),
quote_literal_internal(type),
quote_literal_internal(ddlStatement),
quote_literal_internal(databaseName),
quote_literal_internal(tableName),
numOccurrences,
sleepTimeSeconds,
dbid);
CdbDispatchCommand(sql, DF_CANCEL_ON_ERROR, NULL);
}
pg_freeaddrinfo_all(hint.ai_family, addrList);
PG_RETURN_DATUM(BoolGetDatum(true));
}
......@@ -145,14 +145,13 @@ class StartInstances():
contentid = segconfig.content
segment_port = segconfig.port
segment_dir = segconfig.datadir
segment_role = StartInstances.getRole(contentid)
# Need to set the dbid to 0 on segments to prevent use in mmxlog records
if contentid != GpSegmentConfiguration.MASTER_CONTENT_ID:
dbid = 0
opts = ("-p %d --gp_dbid=%d --silent-mode=true -i -M %s --gp_contentid=%d --gp_num_contents_in_cluster=%d" %
(segment_port, dbid, segment_role, contentid, self.clusterconfig.get_num_contents()))
opts = ("-p %d --gp_dbid=%d --silent-mode=true -i --gp_contentid=%d --gp_num_contents_in_cluster=%d" %
(segment_port, dbid, contentid, self.clusterconfig.get_num_contents()))
# Arguments for the master.
#
......
......@@ -1274,7 +1274,7 @@ CREATE_QD_DB () {
BUILD_PERFMON $GP_DIR
ERROR_CHK $? "create perfmon directories and configuration file" 1
LOG_MSG "[INFO]:-Starting the Master in admin mode" 1
export PGPORT=$GP_PORT;$PG_CTL -w -l $GP_DIR/pg_log/startup.log -D $GP_DIR -o "-i -p $GP_PORT -c gp_role=utility -M master --gp_dbid=1 --gp_contentid=-1 \
export PGPORT=$GP_PORT;$PG_CTL -w -l $GP_DIR/pg_log/startup.log -D $GP_DIR -o "-i -p $GP_PORT -c gp_role=utility --gp_dbid=1 --gp_contentid=-1 \
--gp_num_contents_in_cluster=0 -m" start >> /dev/null 2>&1
RET_TEXT="`$PG_CTL status -D $GP_DIR`"
RUNNING=`$ECHO $RET_TEXT|$EGREP -c "not running|neither"`
......
......@@ -148,11 +148,11 @@ class PgCtlBackendOptions(CmdArgs):
>>> str(PgCtlBackendOptions(5432, 1, 2))
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true'
>>> str(PgCtlBackendOptions(5432, 1, 2).set_master(False))
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true -i -M master --gp_contentid=-1'
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true -i --gp_contentid=-1'
>>> str(PgCtlBackendOptions(5432, 1, 2).set_master(True))
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true -i -M master --gp_contentid=-1 -E'
>>> str(PgCtlBackendOptions(5432, 1, 2).set_segment('mirror', 1))
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true -i -M mirror --gp_contentid=1'
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true -i --gp_contentid=-1 -E'
>>> str(PgCtlBackendOptions(5432, 1, 2).set_segment(1))
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true -i --gp_contentid=1'
>>> str(PgCtlBackendOptions(5432, 1, 2).set_special('upgrade'))
'-p 5432 --gp_dbid=1 --gp_num_contents_in_cluster=2 --silent-mode=true -U'
>>> str(PgCtlBackendOptions(5432, 1, 2).set_special('maintenance'))
......@@ -188,16 +188,15 @@ class PgCtlBackendOptions(CmdArgs):
"""
@param seqserver: start with seqserver?
"""
self.extend(["-i", "-M", "master", "--gp_contentid=-1"])
self.extend(["-i", "--gp_contentid=-1"])
if seqserver: self.append("-E")
return self
def set_segment(self, mode, content):
def set_segment(self, content):
"""
@param mode: mirroring mode
@param content: content id
"""
self.extend(["-i", "-M", "mirrorless", "--gp_contentid="+str(content)])
self.extend(["-i", "--gp_contentid="+str(content)])
return self
#
......@@ -367,7 +366,7 @@ class SegmentStart(Command):
# build backend options
b = PgCtlBackendOptions(port, dbid, numContentsInCluster)
b.set_segment(mirrormode, content)
b.set_segment(content)
b.set_utility(utilityMode)
b.set_special(specialMode)
......@@ -387,158 +386,6 @@ class SegmentStart(Command):
cmd=SegmentStart(name, gpdb, numContentsInCluster, era, mirrormode, utilityMode, ctxt=REMOTE, remoteHost=remoteHost)
cmd.run(validateAfter=True)
#-----------------------------------------------
class SendFilerepTransitionMessage(Command):
# see gpmirrortransition.c and primary_mirror_transition_client.h
TRANSITION_ERRCODE_SUCCESS = 0
TRANSITION_ERRCODE_ERROR_UNSPECIFIED = 1
TRANSITION_ERRCODE_ERROR_SERVER_DID_NOT_RETURN_DATA = 10
TRANSITION_ERRCODE_ERROR_PROTOCOL_VIOLATED = 11
TRANSITION_ERRCODE_ERROR_HOST_LOOKUP_FAILED = 12
TRANSITION_ERRCODE_ERROR_INVALID_ARGUMENT = 13
TRANSITION_ERRCODE_ERROR_READING_INPUT = 14
TRANSITION_ERRCODE_ERROR_SOCKET = 15
#
# note: this should be cleaned up -- there are two hosts involved,
# the host on which to run gp_primarymirror, AND the host to pass to gp_primarymirror -h
#
# Right now, it uses the same for both which is pretty wrong for anything but a local context.
#
def __init__(self, name, inputFile, port=None,ctxt=LOCAL, remoteHost=None, dataDir=None):
if not remoteHost:
remoteHost = "localhost"
self.cmdStr='$GPHOME/bin/gp_primarymirror -h %s -p %s -i %s' % (remoteHost,port,inputFile)
self.dataDir = dataDir
Command.__init__(self,name,self.cmdStr,ctxt,remoteHost)
@staticmethod
def local(name,inputFile,port=None,remoteHost=None):
cmd=SendFilerepTransitionMessage(name, inputFile, port, LOCAL, remoteHost)
cmd.run(validateAfter=True)
return cmd
@staticmethod
def buildTransitionMessageCommand(transitionData, dir, port):
dbData = transitionData["dbsByPort"][int(port)]
targetMode = dbData["targetMode"]
argsArr = []
argsArr.append(targetMode)
if targetMode == 'mirror' or targetMode == 'primary':
mode = dbData["mode"]
if mode == 'r' and dbData["fullResyncFlag"]:
# full resync requested, convert 'r' to 'f'
argsArr.append( 'f' )
else:
# otherwise, pass the mode through
argsArr.append( dbData["mode"])
argsArr.append( dbData["hostName"])
argsArr.append( "%d" % dbData["hostPort"])
argsArr.append( dbData["peerName"])
argsArr.append( "%d" % dbData["peerPort"])
argsArr.append( "%d" % dbData["peerPMPort"])
#
# write arguments to input file. We will leave this file around. It can be useful for debugging
#
inputFile = os.path.join( dir, "gp_pmtransition_args" )
writeLinesToFile(inputFile, argsArr)
return SendFilerepTransitionMessage("Changing seg at dir %s" % dir, inputFile, port=port, dataDir=dir)
class SendFilerepTransitionStatusMessage(Command):
def __init__(self, name, msg, dataDir=None, port=None,ctxt=LOCAL, remoteHost=None):
if not remoteHost:
remoteHost = "localhost"
self.cmdStr='$GPHOME/bin/gp_primarymirror -h %s -p %s' % (remoteHost,port)
self.dataDir = dataDir
logger.debug("Sending msg %s and cmdStr %s" % (msg, self.cmdStr))
Command.__init__(self, name, self.cmdStr, ctxt, remoteHost, stdin=msg)
def unpackSuccessLine(self):
"""
After run() has been called on this cmd, call this to find the "Success" data in the output
That line is returned if successful, otherwise None is returned
"""
res = self.get_results()
if res.rc != 0:
logger.warn("Error getting data stdout:\"%s\" stderr:\"%s\"" % \
(res.stdout.replace("\n", " "), res.stderr.replace("\n", " ")))
return None
else:
logger.info("Result: stdout:\"%s\" stderr:\"%s\"" % \
(res.stdout.replace("\n", " "), res.stderr.replace("\n", " ")))
line = res.stderr
if line.startswith("Success:"):
line = line[len("Success:"):]
return line
#-----------------------------------------------
class SendFilerepVerifyMessage(Command):
DEFAULT_IGNORE_FILES = [
'pg_internal.init', 'pgstat.stat', 'pga_hba.conf',
'pg_ident.conf', 'pg_fsm.cache', 'gp_dbid', 'gp_pmtransitions_args',
'gp_dump', 'postgresql.conf', 'postmaster.log', 'postmaster.opts',
'postmaser.pids', 'postgresql.conf.bak', 'core', 'wet_execute.tbl',
'recovery.done']
DEFAULT_IGNORE_DIRS = [
'pgsql_tmp', 'pg_xlog', 'pg_log', 'pg_stat_tmp', 'pg_changetracking', 'pg_verify', 'db_dumps', 'pg_utilitymodedtmredo', 'gpperfmon'
]
def __init__(self, name, host, port, token, full=None, verify_file=None, verify_dir=None,
abort=None, suspend=None, resume=None, ignore_dir=None, ignore_file=None,
results=None, results_level=None, ctxt=LOCAL, remoteHost=None):
"""
Sends gp_verify message to backend to either start or get results of a
mirror verification.
"""
self.host = host
self.port = port
msg_contents = ['gp_verify']
## The ordering of the following appends is critical. Do not rearrange without
## an associated change in gp_primarymirror
# full
msg_contents.append('true') if full else msg_contents.append('')
# verify_file
msg_contents.append(verify_file) if verify_file else msg_contents.append('')
# verify_dir
msg_contents.append(verify_dir) if verify_dir else msg_contents.append('')
# token
msg_contents.append(token)
# abort
msg_contents.append('true') if abort else msg_contents.append('')
# suspend
msg_contents.append('true') if suspend else msg_contents.append('')
# resume
msg_contents.append('true') if resume else msg_contents.append('')
# ignore_directory
ignore_dir_list = SendFilerepVerifyMessage.DEFAULT_IGNORE_DIRS + (ignore_dir.split(',') if ignore_dir else [])
msg_contents.append(','.join(ignore_dir_list))
# ignore_file
ignore_file_list = SendFilerepVerifyMessage.DEFAULT_IGNORE_FILES + (ignore_file.split(',') if ignore_file else [])
msg_contents.append(','.join(ignore_file_list))
# resultslevel
msg_contents.append(str(results_level)) if results_level else msg_contents.append('')
logger.debug("gp_verify message sent to %s:%s:\n%s" % (host, port, "\n".join(msg_contents)))
self.cmdStr='$GPHOME/bin/gp_primarymirror -h %s -p %s' % (host, port)
Command.__init__(self, name, self.cmdStr, ctxt, remoteHost, stdin="\n".join(msg_contents))
#-----------------------------------------------
class SegmentStop(Command):
def __init__(self, name, dataDir,mode='smart', nowait=False, ctxt=LOCAL,
......
......@@ -239,7 +239,7 @@ STOP_QE() {
START_QE() {
LOG_MSG "[INFO][$INST_COUNT]:-Starting Functioning instance on segment ${GP_HOSTADDRESS}"
PG_CTL_WAIT=$1
$TRUSTED_SHELL ${GP_HOSTADDRESS} "$EXPORT_LIB_PATH;export PGPORT=${GP_PORT}; $PG_CTL $PG_CTL_WAIT -l $GP_DIR/pg_log/startup.log -D $GP_DIR -o \"-i -p ${GP_PORT} -M mirrorless --gp_dbid=${GP_DBID} --gp_contentid=${GP_CONTENT} --gp_num_contents_in_cluster=${TOTAL_SEG}\" start" >> $LOG_FILE 2>&1
$TRUSTED_SHELL ${GP_HOSTADDRESS} "$EXPORT_LIB_PATH;export PGPORT=${GP_PORT}; $PG_CTL $PG_CTL_WAIT -l $GP_DIR/pg_log/startup.log -D $GP_DIR -o \"-i -p ${GP_PORT} --gp_dbid=${GP_DBID} --gp_contentid=${GP_CONTENT} --gp_num_contents_in_cluster=${TOTAL_SEG}\" start" >> $LOG_FILE 2>&1
RETVAL=$?
if [ $RETVAL -ne 0 ]; then
BACKOUT_COMMAND "$TRUSTED_SHELL $GP_HOSTADDRESS \"${EXPORT_LIB_PATH};export PGPORT=${GP_PORT}; $PG_CTL -w -D $GP_DIR -o \"-i -p ${GP_PORT}\" -m immediate stop\""
......
......@@ -58,7 +58,6 @@
#include "storage/shmem.h"
#include "miscadmin.h"
/*
* Define segment size. A page is the same BLCKSZ as is used everywhere
* else in Postgres. The segment size can be chosen somewhat arbitrarily;
......
......@@ -10593,7 +10593,6 @@ rm_redo_error_callback(void *arg)
pfree(buf.data);
}
#if 0 /* GPDB doesn't have online backup */
/*
* BackupInProgress: check if online backup mode is active
*
......@@ -10642,7 +10641,6 @@ CancelBackup(void)
BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
}
}
#endif
static char *
XLogLocationToBuffer(char *buffer, XLogRecPtr *loc, bool longFormat)
......
......@@ -355,12 +355,6 @@ AuxiliaryProcessMain(int argc, char *argv[])
case WalReceiverProcess:
statmsg = "wal receiver process";
break;
case FilerepProcess:
statmsg = "filerep process";
break;
case FilerepResetPeerProcess:
statmsg = "filerep reset peer process";
break;
default:
statmsg = "??? process";
break;
......@@ -401,13 +395,7 @@ AuxiliaryProcessMain(int argc, char *argv[])
* InitPostgres pushups, but there are a couple of things that need to get
* lit up even in an auxiliary process.
*/
/*
* FileRep Main process does not use LWLock and so PGPROC is not required
* to be initialized.
* It is temporary fix to handle that here as an exception.
*/
if (IsUnderPostmaster && (MyAuxProcType != FilerepProcess &&
MyAuxProcType != FilerepResetPeerProcess))
if (IsUnderPostmaster)
{
/*
* Create a PGPROC so we can use LWLocks. In the EXEC_BACKEND case,
......
......@@ -33,7 +33,6 @@
#include "executor/spi.h"
#include "postmaster/fts.h"
#include "postmaster/primary_mirror_mode.h"
#include "utils/faultinjection.h"
#include "utils/fmgroids.h"
......@@ -146,36 +145,6 @@ FtsNotifyProber(void)
}
}
/*
* Check if master needs to shut down
*/
bool
FtsMasterShutdownRequested()
{
return *ftsShutdownMaster;
}
/*
* Set flag indicating that master needs to shut down
*/
void
FtsRequestMasterShutdown()
{
#ifdef USE_ASSERT_CHECKING
Assert(!*ftsShutdownMaster);
PrimaryMirrorMode pm_mode;
getPrimaryMirrorStatusCodes(&pm_mode, NULL, NULL, NULL);
Assert(pm_mode == PMModeMaster);
#endif /* USE_ASSERT_CHECKING */
*ftsShutdownMaster = true;
}
/*
* Test-Connection: This is called from the threaded context inside the
* dispatcher: ONLY CALL THREADSAFE FUNCTIONS -- elog() is NOT threadsafe.
......
......@@ -940,24 +940,3 @@ CdbComponentDatabaseInfo *FtsGetPeerSegment(CdbComponentDatabases *cdbs,
return NULL;
}
/*
* Notify postmaster to shut down due to inconsistent segment state
*/
void FtsRequestPostmasterShutdown(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror)
{
FtsRequestMasterShutdown();
elog(FATAL, "FTS: detected invalid state for content=%d: "
"primary (dbid=%d, mode='%c', status='%c'), "
"mirror (dbid=%d, mode='%c', status='%c'), "
"shutting down master.",
primary->segindex,
primary->dbid,
primary->mode,
primary->status,
mirror->dbid,
mirror->mode,
mirror->status
);
}
......@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
OBJS = autovacuum.o bgwriter.o checkpointer.o fork_process.o seqserver.o pgarch.o pgstat.o \
postmaster.o primary_mirror_mode.o primary_mirror_transition_client.o syslogger.o \
postmaster.o syslogger.o \
perfmon.o backoff.o perfmon_segmentinfo.o \
sendalert.o alertseverity.o autostats.o walwriter.o
......
......@@ -44,7 +44,6 @@
#include "storage/pmsignal.h"
#include "utils/guc.h"
#include "utils/ps_status.h"
#include "postmaster/primary_mirror_mode.h"
/* ----------
* Timer definitions.
......
此差异已折叠。
此差异已折叠。
/**
* This code is used for clients that need to connect to the postmaster to send a so-called "transition" message
* Transition messages are special in that they do not require a full database backend to be executed; so they
* can run on a mirror segment that does not have a running database.
*/
#include "postmaster/primary_mirror_mode.h"
#include "postmaster/primary_mirror_transition_client.h"
#include "pgtime.h"
#include <unistd.h>
#include "libpq/pqcomm.h"
#include "libpq/ip.h"
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
/*
* These macros are needed to let error-handling code be portable between
* Unix and Windows. (ugh)
*
* macros taken from libpq-int.h
*/
#ifdef WIN32
#define SOCK_ERRNO (WSAGetLastError())
#define SOCK_STRERROR winsock_strerror
#define SOCK_ERRNO_SET(e) WSASetLastError(e)
#else
#define SOCK_ERRNO errno
#define SOCK_STRERROR pqStrerror
#define SOCK_ERRNO_SET(e) (errno = (e))
#endif
#define RESULT_BUFFER_SIZE 10000
#define ERROR_MESSAGE_SIZE 1000
#define SOCKET_ERROR_MESSAGE_SIZE 256
/*
* ConnectionInfo -- define the connection related info.
*/
typedef struct ConnectionInfo
{
struct addrinfo *addr;
int resultSock;
int resultStatus;
GpMonotonicTime startTime;
uint64 timeout_ms;
char resultBuffer[RESULT_BUFFER_SIZE];
int32 resultMsgSize;
char errMessage[ERROR_MESSAGE_SIZE];
char sockErrMessage[SOCKET_ERROR_MESSAGE_SIZE];
} ConnectionInfo;
static bool completedReceiving(ConnectionInfo *connInfo);
/*
* createSocket -- create the socket.
*
* Return true if the socket is created. Otherwise, return false.
*
* On success, the created socket is stored in resultSock in connInfo.
*
* The resultStatus in connInfo is set accordingly:
*
* TRANS_ERRCODE_ERROR_SOCKET on failure on socket().
* TRANS_ERRCODE_SUCCESS when a socket is created.
* TRANS_ERRCODE_INTERRUPT_REQUESTED if an interrupt is requested.
*/
static bool
createSocket(ConnectionInfo *connInfo,
PrimaryMirrorTransitionClientInfo *client)
{
struct addrinfo *currentAddr = connInfo->addr;
connInfo->resultStatus = TRANS_ERRCODE_SUCCESS;
connInfo->resultSock = -1;
while (currentAddr != NULL)
{
if (client->checkForNeedToExitFn())
{
connInfo->resultStatus = TRANS_ERRCODE_INTERRUPT_REQUESTED;
break;
}
connInfo->resultSock = socket(currentAddr->ai_family, SOCK_STREAM, 0);
if (connInfo->resultSock >= 0)
break;
if (currentAddr->ai_next == NULL)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed to create socket: %s (errno: %d)\n",
SOCK_STRERROR(SOCK_ERRNO, connInfo->sockErrMessage, sizeof(connInfo->sockErrMessage)),
SOCK_ERRNO);
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SOCKET;
break;
}
currentAddr = currentAddr->ai_next;
}
connInfo->addr = currentAddr;
if (connInfo->resultStatus != TRANS_ERRCODE_SUCCESS)
{
return false;
}
return true;
}
/*
* createConnection -- create the connection to a given socket.
*
* Return true if the connection is created. Otherwise, return false.
*
* The resultStatus in connInfo is set accordingly:
*
* TRANS_ERRCODE_ERROR_SOCKET on failure on connect().
* TRANS_ERRCODE_SUCCESS when a socket is connected.
* TRANS_ERRCODE_INTERRUPT_REQUESTED if an interrupt is requested.
*/
static bool
createConnection(ConnectionInfo *connInfo,
PrimaryMirrorTransitionClientInfo *client)
{
if (connInfo->resultSock < 0 ||
connInfo->addr == NULL)
return false;
connInfo->resultStatus = TRANS_ERRCODE_SUCCESS;
int status = connect(connInfo->resultSock, connInfo->addr->ai_addr, connInfo->addr->ai_addrlen);
while (status == -1)
{
if (client->checkForNeedToExitFn())
{
connInfo->resultStatus = TRANS_ERRCODE_INTERRUPT_REQUESTED;
break;
}
if (SOCK_ERRNO != EINTR && SOCK_ERRNO != EAGAIN)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed to connect: %s (errno: %d)\n",
SOCK_STRERROR(SOCK_ERRNO, connInfo->sockErrMessage, sizeof(connInfo->sockErrMessage)),
SOCK_ERRNO);
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SOCKET;
break;
}
status = connect(connInfo->resultSock, connInfo->addr->ai_addr, connInfo->addr->ai_addrlen);
}
return (connInfo->resultStatus == TRANS_ERRCODE_SUCCESS);
}
/*
* createConnectedSocket -- create a connected socket.
*/
static bool
createConnectedSocket(ConnectionInfo *connInfo,
PrimaryMirrorTransitionClientInfo *client)
{
struct addrinfo *currentAddr = connInfo->addr;
connInfo->resultStatus = TRANS_ERRCODE_SUCCESS;
while (currentAddr != NULL)
{
if (createSocket(connInfo, client) &&
createConnection(connInfo, client))
{
break;
}
if (connInfo->resultStatus == TRANS_ERRCODE_INTERRUPT_REQUESTED)
{
break;
}
if (connInfo->resultSock >= 0)
{
closesocket(connInfo->resultSock);
connInfo->resultSock = -1;
}
currentAddr = currentAddr->ai_next;
}
return (connInfo->resultStatus == TRANS_ERRCODE_SUCCESS);
}
/*
* getRemainingTimeout -- return the remaining timeout after subtracting
* the elapsed time since the start time.
*/
static uint64
getRemainingTimeout(GpMonotonicTime *startTime, uint64 timeout_ms)
{
uint64 elapsedTime_ms = gp_get_elapsed_ms(startTime);
if (elapsedTime_ms > timeout_ms)
return 0;
return timeout_ms - elapsedTime_ms;
}
/*
* sendFully -- send all dataLength of data to the target socket.
*
* This function returns true if the data is sent successfully.
* Otherwise, return false.
*
* The resultStatus is set accordingly:
* TRANS_ERRCODE_ERROR_SOCKET if the data can not all be sent.
* TRANS_ERRCODE_SUCCESS when the data is sent successfully.
* TRANS_ERRCODE_ERROR_TIMEOUT if the timeout is expired.
* TRANS_ERRCODE_INTERRUPT_REQUESTED if an interrupt is requested.
*/
static bool
sendFully(ConnectionInfo *connInfo,
PrimaryMirrorTransitionClientInfo *client,
char *data,
int dataLength)
{
if (connInfo->resultSock < 0)
return false;
connInfo->resultStatus = TRANS_ERRCODE_SUCCESS;
struct pollfd nfd;
nfd.fd = connInfo->resultSock;
nfd.events = POLLOUT;
int nfds = 1;
while (dataLength > 0)
{
if (client->checkForNeedToExitFn())
{
connInfo->resultStatus = TRANS_ERRCODE_INTERRUPT_REQUESTED;
break;
}
uint64 remainingTimeout = getRemainingTimeout(&connInfo->startTime, connInfo->timeout_ms);
int status = poll(&nfd, nfds, (int)remainingTimeout);
if (status == 0)
{
connInfo->resultStatus = TRANS_ERRCODE_ERROR_TIMEOUT;
break;
}
if (status == -1)
{
if (SOCK_ERRNO == EINTR || SOCK_ERRNO == EAGAIN)
{
continue;
}
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed to poll: %s (errno: %d)\n",
SOCK_STRERROR(SOCK_ERRNO, connInfo->sockErrMessage, sizeof(connInfo->sockErrMessage)),
SOCK_ERRNO);
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SOCKET;
break;
}
int numSent = send(connInfo->resultSock, data, dataLength, 0 /* flags */);
if (numSent < 0)
{
if (SOCK_ERRNO == EINTR || SOCK_ERRNO == EAGAIN)
{
continue;
}
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed to send: %s (errno: %d)\n",
SOCK_STRERROR(SOCK_ERRNO, connInfo->sockErrMessage, sizeof(connInfo->sockErrMessage)),
SOCK_ERRNO);
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SOCKET;
break;
}
else
{
data += numSent;
dataLength -= numSent;
}
}
return (connInfo->resultStatus == TRANS_ERRCODE_SUCCESS);
}
/*
* receiveMessage -- wait to receive results.
*
* This function returns true on success. Otherwise, return false.
*
* The resultStatus is set accordingly:
* TRANS_ERRCODE_SUCCESS when some data has been received.
* TRANS_ERRCODE_ERROR_TIMEOUT if the timeout is expired.
* TRANS_ERRCODE_INTERRUPT_REQUESTED if an interrupt is requested.
* TRANS_ERRCODE_ERROR_SERVER_DID_NOT_RETURN_DATA when no data is received.
* TRANS_ERRCODE_ERROR_PROTOCOL_VIOLATED if there are too much data to be received.
*/
static bool
receiveMessage(ConnectionInfo *connInfo,
PrimaryMirrorTransitionClientInfo *client)
{
if (connInfo->resultSock < 0)
{
return false;
}
connInfo->resultMsgSize = 0;
struct pollfd nfd;
nfd.fd = connInfo->resultSock;
nfd.events = POLLIN;
int nfds = 1;
int32 bufferSizeLeft = sizeof(connInfo->resultBuffer);
connInfo->resultStatus = TRANS_ERRCODE_SUCCESS;
while (bufferSizeLeft > 0)
{
if (client->checkForNeedToExitFn())
{
connInfo->resultStatus = TRANS_ERRCODE_INTERRUPT_REQUESTED;
break;
}
uint64 remainingTimeout = getRemainingTimeout(&connInfo->startTime, connInfo->timeout_ms);
int status = poll(&nfd, nfds, (int)remainingTimeout);
if (status == 0)
{
connInfo->resultStatus = TRANS_ERRCODE_ERROR_TIMEOUT;
break;
}
if (status < 0)
{
if (SOCK_ERRNO == EINTR || SOCK_ERRNO == EAGAIN)
{
continue;
}
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed to poll: %s (errno: %d)\n",
SOCK_STRERROR(SOCK_ERRNO, connInfo->sockErrMessage, sizeof(connInfo->sockErrMessage)),
SOCK_ERRNO);
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SOCKET;
break;
}
int numRecv = recv(connInfo->resultSock,
connInfo->resultBuffer + connInfo->resultMsgSize,
bufferSizeLeft,
0 /* flags */);
if (numRecv == 0)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"peer shut down connection before response was fully received\n");
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SOCKET;
break;
}
if (numRecv < 0)
{
if (SOCK_ERRNO == EINTR || SOCK_ERRNO == EAGAIN)
{
continue;
}
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed to receive response: %s (errno: %d)\n",
SOCK_STRERROR(SOCK_ERRNO, connInfo->sockErrMessage, sizeof(connInfo->sockErrMessage)),
SOCK_ERRNO);
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SOCKET;
break;
}
connInfo->resultMsgSize += numRecv;
bufferSizeLeft -= numRecv;
/* check if response was fully received */
if (completedReceiving(connInfo))
{
break;
}
if (bufferSizeLeft == 0)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed: response from server is too large\n");
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_PROTOCOL_VIOLATED;
break;
}
}
return (connInfo->resultStatus == TRANS_ERRCODE_SUCCESS);
}
/*
* Check if segment response has been fully received
*/
static bool
completedReceiving(ConnectionInfo *connInfo)
{
/* check if message header was received */
if (connInfo->resultMsgSize < 4)
{
return false;
}
/*
* message header contains message size;
* check if received message size is equal or bigger than expected one
*/
int32 expectedMsgSize = htonl(*(int32*)connInfo->resultBuffer);
return (expectedMsgSize <= connInfo->resultMsgSize);
}
/*
* processResponse -- process the response from the server.
*/
static void
processResponse(ConnectionInfo *connInfo,
PrimaryMirrorTransitionClientInfo *client)
{
if (connInfo->resultMsgSize < 4)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed: server did not respond with enough data");
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_SERVER_DID_NOT_RETURN_DATA;
return;
}
int32 expectedMsgSize = htonl(*(int32*)connInfo->resultBuffer);
if (expectedMsgSize != connInfo->resultMsgSize)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"failed: sent %d bytes, but received %d bytes",
expectedMsgSize, connInfo->resultMsgSize);
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_PROTOCOL_VIOLATED;
return;
}
if (connInfo->resultMsgSize == sizeof(connInfo->resultBuffer))
{
connInfo->resultMsgSize--; /* Make space for trailing '\0' */
}
connInfo->resultBuffer[connInfo->resultMsgSize] = '\0';
char *resultMsg = connInfo->resultBuffer + sizeof(expectedMsgSize);
client->receivedDataCallbackFn(resultMsg);
/* Anything equal to success or STARTING with success: is considered success */
if (strcmp(resultMsg, "Success") != 0 &&
strncmp(resultMsg, "Success:", sizeof("Success:") - 1) != 0)
{
connInfo->resultStatus = TRANS_ERRCODE_ERROR_UNSPECIFIED;
client->errorLogFn(resultMsg);
}
}
/**
* Send the given data to the given address(es).
*
* addr may be a chain of addresses
*
* if data is NULL then we don't send any data...we just close immediately
*
* @return the error code
*/
int
sendTransitionMessage(PrimaryMirrorTransitionClientInfo *client,
struct addrinfo *addr,
void *data,
int dataLength,
int maxRetries,
int transitionTimeout)
{
int retry;
int save_errno = SOCK_ERRNO;
int retrySleepTimeSeconds = 1;
ConnectionInfo *connInfo = (ConnectionInfo *)malloc(sizeof(ConnectionInfo));
if (connInfo == NULL)
{
client->errorLogFn("Out of memory\n");
return TRANS_ERRCODE_ERROR_UNSPECIFIED;
}
memset(connInfo, 0, sizeof(ConnectionInfo));
connInfo->addr = addr;
/* convert the transition timeout to milliseconds */
connInfo->timeout_ms = transitionTimeout * 1000;
struct
{
uint32 packetlen;
PrimaryMirrorTransitionPacket payload;
} transitionPacket;
/* create the transition request packet */
transitionPacket.packetlen = htonl((uint32) sizeof(transitionPacket));
transitionPacket.payload.protocolCode = (MsgType) htonl(PRIMARY_MIRROR_TRANSITION_REQUEST_CODE);
transitionPacket.payload.dataLength = htonl(dataLength);
for (retry = 0; retry < maxRetries; retry++)
{
gp_set_monotonic_begin_time(&connInfo->startTime);
/* reset the address since it may be modified in createConnectedSocket. */
connInfo->addr = addr;
if (retry > 0)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage),
"Retrying no %d\n", retry);
client->errorLogFn(connInfo->errMessage);
}
if (createConnectedSocket(connInfo, client) &&
sendFully(connInfo, client, (char *)&transitionPacket, sizeof(transitionPacket)) &&
sendFully(connInfo, client, (char *)data, dataLength) &&
receiveMessage(connInfo, client))
{
/*
* Successfully send the transition request packet and receive the
* response from the server.
*/
processResponse(connInfo, client);
break;
}
if (connInfo->resultStatus == TRANS_ERRCODE_INTERRUPT_REQUESTED ||
connInfo->resultStatus == TRANS_ERRCODE_ERROR_PROTOCOL_VIOLATED ||
connInfo->resultStatus == TRANS_ERRCODE_ERROR_SERVER_DID_NOT_RETURN_DATA)
{
break;
}
if (connInfo->resultStatus == TRANS_ERRCODE_ERROR_TIMEOUT)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage), "failure: timeout\n");
client->errorLogFn(connInfo->errMessage);
}
if (connInfo->resultStatus != TRANS_ERRCODE_SUCCESS)
{
if (connInfo->resultSock >= 0)
{
closesocket(connInfo->resultSock);
connInfo->resultSock = -1;
}
sleep(retrySleepTimeSeconds);
}
}
if (connInfo->resultStatus == TRANS_ERRCODE_INTERRUPT_REQUESTED)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage), "failure: interrupted\n");
client->errorLogFn(connInfo->errMessage);
}
if (connInfo->resultStatus == TRANS_ERRCODE_ERROR_TIMEOUT)
{
snprintf(connInfo->errMessage, sizeof(connInfo->errMessage), "failure: timeout\n");
client->errorLogFn(connInfo->errMessage);
connInfo->resultStatus = TRANS_ERRCODE_ERROR_UNSPECIFIED;
}
if (connInfo->resultSock >= 0)
closesocket(connInfo->resultSock);
SOCK_ERRNO_SET(save_errno);
int resultStatus = connInfo->resultStatus;
free(connInfo);
return resultStatus;
}
......@@ -79,7 +79,6 @@
#include "utils/resowner.h"
#include "utils/workfile_mgr.h"
#include "utils/faultinjector.h"
#include "postmaster/primary_mirror_mode.h"
// Provide some indirection here in case we have problems with lseek and
// 64 bits on some platforms
......
......@@ -31,7 +31,6 @@
#include "postmaster/autovacuum.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
#include "postmaster/primary_mirror_mode.h"
#include "postmaster/seqserver.h"
#include "replication/walsender.h"
#include "replication/walreceiver.h"
......@@ -154,7 +153,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, SInvalShmemSize());
size = add_size(size, PMSignalShmemSize());
size = add_size(size, ProcSignalShmemSize());
size = add_size(size, primaryMirrorModeShmemSize());
//size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, FtsShmemSize());
size = add_size(size, tmShmemSize());
......@@ -243,8 +241,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
*/
InitShmemIndex();
primaryMirrorModeShmemInit();
/*
* Set up xlog, clog, and buffers
*/
......@@ -274,11 +270,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
*/
ResManagerShmemInit();
/*
* Set up process table
*/
if (!IsUnderPostmaster)
{
/* Set up process table */
InitProcGlobal(PostmasterGetMppLocalProcessCounter());
}
InitProcGlobal();
/* Initialize SessionState shared memory array */
SessionState_ShmemInit();
......
......@@ -176,7 +176,7 @@ ProcGlobalSemas(void)
* pointers must be propagated specially for EXEC_BACKEND operation.
*/
void
InitProcGlobal(int mppLocalProcessCounter)
InitProcGlobal(void)
{
PGPROC *procs;
int i;
......@@ -204,7 +204,7 @@ InitProcGlobal(int mppLocalProcessCounter)
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
ProcGlobal->mppLocalProcessCounter = mppLocalProcessCounter;
ProcGlobal->mppLocalProcessCounter = 0;
/*
* Pre-create the PGPROC structures and create a semaphore for each.
......@@ -2077,18 +2077,6 @@ ResLockWaitCancel(void)
return;
}
bool ProcGetMppLocalProcessCounter(int *mppLocalProcessCounter)
{
Assert(mppLocalProcessCounter != NULL);
if (ProcGlobal == NULL)
return false;
*mppLocalProcessCounter = ProcGlobal->mppLocalProcessCounter;
return true;
}
bool ProcCanSetMppSessionId(void)
{
if (ProcGlobal == NULL || MyProc == NULL)
......
......@@ -100,7 +100,6 @@
#include "pgstat.h"
#include "executor/nodeFunctionscan.h"
#include "postmaster/primary_mirror_mode.h"
#include "utils/session_state.h"
#include "utils/vmem_tracker.h"
......
......@@ -26,7 +26,6 @@
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbfts.h"
#include "postmaster/primary_mirror_mode.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
......@@ -699,7 +698,7 @@ gp_add_master_standby(PG_FUNCTION_ARGS)
new.db.port = PG_GETARG_INT32(3);
add_segment_config_entry(GpIdentity.dbid, &new);
heap_close(gprel, NoLock);
PG_RETURN_INT16(new.db.dbid);
......@@ -840,13 +839,6 @@ gp_activate_standby(void)
catalog_activate_standby(olddbid, newdbid);
/*
* Tell postmaster to change dbid to the new one. This should come last
* after completing catalog change, as the new value will reside after
* transaction abort or PANIC.
*/
primaryMirrorSetNewDbid(newdbid);
/* done */
return true;
}
......
......@@ -37,7 +37,6 @@
#include "utils/faultinjector.h"
#include "utils/hsearch.h"
#include "miscadmin.h"
#include "postmaster/primary_mirror_mode.h"
#ifdef FAULT_INJECTOR
......@@ -434,15 +433,6 @@ FaultInjector_InjectFaultNameIfSet(
for (ii=0; ii < cnt; ii++)
{
pg_usleep(1000000L); // sleep for 1 sec (1 sec * 3600 = 1 hour)
SegmentState_e segmentState;
getFileRepRoleAndState(NULL, &segmentState, NULL, NULL, NULL);
if (segmentState == SegmentStateImmediateShutdown ||
segmentState == SegmentStateShutdown ||
IsFtsShudownRequested())
{
break;
}
}
break;
case FaultInjectorTypeDataCorruption:
......
......@@ -24,7 +24,6 @@
#include "nodes/print.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "postmaster/primary_mirror_mode.h"
#define WORKFILE_SET_MASK "XXXXXXXXXX"
......
......@@ -17,8 +17,7 @@ unittest-check:
$(MAKE) -C pg_dump/test check
DIRS = initdb pg_basebackup pg_ctl pg_dump \
psql scripts pg_config pg_controldata pg_resetxlog \
gpmirrortransition
psql scripts pg_config pg_controldata pg_resetxlog
ifeq ($(PORTNAME), win32)
DIRS+=pgevent
......
#-------------------------------------------------------------------------
#
# Makefile for src/bin/gpmirrortransition
#
# Portions Copyright (c) 2009 Greenplum Inc
# Portions Copyright (c) 2012-Present Pivotal Software, Inc.
#
# This Makefile was copied from the pg_dump makefile and modified accordingly
#
# IDENTIFICATION
# src/bin/gpmirrortransition/Makefile
#
#-------------------------------------------------------------------------
PGFILEDESC = "gp_primarymirror - inform a segment of a change in primary/mirror status"
subdir = src/bin/gpmirrortransition
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
# The frontend doesn't need everything that's in LIBS, some are backend only
LIBS := $(filter-out -lresolv, $(LIBS))
# This program isn't interactive, so doesn't need these
LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses -lcurl -lssl -lcrypto, $(LIBS))
# the use of tempnam in pg_backup_tar.c causes a warning when using newer versions of GCC
override CPPFLAGS := -Wno-deprecated-declarations -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
OBJS=gpmirrortransition.o $(WIN32RES)
EXTRA_OBJS = $(top_builddir)/src/backend/libpq/ip.o $(top_builddir)/src/backend/postmaster/primary_mirror_transition_client.o $(top_builddir)/src/timezone/gptime.o
all: submake-libpq submake-libpgport submake-backend gp_primarymirror
gp_primarymirror: gpmirrortransition.o $(OBJS) $(EXTRA_OBJS) $(libpq_builddir)/libpq.a
$(CC) $(CFLAGS) $(OBJS) $(EXTRA_OBJS) $(libpq_pgport) $(LDFLAGS) $(LIBS) -o $@$(X)
.PHONY: submake-backend
submake-backend:
$(MAKE) -C $(top_builddir)/src/backend/libpq ip.o
install: all installdirs
$(INSTALL_PROGRAM) gp_primarymirror$(X) '$(DESTDIR)$(bindir)'/gp_primarymirror$(X)
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f $(addprefix '$(DESTDIR)$(bindir)'/, gp_primarymirror$(X))
clean distclean maintainer-clean:
rm -f gp_primarymirror$(X) $(OBJS) gpmirrortransition.o
......@@ -55,7 +55,6 @@
#include "access/xlog_internal.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "postmaster/primary_mirror_mode.h"
extern int optind;
extern char *optarg;
......
......@@ -75,7 +75,7 @@ typedef struct
typedef enum CAC_state
{
CAC_OK, CAC_STARTUP, CAC_SHUTDOWN, CAC_RECOVERY, CAC_TOOMANY, CAC_MIRROR_OR_QUIESCENT,
CAC_OK, CAC_STARTUP, CAC_SHUTDOWN, CAC_RECOVERY, CAC_TOOMANY,
CAC_WAITBACKUP, CAC_MIRROR_READY
} CAC_state;
......
......@@ -210,19 +210,6 @@ typedef struct CancelRequestPacket
*/
#define NEGOTIATE_SSL_CODE PG_PROTOCOL(1234,5679)
/*
* Filerep Add a pre-startup message primary-mirror-transition-request,
* and a primary-mirror-transition-query
*/
#define PRIMARY_MIRROR_TRANSITION_REQUEST_CODE PG_PROTOCOL(1234,5680)
#define PRIMARY_MIRROR_TRANSITION_QUERY_CODE PG_PROTOCOL(1234,5681)
typedef struct PrimaryMirrorTransitionPacket
{
MsgType protocolCode;
PacketLen dataLength;
} PrimaryMirrorTransitionPacket;
/* the number of times trying to acquire the send mutex for the front
* end connection after detecting process is exitting */
#define PQ_BUSY_TEST_COUNT_IN_EXITING 5
......
......@@ -489,8 +489,6 @@ typedef enum
CheckpointerProcess,
WalWriterProcess,
WalReceiverProcess,
FilerepProcess,
FilerepResetPeerProcess
} AuxProcType;
extern AuxProcType MyAuxProcType; /* bootstrap.c */
......
......@@ -170,13 +170,6 @@ extern bool FtsIsActive(void);
extern void HandleFtsMessage(const char* query_string);
extern void FtsWalRepMessageSegments(fts_context *context);
/*
* Interface for requesting master to shut down
*/
extern void FtsRequestPostmasterShutdown(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror);
extern bool FtsMasterShutdownRequested(void);
extern void FtsRequestMasterShutdown(void);
/*
* If master has requested FTS to shutdown.
*/
......
......@@ -69,20 +69,5 @@ extern void ShmemBackendArrayAllocation(void);
/* CDB */
typedef int (PMSubStartCallback)(void);
extern bool GPIsSegmentDatabase(void);
extern bool GPAreFileReplicationStructuresRequired(void);
extern int PostmasterGetMppLocalProcessCounter(void);
extern void StartMasterOrPrimaryPostmasterProcesses(void);
extern void SignalShutdownFilerepProcess(void);
extern void SignalShutdownFilerepBackendProcesses(void);
extern bool IsFilerepBackendsDoneShutdown(void);
extern void NotifyProcessesOfFilerepStateChange(void);
extern void StartFilerepProcesses(void);
extern bool IsFilerepProcessRunning(void);
extern void SetFilerepPeerResetResult(bool success);
extern bool IsDatabaseInRunMode(void);
extern char *processTransitionRequest_faultInject(
void * inputBuf, int *offsetPtr, int length);
#endif /* _POSTMASTER_H */
此差异已折叠。
/*-------------------------------------------------------------------------
*
* primary_mirror_transition_client.h
* Exports from primary_mirror_transition_client.c.
*
* Portions Copyright (c) 2010, Greenplum inc
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
*
* IDENTIFICATION
* src/include/postmaster/primary_mirror_transition_client.h
*
*-------------------------------------------------------------------------
*/
#ifndef _PRIMARY_MIRROR_TRANSITION_CLIENT_H
#define _PRIMARY_MIRROR_TRANSITION_CLIENT_H
/** These codes can be returned by sendTransitionMessage. Note that some of these error codes are only used
* by the external client program for sending transition messages -- gpmirrortransition.c */
/* NOTE! These codes also exist in python code -- gp.py */
#define TRANS_ERRCODE_SUCCESS 0
#define TRANS_ERRCODE_ERROR_UNSPECIFIED 1
#define TRANS_ERRCODE_ERROR_SERVER_DID_NOT_RETURN_DATA 10
#define TRANS_ERRCODE_ERROR_PROTOCOL_VIOLATED 11
#define TRANS_ERRCODE_ERROR_HOST_LOOKUP_FAILED 12
#define TRANS_ERRCODE_ERROR_INVALID_ARGUMENT 13
#define TRANS_ERRCODE_ERROR_READING_INPUT 14
#define TRANS_ERRCODE_ERROR_SOCKET 15
/* this code is for when transitionCheckShouldExitFunction returns true and we exit because of that */
#define TRANS_ERRCODE_INTERRUPT_REQUESTED 16
/* NOTE! These codes above also exist in python code -- gp.py */
/*
* The timeout occurs when requesting a segment state transition.
* Only used in gpmirrortransition.c.
*/
#define TRANS_ERRCODE_ERROR_TIMEOUT 17
typedef void (*transitionErrorLoggingFunction)(char *str);
typedef void (*transitionReceivedDataFunction)(char *buf);
typedef bool (*transitionCheckShouldExitFunction)(void);
typedef struct
{
transitionErrorLoggingFunction errorLogFn;
/**
* This function will be called with the data received (regardless of whether it's a Success: result or not).
*
* The function will not be called if there is a socket or other error during the sending of the message.
*
* The buffer passed to the function should NOT be stored -- copy the data out if you need to keep it around
*/
transitionReceivedDataFunction receivedDataCallbackFn;
/**
* This function will be called before entering any system call to see if we should exit
* the transition attempt instead.
* If the function returns true, the transition will be exited as soon as possible
*/
transitionCheckShouldExitFunction checkForNeedToExitFn;
} PrimaryMirrorTransitionClientInfo;
extern int sendTransitionMessage(PrimaryMirrorTransitionClientInfo *client, struct addrinfo *addr,
void *data, int dataLength, int maxRetries, int retrySleepTimeSeconds);
#endif // _PRIMARY_MIRROR_TRANSITION_CLIENT_H
......@@ -33,19 +33,6 @@ typedef enum
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_FILEREP_STATE_CHANGE, /* filerep is reporting state change */
PMSIGNAL_PRIMARY_MIRROR_TRANSITION_RECEIVED, /* a primary mirror transition has been received by a backend */
PMSIGNAL_PRIMARY_MIRROR_ALL_BACKENDS_SHUTDOWN, /* filerep has shut down all backends */
/* a filerep subprocess crashed in a way that requires postmaster reset */
PMSIGNAL_POSTMASTER_RESET_FILEREP,
/* peer segment requested postmaster reset */
PMSIGNAL_POSTMASTER_RESET_BY_PEER,
PMSIGNAL_SEGCONFIG_CHANGE, /* segment configuration hs changed */
NUM_PMSIGNALS /* Must be last value of enum! */
} PMSignalReason;
......
......@@ -232,7 +232,7 @@ extern volatile bool cancel_from_timeout;
*/
extern int ProcGlobalSemas(void);
extern Size ProcGlobalShmemSize(void);
extern void InitProcGlobal(int mppLocalProcessCounter);
extern void InitProcGlobal(void);
extern void InitProcess(void);
extern void InitProcessPhase2(void);
extern void InitAuxiliaryProcess(void);
......@@ -257,7 +257,6 @@ extern bool DisableClientWaitTimeoutInterrupt(void);
extern int ResProcSleep(LOCKMODE lockmode, LOCALLOCK *locallock, void *incrementSet);
extern void ResLockWaitCancel(void);
extern bool ProcGetMppLocalProcessCounter(int *mppLocalProcessCounter);
extern bool ProcCanSetMppSessionId(void);
extern void ProcNewMppSessionId(int *newSessionId);
......
......@@ -6,7 +6,7 @@ CREATE
-- end_ignore
create or replace function pg_ctl(datadir text, command text, port int, contentid int) returns text as $$ import subprocess
cmd = 'pg_ctl -D %s ' % datadir if command in ('stop', 'restart'): cmd = cmd + '-w -m immediate %s' % command elif command == 'start': opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -M mirrorless -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid) cmd = cmd + '-o "%s" start' % opts else: return 'Invalid command input'
cmd = 'pg_ctl -D %s ' % datadir if command in ('stop', 'restart'): cmd = cmd + '-w -m immediate %s' % command elif command == 'start': opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid) cmd = cmd + '-o "%s" start' % opts else: return 'Invalid command input'
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).replace('.', '') $$ language plpythonu;
CREATE
......
......@@ -10,7 +10,7 @@ CREATE
-- end_ignore
create or replace function pg_ctl(datadir text, command text, port int, contentid int) returns text as $$ import subprocess
cmd = 'pg_ctl -D %s ' % datadir if command in ('stop', 'restart'): cmd = cmd + '-w -m immediate %s' % command elif command == 'start': opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -M mirrorless -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid) cmd = cmd + '-o "%s" start' % opts else: return 'Invalid command input'
cmd = 'pg_ctl -D %s ' % datadir if command in ('stop', 'restart'): cmd = cmd + '-w -m immediate %s' % command elif command == 'start': opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid) cmd = cmd + '-o "%s" start' % opts else: return 'Invalid command input'
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).replace('.', '') $$ language plpythonu;
CREATE
......
......@@ -4,7 +4,7 @@ CREATE
-- end_ignore
create or replace function pg_ctl(datadir text, command text, port int, contentid int) returns text as $$ import subprocess
cmd = 'pg_ctl -D %s ' % datadir if command in ('stop', 'restart'): cmd = cmd + '-w -m immediate %s' % command elif command == 'start': opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -M mirrorless -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid) cmd = cmd + '-o "%s" start' % opts else: return 'Invalid command input'
cmd = 'pg_ctl -D %s ' % datadir if command in ('stop', 'restart'): cmd = cmd + '-w -m immediate %s' % command elif command == 'start': opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid) cmd = cmd + '-o "%s" start' % opts else: return 'Invalid command input'
return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True).replace('.', '') $$ language plpythonu;
CREATE
......@@ -57,7 +57,7 @@ Mirror content 0: 2018-01-10 23:42:05.009465: Running command... mkdir /Users/a
Mirror content 0:
Mirror content 0: 2018-01-10 23:42:05.020184: Initialized mirror at /Users/aagrawal/workspace/gpdata/dbfast1/demoDataDir0
Segment primary content 0: 2018-01-10 23:42:05.020494: Running command... pg_ctl -D /Users/aagrawal/workspace/gpdata/dbfast1/demoDataDir0 -o '-p 25432 --gp_dbid=0 --silent-mode=true -i -M mirrorless --gp_contentid=0 --gp_num_contents_in_cluster=3' start
Segment primary content 0: 2018-01-10 23:42:05.020494: Running command... pg_ctl -D /Users/aagrawal/workspace/gpdata/dbfast1/demoDataDir0 -o '-p 25432 --gp_dbid=0 --silent-mode=true -i --gp_contentid=0 --gp_num_contents_in_cluster=3' start
Segment primary content 0: server starting
Segment primary content 0:
Segment primary content 0: 2018-01-10 23:42:05.048563: Running command... pg_ctl -D /Users/aagrawal/workspace/gpdata/dbfast1/demoDataDir0 status
......@@ -151,7 +151,7 @@ Mirror content 0: 2018-01-10 23:42:14.326265: Running command... mkdir /Users/a
Mirror content 0:
Mirror content 0: 2018-01-10 23:42:14.335604: Initialized mirror at /Users/aagrawal/workspace/gpdata/dbfast_mirror1/demoDataDir0
Segment mirror content 0: 2018-01-10 23:42:14.335990: Running command... pg_ctl -D /Users/aagrawal/workspace/gpdata/dbfast_mirror1/demoDataDir0 -o '-p 25435 --gp_dbid=0 --silent-mode=true -i -M mirrorless --gp_contentid=0 --gp_num_contents_in_cluster=3' start
Segment mirror content 0: 2018-01-10 23:42:14.335990: Running command... pg_ctl -D /Users/aagrawal/workspace/gpdata/dbfast_mirror1/demoDataDir0 -o '-p 25435 --gp_dbid=0 --silent-mode=true -i --gp_contentid=0 --gp_num_contents_in_cluster=3' start
Segment mirror content 0: server starting
Segment mirror content 0:
Segment mirror content 0: 2018-01-10 23:42:14.362829: Running command... pg_ctl -D /Users/aagrawal/workspace/gpdata/dbfast_mirror1/demoDataDir0 status
......
......@@ -12,7 +12,7 @@ returns text as $$
if command in ('stop', 'restart'):
cmd = cmd + '-w -m immediate %s' % command
elif command == 'start':
opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -M mirrorless -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid)
opts = '-p %d -\-gp_dbid=0 -\-silent-mode=true -i -\-gp_contentid=%d -\-gp_num_contents_in_cluster=3' % (port, contentid)
cmd = cmd + '-o "%s" start' % opts
else:
return 'Invalid command input'
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册