提交 0e1b9a05 编写于 作者: A Ashwin Agrawal

Cleanup some FTS functions.

上级 d5fb628f
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "cdb/cdbtm.h" #include "cdb/cdbtm.h"
#include "cdb/cdbvars.h" #include "cdb/cdbvars.h"
#include "commands/copy.h" #include "commands/copy.h"
#include "storage/pmsignal.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/faultinjector.h" #include "utils/faultinjector.h"
#include "utils/memutils.h" #include "utils/memutils.h"
...@@ -755,8 +756,12 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int *total_rows_completed) ...@@ -755,8 +756,12 @@ cdbCopyEndAndFetchRejectNum(CdbCopy *c, int *total_rows_completed)
if (failed_count > 0) if (failed_count > 0)
{ {
elog(LOG, "%s", c->err_msg.data); elog(LOG, "%s", c->err_msg.data);
elog(LOG, "COPY passes failed segment(s) information to FTS"); elog(LOG, "COPY signals FTS to probe segments");
FtsHandleNetFailure(failedSegDBs, failed_count); SendPostmasterSignal(PMSIGNAL_WAKEN_FTS);
DisconnectAndDestroyAllGangs(true);
ereport(ERROR,
(errmsg_internal("MPP detected %d segment failures, system is reconnected", failed_count),
errSendAlert(true)));
} }
pfree(results); pfree(results);
......
...@@ -133,16 +133,11 @@ FtsNotifyProber(void) ...@@ -133,16 +133,11 @@ FtsNotifyProber(void)
* dispatcher: ONLY CALL THREADSAFE FUNCTIONS -- elog() is NOT threadsafe. * dispatcher: ONLY CALL THREADSAFE FUNCTIONS -- elog() is NOT threadsafe.
*/ */
bool bool
FtsTestConnection(CdbComponentDatabaseInfo *failedDBInfo, bool fullScan) FtsIsSegmentUp(CdbComponentDatabaseInfo *dBInfo)
{ {
/* master is always reported as alive */ /* master is always reported as alive */
if (failedDBInfo->segindex == MASTER_SEGMENT_ID) if (dBInfo->segindex == MASTER_SEGMENT_ID)
{
return true; return true;
}
if (fullScan)
FtsNotifyProber();
/* /*
* If fullscan is not requested, caller is just trying to optimize on the * If fullscan is not requested, caller is just trying to optimize on the
...@@ -153,43 +148,10 @@ FtsTestConnection(CdbComponentDatabaseInfo *failedDBInfo, bool fullScan) ...@@ -153,43 +148,10 @@ FtsTestConnection(CdbComponentDatabaseInfo *failedDBInfo, bool fullScan)
* checking against uninitialzed variable. * checking against uninitialzed variable.
*/ */
return ftsProbeInfo->fts_statusVersion ? return ftsProbeInfo->fts_statusVersion ?
FTS_STATUS_IS_UP(ftsProbeInfo->fts_status[failedDBInfo->dbid]) : FTS_STATUS_IS_UP(ftsProbeInfo->fts_status[dBInfo->dbid]) :
true; true;
} }
/*
* Re-Configure the system: if someone has noticed that the status
* version has been updated, they call this to verify that they've got
* the right configuration.
*
* NOTE: This *always* destroys gangs. And also attempts to inform the
* fault-prober to do a full scan.
*/
void
FtsReConfigureMPP(bool create_new_gangs)
{
/* need to scan to pick up the latest view */
FtsNotifyProber();
ereport(LOG, (errmsg_internal("FTS: reconfiguration is in progress"),
errSendAlert(true)));
DisconnectAndDestroyAllGangs(true);
/* Caller should throw an error. */
return;
}
void
FtsHandleNetFailure(SegmentDatabaseDescriptor **segDB, int numOfFailed)
{
elog(LOG, "FtsHandleNetFailure: numOfFailed %d", numOfFailed);
FtsReConfigureMPP(true);
ereport(ERROR, (errmsg_internal("MPP detected %d segment failures, system is reconnected", numOfFailed),
errSendAlert(true)));
}
/* /*
* Check if any segment DB is down. * Check if any segment DB is down.
* *
...@@ -199,7 +161,6 @@ bool ...@@ -199,7 +161,6 @@ bool
FtsTestSegmentDBIsDown(SegmentDatabaseDescriptor *segdbDesc, int size) FtsTestSegmentDBIsDown(SegmentDatabaseDescriptor *segdbDesc, int size)
{ {
int i = 0; int i = 0;
bool forceRescan = true;
for (i = 0; i < size; i++) for (i = 0; i < size; i++)
{ {
...@@ -207,21 +168,17 @@ FtsTestSegmentDBIsDown(SegmentDatabaseDescriptor *segdbDesc, int size) ...@@ -207,21 +168,17 @@ FtsTestSegmentDBIsDown(SegmentDatabaseDescriptor *segdbDesc, int size)
elog(DEBUG2, "FtsTestSegmentDBIsDown: looking for real fault on segment dbid %d", segInfo->dbid); elog(DEBUG2, "FtsTestSegmentDBIsDown: looking for real fault on segment dbid %d", segInfo->dbid);
if (!FtsTestConnection(segInfo, forceRescan)) if (!FtsIsSegmentUp(segInfo))
{ {
ereport(LOG, (errmsg_internal("FTS: found fault with segment dbid %d. " ereport(LOG, (errmsg_internal("FTS: found fault with segment dbid %d. "
"Reconfiguration is in progress", segInfo->dbid))); "Reconfiguration is in progress", segInfo->dbid)));
return true; return true;
} }
/* only force the rescan on the first call. */
forceRescan = false;
} }
return false; return false;
} }
void void
FtsCondSetTxnReadOnly(bool *XactFlag) FtsCondSetTxnReadOnly(bool *XactFlag)
{ {
......
...@@ -766,8 +766,6 @@ signalQEs(CdbDispatchCmdAsync *pParms) ...@@ -766,8 +766,6 @@ signalQEs(CdbDispatchCmdAsync *pParms)
/* /*
* Check if any segment DB down is detected by FTS. * Check if any segment DB down is detected by FTS.
*
* Issue a FTS probe every 1 minute.
*/ */
static void static void
checkSegmentAlive(CdbDispatchCmdAsync *pParms) checkSegmentAlive(CdbDispatchCmdAsync *pParms)
...@@ -776,8 +774,7 @@ checkSegmentAlive(CdbDispatchCmdAsync *pParms) ...@@ -776,8 +774,7 @@ checkSegmentAlive(CdbDispatchCmdAsync *pParms)
bool forceScan = true; bool forceScan = true;
/* /*
* check the connection still valid, set 1 min time interval this may * check the connection still valid
* affect performance, should turn it off if required.
*/ */
for (i = 0; i < pParms->dispatchCount; i++) for (i = 0; i < pParms->dispatchCount; i++)
{ {
...@@ -799,7 +796,13 @@ checkSegmentAlive(CdbDispatchCmdAsync *pParms) ...@@ -799,7 +796,13 @@ checkSegmentAlive(CdbDispatchCmdAsync *pParms)
ELOG_DISPATCHER_DEBUG("FTS testing connection %d of %d (%s)", ELOG_DISPATCHER_DEBUG("FTS testing connection %d of %d (%s)",
i + 1, pParms->dispatchCount, segdbDesc->whoami); i + 1, pParms->dispatchCount, segdbDesc->whoami);
if (!FtsTestConnection(segdbDesc->segment_database_info, forceScan)) if (forceScan)
{
FtsNotifyProber();
forceScan = false;
}
if (!FtsIsSegmentUp(segdbDesc->segment_database_info))
{ {
char *msg = PQerrorMessage(segdbDesc->conn); char *msg = PQerrorMessage(segdbDesc->conn);
...@@ -815,8 +818,6 @@ checkSegmentAlive(CdbDispatchCmdAsync *pParms) ...@@ -815,8 +818,6 @@ checkSegmentAlive(CdbDispatchCmdAsync *pParms)
PQfinish(segdbDesc->conn); PQfinish(segdbDesc->conn);
segdbDesc->conn = NULL; segdbDesc->conn = NULL;
} }
forceScan = false;
} }
} }
......
...@@ -936,7 +936,7 @@ cdbdisp_checkSegmentDBAlive(DispatchCommandParms *pParms) ...@@ -936,7 +936,7 @@ cdbdisp_checkSegmentDBAlive(DispatchCommandParms *pParms)
WRITE_LOG_DISPATCHER_DEBUG("testing connection %d of %d %s stillRunning %d", WRITE_LOG_DISPATCHER_DEBUG("testing connection %d of %d %s stillRunning %d",
i + 1, pParms->db_count, segdbDesc->whoami, dispatchResult->stillRunning); i + 1, pParms->db_count, segdbDesc->whoami, dispatchResult->stillRunning);
if (!FtsTestConnection(segdbDesc->segment_database_info, false)) if (!FtsIsSegmentUp(segdbDesc->segment_database_info))
{ {
cdbdisp_appendMessage(dispatchResult, LOG, cdbdisp_appendMessage(dispatchResult, LOG,
"Lost connection to %s. FTS detected segment failures.", "Lost connection to %s. FTS detected segment failures.",
......
...@@ -537,7 +537,7 @@ buildGangDefinition(GangType type, int gang_id, int size, int content) ...@@ -537,7 +537,7 @@ buildGangDefinition(GangType type, int gang_id, int size, int content)
if (size != segCount) if (size != segCount)
{ {
FtsReConfigureMPP(false); DisconnectAndDestroyAllGangs(true);
elog(ERROR, "Not all primary segment instances are active and connected"); elog(ERROR, "Not all primary segment instances are active and connected");
} }
break; break;
...@@ -1317,7 +1317,7 @@ cleanupGang(Gang *gp) ...@@ -1317,7 +1317,7 @@ cleanupGang(Gang *gp)
return false; return false;
/* if segment is down, the gang can not be reused */ /* if segment is down, the gang can not be reused */
if (!FtsTestConnection(segdbDesc->segment_database_info, false)) if (!FtsIsSegmentUp(segdbDesc->segment_database_info))
return false; return false;
/* Note, we cancel all "still running" queries */ /* Note, we cancel all "still running" queries */
...@@ -1827,7 +1827,7 @@ GangOK(Gang *gp) ...@@ -1827,7 +1827,7 @@ GangOK(Gang *gp)
if (cdbconn_isBadConnection(segdbDesc)) if (cdbconn_isBadConnection(segdbDesc))
return false; return false;
if (!FtsTestConnection(segdbDesc->segment_database_info, false)) if (!FtsIsSegmentUp(segdbDesc->segment_database_info))
return false; return false;
} }
......
...@@ -297,6 +297,7 @@ create_gang_retry: ...@@ -297,6 +297,7 @@ create_gang_retry:
{ {
MemoryContextSwitchTo(GangContext); MemoryContextSwitchTo(GangContext);
FtsNotifyProber();
/* FTS shows some segment DBs are down */ /* FTS shows some segment DBs are down */
if (FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size)) if (FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size))
{ {
......
...@@ -223,6 +223,7 @@ create_gang_retry: ...@@ -223,6 +223,7 @@ create_gang_retry:
/* there'er failed connections */ /* there'er failed connections */
FtsNotifyProber();
/* FTS shows some segment DBs are down, destroy all gangs. */ /* FTS shows some segment DBs are down, destroy all gangs. */
if (FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size)) if (FtsTestSegmentDBIsDown(newGangDefinition->db_descriptors, size))
{ {
......
...@@ -55,9 +55,7 @@ extern volatile FtsProbeInfo *ftsProbeInfo; ...@@ -55,9 +55,7 @@ extern volatile FtsProbeInfo *ftsProbeInfo;
extern int FtsShmemSize(void); extern int FtsShmemSize(void);
extern void FtsShmemInit(void); extern void FtsShmemInit(void);
extern bool FtsTestConnection(CdbComponentDatabaseInfo *db_to_test, bool full_scan); extern bool FtsIsSegmentUp(CdbComponentDatabaseInfo *dBInfo);
extern void FtsReConfigureMPP(bool create_new_gangs);
extern void FtsHandleNetFailure(SegmentDatabaseDescriptor **, int);
extern bool FtsTestSegmentDBIsDown(SegmentDatabaseDescriptor *, int); extern bool FtsTestSegmentDBIsDown(SegmentDatabaseDescriptor *, int);
extern bool verifyFtsSyncCount(void); extern bool verifyFtsSyncCount(void);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册