提交 46dfa750 编写于 作者: G Gang Xiong

refactor gang management code

1) add one new type of gang: singleton reader gang.
2) change interface of allocateGang.
3) handling exceptions during gang creation: segment down and segment reset.
4) cleanup some dead code.
上级 82fd418e
......@@ -4201,14 +4201,14 @@ AbortTransaction(void)
if (QueryCancelCleanup)
{
QueryCancelCleanup = false;
cleanupIdleReaderGangs();
disconnectAndDestroyIdleReaderGangs();
}
/* If memprot decides to kill process, make sure we destroy all processes
* so that all mem/resource will be freed
*/
if(elog_geterrcode() == ERRCODE_GP_MEMPROT_KILL)
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
}
/*
......
......@@ -25,9 +25,35 @@ extern int pq_putmessage(char msgtype, const char *s, size_t len);
int gp_segment_connect_timeout = 180;
static const char* transStatusToString(PGTransactionStatusType status)
{
const char *ret = "";
switch (status)
{
case PQTRANS_IDLE:
ret = "idle";
break;
case PQTRANS_ACTIVE:
ret = "active";
break;
case PQTRANS_INTRANS:
ret = "idle, within transaction";
break;
case PQTRANS_INERROR:
ret = "idle, within failed transaction";
break;
case PQTRANS_UNKNOWN:
ret = "unknown transaction status";
break;
default:
Assert(false);
}
return ret;
}
static void MPPnoticeReceiver(void * arg, const PGresult * res)
{
PQExpBufferData msgbuf;
PQExpBufferData msgbuf;
PGMessageField *pfield;
int elevel = INFO;
char * sqlstate = "00000";
......@@ -35,18 +61,17 @@ static void MPPnoticeReceiver(void * arg, const PGresult * res)
char * file = "";
char * line = NULL;
char * func = "";
char message[1024];
char message[1024];
char * detail = NULL;
char * hint = NULL;
char * context = NULL;
SegmentDatabaseDescriptor *segdbDesc = (SegmentDatabaseDescriptor *) arg;
SegmentDatabaseDescriptor *segdbDesc = (SegmentDatabaseDescriptor *) arg;
if (!res)
return;
strcpy(message,"missing error text");
for (pfield = res->errFields; pfield != NULL; pfield = pfield->next)
{
switch (pfield->code)
......@@ -110,10 +135,10 @@ static void MPPnoticeReceiver(void * arg, const PGresult * res)
break;
default:
break;
}
}
if (elevel < client_min_messages && elevel != INFO)
return;
......@@ -125,7 +150,7 @@ static void MPPnoticeReceiver(void * arg, const PGresult * res)
* same reason.
*/
initPQExpBuffer(&msgbuf);
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
......@@ -197,15 +222,15 @@ static void MPPnoticeReceiver(void * arg, const PGresult * res)
appendPQExpBufferChar(&msgbuf, PG_DIAG_SOURCE_FUNCTION);
appendBinaryPQExpBuffer(&msgbuf, func, strlen(func)+1);
}
}
else
{
appendPQExpBuffer(&msgbuf, "%s: ", severity);
appendBinaryPQExpBuffer(&msgbuf, message, strlen(message));
appendPQExpBufferChar(&msgbuf, '\n');
appendPQExpBufferChar(&msgbuf, '\0');
......@@ -214,68 +239,65 @@ static void MPPnoticeReceiver(void * arg, const PGresult * res)
appendPQExpBufferChar(&msgbuf, '\0'); /* terminator */
pq_putmessage('N', msgbuf.data, msgbuf.len);
termPQExpBuffer(&msgbuf);
pq_flush();
}
pq_flush();
}
/* Initialize a QE connection descriptor in storage provided by the caller. */
void
cdbconn_initSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc,
struct CdbComponentDatabaseInfo *cdbinfo)
void cdbconn_initSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc,
struct CdbComponentDatabaseInfo *cdbinfo)
{
MemSet(segdbDesc, 0, sizeof(*segdbDesc));
MemSet(segdbDesc, 0, sizeof(*segdbDesc));
/* Segment db info */
segdbDesc->segment_database_info = cdbinfo;
segdbDesc->segindex = cdbinfo->segindex;
//segdbDesc->dbname = MyProcPort->database_name ? strdup(MyProcPort->database_name) : NULL;
//segdbDesc->username = MyProcPort->user_name ? strdup(MyProcPort->user_name) : NULL;
/* Segment db info */
segdbDesc->segment_database_info = cdbinfo;
segdbDesc->segindex = cdbinfo->segindex;
/* Connection info */
segdbDesc->conn = NULL;
segdbDesc->motionListener = 0;
segdbDesc->whoami = NULL;
segdbDesc->myAgent = NULL;
/* Connection info, set in function cdbconn_doConnect*/
segdbDesc->conn = NULL;
segdbDesc->motionListener = 0;
segdbDesc->backendPid = 0;
segdbDesc->myAgent = NULL;
/* Connection error info */
segdbDesc->errcode = 0;
initPQExpBuffer(&segdbDesc->error_message);
} /* cdbconn_initSegmentDescriptor */
/*whoami*/
segdbDesc->whoami = NULL;
/* Connection error info */
segdbDesc->errcode = 0;
initPQExpBuffer(&segdbDesc->error_message);
}
/* Free all memory owned by this segment descriptor. */
void
cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc)
/* Free memory of segment descriptor. */
void cdbconn_termSegmentDescriptor(SegmentDatabaseDescriptor *segdbDesc)
{
/* Free the error message buffer. */
segdbDesc->errcode = 0;
termPQExpBuffer(&segdbDesc->error_message);
/* Free connection info. */
if (segdbDesc->whoami)
{
free(segdbDesc->whoami);
segdbDesc->whoami = NULL;
}
} /* cdbconn_termSegmentDescriptor */
/* Connect to a QE as a client via libpq. */
bool /* returns true if connected */
cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc,
const char *gpqeid,
const char *options)
/* Free the error message buffer. */
segdbDesc->errcode = 0;
termPQExpBuffer(&segdbDesc->error_message);
if(segdbDesc->whoami != NULL)
{
pfree(segdbDesc->whoami);
segdbDesc->whoami = NULL;
}
} /* cdbconn_termSegmentDescriptor */
/*
* Connect to a QE as a client via libpq.
* returns true if connected.
*/
void cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc, const char *gpqeid,
const char *options)
{
CdbComponentDatabaseInfo *q = segdbDesc->segment_database_info;
#define MAX_KEYWORDS 10
#define MAX_INT_STRING_LEN 20
CdbComponentDatabaseInfo *cdbinfo = segdbDesc->segment_database_info;
const char *keywords[MAX_KEYWORDS];
const char *values[MAX_KEYWORDS];
int nkeywords = 0;
char portstr[20];
char timeoutstr[20];
char portstr[MAX_INT_STRING_LEN];
char timeoutstr[MAX_INT_STRING_LEN];
int nkeywords = 0;
keywords[nkeywords] = "gpqeid";
values[nkeywords] = gpqeid;
......@@ -292,71 +314,44 @@ cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc,
}
/*
* On the master, we must use UNIX domain sockets for security -- as it can
* be authenticated. See MPP-15802.
* For entry DB connection, we make sure both "hostaddr" and "host" are empty string.
* Or else, it will fall back to environment variables and won't use domain socket
* in function connectDBStart.
*
* For other QE connections, we set "hostaddr". "host" is not used.
*/
if (!(q->segindex == MASTER_CONTENT_ID &&
GpIdentity.segindex == MASTER_CONTENT_ID))
if (segdbDesc->segindex == MASTER_CONTENT_ID &&
GpIdentity.segindex == MASTER_CONTENT_ID)
{
/*
* First we pick the cached hostip if we have it.
*
* If we don't have a cached hostip, we use the host->address,
* if we don't have that we fallback to host->hostname.
*/
if (q->hostip != NULL)
{
keywords[nkeywords] = "hostaddr";
values[nkeywords] = q->hostip;
nkeywords++;
}
else if (q->address != NULL)
{
if (isdigit(q->address[0]))
{
keywords[nkeywords] = "hostaddr";
values[nkeywords] = q->address;
nkeywords++;
}
else
{
keywords[nkeywords] = "host";
values[nkeywords] = q->address;
nkeywords++;
}
}
else if (q->hostname == NULL)
{
keywords[nkeywords] = "host";
values[nkeywords] = "";
nkeywords++;
}
else if (isdigit(q->hostname[0]))
{
keywords[nkeywords] = "hostaddr";
values[nkeywords] = q->hostname;
nkeywords++;
}
else
{
keywords[nkeywords] = "host";
values[nkeywords] = q->hostname;
nkeywords++;
}
keywords[nkeywords] = "hostaddr";
values[nkeywords] = "";
nkeywords++;
}
else
{
Assert(cdbinfo->hostip != NULL);
keywords[nkeywords] = "hostaddr";
values[nkeywords] = cdbinfo->hostip;
nkeywords++;
}
snprintf(portstr, sizeof(portstr), "%u", q->port);
keywords[nkeywords] = "host";
values[nkeywords] = "";
nkeywords++;
snprintf(portstr, sizeof(portstr), "%u", cdbinfo->port);
keywords[nkeywords] = "port";
values[nkeywords] = portstr;
nkeywords++;
if (MyProcPort->database_name)
if (MyProcPort->database_name)
{
keywords[nkeywords] = "dbname";
values[nkeywords] = MyProcPort->database_name;
nkeywords++;
}
Assert(MyProcPort->user_name);
keywords[nkeywords] = "user";
values[nkeywords] = MyProcPort->user_name;
nkeywords++;
......@@ -369,117 +364,179 @@ cdbconn_doConnect(SegmentDatabaseDescriptor *segdbDesc,
keywords[nkeywords] = NULL;
values[nkeywords] = NULL;
Assert (nkeywords < MAX_KEYWORDS);
Assert(nkeywords < MAX_KEYWORDS);
/*
* Call libpq to connect
*/
segdbDesc->conn = PQconnectdbParams(keywords, values, false);
/*
* Call libpq to connect
*/
segdbDesc->conn = PQconnectdbParams(keywords, values, false);
/*
* Check for connection failure.
*/
if (PQstatus(segdbDesc->conn) == CONNECTION_BAD)
{
if (!segdbDesc->errcode)
segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
appendPQExpBuffer(&segdbDesc->error_message,
"Master unable to connect to %s with options %s: %s\n",
segdbDesc->whoami, options, PQerrorMessage(segdbDesc->conn));
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
/*
* Successfully connected.
*/
else
{
PQsetNoticeReceiver(segdbDesc->conn, &MPPnoticeReceiver, segdbDesc);
/* Command the QE to initialize its motion layer.
* Wait for it to respond giving us the TCP port number
* where it listens for connections from the gang below.
*/
segdbDesc->motionListener = PQgetQEdetail(segdbDesc->conn);
segdbDesc->backendPid = PQbackendPID(segdbDesc->conn);
if (segdbDesc->motionListener == -1)
{
segdbDesc->errcode = ERRCODE_GP_INTERNAL_ERROR;
appendPQExpBuffer(&segdbDesc->error_message,
"Internal error: No motion listener port for %s\n",
segdbDesc->whoami);
/* Build whoami string to identify the QE for use in messages. */
if(!cdbconn_setSliceIndex(segdbDesc, -1))
{
if (!segdbDesc->errcode)
segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
else
{
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("Connected to %s motionListener=%d with options: %s\n",
segdbDesc->whoami, segdbDesc->motionListener, options);
}
}
}
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
/* Disconnect from QE */
void cdbconn_disconnect(SegmentDatabaseDescriptor *segdbDesc)
{
if (PQstatus(segdbDesc->conn) != CONNECTION_BAD)
{
PGTransactionStatusType status = PQtransactionStatus(segdbDesc->conn);
/*
* Check for connection failure.
*/
if (PQstatus(segdbDesc->conn) == CONNECTION_BAD)
{
if (!segdbDesc->errcode)
segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
appendPQExpBuffer(&segdbDesc->error_message,
"Master unable to connect to %s with options %s\n",
segdbDesc->whoami,
PQerrorMessage(segdbDesc->conn));
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("%s\n", segdbDesc->error_message.data);
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
/*
* Successfully connected.
*/
else
{
PQsetNoticeReceiver(segdbDesc->conn, &MPPnoticeReceiver, segdbDesc);
/* Command the QE to initialize its motion layer.
* Wait for it to respond giving us the TCP port number
* where it listens for connections from the gang below.
*/
segdbDesc->motionListener = PQgetQEdetail(segdbDesc->conn);
segdbDesc->backendPid = PQbackendPID(segdbDesc->conn);
/* Don't use elog, it's not thread-safe */
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
write_log("Connected to %s motionListener=%d\n",
segdbDesc->whoami,
segdbDesc->motionListener);
}
return segdbDesc->conn != NULL;
} /* cdbconn_doConnect */
/* Build text to identify this QE in error messages. */
bool
cdbconn_setSliceIndex(SegmentDatabaseDescriptor *segdbDesc,
int sliceIndex)
if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
elog(LOG, "Finishing connection with %s; %s", segdbDesc->whoami, transStatusToString(status));
elog((Debug_print_full_dtm ? LOG : (gp_log_gang >= GPVARS_VERBOSITY_DEBUG ? LOG : DEBUG5)),
"disconnectAndDestroyGang: got QEDistributedTransactionId = %u, QECommandId = %u, and QEDirty = %s",
segdbDesc->conn->QEWriter_DistributedTransactionId,
segdbDesc->conn->QEWriter_CommandId,
(segdbDesc->conn->QEWriter_Dirty ? "true" : "false"));
if (status == PQTRANS_ACTIVE)
{
char errbuf[256];
PGcancel *cn = PQgetCancel(segdbDesc->conn);
if (Debug_cancel_print || gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
elog(LOG, "Calling PQcancel for %s", segdbDesc->whoami);
if (PQcancel(cn, errbuf, 256) == 0)
elog(LOG, "Unable to cancel %s: %s", segdbDesc->whoami, errbuf);
PQfreeCancel(cn);
}
PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL;
}
}
/*
* Read result from connection and discard it.
*
* Retry at most N times.
*
* Return false if there'er still leftovers.
*/
bool cdbconn_discardResults(SegmentDatabaseDescriptor *segdbDesc,
int retryCount)
{
CdbComponentDatabaseInfo *q = segdbDesc->segment_database_info;
PQExpBuffer scratchbuf = &segdbDesc->error_message;
int scratchoff = scratchbuf->len;
Assert(scratchbuf->len < 300000);
/* Format the identity of the segment db. */
if (q->segindex >= 0)
{
appendPQExpBuffer(scratchbuf, "seg%d", q->segindex);
/* Format the slice index. */
if (sliceIndex > 0)
appendPQExpBuffer(scratchbuf, " slice%d", sliceIndex);
}
else
appendPQExpBuffer(scratchbuf,
SEGMENT_IS_ACTIVE_PRIMARY(q) ? "entry db" : "mirror entry db");
/* Format the connection info. */
appendPQExpBuffer(scratchbuf, " %s:%d",
q->hostname, q->port);
/* If connected, format the QE's process id. */
if (segdbDesc->conn)
{
int pid = PQbackendPID(segdbDesc->conn);
if (pid)
appendPQExpBuffer(scratchbuf, " pid=%d", pid);
}
/* Store updated whoami text. */
if (segdbDesc->whoami != NULL)
free(segdbDesc->whoami);
segdbDesc->whoami = strdup(scratchbuf->data + scratchoff);
if(!segdbDesc->whoami)
{
appendPQExpBuffer(scratchbuf, " Error: Out of Memory");
return false;
}
/* Give back our scratch space at tail of error_message buffer. */
truncatePQExpBuffer(scratchbuf, scratchoff);
return true;
} /* cdbconn_setSliceIndex */
PGresult *pRes = NULL;
ExecStatusType stat;
int i = 0;
/* PQstatus() is smart enough to handle NULL */
while (NULL != (pRes = PQgetResult(segdbDesc->conn)))
{
stat = PQresultStatus(pRes);
PQclear(pRes);
elog(LOG, "(%s) Leftover result at freeGang time: %s %s", segdbDesc->whoami,
PQresStatus(stat),
PQerrorMessage(segdbDesc->conn));
if (stat == PGRES_FATAL_ERROR || stat == PGRES_BAD_RESPONSE)
return true;
if (i++ > retryCount)
return false;
}
return true;
}
/* Return if it's a bad connection */
bool cdbconn_isBadConnection(SegmentDatabaseDescriptor *segdbDesc)
{
return PQstatus(segdbDesc->conn) == CONNECTION_BAD;
}
/* Reset error message buffer */
void cdbconn_resetQEErrorMessage(SegmentDatabaseDescriptor *segdbDesc)
{
segdbDesc->errcode = 0;
resetPQExpBuffer(&segdbDesc->error_message);
}
/*
* Build text to identify this QE in error messages.
* Don't call this function in threads.
*/
void setQEIdentifier(SegmentDatabaseDescriptor *segdbDesc,
int sliceIndex, MemoryContext mcxt)
{
CdbComponentDatabaseInfo *cdbinfo = segdbDesc->segment_database_info;
MemoryContext oldContext = MemoryContextSwitchTo(mcxt);
StringInfo string = makeStringInfo();
/* Format the identity of the segment db. */
if (segdbDesc->segindex >= 0)
{
appendStringInfo(string, "seg%d", segdbDesc->segindex);
/* Format the slice index. */
if (sliceIndex > 0)
appendStringInfo(string, " slice%d", sliceIndex);
}
else
appendStringInfo(string, SEGMENT_IS_ACTIVE_PRIMARY(cdbinfo) ? "entry db" : "mirror entry db");
/* Format the connection info. */
appendStringInfo(string, " %s:%d", cdbinfo->hostip, cdbinfo->port);
/* If connected, format the QE's process id. */
if (segdbDesc->backendPid != 0)
appendStringInfo(string, " pid=%d", segdbDesc->backendPid);
segdbDesc->whoami = string->data;
pfree(string);
MemoryContextSwitchTo(oldContext);
}
......@@ -206,12 +206,12 @@ void
FtsReConfigureMPP(bool create_new_gangs)
{
/* need to scan to pick up the latest view */
detectFailedConnections();
FtsNotifyProber();
local_fts_statusVersion = ftsProbeInfo->fts_statusVersion;
ereport(LOG, (errmsg_internal("FTS: reconfiguration is in progress"),
errSendAlert(true)));
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
/* Caller should throw an error. */
return;
......@@ -229,110 +229,39 @@ FtsHandleNetFailure(SegmentDatabaseDescriptor ** segDB, int numOfFailed)
}
/*
* FtsHandleGangConnectionFailure is called by createGang during
* creating connections return true if error need to be thrown
* Check if any segment DB is down.
*
* returns true if any segment DB is down.
*/
bool
FtsHandleGangConnectionFailure(SegmentDatabaseDescriptor * segdbDesc, int size)
FtsTestSegmentDBIsDown(SegmentDatabaseDescriptor * segdbDesc, int size)
{
int i;
bool dtx_active;
bool reportError = false;
bool realFaultFound = false;
bool forceRescan=true;
for (i = 0; i < size; i++)
{
if (PQstatus(segdbDesc[i].conn) != CONNECTION_OK)
{
CdbComponentDatabaseInfo *segInfo = segdbDesc[i].segment_database_info;
elog(DEBUG2, "FtsHandleGangConnectionFailure: looking for real fault on segment dbid %d", segInfo->dbid);
if (!FtsTestConnection(segInfo, forceRescan))
{
elog(DEBUG2, "found fault with segment dbid %d", segInfo->dbid);
realFaultFound = true;
/* that at least one fault exists is enough, for now */
break;
}
forceRescan = false; /* only force the rescan on the first call. */
}
}
if (!realFaultFound)
{
/* If we successfully tested the gang and didn't notice a
* failure, our caller must've seen some kind of transient
* failure when the gang was originally constructed ... */
elog(DEBUG2, "FtsHandleGangConnectionFailure: no real fault found!");
return false;
}
if (!isFTSEnabled())
{
return false;
}
int i = 0;
bool forceRescan = true;
ereport(LOG, (errmsg_internal("FTS: reconfiguration is in progress")));
Assert(isFTSEnabled());
forceRescan = true;
for (i = 0; i < size; i++)
{
CdbComponentDatabaseInfo *segInfo = segdbDesc[i].segment_database_info;
if (PQstatus(segdbDesc[i].conn) != CONNECTION_OK)
elog(DEBUG2, "FtsTestSegmentDBIsDown: looking for real fault on segment dbid %d", segInfo->dbid);
if (!FtsTestConnection(segInfo, forceRescan))
{
if (!FtsTestConnection(segInfo, forceRescan))
{
ereport(WARNING, (errmsg_internal("FTS: found bad segment with dbid %d", segInfo->dbid),
errSendAlert(true)));
/* probe process has already marked segment down. */
}
forceRescan = false; /* only force rescan on first call. */
ereport(LOG, (errmsg_internal("FTS: found fault with segment dbid %d. "
"Reconfiguration is in progress", segInfo->dbid)));
return true;
}
}
if (gangsExist())
{
reportError = true;
disconnectAndDestroyAllGangs();
}
/*
* KLUDGE: Do not error out if we are attempting a DTM protocol retry
*/
if (DistributedTransactionContext == DTX_CONTEXT_QD_RETRY_PHASE_2)
{
return false;
}
/* is there a transaction active ? */
dtx_active = isCurrentDtxActive();
/* When the error is raised, it will abort the current DTM transaction */
if (dtx_active)
{
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"FtsHandleGangConnectionFailure found an active DTM transaction (returning true).");
return true;
/* only force the rescan on the first call. */
forceRescan = false;
}
/*
* error out if this sets read only flag, at this stage the read only
* transaction checking has passed, so error out, but do not error out if
* tm is in recovery
*/
if ((*ftsReadOnlyFlag && !isTMInRecovery()) || reportError)
return true;
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"FtsHandleGangConnectionFailure returning false.");
return false;
}
void
FtsCondSetTxnReadOnly(bool *XactFlag)
{
......@@ -365,3 +294,8 @@ isFtsReadOnlySet(void)
{
return *ftsReadOnlyFlag;
}
uint64 getFtsVersion(void)
{
return ftsProbeInfo->fts_statusVersion;
}
此差异已折叠。
......@@ -158,7 +158,7 @@ static void performDtxProtocolCommitPrepared(const char *gid, bool raiseErrorIfN
static void performDtxProtocolAbortPrepared(const char *gid, bool raiseErrorIfNotFound);
static DistributedTransactionId determineSegmentMaxDistributedXid(void);
extern void resetSessionForPrimaryGangLoss(void);
extern void resetSessionForPrimaryGangLoss(bool resetSession);
extern void CheckForResetSession(void);
/**
......@@ -794,7 +794,7 @@ doNotifyingCommitPrepared(void)
* at the top in PostgresMain.
*/
elog(NOTICE, "Releasing segworker group to retry broadcast.");
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
/*
* This call will at a minimum change the session id so we will
......@@ -871,7 +871,7 @@ doNotifyingAbort(void)
* Reset the dispatch logic and disconnect from any segment that didn't respond to our abort.
*/
elog(NOTICE, "Releasing segworker groups to finish aborting the transaction.");
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
/*
* This call will at a minimum change the session id so we will
......@@ -930,7 +930,7 @@ doNotifyingAbort(void)
* Reset the dispatch logic (i.e. deallocate gang) so we can attempt a retry.
*/
elog(NOTICE, "Releasing segworker groups to retry broadcast.");
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
/*
* This call will at a minimum change the session id so we will
......@@ -1029,8 +1029,7 @@ doDtxPhase2Retry(void)
}
/*
* KLUDGE: FtsHandleGangConnectionFailure will need a special
* transaction context to tell it not to raise an ERROR...
* Todo: Maybe we don't need DTX_CONTEXT_QD_RETRY_PHASE_2 anymore.
*/
setDistributedTransactionContext(DTX_CONTEXT_QD_RETRY_PHASE_2);
elog(DTM_DEBUG5,
......@@ -1041,17 +1040,35 @@ doDtxPhase2Retry(void)
currentGxact->retryPhase2RecursionStop = true;
succeeded = doDispatchDtxProtocolCommand(dtxProtocolCommand, /* flags */ 0,
currentGxact->gid, currentGxact->gxid,
&badGangs, /* raiseError */ false,
&direct, NULL, 0);
/*
* We don't want doDispatchDtxProtocolCommand to raise error. But it will call
* createGang to allocate a writer gang, which could fail and error out.
*
* We catch the error and log a FATAL error instead of a PANIC.
*/
PG_TRY();
{
succeeded = doDispatchDtxProtocolCommand(dtxProtocolCommand, /* flags */ 0,
currentGxact->gid, currentGxact->gxid,
&badGangs, /* raiseError */ false,
&direct, NULL, 0);
}
PG_CATCH();
{
succeeded = false;
}
PG_END_TRY();
if (!succeeded)
{
elog(FATAL, "A retry of the distributed transaction '%s Prepared' broadcast failed to one or more segments for gid = %s.",
prepareKind, currentGxact->gid);
}
elog(NOTICE, "Retry of the distributed transaction '%s Prepared' broadcast succeeded to the segments for gid = %s.",
prepareKind, currentGxact->gid);
else
{
elog(NOTICE, "Retry of the distributed transaction '%s Prepared' broadcast succeeded to the segments for gid = %s.",
prepareKind, currentGxact->gid);
}
/*
* Global locking order: ProcArrayLock then DTM lock since
......@@ -1241,7 +1258,7 @@ rollbackDtxTransaction(void)
* segments. What's left are possibily prepared transactions.
*/
elog(WARNING, "Releasing segworker groups since one or more segment connections failed. This will abort the transactions in the segments that did not get prepared.");
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
/*
* This call will at a minimum change the session id so we will
......@@ -1266,7 +1283,7 @@ rollbackDtxTransaction(void)
* segments.
*/
elog(NOTICE, "Releasing segworker groups to finish aborting the transaction.");
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
/*
* This call will at a minimum change the session id so we will
......@@ -1334,7 +1351,7 @@ rollbackDtxTransaction(void)
* segment instances. And, we will abort the transactions in the
* segments.
*/
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
/*
* This call will at a minimum change the session id so we will
......@@ -1561,11 +1578,11 @@ initTM(void)
PG_TRY();
{
/*
* detectFailedConnections could throw ERROR, so
* FtsNotifyProber could throw ERROR, so
* we should catch it if it happens.
*/
if (!first)
detectFailedConnections();
FtsNotifyProber();
initTM_recover_as_needed();
succeeded = true;
......@@ -3165,8 +3182,7 @@ cdbtm_performDeferredRecovery(void)
*shmDtmRecoveryDeferred = false;
elog(NOTICE, "Releasing segworker groups for deferred recovery.");
disconnectAndDestroyAllGangs();
resetSessionForPrimaryGangLoss();
disconnectAndDestroyAllGangs(true);
}
releaseTmLock();
CheckForResetSession();
......
......@@ -248,7 +248,11 @@ getCdbComponentInfo(bool DNSLookupAsError)
pRow->filerep_port = -1;
getAddressesForDBid(pRow, DNSLookupAsError ? ERROR : LOG);
pRow->hostip = pRow->hostaddrs[0];
/* We make sure we get a valid hostip here */
if(pRow->hostaddrs[0] == NULL)
elog(ERROR, "Cannot resolve network address for dbid=%d", dbid);
pRow->hostip = pstrdup(pRow->hostaddrs[0]);
}
/*
......@@ -440,6 +444,9 @@ freeCdbComponentDatabaseInfo(CdbComponentDatabaseInfo *cdi)
if (cdi->address != NULL)
pfree(cdi->address);
if (cdi->hostip != NULL)
pfree(cdi->hostip);
for (i=0; i < COMPONENT_DBS_MAX_ADDRS; i++)
{
if (cdi->hostaddrs[i] != NULL)
......@@ -490,7 +497,7 @@ cdb_cleanup(int code __attribute__((unused)) , Datum arg __attribute__((unused))
{
elog(DEBUG1, "Cleaning up Greenplum components...");
disconnectAndDestroyAllGangs();
disconnectAndDestroyAllGangs(true);
if (Gp_role == GP_ROLE_DISPATCH)
{
......@@ -1291,3 +1298,37 @@ getgpsegmentCount(void)
Assert(GpIdentity.numsegments > 0);
return GpIdentity.numsegments;
}
bool isSockAlive(int sock)
{
int ret;
char buf;
int i = 0;
for(i = 0; i < 10; i++)
{
#ifndef WIN32
ret = recv(sock, &buf, 1, MSG_PEEK | MSG_DONTWAIT);
#else
ret = recv(sock, &buf, 1, MSG_PEEK | MSG_PARTIAL);
#endif
if (ret == 0) /* socket has been closed. EOF */
return false;
if (ret > 0) /* data waiting on socket, it must be OK. */
return true;
if (ret == -1) /* error, or would be block. */
{
if (errno == EAGAIN || errno == EINPROGRESS)
return true; /* connection intact, no data available */
else if (errno == EINTR)
continue; /* interrupted by signal, retry at most 10 times */
else
return false;
}
}
return true;
}
此差异已折叠。
......@@ -161,8 +161,7 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
{
Assert(disp_direct->count == 1); /* currently we allow direct-to-one dispatch, only */
if (disp_direct->content[0] !=
segdbDesc->segment_database_info->segindex)
if (disp_direct->content[0] != segdbDesc->segindex)
continue;
}
......
......@@ -13,6 +13,11 @@ include $(top_builddir)/src/backend/mock.mk
cdbgang.t: \
$(MOCK_DIR)/backend/catalog/namespace_mock.o \
$(MOCK_DIR)/backend/storage/lmgr/proc_mock.o \
$(MOCK_DIR)/backend/access/transam/xact_mock.o \
$(MOCK_DIR)/backend/cdb/cdbutil_mock.o \
$(MOCK_DIR)/backend/cdb/cdbfts_mock.o \
$(MOCK_DIR)/backend/utils/misc/superuser_mock.o \
$(MOCK_DIR)/backend/gp_libpq_fe/fe-connect_mock.o \
$(MOCK_DIR)/backend/utils/mmgr/redzone_handler_mock.o
cdbfilerep.t: \
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册