diff --git a/src/backend/fts/Makefile b/src/backend/fts/Makefile index c26603eca31c3511f47db6d7ab0b530d8f7e684e..51176399fe31d5e319885158093d605c677fad7a 100644 --- a/src/backend/fts/Makefile +++ b/src/backend/fts/Makefile @@ -11,6 +11,10 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(top_srcdir)/src/backend/gp_libpq_fe $(CPPFLAGS) -OBJS = fts.o ftsprobe.o ftsfilerep.o ftsprobehandler.o - +OBJS = fts.o ftsprobe.o +ifeq ($(enable_segwalrep), yes) +OBJS += ftsprobehandler.o +else +OBJS += ftsfilerep.o ftsprobefilerep.o +endif include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/fts/fts.c b/src/backend/fts/fts.c index 84654e1949e7c786b28908c5b4121b6c42e64e0b..c59d2e01764d49766a329857f8bfd3b8c7328b70 100644 --- a/src/backend/fts/fts.c +++ b/src/backend/fts/fts.c @@ -45,7 +45,7 @@ #include "utils/ps_status.h" #include "utils/relcache.h" #include "utils/snapmgr.h" -#include "utils/syscache.h" + #include "catalog/pg_authid.h" #include "catalog/pg_database.h" @@ -65,12 +65,13 @@ /* * CONSTANTS */ - /* maximum number of segments */ #define MAX_NUM_OF_SEGMENTS 32768 -/* buffer size for SQL command */ -#define SQL_CMD_BUF_SIZE 1024 +#ifndef USE_SEGWALREP +/* one byte of status for each segment */ +static uint8 scan_status[MAX_NUM_OF_SEGMENTS]; +#endif #define GpConfigHistoryRelName "gp_configuration_history" @@ -79,11 +80,6 @@ * STATIC VARIABLES */ -#ifndef USE_SEGWALREP -/* one byte of status for each segment */ -static uint8 scan_status[MAX_NUM_OF_SEGMENTS]; -#endif - static bool am_ftsprobe = false; static volatile bool shutdown_requested = false; @@ -107,32 +103,6 @@ NON_EXEC_STATIC void ftsMain(int argc, char *argv[]); static void FtsLoop(void); static void readCdbComponentInfoAndUpdateStatus(MemoryContext probeContext); -static bool probePublishUpdate(uint8 *scan_status); - -static uint32 getTransition(bool isPrimaryAlive, bool isMirrorAlive); - -static void -buildSegmentStateChange - ( - CdbComponentDatabaseInfo *segInfo, - FtsSegmentStatusChange *change, - uint8 statusNew - ) - ; - -static uint32 transition - ( - uint32 stateOld, - uint32 trans, - CdbComponentDatabaseInfo *primary, - CdbComponentDatabaseInfo *mirror, - FtsSegmentStatusChange *changesPrimary, - FtsSegmentStatusChange *changesMirror - ) - ; - -static void updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries); -static bool probeUpdateConfig(FtsSegmentStatusChange *changes, int changeCount); /* * Main entry point for ftsprobe process. @@ -700,7 +670,7 @@ void FtsLoop() * Now we've completed the scan, update shared-memory. if we * change anything, we return true. */ - updated_probe_state = probePublishUpdate(scan_status); + updated_probe_state = probePublishUpdate(cdb_component_dbs, scan_status); #endif MemoryContextSwitchTo(oldContext); @@ -765,354 +735,6 @@ FtsIsActive(void) return (!ftsProbeInfo->fts_discardResults && !shutdown_requested); } -/* - * Build a set of changes, based on our current state, and the probe results. - */ -static bool -probePublishUpdate(uint8 *probe_results) -{ - bool update_found = false; - int i; - - /* preprocess probe results to decide what is the current segment state */ - FtsPreprocessProbeResultsFilerep(cdb_component_dbs, probe_results); - - for (i = 0; i < cdb_component_dbs->total_segment_dbs; i++) - { - CdbComponentDatabaseInfo *segInfo = &cdb_component_dbs->segment_db_info[i]; - - /* if we've gotten a pause or shutdown request, we ignore our probe results. */ - if (!FtsIsActive()) - { - return false; - } - - /* we check segments in pairs of primary-mirror */ - if (!SEGMENT_IS_ACTIVE_PRIMARY(segInfo)) - { - continue; - } - - CdbComponentDatabaseInfo *primary = segInfo; - CdbComponentDatabaseInfo *mirror = FtsGetPeerSegment(segInfo->segindex, segInfo->dbid); - - Assert(mirror != NULL); - - /* changes required for primary and mirror */ - FtsSegmentStatusChange changes[2]; - - uint32 stateOld = 0; - uint32 stateNew = 0; - - bool isPrimaryAlive = PROBE_IS_ALIVE(primary); - bool isMirrorAlive = PROBE_IS_ALIVE(mirror); - - /* get transition type */ - uint32 trans = getTransition(isPrimaryAlive, isMirrorAlive); - - if (gp_log_fts > GPVARS_VERBOSITY_VERBOSE) - { - elog(LOG, "FTS: primary found %s, mirror found %s, transition %d.", - (isPrimaryAlive ? "alive" : "dead"), (isMirrorAlive ? "alive" : "dead"), trans); - } - - if (trans == TRANS_D_D) - { - elog(LOG, "FTS: detected double failure for content=%d, primary (dbid=%d), mirror (dbid=%d).", - primary->segindex, primary->dbid, mirror->dbid); - } - - /* get current state */ - stateOld = FtsGetPairStateFilerep(primary, mirror); - - /* get new state */ - stateNew = transition(stateOld, trans, primary, mirror, &changes[0], &changes[1]); - - /* check if transition is required */ - if (stateNew != stateOld) - { - update_found = true; - updateConfiguration(changes, ARRAY_SIZE(changes)); - } - } - - if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) - { - elog(LOG, "FTS: probe result processing is complete."); - } - - return update_found; -} - - -/* - * Build struct with segment changes - */ -static void -buildSegmentStateChange(CdbComponentDatabaseInfo *segInfo, FtsSegmentStatusChange *change, uint8 statusNew) -{ - change->dbid = segInfo->dbid; - change->segindex = segInfo->segindex; - change->oldStatus = ftsProbeInfo->fts_status[segInfo->dbid]; - change->newStatus = statusNew; -} - -/* - * get transition type - derived from probed primary/mirror state - */ -static uint32 -getTransition(bool isPrimaryAlive, bool isMirrorAlive) -{ - uint32 state = (isPrimaryAlive ? 2 : 0) + (isMirrorAlive ? 1 : 0); - - switch (state) - { - case (0): - /* primary and mirror dead */ - return TRANS_D_D; - case (1): - /* primary dead, mirror alive */ - return TRANS_D_U; - case (2): - /* primary alive, mirror dead */ - return TRANS_U_D; - case (3): - /* primary and mirror alive */ - return TRANS_U_U; - default: - Assert(!"Invalid transition for FTS state machine"); - return 0; - } -} - - -/* - * find new state for primary and mirror - */ -static uint32 -transition - ( - uint32 stateOld, - uint32 trans, - CdbComponentDatabaseInfo *primary, - CdbComponentDatabaseInfo *mirror, - FtsSegmentStatusChange *changesPrimary, - FtsSegmentStatusChange *changesMirror - ) -{ - Assert(IS_VALID_TRANSITION(trans)); - - /* reset changes */ - memset(changesPrimary, 0, sizeof(*changesPrimary)); - memset(changesMirror, 0, sizeof(*changesMirror)); - - uint32 stateNew = stateOld; - - /* in case of a double failure we don't do anything */ - if (trans == TRANS_D_D) - { - return stateOld; - } - - /* get new state for primary and mirror */ - stateNew = FtsTransitionFilerep(stateOld, trans); - - /* check if transition is required */ - if (stateNew != stateOld) - { - FtsSegmentPairState pairState; - memset(&pairState, 0, sizeof(pairState)); - pairState.primary = primary; - pairState.mirror = mirror; - pairState.stateNew = stateNew; - pairState.statePrimary = 0; - pairState.stateMirror = 0; - - if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG) - { - elog(LOG, "FTS: state machine transition from %d to %d.", stateOld, stateNew); - } - - FtsResolveStateFilerep(&pairState); - - buildSegmentStateChange(primary, changesPrimary, pairState.statePrimary); - buildSegmentStateChange(mirror, changesMirror, pairState.stateMirror); - - FtsDumpChanges(changesPrimary, 1); - FtsDumpChanges(changesMirror, 1); - } - - return stateNew; -} - - -/* - * Apply requested segment transitions - */ -static void -updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries) -{ - Assert(changes != NULL); - - CdbComponentDatabaseInfo *entryDB = &cdb_component_dbs->entry_db_info[0]; - - if (entryDB->dbid != GpIdentity.dbid) - { - if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG) - { - elog(LOG, "FTS: advancing to second entry-db."); - } - entryDB = entryDB + 1; - } - - /* if we've gotten a pause or shutdown request, we ignore our probe results. */ - if (!FtsIsActive()) - { - return; - } - - /* update segment configuration */ - bool commit = probeUpdateConfig(changes, changeEntries); - - if (commit) - FtsFailoverFilerep(changes, changeEntries); - - if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) - { - elog(LOG, "FTS: finished segment modifications."); - } -} - -/* - * update segment configuration in catalog and shared memory - */ -static bool -probeUpdateConfig(FtsSegmentStatusChange *changes, int changeCount) -{ - Relation configrel; - Relation histrel; - SysScanDesc sscan; - ScanKeyData scankey; - HeapTuple configtuple; - HeapTuple newtuple; - HeapTuple histtuple; - Datum configvals[Natts_gp_segment_configuration]; - bool confignulls[Natts_gp_segment_configuration] = { false }; - bool repls[Natts_gp_segment_configuration] = { false }; - Datum histvals[Natts_gp_configuration_history]; - bool histnulls[Natts_gp_configuration_history] = { false }; - bool valid; - bool primary; - bool changelogging; - int i; - char desc[SQL_CMD_BUF_SIZE]; - - /* - * Commit/abort transaction below will destroy - * CurrentResourceOwner. We need it for catalog reads. - */ - ResourceOwner save = CurrentResourceOwner; - StartTransactionCommand(); - GetTransactionSnapshot(); - elog(LOG, "probeUpdateConfig called for %d changes", changeCount); - - histrel = heap_open(GpConfigHistoryRelationId, - RowExclusiveLock); - configrel = heap_open(GpSegmentConfigRelationId, - RowExclusiveLock); - - for (i = 0; i < changeCount; i++) - { - FtsSegmentStatusChange *change = &changes[i]; - valid = (changes[i].newStatus & FTS_STATUS_ALIVE); - primary = (changes[i].newStatus & FTS_STATUS_PRIMARY); - changelogging = (changes[i].newStatus & FTS_STATUS_CHANGELOGGING); - - if (changelogging) - { - Assert(primary && valid); - } - - Assert((valid || !primary) && "Primary cannot be down"); - - /* - * Insert new tuple into gp_configuration_history catalog. - */ - histvals[Anum_gp_configuration_history_time-1] = - TimestampTzGetDatum(GetCurrentTimestamp()); - histvals[Anum_gp_configuration_history_dbid-1] = - Int16GetDatum(changes[i].dbid); - snprintf(desc, sizeof(desc), - "FTS: content %d fault marking status %s%s role %c", - change->segindex, valid ? "UP" : "DOWN", - (changelogging) ? " mode: change-tracking" : "", - primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR); - histvals[Anum_gp_configuration_history_desc-1] = - CStringGetTextDatum(desc); - - histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls); - simple_heap_insert(histrel, histtuple); - CatalogUpdateIndexes(histrel, histtuple); - - /* - * Find and update gp_segment_configuration tuple. - */ - ScanKeyInit(&scankey, - Anum_gp_segment_configuration_dbid, - BTEqualStrategyNumber, F_INT2EQ, - Int16GetDatum(changes[i].dbid)); - sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId, - true, SnapshotNow, 1, &scankey); - configtuple = systable_getnext(sscan); - if (!HeapTupleIsValid(configtuple)) - { - elog(ERROR, "FTS cannot find dbid=%d in %s", changes[i].dbid, - RelationGetRelationName(configrel)); - } - configvals[Anum_gp_segment_configuration_role-1] = - CharGetDatum(primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR); - repls[Anum_gp_segment_configuration_role-1] = true; - configvals[Anum_gp_segment_configuration_status-1] = - CharGetDatum(valid ? GP_SEGMENT_CONFIGURATION_STATUS_UP : GP_SEGMENT_CONFIGURATION_STATUS_DOWN); - repls[Anum_gp_segment_configuration_status-1] = true; - if (changelogging) - { - configvals[Anum_gp_segment_configuration_mode-1] = - CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_CHANGETRACKING); - } - repls[Anum_gp_segment_configuration_mode-1] = changelogging; - - newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel), - configvals, confignulls, repls); - simple_heap_update(configrel, &configtuple->t_self, newtuple); - CatalogUpdateIndexes(configrel, newtuple); - - systable_endscan(sscan); - pfree(newtuple); - /* - * Update shared memory - */ - ftsProbeInfo->fts_status[changes[i].dbid] = changes[i].newStatus; - } - heap_close(histrel, RowExclusiveLock); - heap_close(configrel, RowExclusiveLock); - - SIMPLE_FAULT_INJECTOR(FtsWaitForShutdown); - /* - * Do not block shutdown. We will always get a change to update - * gp_segment_configuration in subsequent probes upon database - * restart. - */ - if (shutdown_requested) - { - elog(LOG, "Shutdown in progress, ignoring FTS prober updates."); - return false; - } - CommitTransactionCommand(); - CurrentResourceOwner = save; - return true; -} - bool FtsIsSegmentAlive(CdbComponentDatabaseInfo *segInfo) { @@ -1156,144 +778,6 @@ FtsDumpChanges(FtsSegmentStatusChange *changes, int changeEntries) } } -/** - * Marks the given db as in-sync in the segment configuration. - */ -void -FtsMarkSegmentsInSync(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror) -{ - if (!FTS_STATUS_ISALIVE(primary->dbid, ftsProbeInfo->fts_status) || - !FTS_STATUS_ISALIVE(mirror->dbid, ftsProbeInfo->fts_status) || - !FTS_STATUS_ISPRIMARY(primary->dbid, ftsProbeInfo->fts_status) || - FTS_STATUS_ISPRIMARY(mirror->dbid, ftsProbeInfo->fts_status) || - FTS_STATUS_IS_SYNCED(primary->dbid, ftsProbeInfo->fts_status) || - FTS_STATUS_IS_SYNCED(mirror->dbid, ftsProbeInfo->fts_status) || - FTS_STATUS_IS_CHANGELOGGING(primary->dbid, ftsProbeInfo->fts_status) || - FTS_STATUS_IS_CHANGELOGGING(mirror->dbid, ftsProbeInfo->fts_status)) - { - FtsRequestPostmasterShutdown(primary, mirror); - } - - if (ftsProbeInfo->fts_pauseProbes) - { - return; - } - - uint8 segStatus=0; - Relation configrel; - Relation histrel; - ScanKeyData scankey; - SysScanDesc sscan; - HeapTuple configtuple; - HeapTuple newtuple; - HeapTuple histtuple; - Datum configvals[Natts_gp_segment_configuration]; - bool confignulls[Natts_gp_segment_configuration] = { false }; - bool repls[Natts_gp_segment_configuration] = { false }; - Datum histvals[Natts_gp_configuration_history]; - bool histnulls[Natts_gp_configuration_history] = { false }; - char *desc = "FTS: changed segment to insync from resync."; - /* - * Commit/abort transaction below will destroy - * CurrentResourceOwner. We need it for catalog reads. - */ - ResourceOwner save = CurrentResourceOwner; - StartTransactionCommand(); - GetTransactionSnapshot(); - - /* update primary */ - segStatus = ftsProbeInfo->fts_status[primary->dbid]; - segStatus |= FTS_STATUS_SYNCHRONIZED; - ftsProbeInfo->fts_status[primary->dbid] = segStatus; - - /* update mirror */ - segStatus = ftsProbeInfo->fts_status[mirror->dbid]; - segStatus |= FTS_STATUS_SYNCHRONIZED; - ftsProbeInfo->fts_status[mirror->dbid] = segStatus; - - histrel = heap_open(GpConfigHistoryRelationId, - RowExclusiveLock); - configrel = heap_open(GpSegmentConfigRelationId, - RowExclusiveLock); - - /* update gp_segment_configuration to insync */ - ScanKeyInit(&scankey, - Anum_gp_segment_configuration_dbid, - BTEqualStrategyNumber, F_INT2EQ, - Int16GetDatum(primary->dbid)); - sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId, - true, SnapshotNow, 1, &scankey); - configtuple = systable_getnext(sscan); - if (!HeapTupleIsValid(configtuple)) - { - elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid, - mirror->dbid, RelationGetRelationName(configrel)); - } - configvals[Anum_gp_segment_configuration_mode-1] = CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_INSYNC); - repls[Anum_gp_segment_configuration_mode-1] = true; - newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel), - configvals, confignulls, repls); - simple_heap_update(configrel, &configtuple->t_self, newtuple); - CatalogUpdateIndexes(configrel, newtuple); - - systable_endscan(sscan); - - ScanKeyInit(&scankey, - Anum_gp_segment_configuration_dbid, - BTEqualStrategyNumber, F_INT2EQ, - Int16GetDatum(mirror->dbid)); - sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId, - true, SnapshotNow, 1, &scankey); - configtuple = systable_getnext(sscan); - if (!HeapTupleIsValid(configtuple)) - { - elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid, - mirror->dbid, RelationGetRelationName(configrel)); - } - newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel), - configvals, confignulls, repls); - simple_heap_update(configrel, &configtuple->t_self, newtuple); - CatalogUpdateIndexes(configrel, newtuple); - - systable_endscan(sscan); - - /* update configuration history */ - histvals[Anum_gp_configuration_history_time-1] = - TimestampTzGetDatum(GetCurrentTimestamp()); - histvals[Anum_gp_configuration_history_dbid-1] = - Int16GetDatum(primary->dbid); - histvals[Anum_gp_configuration_history_desc-1] = - CStringGetTextDatum(desc); - histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls); - simple_heap_insert(histrel, histtuple); - CatalogUpdateIndexes(histrel, histtuple); - - histvals[Anum_gp_configuration_history_dbid-1] = - Int16GetDatum(mirror->dbid); - histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls); - simple_heap_insert(histrel, histtuple); - CatalogUpdateIndexes(histrel, histtuple); - ereport(LOG, - (errmsg("FTS: resynchronization of mirror (dbid=%d, content=%d) on %s:%d has completed.", - mirror->dbid, mirror->segindex, mirror->address, mirror->port ), - errSendAlert(true))); - - heap_close(histrel, RowExclusiveLock); - heap_close(configrel, RowExclusiveLock); - /* - * Do not block shutdown. We will always get a change to update - * gp_segment_configuration in subsequent probes upon database - * restart. - */ - if (shutdown_requested) - { - elog(LOG, "Shutdown in progress, ignoring FTS prober updates."); - return; - } - CommitTransactionCommand(); - CurrentResourceOwner = save; -} - /* * Get peer segment descriptor */ diff --git a/src/backend/fts/ftsfilerep.c b/src/backend/fts/ftsfilerep.c index 1b75c10fe72858963a386761fa90fb97df87a0a3..89394d907302e6483d0e796be0bbf346c2d8c914 100644 --- a/src/backend/fts/ftsfilerep.c +++ b/src/backend/fts/ftsfilerep.c @@ -16,15 +16,21 @@ */ #include "postgres.h" +#include "access/heapam.h" +#include "access/genam.h" +#include "catalog/gp_configuration_history.h" +#include "catalog/gp_segment_config.h" + +#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */ +#include "executor/spi.h" #include "gp-libpq-fe.h" #include "gp-libpq-int.h" -#include "cdb/cdbfts.h" -#include "executor/spi.h" #include "postmaster/fts.h" #include "postmaster/primary_mirror_mode.h" - -#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */ - +#include "utils/fmgroids.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" +#include "cdb/cdbfts.h" /* * CONSTANTS @@ -269,6 +275,143 @@ FtsResolveStateFilerep(FtsSegmentPairState *pairState) } } +/* + * Marks the given db as in-sync in the segment configuration. + */ +static void +FtsMarkSegmentsInSync(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror) +{ + if (!FTS_STATUS_ISALIVE(primary->dbid, ftsProbeInfo->fts_status) || + !FTS_STATUS_ISALIVE(mirror->dbid, ftsProbeInfo->fts_status) || + !FTS_STATUS_ISPRIMARY(primary->dbid, ftsProbeInfo->fts_status) || + FTS_STATUS_ISPRIMARY(mirror->dbid, ftsProbeInfo->fts_status) || + FTS_STATUS_IS_SYNCED(primary->dbid, ftsProbeInfo->fts_status) || + FTS_STATUS_IS_SYNCED(mirror->dbid, ftsProbeInfo->fts_status) || + FTS_STATUS_IS_CHANGELOGGING(primary->dbid, ftsProbeInfo->fts_status) || + FTS_STATUS_IS_CHANGELOGGING(mirror->dbid, ftsProbeInfo->fts_status)) + { + FtsRequestPostmasterShutdown(primary, mirror); + } + + if (ftsProbeInfo->fts_pauseProbes) + { + return; + } + + uint8 segStatus=0; + Relation configrel; + Relation histrel; + ScanKeyData scankey; + SysScanDesc sscan; + HeapTuple configtuple; + HeapTuple newtuple; + HeapTuple histtuple; + Datum configvals[Natts_gp_segment_configuration]; + bool confignulls[Natts_gp_segment_configuration] = { false }; + bool repls[Natts_gp_segment_configuration] = { false }; + Datum histvals[Natts_gp_configuration_history]; + bool histnulls[Natts_gp_configuration_history] = { false }; + char *desc = "FTS: changed segment to insync from resync."; + /* + * Commit/abort transaction below will destroy + * CurrentResourceOwner. We need it for catalog reads. + */ + ResourceOwner save = CurrentResourceOwner; + StartTransactionCommand(); + GetTransactionSnapshot(); + + /* update primary */ + segStatus = ftsProbeInfo->fts_status[primary->dbid]; + segStatus |= FTS_STATUS_SYNCHRONIZED; + ftsProbeInfo->fts_status[primary->dbid] = segStatus; + + /* update mirror */ + segStatus = ftsProbeInfo->fts_status[mirror->dbid]; + segStatus |= FTS_STATUS_SYNCHRONIZED; + ftsProbeInfo->fts_status[mirror->dbid] = segStatus; + + histrel = heap_open(GpConfigHistoryRelationId, + RowExclusiveLock); + configrel = heap_open(GpSegmentConfigRelationId, + RowExclusiveLock); + + /* update gp_segment_configuration to insync */ + ScanKeyInit(&scankey, + Anum_gp_segment_configuration_dbid, + BTEqualStrategyNumber, F_INT2EQ, + Int16GetDatum(primary->dbid)); + sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId, + true, SnapshotNow, 1, &scankey); + configtuple = systable_getnext(sscan); + if (!HeapTupleIsValid(configtuple)) + { + elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid, + mirror->dbid, RelationGetRelationName(configrel)); + } + configvals[Anum_gp_segment_configuration_mode-1] = CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_INSYNC); + repls[Anum_gp_segment_configuration_mode-1] = true; + newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel), + configvals, confignulls, repls); + simple_heap_update(configrel, &configtuple->t_self, newtuple); + CatalogUpdateIndexes(configrel, newtuple); + + systable_endscan(sscan); + + ScanKeyInit(&scankey, + Anum_gp_segment_configuration_dbid, + BTEqualStrategyNumber, F_INT2EQ, + Int16GetDatum(mirror->dbid)); + sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId, + true, SnapshotNow, 1, &scankey); + configtuple = systable_getnext(sscan); + if (!HeapTupleIsValid(configtuple)) + { + elog(ERROR,"FTS cannot find dbid (%d, %d) in %s", primary->dbid, + mirror->dbid, RelationGetRelationName(configrel)); + } + newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel), + configvals, confignulls, repls); + simple_heap_update(configrel, &configtuple->t_self, newtuple); + CatalogUpdateIndexes(configrel, newtuple); + + systable_endscan(sscan); + + /* update configuration history */ + histvals[Anum_gp_configuration_history_time-1] = + TimestampTzGetDatum(GetCurrentTimestamp()); + histvals[Anum_gp_configuration_history_dbid-1] = + Int16GetDatum(primary->dbid); + histvals[Anum_gp_configuration_history_desc-1] = + CStringGetTextDatum(desc); + histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls); + simple_heap_insert(histrel, histtuple); + CatalogUpdateIndexes(histrel, histtuple); + + histvals[Anum_gp_configuration_history_dbid-1] = + Int16GetDatum(mirror->dbid); + histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls); + simple_heap_insert(histrel, histtuple); + CatalogUpdateIndexes(histrel, histtuple); + ereport(LOG, + (errmsg("FTS: resynchronization of mirror (dbid=%d, content=%d) on %s:%d has completed.", + mirror->dbid, mirror->segindex, mirror->address, mirror->port ), + errSendAlert(true))); + + heap_close(histrel, RowExclusiveLock); + heap_close(configrel, RowExclusiveLock); + /* + * Do not block shutdown. We will always get a change to update + * gp_segment_configuration in subsequent probes upon database + * restart. + */ + if (IsFtsShudownRequested()) + { + elog(LOG, "Shutdown in progress, ignoring FTS prober updates."); + return; + } + CommitTransactionCommand(); + CurrentResourceOwner = save; +} /* * pre-process probe results to take into account some special @@ -283,11 +426,11 @@ FtsPreprocessProbeResultsFilerep(CdbComponentDatabases *dbs, uint8 *probe_result { int i = 0; cdb_component_dbs = dbs; - Assert(cdb_component_dbs != NULL); + Assert(dbs != NULL); - for (i=0; i < cdb_component_dbs->total_segment_dbs; i++) + for (i=0; i < dbs->total_segment_dbs; i++) { - CdbComponentDatabaseInfo *segInfo = &cdb_component_dbs->segment_db_info[i]; + CdbComponentDatabaseInfo *segInfo = &dbs->segment_db_info[i]; CdbComponentDatabaseInfo *primary = NULL, *mirror = NULL; if (!SEGMENT_IS_ACTIVE_PRIMARY(segInfo)) @@ -568,6 +711,7 @@ getHostsByDbid(int dbid, char **hostname_p, int *host_port_p, int *host_filerep_ Assert(peer_name_p != NULL); Assert(peer_pm_port_p != NULL); Assert(peer_filerep_port_p != NULL); + Assert(cdb_component_dbs != NULL); found = false; @@ -622,5 +766,379 @@ getHostsByDbid(int dbid, char **hostname_p, int *host_port_p, int *host_filerep_ } } +#ifndef USE_SEGWALREP +static uint32 getTransition(bool isPrimaryAlive, bool isMirrorAlive); + +static void +buildSegmentStateChange + ( + CdbComponentDatabaseInfo *segInfo, + FtsSegmentStatusChange *change, + uint8 statusNew + ) + ; + +static uint32 transition + ( + uint32 stateOld, + uint32 trans, + CdbComponentDatabaseInfo *primary, + CdbComponentDatabaseInfo *mirror, + FtsSegmentStatusChange *changesPrimary, + FtsSegmentStatusChange *changesMirror + ) + ; + +static void updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries); + +/* + * Build a set of changes, based on our current state, and the probe results. + */ +bool probePublishUpdate(CdbComponentDatabases *dbs, uint8 *probe_results) +{ + bool update_found = false; + int i; + + Assert(dbs != NULL); + cdb_component_dbs = dbs; + + /* preprocess probe results to decide what is the current segment state */ + FtsPreprocessProbeResultsFilerep(dbs, probe_results); + + for (i = 0; i < dbs->total_segment_dbs; i++) + { + CdbComponentDatabaseInfo *segInfo = &dbs->segment_db_info[i]; + + /* if we've gotten a pause or shutdown request, we ignore our probe results. */ + if (!FtsIsActive()) + { + return false; + } + + /* we check segments in pairs of primary-mirror */ + if (!SEGMENT_IS_ACTIVE_PRIMARY(segInfo)) + { + continue; + } + + CdbComponentDatabaseInfo *primary = segInfo; + CdbComponentDatabaseInfo *mirror = FtsGetPeerSegment(segInfo->segindex, segInfo->dbid); + + Assert(mirror != NULL); + + /* changes required for primary and mirror */ + FtsSegmentStatusChange changes[2]; + + uint32 stateOld = 0; + uint32 stateNew = 0; + + bool isPrimaryAlive = PROBE_IS_ALIVE(primary); + bool isMirrorAlive = PROBE_IS_ALIVE(mirror); + + /* get transition type */ + uint32 trans = getTransition(isPrimaryAlive, isMirrorAlive); + + if (gp_log_fts > GPVARS_VERBOSITY_VERBOSE) + { + elog(LOG, "FTS: primary found %s, mirror found %s, transition %d.", + (isPrimaryAlive ? "alive" : "dead"), (isMirrorAlive ? "alive" : "dead"), trans); + } + + if (trans == TRANS_D_D) + { + elog(LOG, "FTS: detected double failure for content=%d, primary (dbid=%d), mirror (dbid=%d).", + primary->segindex, primary->dbid, mirror->dbid); + } + + /* get current state */ + stateOld = FtsGetPairStateFilerep(primary, mirror); + + /* get new state */ + stateNew = transition(stateOld, trans, primary, mirror, &changes[0], &changes[1]); + + /* check if transition is required */ + if (stateNew != stateOld) + { + update_found = true; + updateConfiguration(changes, ARRAY_SIZE(changes)); + } + } + + if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) + { + elog(LOG, "FTS: probe result processing is complete."); + } + + return update_found; +} + +/* + * Build struct with segment changes + */ +static void +buildSegmentStateChange(CdbComponentDatabaseInfo *segInfo, FtsSegmentStatusChange *change, uint8 statusNew) +{ + change->dbid = segInfo->dbid; + change->segindex = segInfo->segindex; + change->oldStatus = ftsProbeInfo->fts_status[segInfo->dbid]; + change->newStatus = statusNew; +} + +/* + * get transition type - derived from probed primary/mirror state + */ +static uint32 +getTransition(bool isPrimaryAlive, bool isMirrorAlive) +{ + uint32 state = (isPrimaryAlive ? 2 : 0) + (isMirrorAlive ? 1 : 0); + + switch (state) + { + case (0): + /* primary and mirror dead */ + return TRANS_D_D; + case (1): + /* primary dead, mirror alive */ + return TRANS_D_U; + case (2): + /* primary alive, mirror dead */ + return TRANS_U_D; + case (3): + /* primary and mirror alive */ + return TRANS_U_U; + default: + Assert(!"Invalid transition for FTS state machine"); + return 0; + } +} + +/* + * find new state for primary and mirror + */ +static uint32 +transition + ( + uint32 stateOld, + uint32 trans, + CdbComponentDatabaseInfo *primary, + CdbComponentDatabaseInfo *mirror, + FtsSegmentStatusChange *changesPrimary, + FtsSegmentStatusChange *changesMirror + ) +{ + Assert(IS_VALID_TRANSITION(trans)); + + /* reset changes */ + memset(changesPrimary, 0, sizeof(*changesPrimary)); + memset(changesMirror, 0, sizeof(*changesMirror)); + + uint32 stateNew = stateOld; + + /* in case of a double failure we don't do anything */ + if (trans == TRANS_D_D) + { + return stateOld; + } + + /* get new state for primary and mirror */ + stateNew = FtsTransitionFilerep(stateOld, trans); + + /* check if transition is required */ + if (stateNew != stateOld) + { + FtsSegmentPairState pairState; + memset(&pairState, 0, sizeof(pairState)); + pairState.primary = primary; + pairState.mirror = mirror; + pairState.stateNew = stateNew; + pairState.statePrimary = 0; + pairState.stateMirror = 0; + + if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG) + { + elog(LOG, "FTS: state machine transition from %d to %d.", stateOld, stateNew); + } + + FtsResolveStateFilerep(&pairState); + + buildSegmentStateChange(primary, changesPrimary, pairState.statePrimary); + buildSegmentStateChange(mirror, changesMirror, pairState.stateMirror); + + FtsDumpChanges(changesPrimary, 1); + FtsDumpChanges(changesMirror, 1); + } + + return stateNew; +} + +/* + * update segment configuration in catalog and shared memory + */ +static bool +probeUpdateConfig(FtsSegmentStatusChange *changes, int changeCount) +{ + Relation configrel; + Relation histrel; + SysScanDesc sscan; + ScanKeyData scankey; + HeapTuple configtuple; + HeapTuple newtuple; + HeapTuple histtuple; + Datum configvals[Natts_gp_segment_configuration]; + bool confignulls[Natts_gp_segment_configuration] = { false }; + bool repls[Natts_gp_segment_configuration] = { false }; + Datum histvals[Natts_gp_configuration_history]; + bool histnulls[Natts_gp_configuration_history] = { false }; + bool valid; + bool primary; + bool changelogging; + int i; + char desc[SQL_CMD_BUF_SIZE]; + + /* + * Commit/abort transaction below will destroy + * CurrentResourceOwner. We need it for catalog reads. + */ + ResourceOwner save = CurrentResourceOwner; + StartTransactionCommand(); + GetTransactionSnapshot(); + elog(LOG, "probeUpdateConfig called for %d changes", changeCount); + + histrel = heap_open(GpConfigHistoryRelationId, + RowExclusiveLock); + configrel = heap_open(GpSegmentConfigRelationId, + RowExclusiveLock); + + for (i = 0; i < changeCount; i++) + { + FtsSegmentStatusChange *change = &changes[i]; + valid = (changes[i].newStatus & FTS_STATUS_ALIVE); + primary = (changes[i].newStatus & FTS_STATUS_PRIMARY); + changelogging = (changes[i].newStatus & FTS_STATUS_CHANGELOGGING); + + if (changelogging) + { + Assert(primary && valid); + } + + Assert((valid || !primary) && "Primary cannot be down"); + + /* + * Insert new tuple into gp_configuration_history catalog. + */ + histvals[Anum_gp_configuration_history_time-1] = + TimestampTzGetDatum(GetCurrentTimestamp()); + histvals[Anum_gp_configuration_history_dbid-1] = + Int16GetDatum(changes[i].dbid); + snprintf(desc, sizeof(desc), + "FTS: content %d fault marking status %s%s role %c", + change->segindex, valid ? "UP" : "DOWN", + (changelogging) ? " mode: change-tracking" : "", + primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR); + histvals[Anum_gp_configuration_history_desc-1] = + CStringGetTextDatum(desc); + + histtuple = heap_form_tuple(RelationGetDescr(histrel), histvals, histnulls); + simple_heap_insert(histrel, histtuple); + CatalogUpdateIndexes(histrel, histtuple); + + /* + * Find and update gp_segment_configuration tuple. + */ + ScanKeyInit(&scankey, + Anum_gp_segment_configuration_dbid, + BTEqualStrategyNumber, F_INT2EQ, + Int16GetDatum(changes[i].dbid)); + sscan = systable_beginscan(configrel, GpSegmentConfigDbidIndexId, + true, SnapshotNow, 1, &scankey); + configtuple = systable_getnext(sscan); + if (!HeapTupleIsValid(configtuple)) + { + elog(ERROR, "FTS cannot find dbid=%d in %s", changes[i].dbid, + RelationGetRelationName(configrel)); + } + configvals[Anum_gp_segment_configuration_role-1] = + CharGetDatum(primary ? GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY : GP_SEGMENT_CONFIGURATION_ROLE_MIRROR); + repls[Anum_gp_segment_configuration_role-1] = true; + configvals[Anum_gp_segment_configuration_status-1] = + CharGetDatum(valid ? GP_SEGMENT_CONFIGURATION_STATUS_UP : GP_SEGMENT_CONFIGURATION_STATUS_DOWN); + repls[Anum_gp_segment_configuration_status-1] = true; + if (changelogging) + { + configvals[Anum_gp_segment_configuration_mode-1] = + CharGetDatum(GP_SEGMENT_CONFIGURATION_MODE_CHANGETRACKING); + } + repls[Anum_gp_segment_configuration_mode-1] = changelogging; + + newtuple = heap_modify_tuple(configtuple, RelationGetDescr(configrel), + configvals, confignulls, repls); + simple_heap_update(configrel, &configtuple->t_self, newtuple); + CatalogUpdateIndexes(configrel, newtuple); + + systable_endscan(sscan); + pfree(newtuple); + /* + * Update shared memory + */ + ftsProbeInfo->fts_status[changes[i].dbid] = changes[i].newStatus; + } + heap_close(histrel, RowExclusiveLock); + heap_close(configrel, RowExclusiveLock); + + SIMPLE_FAULT_INJECTOR(FtsWaitForShutdown); + /* + * Do not block shutdown. We will always get a change to update + * gp_segment_configuration in subsequent probes upon database + * restart. + */ + if (IsFtsShudownRequested()) + { + elog(LOG, "Shutdown in progress, ignoring FTS prober updates."); + return false; + } + CommitTransactionCommand(); + CurrentResourceOwner = save; + return true; +} + +/* + * Apply requested segment transitions + */ +static void +updateConfiguration(FtsSegmentStatusChange *changes, int changeEntries) +{ + Assert(changes != NULL); + Assert(cdb_component_dbs != NULL); + + CdbComponentDatabaseInfo *entryDB = &cdb_component_dbs->entry_db_info[0]; + + if (entryDB->dbid != GpIdentity.dbid) + { + if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG) + { + elog(LOG, "FTS: advancing to second entry-db."); + } + entryDB = entryDB + 1; + } + + /* if we've gotten a pause or shutdown request, we ignore our probe results. */ + if (!FtsIsActive()) + { + return; + } + + /* update segment configuration */ + bool commit = probeUpdateConfig(changes, changeEntries); + + if (commit) + FtsFailoverFilerep(changes, changeEntries); + + if (gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) + { + elog(LOG, "FTS: finished segment modifications."); + } +} + +#endif /* EOF */ diff --git a/src/backend/fts/ftsprobe.c b/src/backend/fts/ftsprobe.c index b7cc28ac566299184e64eedce3868f9f3548ca36..65526099292f27a7bbb18bec26ee5537275d793e 100644 --- a/src/backend/fts/ftsprobe.c +++ b/src/backend/fts/ftsprobe.c @@ -20,18 +20,17 @@ #include #include #include - #include "gp-libpq-fe.h" #include "gp-libpq-int.h" + #include "cdb/cdbgang.h" /* gp_pthread_create */ #include "libpq/ip.h" #include "postmaster/fts.h" +#include "postmaster/ftsprobe.h" #include "executor/spi.h" #include "postmaster/primary_mirror_mode.h" -#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */ - #ifdef HAVE_POLL_H #include #endif @@ -52,7 +51,7 @@ #else #define PROBE_RESPONSE_LEN (20) /* size of segment response message */ #endif -#define PROBE_ERR_MSG_LEN (256) /* length of error message for errno */ + /* * MACROS @@ -64,29 +63,6 @@ /* * STRUCTURES */ - -typedef struct threadWorkerInfo -{ - uint8 *scan_status; -} threadWorkerInfo; - - -typedef struct ProbeConnectionInfo -{ - int16 dbId; /* the dbid of the segment */ - int16 segmentId; /* content indicator: -1 for master, 0, ..., n-1 for segments */ - char role; /* primary ('p'), mirror ('m') */ - char mode; /* sync ('s'), resync ('r'), change-tracking ('c') */ - GpMonotonicTime startTime; /* probe start timestamp */ -#ifdef USE_SEGWALREP - probe_result *result; -#endif - char segmentStatus; /* probed segment status */ - int16 probe_errno; /* saved errno from the latest system call */ - char errmsg[PROBE_ERR_MSG_LEN]; /* message returned by strerror() */ - PGconn *conn; /* libpq connection object */ -} ProbeConnectionInfo; - typedef struct ProbeMsg { uint32 packetlen; @@ -97,23 +73,15 @@ typedef struct ProbeMsg * STATIC VARIABLES */ +#ifdef USE_SEGWALREP /* mutex used for pthread synchronization in parallel probing */ static pthread_mutex_t worker_thread_mutex = PTHREAD_MUTEX_INITIALIZER; - -/* struct holding segment configuration */ -static CdbComponentDatabases *cdb_component_dbs = NULL; - -/* one byte of status for each segment */ -static uint8 *scan_status; +#endif /* * FUNCTION PROTOTYPES */ -static void *probeSegmentFromThread(void *cdb_component_dbs); - -static char probeSegment(CdbComponentDatabaseInfo *dbInfo); - static bool probeConnect(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo *probeInfo); static bool probePollOut(ProbeConnectionInfo *probeInfo); static bool probeSend(ProbeConnectionInfo *probeInfo); @@ -151,91 +119,12 @@ static char *errmessage(ProbeConnectionInfo *probeInfo) return probeInfo->errmsg; } -/* - * probe segments to check if they are alive and if any failure has occurred - */ +#ifdef USE_SEGWALREP void -FtsProbeSegments(CdbComponentDatabases *dbs, uint8 *probeRes) -{ - int i; - - threadWorkerInfo worker_info; - int workers = gp_fts_probe_threadcount; - pthread_t *threads = NULL; - - cdb_component_dbs = dbs; - scan_status = probeRes; - if (cdb_component_dbs == NULL || scan_status == NULL) - { - elog(ERROR, "FTS: segment configuration has not been loaded to shared memory"); - } - - /* reset probe results */ - memset(scan_status, 0, cdb_component_dbs->total_segment_dbs * sizeof(scan_status[0])); - - /* figure out which segments to include in the scan. */ - for (i=0; i < cdb_component_dbs->total_segment_dbs; i++) - { - CdbComponentDatabaseInfo *segInfo = &cdb_component_dbs->segment_db_info[i]; - - if (FtsIsSegmentAlive(segInfo)) - { - /* mark segment for probing */ - scan_status[segInfo->dbid] = PROBE_SEGMENT; - } - else - { - /* consider segment dead */ - scan_status[segInfo->dbid] = PROBE_DEAD; - } - } - - worker_info.scan_status = scan_status; - - threads = (pthread_t *)palloc(workers * sizeof(pthread_t)); - for (i = 0; i < workers; i++) - { -#ifdef USE_ASSERT_CHECKING - int ret = -#endif /* USE_ASSERT_CHECKING */ - gp_pthread_create(&threads[i], probeSegmentFromThread, &worker_info, "probeSegments"); - - Assert(ret == 0 && "FTS: failed to create probing thread"); - } - - /* we have nothing left to do but wait */ - for (i = 0; i < workers; i++) - { -#ifdef USE_ASSERT_CHECKING - int ret = -#endif /* USE_ASSERT_CHECKING */ - pthread_join(threads[i], NULL); - - Assert(ret == 0 && "FTS: failed to join probing thread"); - } - - pfree(threads); - threads = NULL; - - /* if we're shutting down, just exit. */ - if (!FtsIsActive()) - return; - - if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG) - { - elog(LOG, "FTS: probe results for all segments:"); - for (i=0; i < cdb_component_dbs->total_segment_dbs; i++) - { - CdbComponentDatabaseInfo *segInfo = NULL; - - segInfo = &cdb_component_dbs->segment_db_info[i]; - - elog(LOG, "segment dbid %d status 0x%x.", segInfo->dbid, scan_status[segInfo->dbid]); - } - } -} - -static char probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo probeInfo) +#else +char +#endif +probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo probeInfo) { /* * probe segment: open socket -> connect -> send probe msg -> receive response; @@ -294,27 +183,6 @@ static char probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnection #endif } -/* - * This is called from several different threads: ONLY USE THREADSAFE FUNCTIONS INSIDE. - */ - -static char probeSegment(CdbComponentDatabaseInfo *dbInfo) -{ - Assert(dbInfo != NULL); - - /* setup probe descriptor */ - ProbeConnectionInfo probeInfo; - memset(&probeInfo, 0, sizeof(ProbeConnectionInfo)); - probeInfo.segmentId = dbInfo->segindex; - probeInfo.dbId = dbInfo->dbid; - probeInfo.role = dbInfo->role; - probeInfo.mode = dbInfo->mode; - - probeInfo.segmentStatus = PROBE_DEAD; - - return probeSegmentHelper(dbInfo, probeInfo); -} - #ifdef USE_SEGWALREP static void probeWalRepSegment(probe_response_per_segment *response) @@ -703,121 +571,4 @@ probeProcessResponse(ProbeConnectionInfo *probeInfo) return true; } -/* - * Function called by probing thread; - * iteratively picks next segment pair to probe until all segments are probed; - * probes primary; probes mirror if primary reports crash fault or does not respond - */ -static void * -probeSegmentFromThread(void *arg) -{ - threadWorkerInfo *worker_info; - int i; - - worker_info = (threadWorkerInfo *) arg; - - i = 0; - - for (;;) - { - CdbComponentDatabaseInfo *primary = NULL; - CdbComponentDatabaseInfo *mirror = NULL; - - char probe_result_primary = PROBE_DEAD; - char probe_result_mirror = PROBE_DEAD; - - /* - * find untested primary, mark primary and mirror "tested" and unlock. - */ - pthread_mutex_lock(&worker_thread_mutex); - for (; i < cdb_component_dbs->total_segment_dbs; i++) - { - primary = &cdb_component_dbs->segment_db_info[i]; - - /* check segments in pairs of primary-mirror */ - if (!SEGMENT_IS_ACTIVE_PRIMARY(primary)) - { - continue; - } - - if (PROBE_CHECK_FLAG(worker_info->scan_status[primary->dbid], PROBE_SEGMENT)) - { - /* prevent re-checking this pair */ - worker_info->scan_status[primary->dbid] &= ~PROBE_SEGMENT; - - mirror = FtsGetPeerSegment(primary->segindex, primary->dbid); - - /* check if mirror is marked for probing */ - if (mirror != NULL && - PROBE_CHECK_FLAG(worker_info->scan_status[mirror->dbid], PROBE_SEGMENT)) - { - worker_info->scan_status[mirror->dbid] &= ~PROBE_SEGMENT; - } - else - { - mirror = NULL; - } - - break; - } - } - pthread_mutex_unlock(&worker_thread_mutex); - - /* check if all segments were probed */ - if (i == cdb_component_dbs->total_segment_dbs || primary == NULL) - { - break; - } - - /* if we've gotten a pause or shutdown request, we ignore probe results. */ - if (!FtsIsActive()) - { - break; - } - - /* probe primary */ - probe_result_primary = probeSegment(primary); - Assert(!PROBE_CHECK_FLAG(probe_result_primary, PROBE_SEGMENT)); - - if ((probe_result_primary & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) - { - write_log("FTS: primary (content=%d, dbid=%d, status 0x%x) didn't respond to probe.", - primary->segindex, primary->dbid, probe_result_primary); - } - - if (mirror != NULL) - { - /* assume mirror is alive */ - probe_result_mirror = PROBE_ALIVE; - - /* probe mirror only if primary is dead or has a crash/network fault */ - if (!PROBE_CHECK_FLAG(probe_result_primary, PROBE_ALIVE) || - PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_CRASH) || - PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_NET)) - { - /* probe mirror */ - probe_result_mirror = probeSegment(mirror); - Assert(!PROBE_CHECK_FLAG(probe_result_mirror, PROBE_SEGMENT)); - - if ((probe_result_mirror & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) - { - write_log("FTS: mirror (content=%d, dbid=%d, status 0x%x) didn't respond to probe.", - mirror->segindex, mirror->dbid, probe_result_mirror); - } - } - } - - /* update results */ - pthread_mutex_lock(&worker_thread_mutex); - worker_info->scan_status[primary->dbid] = probe_result_primary; - if (mirror != NULL) - { - worker_info->scan_status[mirror->dbid] = probe_result_mirror; - } - pthread_mutex_unlock(&worker_thread_mutex); - } - - return NULL; -} - /* EOF */ diff --git a/src/backend/fts/ftsprobefilerep.c b/src/backend/fts/ftsprobefilerep.c new file mode 100644 index 0000000000000000000000000000000000000000..30cc91e5f6b612c965990d7afad1c5dd7aeea14c --- /dev/null +++ b/src/backend/fts/ftsprobefilerep.c @@ -0,0 +1,257 @@ +/*------------------------------------------------------------------------- + * + * ftsprobefilerep.c + * Implementation of segment probing interface for filerep + * + * Portions Copyright (c) 2012-Present Pivotal Software, Inc. + * + * + * IDENTIFICATION + * src/backend/fts/ftsprobefilerep.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include +#include "postmaster/fts.h" +#include "postmaster/ftsprobe.h" +#include "gp-libpq-fe.h" +#include "gp-libpq-int.h" + +typedef struct threadWorkerInfo +{ + uint8 *scan_status; +} threadWorkerInfo; + +/* struct holding segment configuration */ +static CdbComponentDatabases *cdb_component_dbs = NULL; + +/* one byte of status for each segment */ +static uint8 *scan_status; + +/* mutex used for pthread synchronization in parallel probing */ +static pthread_mutex_t worker_thread_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* + * This is called from several different threads: ONLY USE THREADSAFE FUNCTIONS INSIDE. + */ +static char probeSegment(CdbComponentDatabaseInfo *dbInfo) +{ + Assert(dbInfo != NULL); + + /* setup probe descriptor */ + ProbeConnectionInfo probeInfo; + memset(&probeInfo, 0, sizeof(ProbeConnectionInfo)); + probeInfo.segmentId = dbInfo->segindex; + probeInfo.dbId = dbInfo->dbid; + probeInfo.role = dbInfo->role; + probeInfo.mode = dbInfo->mode; + + probeInfo.segmentStatus = PROBE_DEAD; + + return probeSegmentHelper(dbInfo, probeInfo); +} + +/* + * Function called by probing thread; + * iteratively picks next segment pair to probe until all segments are probed; + * probes primary; probes mirror if primary reports crash fault or does not respond + */ +static void * +probeSegmentFromThread(void *arg) +{ + threadWorkerInfo *worker_info; + int i; + + worker_info = (threadWorkerInfo *) arg; + Assert(cdb_component_dbs != NULL); + i = 0; + + for (;;) + { + CdbComponentDatabaseInfo *primary = NULL; + CdbComponentDatabaseInfo *mirror = NULL; + + char probe_result_primary = PROBE_DEAD; + char probe_result_mirror = PROBE_DEAD; + + /* + * find untested primary, mark primary and mirror "tested" and unlock. + */ + pthread_mutex_lock(&worker_thread_mutex); + for (; i < cdb_component_dbs->total_segment_dbs; i++) + { + primary = &cdb_component_dbs->segment_db_info[i]; + + /* check segments in pairs of primary-mirror */ + if (!SEGMENT_IS_ACTIVE_PRIMARY(primary)) + { + continue; + } + + if (PROBE_CHECK_FLAG(worker_info->scan_status[primary->dbid], PROBE_SEGMENT)) + { + /* prevent re-checking this pair */ + worker_info->scan_status[primary->dbid] &= ~PROBE_SEGMENT; + + mirror = FtsGetPeerSegment(primary->segindex, primary->dbid); + + /* check if mirror is marked for probing */ + if (mirror != NULL && + PROBE_CHECK_FLAG(worker_info->scan_status[mirror->dbid], PROBE_SEGMENT)) + { + worker_info->scan_status[mirror->dbid] &= ~PROBE_SEGMENT; + } + else + { + mirror = NULL; + } + + break; + } + } + pthread_mutex_unlock(&worker_thread_mutex); + + /* check if all segments were probed */ + if (i == cdb_component_dbs->total_segment_dbs || primary == NULL) + { + break; + } + + /* if we've gotten a pause or shutdown request, we ignore probe results. */ + if (!FtsIsActive()) + { + break; + } + + /* probe primary */ + probe_result_primary = probeSegment(primary); + Assert(!PROBE_CHECK_FLAG(probe_result_primary, PROBE_SEGMENT)); + + if ((probe_result_primary & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) + { + write_log("FTS: primary (content=%d, dbid=%d, status 0x%x) didn't respond to probe.", + primary->segindex, primary->dbid, probe_result_primary); + } + + if (mirror != NULL) + { + /* assume mirror is alive */ + probe_result_mirror = PROBE_ALIVE; + + /* probe mirror only if primary is dead or has a crash/network fault */ + if (!PROBE_CHECK_FLAG(probe_result_primary, PROBE_ALIVE) || + PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_CRASH) || + PROBE_CHECK_FLAG(probe_result_primary, PROBE_FAULT_NET)) + { + /* probe mirror */ + probe_result_mirror = probeSegment(mirror); + Assert(!PROBE_CHECK_FLAG(probe_result_mirror, PROBE_SEGMENT)); + + if ((probe_result_mirror & PROBE_ALIVE) == 0 && gp_log_fts >= GPVARS_VERBOSITY_VERBOSE) + { + write_log("FTS: mirror (content=%d, dbid=%d, status 0x%x) didn't respond to probe.", + mirror->segindex, mirror->dbid, probe_result_mirror); + } + } + } + + /* update results */ + pthread_mutex_lock(&worker_thread_mutex); + worker_info->scan_status[primary->dbid] = probe_result_primary; + if (mirror != NULL) + { + worker_info->scan_status[mirror->dbid] = probe_result_mirror; + } + pthread_mutex_unlock(&worker_thread_mutex); + } + + return NULL; +} + +/* + * probe segments to check if they are alive and if any failure has occurred + */ +void +FtsProbeSegments(CdbComponentDatabases *dbs, uint8 *probeRes) +{ + int i; + + threadWorkerInfo worker_info; + int workers = gp_fts_probe_threadcount; + pthread_t *threads = NULL; + + cdb_component_dbs = dbs; + scan_status = probeRes; + if (dbs == NULL || scan_status == NULL) + { + elog(ERROR, "FTS: segment configuration has not been loaded to shared memory"); + } + + /* reset probe results */ + memset(scan_status, 0, dbs->total_segment_dbs * sizeof(scan_status[0])); + + /* figure out which segments to include in the scan. */ + for (i=0; i < dbs->total_segment_dbs; i++) + { + CdbComponentDatabaseInfo *segInfo = &dbs->segment_db_info[i]; + + if (FtsIsSegmentAlive(segInfo)) + { + /* mark segment for probing */ + scan_status[segInfo->dbid] = PROBE_SEGMENT; + } + else + { + /* consider segment dead */ + scan_status[segInfo->dbid] = PROBE_DEAD; + } + } + + worker_info.scan_status = scan_status; + + threads = (pthread_t *)palloc(workers * sizeof(pthread_t)); + for (i = 0; i < workers; i++) + { +#ifdef USE_ASSERT_CHECKING + int ret = +#endif /* USE_ASSERT_CHECKING */ + gp_pthread_create(&threads[i], probeSegmentFromThread, &worker_info, "probeSegments"); + + Assert(ret == 0 && "FTS: failed to create probing thread"); + } + + /* we have nothing left to do but wait */ + for (i = 0; i < workers; i++) + { +#ifdef USE_ASSERT_CHECKING + int ret = +#endif /* USE_ASSERT_CHECKING */ + pthread_join(threads[i], NULL); + + Assert(ret == 0 && "FTS: failed to join probing thread"); + } + + pfree(threads); + threads = NULL; + + /* if we're shutting down, just exit. */ + if (!FtsIsActive()) + return; + + Assert(cdb_component_dbs != NULL); + + if (gp_log_fts >= GPVARS_VERBOSITY_DEBUG) + { + elog(LOG, "FTS: probe results for all segments:"); + for (i=0; i < cdb_component_dbs->total_segment_dbs; i++) + { + CdbComponentDatabaseInfo *segInfo = NULL; + + segInfo = &cdb_component_dbs->segment_db_info[i]; + + elog(LOG, "segment dbid %d status 0x%x.", segInfo->dbid, scan_status[segInfo->dbid]); + } + } +} + diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 1fbeead1b19bf9ad711c94814ecd0c7ec8278fd8..2c9db9884a72ed3d0ffd572d8f379d90571281fb 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -556,7 +556,9 @@ static int BackendStartup(Port *port); static int ProcessStartupPacket(Port *port, bool SSLdone); static void processCancelRequest(Port *port, void *pkt, MsgType code); static void processPrimaryMirrorTransitionRequest(Port *port, void *pkt); +#ifndef USE_SEGWALREP static void processPrimaryMirrorTransitionQuery(Port *port, void *pkt); +#endif static int initMasks(fd_set *rmask); static void report_fork_failure_to_client(Port *port, int errnum); static enum CAC_state canAcceptConnections(void); @@ -3577,6 +3579,7 @@ processPrimaryMirrorTransitionRequest(Port *port, void *pkt) } } +#ifndef USE_SEGWALREP static void sendPrimaryMirrorTransitionQuery(uint32 mode, uint32 segstate, uint32 datastate, uint32 faulttype) { @@ -3687,6 +3690,7 @@ processPrimaryMirrorTransitionQuery(Port *port, void *pkt) return; } +#endif /* * The client has sent a cancel request packet, not a normal diff --git a/src/include/postmaster/fts.h b/src/include/postmaster/fts.h index d3ceeed7433c555002f8a4926d87599a143eed66..fa487502953acca8f61da94874dc0cd4369aab13 100644 --- a/src/include/postmaster/fts.h +++ b/src/include/postmaster/fts.h @@ -94,6 +94,8 @@ enum probe_transition_e #define IS_VALID_TRANSITION(trans) \ (trans == TRANS_D_D || trans == TRANS_D_U || trans == TRANS_U_D || trans == TRANS_U_U) +/* buffer size for SQL command */ +#define SQL_CMD_BUF_SIZE 1024 /* * STRUCTURES @@ -129,16 +131,11 @@ extern int ftsprobe_start(void); */ extern void FtsProbeSegments(CdbComponentDatabases *dbs, uint8 *scan_status); -#ifdef USE_SEGWALREP -extern void FtsWalRepProbeSegments(probe_context *context); -#endif - /* * Interface for segment state checking */ extern bool FtsIsSegmentAlive(CdbComponentDatabaseInfo *segInfo); extern CdbComponentDatabaseInfo *FtsGetPeerSegment(int content, int dbid); -extern void FtsMarkSegmentsInSync(CdbComponentDatabaseInfo *primary, CdbComponentDatabaseInfo *mirror); extern void FtsDumpChanges(FtsSegmentStatusChange *changes, int changeEntries); /* @@ -151,7 +148,9 @@ extern bool FtsIsActive(void); * Interface for WALREP specific checking */ extern void HandleFtsWalRepProbe(void); -#endif +extern void FtsWalRepProbeSegments(probe_context *context); +#else +extern bool probePublishUpdate(CdbComponentDatabases *dbs, uint8 *probe_results); /* * Interface for FireRep-specific segment state machine and transitions @@ -162,7 +161,7 @@ extern void FtsResolveStateFilerep(FtsSegmentPairState *pairState); extern void FtsPreprocessProbeResultsFilerep(CdbComponentDatabases *dbs, uint8 *probe_results); extern void FtsFailoverFilerep(FtsSegmentStatusChange *changes, int changeCount); - +#endif /* * Interface for requesting master to shut down diff --git a/src/include/postmaster/ftsprobe.h b/src/include/postmaster/ftsprobe.h new file mode 100644 index 0000000000000000000000000000000000000000..31aa8ec1f51135c72a903524d4441c51bc7831e1 --- /dev/null +++ b/src/include/postmaster/ftsprobe.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * ftsprobe.h + * Interface for fault tolerance service Sender. + * + * Portions Copyright (c) 2012-Present Pivotal Software, Inc. + * + * + * IDENTIFICATION + * src/include/postmaster/ftsprobe.h + * + *------------------------------------------------------------------------- + */ +#ifndef FTSPROBE_H +#define FTSPROBE_H + +#include "cdb/ml_ipc.h" /* gettime_elapsed_ms */ + +#define PROBE_ERR_MSG_LEN (256) /* length of error message for errno */ + +struct pg_conn; /* PGconn ... #include "gp-libpq-fe.h" */ +typedef struct ProbeConnectionInfo +{ + int16 dbId; /* the dbid of the segment */ + int16 segmentId; /* content indicator: -1 for master, 0, ..., n-1 for segments */ + char role; /* primary ('p'), mirror ('m') */ + char mode; /* sync ('s'), resync ('r'), change-tracking ('c') */ + GpMonotonicTime startTime; /* probe start timestamp */ +#ifdef USE_SEGWALREP + probe_result *result; +#endif + char segmentStatus; /* probed segment status */ + int16 probe_errno; /* saved errno from the latest system call */ + char errmsg[PROBE_ERR_MSG_LEN]; /* message returned by strerror() */ + struct pg_conn *conn; /* libpq connection object */ +} ProbeConnectionInfo; + +extern +#ifdef USE_SEGWALREP +void +#else +char +#endif +probeSegmentHelper(CdbComponentDatabaseInfo *dbInfo, ProbeConnectionInfo probeInfo); + +#endif