提交 05346eb2 编写于 作者: A Ashwin Agrawal

Move FTS filerep specific functions to separate file.

Also, fixes bunch of warnings introduced due to fts walrep work.
上级 be967e8c
......@@ -11,6 +11,10 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(top_srcdir)/src/backend/gp_libpq_fe $(CPPFLAGS)
OBJS = fts.o ftsprobe.o ftsfilerep.o ftsprobehandler.o
OBJS = fts.o ftsprobe.o
ifeq ($(enable_segwalrep), yes)
OBJS += ftsprobehandler.o
else
OBJS += ftsfilerep.o ftsprobefilerep.o
endif
include $(top_srcdir)/src/backend/common.mk
......@@ -45,7 +45,7 @@
#include "utils/ps_status.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
......@@ -65,12 +65,13 @@
/*
* CONSTANTS
*/
/* maximum number of segments */
#define MAX_NUM_OF_SEGMENTS 32768
/* buffer size for SQL command */
#define SQL_CMD_BUF_SIZE 1024
#ifndef USE_SEGWALREP
/* one byte of status for each segment */
static uint8 scan_status[MAX_NUM_OF_SEGMENTS];
#endif
#define GpConfigHistoryRelName "gp_configuration_history"
......@@ -79,11 +80,6 @@
* STATIC VARIABLES
*/
#ifndef USE_SEGWALREP
/* one byte of status for each segment */
static uint8 scan_status[MAX_NUM_OF_SEGMENTS];
#endif
static bool am_ftsprobe = false;
static volatile bool shutdown_requested = false;
......@@ -107,32 +103,6 @@ NON_EXEC_STATIC void ftsMain(int argc, char *argv[]);
static void FtsLoop(void);
static void readCdbComponentInfoAndUpdateStatus(MemoryContext probeContext);
static bool probePublishUpdate(uint8 *scan_status);
static uint32 getTransition(bool isPrimaryAlive, bool isMirrorAlive);
static void
buildSegmentStateChange
(
CdbComponentDatabaseInfo *segInfo,
FtsSegmentStatusChange *change,
uint8 statusNew
)
;
static uint32 transition
(
uint32 stateOld,
uint32 trans,
CdbComponentDatabaseInfo *primary,
CdbComponentDatabaseInfo *mirror,
FtsSegmentStatusChange *changesPrimary,
FtsSegmentStatusChange *changesMirror
)
;
static void updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries);
static bool probeUpdateConfig(FtsSegmentStatusChange *changes, int changeCount);
/*
* Main entry point for ftsprobe process.
......@@ -700,7 +670,7 @@ void FtsLoop()
* Now we've completed the scan, update shared-memory. if we
* change anything, we return true.
*/
updated_probe_state = probePublishUpdate(scan_status);
updated_probe_state = probePublishUpdate(cdb_component_dbs, scan_status);
#endif
MemoryContextSwitchTo(oldContext);
......@@ -765,354 +735,6 @@ FtsIsActive(void)
return (!ftsProbeInfo->fts_discardResults && !shutdown_requested);
}
/*
* Build a set of changes, based on our current state, and the probe results.
*/
static bool
probePublishUpdate(uint8 *probe_results)
{
bool update_found = false;
int i;
/* preprocess probe results to decide what is the current segment state */
FtsPreprocessProbeResultsFilerep(cdb_component_dbs, probe_results);
for (i = 0; i < cdb_component_dbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *segInfo = &cdb_component_dbs->segment_db_info[i];
/* if we've gotten a pause or shutdown request, we ignore our probe results. */
if (!FtsIsActive())
{
return false;
}
/* we check segments in pairs of primary-mirror */
if (!SEGMENT_IS_ACTIVE_PRIMARY(segInfo))
{
continue;
}
CdbComponentDatabaseInfo *primary = segInfo;
CdbComponentDatabaseInfo *mirror = FtsGetPeerSegment(segInfo->segindex, segInfo->dbid);
Assert(mirror != NULL);
/* changes required for primary and mirror */
FtsSegmentStatusChange changes[2];
uint32 stateOld = 0;
uint32 stateNew = 0;
bool isPrimaryAlive = PROBE_IS_ALIVE(primary);
bool isMirrorAlive = PROBE_IS_ALIVE(mirror);
/* get transition type */
uint32 trans = getTransition(isPrimaryAlive, isMirrorAlive);
if (gp_log_fts > GPVARS_VERBOSITY_VERBOSE)
{
elog(LOG, "FTS: primary found %s, mirror found %s, transition %d.",
(isPrimaryAlive ? "alive" : "dead"), (isMirrorAlive ? "alive" : "dead"), trans);
}
if (trans == TRANS_D_D)
{
elog(LOG, "FTS: detected double failure for content=%d, primary (dbid=%d), mirror (dbid=%d).",
primary->segindex, primary->dbid, mirror->dbid);
}
/* get current state */
stateOld = FtsGetPairStateFilerep(primary, mirror);
/* get new state */
stateNew = transition(stateOld, trans, primary, mirror, &changes[0], &changes[1]);
/* check if transition is required */
if (stateNew != stateOld)
{
update_found = true;
updateConfiguration(changes, ARRAY_SIZE(changes));
}
}
if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
elog(LOG, "FTS: probe result processing is complete.");
}
return update_found;
}
/*
* Build struct with segment changes
*/
static void
buildSegmentStateChange(CdbComponentDatabaseInfo *segInfo, FtsSegmentStatusChange *change, uint8 statusNew)
{
change->dbid = segInfo->dbid;
change->segindex = segInfo->segindex;
change->oldStatus = ftsProbeInfo->fts_status[segInfo->dbid];
change->newStatus = statusNew;
}
/*
* get transition type - derived from probed primary/mirror state
*/
static uint32
getTransition(bool isPrimaryAlive, bool isMirrorAlive)
{
uint32 state = (isPrimaryAlive ? 2 : 0) + (isMirrorAlive ? 1 : 0);
switch (state)
{
case (0):
/* primary and mirror dead */
return TRANS_D_D;
case (1):
/* primary dead, mirror alive */
return TRANS_D_U;
case (2):
/* primary alive, mirror dead */
return TRANS_U_D;
case (3):
/* primary and mirror alive */
return TRANS_U_U;
default:
Assert(!"Invalid transition for FTS state machine");
return 0;
}
}
/*
* find new state for primary and mirror
*/
static uint32
transition
(
uint32 stateOld,
uint32 trans,
CdbComponentDatabaseInfo *primary,
CdbComponentDatabaseInfo *mirror,
FtsSegmentStatusChange *changesPrimary,
FtsSegmentStatusChange *changesMirror
)
{
Assert(IS_VALID_TRANSITION(trans));
/* reset changes */
memset(changesPrimary, 0, sizeof(*changesPrimary));
memset(changesMirror, 0, sizeof(*changesMirror));
uint32 stateNew = stateOld;
/* in case of a double failure we don't do anything */
if (trans == TRANS_D_D)
{
return stateOld;
}
/* get new state for primary and mirror */
stateNew = FtsTransitionFilerep(stateOld, trans);
/* check if transition is required */
if (stateNew != stateOld)
{
FtsSegmentPairState pairState;
memset(&pairState, 0, sizeof(pairState));
pairState.primary = primary;
pairState.mirror = mirror;
pairState.stateNew = stateNew;
pairState.statePrimary = 0;
pairState.stateMirror = 0;
if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG)
{
elog(LOG, "FTS: state machine transition from %d to %d.", stateOld, stateNew);
}
FtsResolveStateFilerep(&pairState);
buildSegmentStateChange(primary, changesPrimary, pairState.statePrimary);
buildSegmentStateChange(mirror, changesMirror, pairState.stateMirror);
FtsDumpChanges(changesPrimary, 1);
FtsDumpChanges(changesMirror, 1);
}
return stateNew;
}
/*
* Apply requested segment transitions
*/
static void
updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries)
{
Assert(changes != NULL);
CdbComponentDatabaseInfo *entryDB = &cdb_component_dbs->entry_db_info[0];
if (entryDB->dbid != GpIdentity.dbid)
{
if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG)
{
elog(LOG, "FTS: advancing to second entry-db.");
}
entryDB = entryDB + 1;
}
/* if we've gotten a pause or shutdown request, we ignore our probe results. */
if (!FtsIsActive())
{
return;
}
/* update segment configuration */
bool commit = probeUpdateConfig(changes, changeEntries);
if (commit)
FtsFailoverFilerep(changes, changeEntries);
if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
elog(LOG, "FTS: finished segment modifications.");
}
}
/*
* update segment configuration in catalog and shared memory
*/
static bool
probeUpdateConfig(FtsSegmentStatusChange *changes, int changeCount)
{
Relation configrel;
Relation histrel;
SysScanDesc sscan;
ScanKeyData scankey;
HeapTuple configtuple;
HeapTuple newtuple;
HeapTuple histtuple;
Datum configvals[Natts_gp_segment_configuration];
bool confignulls[Natts_gp_segment_configuration] = { false };
bool repls[Natts_gp_segment_configuration] = { false };
Datum histvals[Natts_gp_configuration_history];
bool histnulls[Natts_gp_configuration_history] = { false };
bool valid;
bool primary;
bool changelogging;
int i;
char desc[SQL_CMD_BUF_SIZE];
/*
* Commit/abort transaction below will destroy
* CurrentResourceOwner. We need it for catalog reads.
*/
ResourceOwner save = CurrentResourceOwner;
StartTransactionCommand();
GetTransactionSnapshot();
elog(LOG, "probeUpdateConfig called for %d changes", changeCount);
histrel = heap_open(GpConfigHistoryRelationId,
RowExclusiveLock);
configrel = heap_open(GpSegmentConfigRelationId,
RowExclusiveLock);
for (i = 0; i < changeCount; i++)
{
FtsSegmentStatusChange *change = &changes[i];
valid = (changes[i].newStatus & FTS_STATUS_ALIVE);
primary = (changes[i].newStatus & FTS_STATUS_PRIMARY);
changelogging = (changes[i].newStatus & FTS_STATUS_CHANGELOGGING);
if (changelogging)
{
Assert(primary && valid);
}
Assert((valid || !primary) && "Primary cannot be down");
/*
* Insert new tuple into gp_configuration_history catalog.
*/
histvals[Anum_gp_configuration_history_time-1] =
TimestampTzGetDatum(GetCurrentTimestamp());
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(changes[i].dbid);
snprintf(desc, sizeof(desc),
"FTS: content %d fault marking status %s%s role %c",
change->segindex, valid ? "UP" : "DOWN",
(changelogging) ? " mode: change-tracking" : "",
primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR);
histvals[Anum_gp_configuration_history_desc-1] =
CStringGetTextDatum(desc);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
simple_heap_insert(histrel, histtuple);
CatalogUpdateIndexes(histrel, histtuple);
/*
* Find and update gp_segment_configuration tuple.
*/
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(changes[i].dbid));
sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId,
true, SnapshotNow, 1, &scankey);
configtuple = systable_getnext(sscan);
if (!HeapTupleIsValid(configtuple))
{
elog(ERROR, "FTS cannot find dbid=%d in %s", changes[i].dbid,
RelationGetRelationName(configrel));
}
configvals[Anum_gp_segment_configuration_role-1] =
CharGetDatum(primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR);
repls[Anum_gp_segment_configuration_role-1] = true;
configvals[Anum_gp_segment_configuration_status-1] =
CharGetDatum(valid ? GP_SEGMENT_CONFIGURATION_STATUS_UP : GP_SEGMENT_CONFIGURATION_STATUS_DOWN);
repls[Anum_gp_segment_configuration_status-1] = true;
if (changelogging)
{
configvals[Anum_gp_segment_configuration_mode-1] =
CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_CHANGETRACKING);
}
repls[Anum_gp_segment_configuration_mode-1] = changelogging;
newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel),
configvals, confignulls, repls);
simple_heap_update(configrel, &configtuple->t_self, newtuple);
CatalogUpdateIndexes(configrel, newtuple);
systable_endscan(sscan);
pfree(newtuple);
/*
* Update shared memory
*/
ftsProbeInfo->fts_status[changes[i].dbid] = changes[i].newStatus;
}
heap_close(histrel, RowExclusiveLock);
heap_close(configrel, RowExclusiveLock);
SIMPLE_FAULT_INJECTOR(FtsWaitForShutdown);
/*
* Do not block shutdown. We will always get a change to update
* gp_segment_configuration in subsequent probes upon database
* restart.
*/
if (shutdown_requested)
{
elog(LOG, "Shutdown in progress, ignoring FTS prober updates.");
return false;
}
CommitTransactionCommand();
CurrentResourceOwner = save;
return true;
}
bool
FtsIsSegmentAlive(CdbComponentDatabaseInfo *segInfo)
{
......@@ -1156,144 +778,6 @@ FtsDumpChanges(FtsSegmentStatusChange *changes, int changeEntries)
}
}
/**
* Marks the given db as in-sync in the segment configuration.
*/
void
FtsMarkSegmentsInSync(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror)
{
if (!FTS_STATUS_ISALIVE(primary->dbid, ftsProbeInfo->fts_status) ||
!FTS_STATUS_ISALIVE(mirror->dbid, ftsProbeInfo->fts_status) ||
!FTS_STATUS_ISPRIMARY(primary->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_ISPRIMARY(mirror->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_SYNCED(primary->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_SYNCED(mirror->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_CHANGELOGGING(primary->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_CHANGELOGGING(mirror->dbid, ftsProbeInfo->fts_status))
{
FtsRequestPostmasterShutdown(primary, mirror);
}
if (ftsProbeInfo->fts_pauseProbes)
{
return;
}
uint8 segStatus=0;
Relation configrel;
Relation histrel;
ScanKeyData scankey;
SysScanDesc sscan;
HeapTuple configtuple;
HeapTuple newtuple;
HeapTuple histtuple;
Datum configvals[Natts_gp_segment_configuration];
bool confignulls[Natts_gp_segment_configuration] = { false };
bool repls[Natts_gp_segment_configuration] = { false };
Datum histvals[Natts_gp_configuration_history];
bool histnulls[Natts_gp_configuration_history] = { false };
char *desc = "FTS: changed segment to insync from resync.";
/*
* Commit/abort transaction below will destroy
* CurrentResourceOwner. We need it for catalog reads.
*/
ResourceOwner save = CurrentResourceOwner;
StartTransactionCommand();
GetTransactionSnapshot();
/* update primary */
segStatus = ftsProbeInfo->fts_status[primary->dbid];
segStatus |= FTS_STATUS_SYNCHRONIZED;
ftsProbeInfo->fts_status[primary->dbid] = segStatus;
/* update mirror */
segStatus = ftsProbeInfo->fts_status[mirror->dbid];
segStatus |= FTS_STATUS_SYNCHRONIZED;
ftsProbeInfo->fts_status[mirror->dbid] = segStatus;
histrel = heap_open(GpConfigHistoryRelationId,
RowExclusiveLock);
configrel = heap_open(GpSegmentConfigRelationId,
RowExclusiveLock);
/* update gp_segment_configuration to insync */
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(primary->dbid));
sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId,
true, SnapshotNow, 1, &scankey);
configtuple = systable_getnext(sscan);
if (!HeapTupleIsValid(configtuple))
{
elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid,
mirror->dbid, RelationGetRelationName(configrel));
}
configvals[Anum_gp_segment_configuration_mode-1] = CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_INSYNC);
repls[Anum_gp_segment_configuration_mode-1] = true;
newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel),
configvals, confignulls, repls);
simple_heap_update(configrel, &configtuple->t_self, newtuple);
CatalogUpdateIndexes(configrel, newtuple);
systable_endscan(sscan);
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(mirror->dbid));
sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId,
true, SnapshotNow, 1, &scankey);
configtuple = systable_getnext(sscan);
if (!HeapTupleIsValid(configtuple))
{
elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid,
mirror->dbid, RelationGetRelationName(configrel));
}
newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel),
configvals, confignulls, repls);
simple_heap_update(configrel, &configtuple->t_self, newtuple);
CatalogUpdateIndexes(configrel, newtuple);
systable_endscan(sscan);
/* update configuration history */
histvals[Anum_gp_configuration_history_time-1] =
TimestampTzGetDatum(GetCurrentTimestamp());
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(primary->dbid);
histvals[Anum_gp_configuration_history_desc-1] =
CStringGetTextDatum(desc);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
simple_heap_insert(histrel, histtuple);
CatalogUpdateIndexes(histrel, histtuple);
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(mirror->dbid);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
simple_heap_insert(histrel, histtuple);
CatalogUpdateIndexes(histrel, histtuple);
ereport(LOG,
(errmsg("FTS: resynchronization of mirror (dbid=%d, content=%d) on %s:%d has completed.",
mirror->dbid, mirror->segindex, mirror->address, mirror->port ),
errSendAlert(true)));
heap_close(histrel, RowExclusiveLock);
heap_close(configrel, RowExclusiveLock);
/*
* Do not block shutdown. We will always get a change to update
* gp_segment_configuration in subsequent probes upon database
* restart.
*/
if (shutdown_requested)
{
elog(LOG, "Shutdown in progress, ignoring FTS prober updates.");
return;
}
CommitTransactionCommand();
CurrentResourceOwner = save;
}
/*
* Get peer segment descriptor
*/
......
......@@ -16,15 +16,21 @@
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/genam.h"
#include "catalog/gp_configuration_history.h"
#include "catalog/gp_segment_config.h"
#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */
#include "executor/spi.h"
#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"
#include "cdb/cdbfts.h"
#include "executor/spi.h"
#include "postmaster/fts.h"
#include "postmaster/primary_mirror_mode.h"
#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */
#include "utils/fmgroids.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "cdb/cdbfts.h"
/*
* CONSTANTS
......@@ -269,6 +275,143 @@ FtsResolveStateFilerep(FtsSegmentPairState *pairState)
}
}
/*
* Marks the given db as in-sync in the segment configuration.
*/
static void
FtsMarkSegmentsInSync(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror)
{
if (!FTS_STATUS_ISALIVE(primary->dbid, ftsProbeInfo->fts_status) ||
!FTS_STATUS_ISALIVE(mirror->dbid, ftsProbeInfo->fts_status) ||
!FTS_STATUS_ISPRIMARY(primary->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_ISPRIMARY(mirror->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_SYNCED(primary->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_SYNCED(mirror->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_CHANGELOGGING(primary->dbid, ftsProbeInfo->fts_status) ||
FTS_STATUS_IS_CHANGELOGGING(mirror->dbid, ftsProbeInfo->fts_status))
{
FtsRequestPostmasterShutdown(primary, mirror);
}
if (ftsProbeInfo->fts_pauseProbes)
{
return;
}
uint8 segStatus=0;
Relation configrel;
Relation histrel;
ScanKeyData scankey;
SysScanDesc sscan;
HeapTuple configtuple;
HeapTuple newtuple;
HeapTuple histtuple;
Datum configvals[Natts_gp_segment_configuration];
bool confignulls[Natts_gp_segment_configuration] = { false };
bool repls[Natts_gp_segment_configuration] = { false };
Datum histvals[Natts_gp_configuration_history];
bool histnulls[Natts_gp_configuration_history] = { false };
char *desc = "FTS: changed segment to insync from resync.";
/*
* Commit/abort transaction below will destroy
* CurrentResourceOwner. We need it for catalog reads.
*/
ResourceOwner save = CurrentResourceOwner;
StartTransactionCommand();
GetTransactionSnapshot();
/* update primary */
segStatus = ftsProbeInfo->fts_status[primary->dbid];
segStatus |= FTS_STATUS_SYNCHRONIZED;
ftsProbeInfo->fts_status[primary->dbid] = segStatus;
/* update mirror */
segStatus = ftsProbeInfo->fts_status[mirror->dbid];
segStatus |= FTS_STATUS_SYNCHRONIZED;
ftsProbeInfo->fts_status[mirror->dbid] = segStatus;
histrel = heap_open(GpConfigHistoryRelationId,
RowExclusiveLock);
configrel = heap_open(GpSegmentConfigRelationId,
RowExclusiveLock);
/* update gp_segment_configuration to insync */
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(primary->dbid));
sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId,
true, SnapshotNow, 1, &scankey);
configtuple = systable_getnext(sscan);
if (!HeapTupleIsValid(configtuple))
{
elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid,
mirror->dbid, RelationGetRelationName(configrel));
}
configvals[Anum_gp_segment_configuration_mode-1] = CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_INSYNC);
repls[Anum_gp_segment_configuration_mode-1] = true;
newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel),
configvals, confignulls, repls);
simple_heap_update(configrel, &configtuple->t_self, newtuple);
CatalogUpdateIndexes(configrel, newtuple);
systable_endscan(sscan);
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(mirror->dbid));
sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId,
true, SnapshotNow, 1, &scankey);
configtuple = systable_getnext(sscan);
if (!HeapTupleIsValid(configtuple))
{
elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid,
mirror->dbid, RelationGetRelationName(configrel));
}
newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel),
configvals, confignulls, repls);
simple_heap_update(configrel, &configtuple->t_self, newtuple);
CatalogUpdateIndexes(configrel, newtuple);
systable_endscan(sscan);
/* update configuration history */
histvals[Anum_gp_configuration_history_time-1] =
TimestampTzGetDatum(GetCurrentTimestamp());
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(primary->dbid);
histvals[Anum_gp_configuration_history_desc-1] =
CStringGetTextDatum(desc);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
simple_heap_insert(histrel, histtuple);
CatalogUpdateIndexes(histrel, histtuple);
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(mirror->dbid);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
simple_heap_insert(histrel, histtuple);
CatalogUpdateIndexes(histrel, histtuple);
ereport(LOG,
(errmsg("FTS: resynchronization of mirror (dbid=%d, content=%d) on %s:%d has completed.",
mirror->dbid, mirror->segindex, mirror->address, mirror->port ),
errSendAlert(true)));
heap_close(histrel, RowExclusiveLock);
heap_close(configrel, RowExclusiveLock);
/*
* Do not block shutdown. We will always get a change to update
* gp_segment_configuration in subsequent probes upon database
* restart.
*/
if (IsFtsShudownRequested())
{
elog(LOG, "Shutdown in progress, ignoring FTS prober updates.");
return;
}
CommitTransactionCommand();
CurrentResourceOwner = save;
}
/*
* pre-process probe results to take into account some special
......@@ -283,11 +426,11 @@ FtsPreprocessProbeResultsFilerep(CdbComponentDatabases *dbs, uint8 *probe_result
{
int i = 0;
cdb_component_dbs = dbs;
Assert(cdb_component_dbs != NULL);
Assert(dbs != NULL);
for (i=0; i < cdb_component_dbs->total_segment_dbs; i++)
for (i=0; i < dbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *segInfo = &cdb_component_dbs->segment_db_info[i];
CdbComponentDatabaseInfo *segInfo = &dbs->segment_db_info[i];
CdbComponentDatabaseInfo *primary = NULL, *mirror = NULL;
if (!SEGMENT_IS_ACTIVE_PRIMARY(segInfo))
......@@ -568,6 +711,7 @@ getHostsByDbid(int dbid, char **hostname_p, int *host_port_p, int *host_filerep_
Assert(peer_name_p != NULL);
Assert(peer_pm_port_p != NULL);
Assert(peer_filerep_port_p != NULL);
Assert(cdb_component_dbs != NULL);
found = false;
......@@ -622,5 +766,379 @@ getHostsByDbid(int dbid, char **hostname_p, int *host_port_p, int *host_filerep_
}
}
#ifndef USE_SEGWALREP
static uint32 getTransition(bool isPrimaryAlive, bool isMirrorAlive);
static void
buildSegmentStateChange
(
CdbComponentDatabaseInfo *segInfo,
FtsSegmentStatusChange *change,
uint8 statusNew
)
;
static uint32 transition
(
uint32 stateOld,
uint32 trans,
CdbComponentDatabaseInfo *primary,
CdbComponentDatabaseInfo *mirror,
FtsSegmentStatusChange *changesPrimary,
FtsSegmentStatusChange *changesMirror
)
;
static void updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries);
/*
* Build a set of changes, based on our current state, and the probe results.
*/
bool probePublishUpdate(CdbComponentDatabases *dbs, uint8 *probe_results)
{
bool update_found = false;
int i;
Assert(dbs != NULL);
cdb_component_dbs = dbs;
/* preprocess probe results to decide what is the current segment state */
FtsPreprocessProbeResultsFilerep(dbs, probe_results);
for (i = 0; i < dbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *segInfo = &dbs->segment_db_info[i];
/* if we've gotten a pause or shutdown request, we ignore our probe results. */
if (!FtsIsActive())
{
return false;
}
/* we check segments in pairs of primary-mirror */
if (!SEGMENT_IS_ACTIVE_PRIMARY(segInfo))
{
continue;
}
CdbComponentDatabaseInfo *primary = segInfo;
CdbComponentDatabaseInfo *mirror = FtsGetPeerSegment(segInfo->segindex, segInfo->dbid);
Assert(mirror != NULL);
/* changes required for primary and mirror */
FtsSegmentStatusChange changes[2];
uint32 stateOld = 0;
uint32 stateNew = 0;
bool isPrimaryAlive = PROBE_IS_ALIVE(primary);
bool isMirrorAlive = PROBE_IS_ALIVE(mirror);
/* get transition type */
uint32 trans = getTransition(isPrimaryAlive, isMirrorAlive);
if (gp_log_fts > GPVARS_VERBOSITY_VERBOSE)
{
elog(LOG, "FTS: primary found %s, mirror found %s, transition %d.",
(isPrimaryAlive ? "alive" : "dead"), (isMirrorAlive ? "alive" : "dead"), trans);
}
if (trans == TRANS_D_D)
{
elog(LOG, "FTS: detected double failure for content=%d, primary (dbid=%d), mirror (dbid=%d).",
primary->segindex, primary->dbid, mirror->dbid);
}
/* get current state */
stateOld = FtsGetPairStateFilerep(primary, mirror);
/* get new state */
stateNew = transition(stateOld, trans, primary, mirror, &changes[0], &changes[1]);
/* check if transition is required */
if (stateNew != stateOld)
{
update_found = true;
updateConfiguration(changes, ARRAY_SIZE(changes));
}
}
if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
elog(LOG, "FTS: probe result processing is complete.");
}
return update_found;
}
/*
* Build struct with segment changes
*/
static void
buildSegmentStateChange(CdbComponentDatabaseInfo *segInfo, FtsSegmentStatusChange *change, uint8 statusNew)
{
change->dbid = segInfo->dbid;
change->segindex = segInfo->segindex;
change->oldStatus = ftsProbeInfo->fts_status[segInfo->dbid];
change->newStatus = statusNew;
}
/*
* get transition type - derived from probed primary/mirror state
*/
static uint32
getTransition(bool isPrimaryAlive, bool isMirrorAlive)
{
uint32 state = (isPrimaryAlive ? 2 : 0) + (isMirrorAlive ? 1 : 0);
switch (state)
{
case (0):
/* primary and mirror dead */
return TRANS_D_D;
case (1):
/* primary dead, mirror alive */
return TRANS_D_U;
case (2):
/* primary alive, mirror dead */
return TRANS_U_D;
case (3):
/* primary and mirror alive */
return TRANS_U_U;
default:
Assert(!"Invalid transition for FTS state machine");
return 0;
}
}
/*
* find new state for primary and mirror
*/
static uint32
transition
(
uint32 stateOld,
uint32 trans,
CdbComponentDatabaseInfo *primary,
CdbComponentDatabaseInfo *mirror,
FtsSegmentStatusChange *changesPrimary,
FtsSegmentStatusChange *changesMirror
)
{
Assert(IS_VALID_TRANSITION(trans));
/* reset changes */
memset(changesPrimary, 0, sizeof(*changesPrimary));
memset(changesMirror, 0, sizeof(*changesMirror));
uint32 stateNew = stateOld;
/* in case of a double failure we don't do anything */
if (trans == TRANS_D_D)
{
return stateOld;
}
/* get new state for primary and mirror */
stateNew = FtsTransitionFilerep(stateOld, trans);
/* check if transition is required */
if (stateNew != stateOld)
{
FtsSegmentPairState pairState;
memset(&pairState, 0, sizeof(pairState));
pairState.primary = primary;
pairState.mirror = mirror;
pairState.stateNew = stateNew;
pairState.statePrimary = 0;
pairState.stateMirror = 0;
if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG)
{
elog(LOG, "FTS: state machine transition from %d to %d.", stateOld, stateNew);
}
FtsResolveStateFilerep(&pairState);
buildSegmentStateChange(primary, changesPrimary, pairState.statePrimary);
buildSegmentStateChange(mirror, changesMirror, pairState.stateMirror);
FtsDumpChanges(changesPrimary, 1);
FtsDumpChanges(changesMirror, 1);
}
return stateNew;
}
/*
* update segment configuration in catalog and shared memory
*/
static bool
probeUpdateConfig(FtsSegmentStatusChange *changes, int changeCount)
{
Relation configrel;
Relation histrel;
SysScanDesc sscan;
ScanKeyData scankey;
HeapTuple configtuple;
HeapTuple newtuple;
HeapTuple histtuple;
Datum configvals[Natts_gp_segment_configuration];
bool confignulls[Natts_gp_segment_configuration] = { false };
bool repls[Natts_gp_segment_configuration] = { false };
Datum histvals[Natts_gp_configuration_history];
bool histnulls[Natts_gp_configuration_history] = { false };
bool valid;
bool primary;
bool changelogging;
int i;
char desc[SQL_CMD_BUF_SIZE];
/*
* Commit/abort transaction below will destroy
* CurrentResourceOwner. We need it for catalog reads.
*/
ResourceOwner save = CurrentResourceOwner;
StartTransactionCommand();
GetTransactionSnapshot();
elog(LOG, "probeUpdateConfig called for %d changes", changeCount);
histrel = heap_open(GpConfigHistoryRelationId,
RowExclusiveLock);
configrel = heap_open(GpSegmentConfigRelationId,
RowExclusiveLock);
for (i = 0; i < changeCount; i++)
{
FtsSegmentStatusChange *change = &changes[i];
valid = (changes[i].newStatus & FTS_STATUS_ALIVE);
primary = (changes[i].newStatus & FTS_STATUS_PRIMARY);
changelogging = (changes[i].newStatus & FTS_STATUS_CHANGELOGGING);
if (changelogging)
{
Assert(primary && valid);
}
Assert((valid || !primary) && "Primary cannot be down");
/*
* Insert new tuple into gp_configuration_history catalog.
*/
histvals[Anum_gp_configuration_history_time-1] =
TimestampTzGetDatum(GetCurrentTimestamp());
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(changes[i].dbid);
snprintf(desc, sizeof(desc),
"FTS: content %d fault marking status %s%s role %c",
change->segindex, valid ? "UP" : "DOWN",
(changelogging) ? " mode: change-tracking" : "",
primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR);
histvals[Anum_gp_configuration_history_desc-1] =
CStringGetTextDatum(desc);
histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls);
simple_heap_insert(histrel, histtuple);
CatalogUpdateIndexes(histrel, histtuple);
/*
* Find and update gp_segment_configuration tuple.
*/
ScanKeyInit(&scankey,
Anum_gp_segment_configuration_dbid,
BTEqualStrategyNumber, F_INT2EQ,
Int16GetDatum(changes[i].dbid));
sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId,
true, SnapshotNow, 1, &scankey);
configtuple = systable_getnext(sscan);
if (!HeapTupleIsValid(configtuple))
{
elog(ERROR, "FTS cannot find dbid=%d in %s", changes[i].dbid,
RelationGetRelationName(configrel));
}
configvals[Anum_gp_segment_configuration_role-1] =
CharGetDatum(primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR);
repls[Anum_gp_segment_configuration_role-1] = true;
configvals[Anum_gp_segment_configuration_status-1] =
CharGetDatum(valid ? GP_SEGMENT_CONFIGURATION_STATUS_UP : GP_SEGMENT_CONFIGURATION_STATUS_DOWN);
repls[Anum_gp_segment_configuration_status-1] = true;
if (changelogging)
{
configvals[Anum_gp_segment_configuration_mode-1] =
CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_CHANGETRACKING);
}
repls[Anum_gp_segment_configuration_mode-1] = changelogging;
newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel),
configvals, confignulls, repls);
simple_heap_update(configrel, &configtuple->t_self, newtuple);
CatalogUpdateIndexes(configrel, newtuple);
systable_endscan(sscan);
pfree(newtuple);
/*
* Update shared memory
*/
ftsProbeInfo->fts_status[changes[i].dbid] = changes[i].newStatus;
}
heap_close(histrel, RowExclusiveLock);
heap_close(configrel, RowExclusiveLock);
SIMPLE_FAULT_INJECTOR(FtsWaitForShutdown);
/*
* Do not block shutdown. We will always get a change to update
* gp_segment_configuration in subsequent probes upon database
* restart.
*/
if (IsFtsShudownRequested())
{
elog(LOG, "Shutdown in progress, ignoring FTS prober updates.");
return false;
}
CommitTransactionCommand();
CurrentResourceOwner = save;
return true;
}
/*
* Apply requested segment transitions
*/
static void
updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries)
{
Assert(changes != NULL);
Assert(cdb_component_dbs != NULL);
CdbComponentDatabaseInfo *entryDB = &cdb_component_dbs->entry_db_info[0];
if (entryDB->dbid != GpIdentity.dbid)
{
if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG)
{
elog(LOG, "FTS: advancing to second entry-db.");
}
entryDB = entryDB + 1;
}
/* if we've gotten a pause or shutdown request, we ignore our probe results. */
if (!FtsIsActive())
{
return;
}
/* update segment configuration */
bool commit = probeUpdateConfig(changes, changeEntries);
if (commit)
FtsFailoverFilerep(changes, changeEntries);
if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
elog(LOG, "FTS: finished segment modifications.");
}
}
#endif
/* EOF */
......@@ -20,18 +20,17 @@
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"
#include "cdb/cdbgang.h" /* gp_pthread_create */
#include "libpq/ip.h"
#include "postmaster/fts.h"
#include "postmaster/ftsprobe.h"
#include "executor/spi.h"
#include "postmaster/primary_mirror_mode.h"
#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
......@@ -52,7 +51,7 @@
#else
#define PROBE_RESPONSE_LEN (20) /* size of segment response message */
#endif
#define PROBE_ERR_MSG_LEN (256) /* length of error message for errno */
/*
* MACROS
......@@ -64,29 +63,6 @@
/*
* STRUCTURES
*/
typedef struct threadWorkerInfo
{
uint8 *scan_status;
} threadWorkerInfo;
typedef struct ProbeConnectionInfo
{
int16 dbId; /* the dbid of the segment */
int16 segmentId; /* content indicator: -1 for master, 0, ..., n-1 for segments */
char role; /* primary ('p'), mirror ('m') */
char mode; /* sync ('s'), resync ('r'), change-tracking ('c') */
GpMonotonicTime startTime; /* probe start timestamp */
#ifdef USE_SEGWALREP
probe_result *result;
#endif
char segmentStatus; /* probed segment status */
int16 probe_errno; /* saved errno from the latest system call */
char errmsg[PROBE_ERR_MSG_LEN]; /* message returned by strerror() */
PGconn *conn; /* libpq connection object */
} ProbeConnectionInfo;
typedef struct ProbeMsg
{
uint32 packetlen;
......@@ -97,23 +73,15 @@ typedef struct ProbeMsg
* STATIC VARIABLES
*/
#ifdef USE_SEGWALREP
/* mutex used for pthread synchronization in parallel probing */
static pthread_mutex_t worker_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
/* struct holding segment configuration */
static CdbComponentDatabases *cdb_component_dbs = NULL;
/* one byte of status for each segment */
static uint8 *scan_status;
#endif
/*
* FUNCTION PROTOTYPES
*/
static void *probeSegmentFromThread(void *cdb_component_dbs);
static char probeSegment(CdbComponentDatabaseInfo *dbInfo);
static bool probeConnect(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo *probeInfo);
static bool probePollOut(ProbeConnectionInfo *probeInfo);
static bool probeSend(ProbeConnectionInfo *probeInfo);
......@@ -151,91 +119,12 @@ static char *errmessage(ProbeConnectionInfo *probeInfo)
return probeInfo->errmsg;
}
/*
* probe segments to check if they are alive and if any failure has occurred
*/
#ifdef USE_SEGWALREP
void
FtsProbeSegments(CdbComponentDatabases *dbs, uint8 *probeRes)
{
int i;
threadWorkerInfo worker_info;
int workers = gp_fts_probe_threadcount;
pthread_t *threads = NULL;
cdb_component_dbs = dbs;
scan_status = probeRes;
if (cdb_component_dbs == NULL || scan_status == NULL)
{
elog(ERROR, "FTS: segment configuration has not been loaded to shared memory");
}
/* reset probe results */
memset(scan_status, 0, cdb_component_dbs->total_segment_dbs * sizeof(scan_status[0]));
/* figure out which segments to include in the scan. */
for (i=0; i < cdb_component_dbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *segInfo = &cdb_component_dbs->segment_db_info[i];
if (FtsIsSegmentAlive(segInfo))
{
/* mark segment for probing */
scan_status[segInfo->dbid] = PROBE_SEGMENT;
}
else
{
/* consider segment dead */
scan_status[segInfo->dbid] = PROBE_DEAD;
}
}
worker_info.scan_status = scan_status;
threads = (pthread_t *)palloc(workers * sizeof(pthread_t));
for (i = 0; i < workers; i++)
{
#ifdef USE_ASSERT_CHECKING
int ret =
#endif /* USE_ASSERT_CHECKING */
gp_pthread_create(&threads[i], probeSegmentFromThread, &worker_info, "probeSegments");
Assert(ret == 0 && "FTS: failed to create probing thread");
}
/* we have nothing left to do but wait */
for (i = 0; i < workers; i++)
{
#ifdef USE_ASSERT_CHECKING
int ret =
#endif /* USE_ASSERT_CHECKING */
pthread_join(threads[i], NULL);
Assert(ret == 0 && "FTS: failed to join probing thread");
}
pfree(threads);
threads = NULL;
/* if we're shutting down, just exit. */
if (!FtsIsActive())
return;
if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG)
{
elog(LOG, "FTS: probe results for all segments:");
for (i=0; i < cdb_component_dbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *segInfo = NULL;
segInfo = &cdb_component_dbs->segment_db_info[i];
elog(LOG, "segment dbid %d status 0x%x.", segInfo->dbid, scan_status[segInfo->dbid]);
}
}
}
static char probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo probeInfo)
#else
char
#endif
probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo probeInfo)
{
/*
* probe segment: open socket -> connect -> send probe msg -> receive response;
......@@ -294,27 +183,6 @@ static char probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnection
#endif
}
/*
* This is called from several different threads: ONLY USE THREADSAFE FUNCTIONS INSIDE.
*/
static char probeSegment(CdbComponentDatabaseInfo *dbInfo)
{
Assert(dbInfo != NULL);
/* setup probe descriptor */
ProbeConnectionInfo probeInfo;
memset(&probeInfo, 0, sizeof(ProbeConnectionInfo));
probeInfo.segmentId = dbInfo->segindex;
probeInfo.dbId = dbInfo->dbid;
probeInfo.role = dbInfo->role;
probeInfo.mode = dbInfo->mode;
probeInfo.segmentStatus = PROBE_DEAD;
return probeSegmentHelper(dbInfo, probeInfo);
}
#ifdef USE_SEGWALREP
static void
probeWalRepSegment(probe_response_per_segment *response)
......@@ -703,121 +571,4 @@ probeProcessResponse(ProbeConnectionInfo *probeInfo)
return true;
}
/*
* Function called by probing thread;
* iteratively picks next segment pair to probe until all segments are probed;
* probes primary; probes mirror if primary reports crash fault or does not respond
*/
static void *
probeSegmentFromThread(void *arg)
{
threadWorkerInfo *worker_info;
int i;
worker_info = (threadWorkerInfo *) arg;
i = 0;
for (;;)
{
CdbComponentDatabaseInfo *primary = NULL;
CdbComponentDatabaseInfo *mirror = NULL;
char probe_result_primary = PROBE_DEAD;
char probe_result_mirror = PROBE_DEAD;
/*
* find untested primary, mark primary and mirror "tested" and unlock.
*/
pthread_mutex_lock(&worker_thread_mutex);
for (; i < cdb_component_dbs->total_segment_dbs; i++)
{
primary = &cdb_component_dbs->segment_db_info[i];
/* check segments in pairs of primary-mirror */
if (!SEGMENT_IS_ACTIVE_PRIMARY(primary))
{
continue;
}
if (PROBE_CHECK_FLAG(worker_info->scan_status[primary->dbid], PROBE_SEGMENT))
{
/* prevent re-checking this pair */
worker_info->scan_status[primary->dbid] &= ~PROBE_SEGMENT;
mirror = FtsGetPeerSegment(primary->segindex, primary->dbid);
/* check if mirror is marked for probing */
if (mirror != NULL &&
PROBE_CHECK_FLAG(worker_info->scan_status[mirror->dbid], PROBE_SEGMENT))
{
worker_info->scan_status[mirror->dbid] &= ~PROBE_SEGMENT;
}
else
{
mirror = NULL;
}
break;
}
}
pthread_mutex_unlock(&worker_thread_mutex);
/* check if all segments were probed */
if (i == cdb_component_dbs->total_segment_dbs || primary == NULL)
{
break;
}
/* if we've gotten a pause or shutdown request, we ignore probe results. */
if (!FtsIsActive())
{
break;
}
/* probe primary */
probe_result_primary = probeSegment(primary);
Assert(!PROBE_CHECK_FLAG(probe_result_primary, PROBE_SEGMENT));
if ((probe_result_primary & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
write_log("FTS: primary (content=%d, dbid=%d, status 0x%x) didn't respond to probe.",
primary->segindex, primary->dbid, probe_result_primary);
}
if (mirror != NULL)
{
/* assume mirror is alive */
probe_result_mirror = PROBE_ALIVE;
/* probe mirror only if primary is dead or has a crash/network fault */
if (!PROBE_CHECK_FLAG(probe_result_primary, PROBE_ALIVE) ||
PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_CRASH) ||
PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_NET))
{
/* probe mirror */
probe_result_mirror = probeSegment(mirror);
Assert(!PROBE_CHECK_FLAG(probe_result_mirror, PROBE_SEGMENT));
if ((probe_result_mirror & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
write_log("FTS: mirror (content=%d, dbid=%d, status 0x%x) didn't respond to probe.",
mirror->segindex, mirror->dbid, probe_result_mirror);
}
}
}
/* update results */
pthread_mutex_lock(&worker_thread_mutex);
worker_info->scan_status[primary->dbid] = probe_result_primary;
if (mirror != NULL)
{
worker_info->scan_status[mirror->dbid] = probe_result_mirror;
}
pthread_mutex_unlock(&worker_thread_mutex);
}
return NULL;
}
/* EOF */
/*-------------------------------------------------------------------------
*
* ftsprobefilerep.c
* Implementation of segment probing interface for filerep
*
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
*
* IDENTIFICATION
* src/backend/fts/ftsprobefilerep.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <pthread.h>
#include "postmaster/fts.h"
#include "postmaster/ftsprobe.h"
#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"
typedef struct threadWorkerInfo
{
uint8 *scan_status;
} threadWorkerInfo;
/* struct holding segment configuration */
static CdbComponentDatabases *cdb_component_dbs = NULL;
/* one byte of status for each segment */
static uint8 *scan_status;
/* mutex used for pthread synchronization in parallel probing */
static pthread_mutex_t worker_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
/*
* This is called from several different threads: ONLY USE THREADSAFE FUNCTIONS INSIDE.
*/
static char probeSegment(CdbComponentDatabaseInfo *dbInfo)
{
Assert(dbInfo != NULL);
/* setup probe descriptor */
ProbeConnectionInfo probeInfo;
memset(&probeInfo, 0, sizeof(ProbeConnectionInfo));
probeInfo.segmentId = dbInfo->segindex;
probeInfo.dbId = dbInfo->dbid;
probeInfo.role = dbInfo->role;
probeInfo.mode = dbInfo->mode;
probeInfo.segmentStatus = PROBE_DEAD;
return probeSegmentHelper(dbInfo, probeInfo);
}
/*
* Function called by probing thread;
* iteratively picks next segment pair to probe until all segments are probed;
* probes primary; probes mirror if primary reports crash fault or does not respond
*/
static void *
probeSegmentFromThread(void *arg)
{
threadWorkerInfo *worker_info;
int i;
worker_info = (threadWorkerInfo *) arg;
Assert(cdb_component_dbs != NULL);
i = 0;
for (;;)
{
CdbComponentDatabaseInfo *primary = NULL;
CdbComponentDatabaseInfo *mirror = NULL;
char probe_result_primary = PROBE_DEAD;
char probe_result_mirror = PROBE_DEAD;
/*
* find untested primary, mark primary and mirror "tested" and unlock.
*/
pthread_mutex_lock(&worker_thread_mutex);
for (; i < cdb_component_dbs->total_segment_dbs; i++)
{
primary = &cdb_component_dbs->segment_db_info[i];
/* check segments in pairs of primary-mirror */
if (!SEGMENT_IS_ACTIVE_PRIMARY(primary))
{
continue;
}
if (PROBE_CHECK_FLAG(worker_info->scan_status[primary->dbid], PROBE_SEGMENT))
{
/* prevent re-checking this pair */
worker_info->scan_status[primary->dbid] &= ~PROBE_SEGMENT;
mirror = FtsGetPeerSegment(primary->segindex, primary->dbid);
/* check if mirror is marked for probing */
if (mirror != NULL &&
PROBE_CHECK_FLAG(worker_info->scan_status[mirror->dbid], PROBE_SEGMENT))
{
worker_info->scan_status[mirror->dbid] &= ~PROBE_SEGMENT;
}
else
{
mirror = NULL;
}
break;
}
}
pthread_mutex_unlock(&worker_thread_mutex);
/* check if all segments were probed */
if (i == cdb_component_dbs->total_segment_dbs || primary == NULL)
{
break;
}
/* if we've gotten a pause or shutdown request, we ignore probe results. */
if (!FtsIsActive())
{
break;
}
/* probe primary */
probe_result_primary = probeSegment(primary);
Assert(!PROBE_CHECK_FLAG(probe_result_primary, PROBE_SEGMENT));
if ((probe_result_primary & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
write_log("FTS: primary (content=%d, dbid=%d, status 0x%x) didn't respond to probe.",
primary->segindex, primary->dbid, probe_result_primary);
}
if (mirror != NULL)
{
/* assume mirror is alive */
probe_result_mirror = PROBE_ALIVE;
/* probe mirror only if primary is dead or has a crash/network fault */
if (!PROBE_CHECK_FLAG(probe_result_primary, PROBE_ALIVE) ||
PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_CRASH) ||
PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_NET))
{
/* probe mirror */
probe_result_mirror = probeSegment(mirror);
Assert(!PROBE_CHECK_FLAG(probe_result_mirror, PROBE_SEGMENT));
if ((probe_result_mirror & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE)
{
write_log("FTS: mirror (content=%d, dbid=%d, status 0x%x) didn't respond to probe.",
mirror->segindex, mirror->dbid, probe_result_mirror);
}
}
}
/* update results */
pthread_mutex_lock(&worker_thread_mutex);
worker_info->scan_status[primary->dbid] = probe_result_primary;
if (mirror != NULL)
{
worker_info->scan_status[mirror->dbid] = probe_result_mirror;
}
pthread_mutex_unlock(&worker_thread_mutex);
}
return NULL;
}
/*
* probe segments to check if they are alive and if any failure has occurred
*/
void
FtsProbeSegments(CdbComponentDatabases *dbs, uint8 *probeRes)
{
int i;
threadWorkerInfo worker_info;
int workers = gp_fts_probe_threadcount;
pthread_t *threads = NULL;
cdb_component_dbs = dbs;
scan_status = probeRes;
if (dbs == NULL || scan_status == NULL)
{
elog(ERROR, "FTS: segment configuration has not been loaded to shared memory");
}
/* reset probe results */
memset(scan_status, 0, dbs->total_segment_dbs * sizeof(scan_status[0]));
/* figure out which segments to include in the scan. */
for (i=0; i < dbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *segInfo = &dbs->segment_db_info[i];
if (FtsIsSegmentAlive(segInfo))
{
/* mark segment for probing */
scan_status[segInfo->dbid] = PROBE_SEGMENT;
}
else
{
/* consider segment dead */
scan_status[segInfo->dbid] = PROBE_DEAD;
}
}
worker_info.scan_status = scan_status;
threads = (pthread_t *)palloc(workers * sizeof(pthread_t));
for (i = 0; i < workers; i++)
{
#ifdef USE_ASSERT_CHECKING
int ret =
#endif /* USE_ASSERT_CHECKING */
gp_pthread_create(&threads[i], probeSegmentFromThread, &worker_info, "probeSegments");
Assert(ret == 0 && "FTS: failed to create probing thread");
}
/* we have nothing left to do but wait */
for (i = 0; i < workers; i++)
{
#ifdef USE_ASSERT_CHECKING
int ret =
#endif /* USE_ASSERT_CHECKING */
pthread_join(threads[i], NULL);
Assert(ret == 0 && "FTS: failed to join probing thread");
}
pfree(threads);
threads = NULL;
/* if we're shutting down, just exit. */
if (!FtsIsActive())
return;
Assert(cdb_component_dbs != NULL);
if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG)
{
elog(LOG, "FTS: probe results for all segments:");
for (i=0; i < cdb_component_dbs->total_segment_dbs; i++)
{
CdbComponentDatabaseInfo *segInfo = NULL;
segInfo = &cdb_component_dbs->segment_db_info[i];
elog(LOG, "segment dbid %d status 0x%x.", segInfo->dbid, scan_status[segInfo->dbid]);
}
}
}
......@@ -556,7 +556,9 @@ static int BackendStartup(Port *port);
static int ProcessStartupPacket(Port *port, bool SSLdone);
static void processCancelRequest(Port *port, void *pkt, MsgType code);
static void processPrimaryMirrorTransitionRequest(Port *port, void *pkt);
#ifndef USE_SEGWALREP
static void processPrimaryMirrorTransitionQuery(Port *port, void *pkt);
#endif
static int initMasks(fd_set *rmask);
static void report_fork_failure_to_client(Port *port, int errnum);
static enum CAC_state canAcceptConnections(void);
......@@ -3577,6 +3579,7 @@ processPrimaryMirrorTransitionRequest(Port *port, void *pkt)
}
}
#ifndef USE_SEGWALREP
static void
sendPrimaryMirrorTransitionQuery(uint32 mode, uint32 segstate, uint32 datastate, uint32 faulttype)
{
......@@ -3687,6 +3690,7 @@ processPrimaryMirrorTransitionQuery(Port *port, void *pkt)
return;
}
#endif
/*
* The client has sent a cancel request packet, not a normal
......
......@@ -94,6 +94,8 @@ enum probe_transition_e
#define IS_VALID_TRANSITION(trans) \
(trans == TRANS_D_D || trans == TRANS_D_U || trans == TRANS_U_D || trans == TRANS_U_U)
/* buffer size for SQL command */
#define SQL_CMD_BUF_SIZE 1024
/*
* STRUCTURES
......@@ -129,16 +131,11 @@ extern int ftsprobe_start(void);
*/
extern void FtsProbeSegments(CdbComponentDatabases *dbs, uint8 *scan_status);
#ifdef USE_SEGWALREP
extern void FtsWalRepProbeSegments(probe_context *context);
#endif
/*
* Interface for segment state checking
*/
extern bool FtsIsSegmentAlive(CdbComponentDatabaseInfo *segInfo);
extern CdbComponentDatabaseInfo *FtsGetPeerSegment(int content, int dbid);
extern void FtsMarkSegmentsInSync(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror);
extern void FtsDumpChanges(FtsSegmentStatusChange *changes, int changeEntries);
/*
......@@ -151,7 +148,9 @@ extern bool FtsIsActive(void);
* Interface for WALREP specific checking
*/
extern void HandleFtsWalRepProbe(void);
#endif
extern void FtsWalRepProbeSegments(probe_context *context);
#else
extern bool probePublishUpdate(CdbComponentDatabases *dbs, uint8 *probe_results);
/*
* Interface for FireRep-specific segment state machine and transitions
......@@ -162,7 +161,7 @@ extern void FtsResolveStateFilerep(FtsSegmentPairState *pairState);
extern void FtsPreprocessProbeResultsFilerep(CdbComponentDatabases *dbs, uint8 *probe_results);
extern void FtsFailoverFilerep(FtsSegmentStatusChange *changes, int changeCount);
#endif
/*
* Interface for requesting master to shut down
......
/*-------------------------------------------------------------------------
*
* ftsprobe.h
* Interface for fault tolerance service Sender.
*
* Portions Copyright (c) 2012-Present Pivotal Software, Inc.
*
*
* IDENTIFICATION
* src/include/postmaster/ftsprobe.h
*
*-------------------------------------------------------------------------
*/
#ifndef FTSPROBE_H
#define FTSPROBE_H
#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */
#define PROBE_ERR_MSG_LEN (256) /* length of error message for errno */
struct pg_conn; /* PGconn ... #include "gp-libpq-fe.h" */
typedef struct ProbeConnectionInfo
{
int16 dbId; /* the dbid of the segment */
int16 segmentId; /* content indicator: -1 for master, 0, ..., n-1 for segments */
char role; /* primary ('p'), mirror ('m') */
char mode; /* sync ('s'), resync ('r'), change-tracking ('c') */
GpMonotonicTime startTime; /* probe start timestamp */
#ifdef USE_SEGWALREP
probe_result *result;
#endif
char segmentStatus; /* probed segment status */
int16 probe_errno; /* saved errno from the latest system call */
char errmsg[PROBE_ERR_MSG_LEN]; /* message returned by strerror() */
struct pg_conn *conn; /* libpq connection object */
} ProbeConnectionInfo;
extern
#ifdef USE_SEGWALREP
void
#else
char
#endif
probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo probeInfo);
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册