提交 4d9c7897 编写于 作者: X Xin Zhang

Primary to mirror failover

- detect primary goes down
- flip the role to m/p/d/n and p/m/u/n (role/prefer/status/mode) in
  gp_segment_configuration
- send promotion message to mirror to promote it

Author: Xin Zhang <xzhang@pivotal.io>
Author: Jacob Champion <pchampion@pivotal.io>
Author: Asim R P <apraveen@pivotal.io>
上级 c5fe4b59
......@@ -81,12 +81,6 @@
extern uint32 bootstrap_data_checksum_version;
/* File path names (all relative to $PGDATA) */
#define RECOVERY_COMMAND_FILE "recovery.conf"
#define RECOVERY_COMMAND_DONE "recovery.done"
#define PROMOTE_SIGNAL_FILE "promote"
/* User-settable parameters */
int CheckPointSegments = 3;
int XLOGbuffers = 8;
......@@ -6428,6 +6422,13 @@ XLogProcessCheckpointRecord(XLogRecord *rec, XLogRecPtr loc)
}
}
DBState
GetCurrentDBState(void)
{
Assert(ControlFile);
return ControlFile->state;
}
static void
UpdateCatalogForStandbyPromotion(void)
{
......
......@@ -389,7 +389,8 @@ CdbComponentDatabases *readCdbComponentInfoAndUpdateStatus(MemoryContext probeCo
}
static void
probeWalRepUpdateConfig(int16 dbid, int16 segindex, bool IsSegmentAlive, bool IsInSync)
probeWalRepUpdateConfig(int16 dbid, int16 segindex, char role,
bool IsSegmentAlive, bool IsInSync)
{
Assert(IsInSync ? IsSegmentAlive : true);
......@@ -411,8 +412,8 @@ probeWalRepUpdateConfig(int16 dbid, int16 segindex, bool IsSegmentAlive, bool Is
histvals[Anum_gp_configuration_history_dbid-1] =
Int16GetDatum(dbid);
snprintf(desc, sizeof(desc),
"FTS: update status and mode for dbid %d with contentid %d to %c and %c",
dbid, segindex,
"FTS: update role, status, and mode for dbid %d with contentid %d to %c, %c, and %c",
dbid, segindex, role,
IsSegmentAlive ? GP_SEGMENT_CONFIGURATION_STATUS_UP :
GP_SEGMENT_CONFIGURATION_STATUS_DOWN,
IsInSync ? GP_SEGMENT_CONFIGURATION_MODE_INSYNC :
......@@ -461,6 +462,9 @@ probeWalRepUpdateConfig(int16 dbid, int16 segindex, bool IsSegmentAlive, bool Is
RelationGetRelationName(configrel));
}
configvals[Anum_gp_segment_configuration_role-1] = CharGetDatum(role);
repls[Anum_gp_segment_configuration_role-1] = true;
configvals[Anum_gp_segment_configuration_status-1] =
CharGetDatum(IsSegmentAlive ? GP_SEGMENT_CONFIGURATION_STATUS_UP :
GP_SEGMENT_CONFIGURATION_STATUS_DOWN);
......@@ -520,11 +524,15 @@ probeWalRepPublishUpdate(CdbComponentDatabases *cdbs, fts_context *context)
bool IsInSync = response->result.isInSync;
/* If are in sync, then both have to be ALIVE */
Assert(IsInSync ? (IsPrimaryAlive && IsMirrorAlive) : true);
AssertImply(IsInSync, IsPrimaryAlive && IsMirrorAlive);
bool UpdatePrimary = (IsPrimaryAlive != SEGMENT_IS_ALIVE(primary));
bool UpdateMirror = (IsMirrorAlive != SEGMENT_IS_ALIVE(mirror));
/* Only swapped in promotion; by default, keep the current roles. */
char newPrimaryRole = GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY;
char newMirrorRole = GP_SEGMENT_CONFIGURATION_ROLE_MIRROR;
/*
* If probe response state is different from current state in
* configuration, update both primary and mirror.
......@@ -549,6 +557,35 @@ probeWalRepPublishUpdate(CdbComponentDatabases *cdbs, fts_context *context)
*/
Assert(UpdateMirror || !SEGMENT_IS_ALIVE(mirror));
}
else if (!IsPrimaryAlive && SEGMENT_IS_IN_SYNC(mirror))
{
/* Primary must have been recorded as in-sync before the probe. */
Assert(SEGMENT_IS_IN_SYNC(primary));
/* The primary is down; promote the mirror to primary. */
response->message = FTS_MSG_PROMOTE;
response->segment_db_info = mirror;
/*
* Flip the roles and mark the failed primary as down in FTS
* configuration before sending promote message. Dispatcher
* should no longer consider the failed primary for gang
* creation, FTS should no longer probe the failed primary.
*/
newPrimaryRole = GP_SEGMENT_CONFIGURATION_ROLE_MIRROR;
newMirrorRole = GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY;
}
else if (IsPrimaryAlive && response->result.isRoleMirror)
{
/* A promote message sent previously didn't make it to the mirror. */
Assert(!SEGMENT_IS_ALIVE(mirror));
Assert(SEGMENT_IS_NOT_INSYNC(mirror));
Assert(SEGMENT_IS_NOT_INSYNC(primary));
Assert(!response->result.isSyncRepEnabled);
Assert(!UpdateMirror);
Assert(!UpdatePrimary);
response->message = FTS_MSG_PROMOTE;
}
/*
* ----------------------------
......@@ -578,11 +615,13 @@ probeWalRepPublishUpdate(CdbComponentDatabases *cdbs, fts_context *context)
if (UpdatePrimary)
probeWalRepUpdateConfig(primary->dbid, primary->segindex,
IsPrimaryAlive, IsInSync);
newPrimaryRole, IsPrimaryAlive,
IsInSync);
if (UpdateMirror)
probeWalRepUpdateConfig(mirror->dbid, mirror->segindex,
IsMirrorAlive, IsInSync);
newMirrorRole, IsMirrorAlive,
IsInSync);
if (shutdown_requested)
{
......@@ -642,6 +681,12 @@ FtsWalRepInitProbeContext(CdbComponentDatabases *cdbs, fts_context *context)
primary->segindex,
primary->dbid);
/*
* If there is no mirror under this primary, no need to probe.
*/
if (!mirror)
continue;
/* primary in catalog will NEVER be marked down. */
Assert(FtsIsSegmentAlive(primary));
......@@ -657,6 +702,7 @@ FtsWalRepInitProbeContext(CdbComponentDatabases *cdbs, fts_context *context)
response->result.isInSync = false;
response->result.isSyncRepEnabled = false;
response->result.retryRequested = false;
response->result.isRoleMirror = false;
response->message = FTS_MSG_PROBE;
response->segment_db_info = primary;
......@@ -690,15 +736,19 @@ FtsWalRepSetupMessageContext(fts_context *context)
response->message = NULL;
response->isScheduled = true;
}
else
else if ((response->message == FTS_MSG_SYNCREP_OFF)
|| (response->message == FTS_MSG_PROMOTE))
{
Assert(strcmp(response->message, FTS_MSG_SYNCREP_OFF) == 0);
response->isScheduled = false;
response->result.isPrimaryAlive = false;
response->result.isInSync = false;
response->result.isSyncRepEnabled = false;
message_segments = true;
}
else
{
Assert(false);
}
}
return message_segments;
}
......
......@@ -16,6 +16,7 @@
#include "libpq/pqformat.h"
#include "libpq/libpq.h"
#include "postmaster/fts.h"
#include "postmaster/postmaster.h"
#include "utils/guc.h"
#include "replication/gp_replication.h"
......@@ -29,7 +30,7 @@ SendFtsResponse(FtsResponse *response, const char *messagetype)
BeginCommand(messagetype, DestRemote);
pq_beginmessage(&buf, 'T');
pq_sendint(&buf, Natts_fts_message_response, 2); /* 3 fields */
pq_sendint(&buf, Natts_fts_message_response, 2); /* # of columns */
pq_sendstring(&buf, "is_mirror_up");
pq_sendint(&buf, 0, 4); /* table oid */
......@@ -55,6 +56,14 @@ SendFtsResponse(FtsResponse *response, const char *messagetype)
pq_sendint(&buf, -1, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
pq_sendstring(&buf, "is_role_mirror");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, Anum_fts_message_response_is_role_mirror, 2); /* attnum */
pq_sendint(&buf, BOOLOID, 4); /* type oid */
pq_sendint(&buf, 1, 2); /* typlen */
pq_sendint(&buf, -1, 4); /* typmod */
pq_sendint(&buf, 0, 2); /* format code */
pq_sendstring(&buf, "request_retry");
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, Anum_fts_message_response_request_retry, 2); /* attnum */
......@@ -79,6 +88,9 @@ SendFtsResponse(FtsResponse *response, const char *messagetype)
pq_sendint(&buf, response->IsSyncRepEnabled, 1);
pq_sendint(&buf, 1, 4); /* col4 len */
pq_sendint(&buf, response->IsRoleMirror, 1);
pq_sendint(&buf, 1, 4); /* col5 len */
pq_sendint(&buf, response->RequestRetry, 1);
pq_endmessage(&buf);
......@@ -89,7 +101,13 @@ SendFtsResponse(FtsResponse *response, const char *messagetype)
static void
HandleFtsWalRepProbe(void)
{
FtsResponse response;
FtsResponse response = {
false, /* IsMirrorUp */
false, /* IsInSync */
false, /* IsSyncRepEnabled */
false, /* IsRoleMirror */
false, /* RequestRetry */
};
GetMirrorStatus(&response);
......@@ -103,6 +121,13 @@ HandleFtsWalRepProbe(void)
/* Syncrep is enabled now, so respond accordingly. */
response.IsSyncRepEnabled = true;
}
else if (!response.IsMirrorUp && am_mirror)
{
Assert(!response.IsInSync);
Assert(!response.IsSyncRepEnabled);
response.IsRoleMirror = true;
elog(LOG, "received probe message while acting as mirror");
}
SendFtsResponse(&response, FTS_MSG_PROBE);
}
......@@ -110,7 +135,13 @@ HandleFtsWalRepProbe(void)
static void
HandleFtsWalRepSyncRepOff(void)
{
FtsResponse response;
FtsResponse response = {
false, /* IsMirrorUp */
false, /* IsInSync */
false, /* IsSyncRepEnabled */
false, /* IsRoleMirror */
false, /* RequestRetry */
};
ereport(LOG,
(errmsg("turning off synchronous wal replication due to FTS request")));
......@@ -120,6 +151,37 @@ HandleFtsWalRepSyncRepOff(void)
SendFtsResponse(&response, FTS_MSG_SYNCREP_OFF);
}
static void
HandleFtsWalRepPromote(void)
{
FtsResponse response = {
false, /* IsMirrorUp */
false, /* IsInSync */
false, /* IsSyncRepEnabled */
am_mirror, /* IsRoleMirror */
false, /* RequestRetry */
};
ereport(LOG,
(errmsg("promoting mirror to primary due to FTS request")));
/*
* FTS sends promote message to a mirror. The mirror may be undergoing
* promotion. Promote messages should therefore be handled in an
* idempotent way.
*/
DBState state = GetCurrentDBState();
if (state == DB_IN_STANDBY_MODE)
SignalPromote();
else
{
elog(LOG, "ignoring promote request, walreceiver not running,"
" DBState = %d", state);
}
SendFtsResponse(&response, FTS_MSG_PROMOTE);
}
void
HandleFtsMessage(const char* query_string)
{
......@@ -129,6 +191,9 @@ HandleFtsMessage(const char* query_string)
else if (strncmp(query_string, FTS_MSG_SYNCREP_OFF,
strlen(FTS_MSG_SYNCREP_OFF)) == 0)
HandleFtsWalRepSyncRepOff();
else if (strncmp(query_string, FTS_MSG_PROMOTE,
strlen(FTS_MSG_PROMOTE)) == 0)
HandleFtsWalRepPromote();
else
ereport(ERROR,
(errmsg("received unknown FTS query: %s", query_string)));
......
......@@ -109,16 +109,22 @@ probeRecordResponse(FtsConnectionInfo *ftsInfo, PGresult *result)
Assert (isSyncRepEnabled);
ftsInfo->result->isSyncRepEnabled = *isSyncRepEnabled;
int *isRoleMirror = (int *) PQgetvalue(result, 0,
Anum_fts_message_response_is_role_mirror);
Assert (isRoleMirror);
ftsInfo->result->isRoleMirror = *isRoleMirror;
int *retryRequested = (int *) PQgetvalue(result, 0,
Anum_fts_message_response_request_retry);
Assert (retryRequested);
ftsInfo->result->retryRequested = *retryRequested;
write_log("FTS: segment (content=%d, dbid=%d, role=%c) reported isMirrorUp %d, isInSync %d, isSyncRepEnabled %d and retryRequested %d to the prober.",
write_log("FTS: segment (content=%d, dbid=%d, role=%c) reported isMirrorUp %d, isInSync %d, isSyncRepEnabled %d, isRoleMirror %d, and retryRequested %d to the prober.",
ftsInfo->segmentId, ftsInfo->dbId, ftsInfo->role,
ftsInfo->result->isMirrorAlive,
ftsInfo->result->isInSync,
ftsInfo->result->isSyncRepEnabled,
ftsInfo->result->isRoleMirror,
ftsInfo->result->retryRequested);
}
......@@ -344,7 +350,8 @@ messageWalRepSegmentFromThread(void *arg)
/* now let's probe the primary. */
probe_response_per_segment *response = &context->responses[response_index];
Assert(SEGMENT_IS_ACTIVE_PRIMARY(response->segment_db_info));
AssertImply(strcmp(response->message, FTS_MSG_PROMOTE) != 0,
SEGMENT_IS_ACTIVE_PRIMARY(response->segment_db_info));
messageWalRepSegment(response);
}
......
......@@ -13,7 +13,9 @@ ftsmessagehandler.t: \
$(MOCK_DIR)/backend/lib/stringinfo_mock.o \
$(MOCK_DIR)/backend/libpq/pqformat_mock.o \
$(MOCK_DIR)/backend/libpq/pqcomm_mock.o \
$(MOCK_DIR)/backend/tcop/dest_mock.o
$(MOCK_DIR)/backend/tcop/dest_mock.o \
$(MOCK_DIR)/backend/postmaster/postmaster_mock.o \
$(MOCK_DIR)/backend/access/transam/xlog_mock.o
ftsprobe.t: \
$(MOCK_DIR)/backend/utils/error/elog_mock.o \
......
......@@ -150,6 +150,7 @@ static void
probeWalRepUpdateConfig_will_be_called_with(
int16 dbid,
int16 segindex,
char role,
char status,
char mode)
{
......@@ -174,11 +175,11 @@ probeWalRepUpdateConfig_will_be_called_with(
Int16GetDatum(dbid);
snprintf(desc, sizeof(desc),
"FTS: update status and mode for dbid %d with contentid %d to %c and %c",
dbid, segindex,
status,
mode
);
"FTS: update role, status, and mode for dbid %d with contentid %d to %c, %c, and %c",
dbid, segindex, role,
status,
mode
);
histvals[Anum_gp_configuration_history_desc - 1] =
CStringGetTextDatum(desc);
......@@ -225,6 +226,16 @@ probeWalRepUpdateConfig_will_be_called_with(
will_be_called_count(systable_beginscan, 1);
static HeapTupleData config_tuple;
typedef struct {
HeapTupleHeaderData header;
FormData_gp_segment_configuration data;
} gp_segment_configuration_tuple;
static gp_segment_configuration_tuple tuple;
config_tuple.t_data = &tuple;
tuple.data.role = role;
expect_any(systable_getnext, sysscan);
will_return(systable_getnext, &config_tuple);
......@@ -260,6 +271,8 @@ ExpectedPrimaryAndMirrorConfiguration(CdbComponentDatabases *cdbs,
char primaryStatus,
char mirrorStatus,
char mode,
char newPrimaryRole,
char newMirrorRole,
bool willUpdatePrimary,
bool willUpdateMirror)
{
......@@ -272,6 +285,7 @@ ExpectedPrimaryAndMirrorConfiguration(CdbComponentDatabases *cdbs,
probeWalRepUpdateConfig_will_be_called_with(
primary->dbid,
primary->segindex,
newPrimaryRole,
primaryStatus,
mode);
}
......@@ -284,6 +298,7 @@ ExpectedPrimaryAndMirrorConfiguration(CdbComponentDatabases *cdbs,
probeWalRepUpdateConfig_will_be_called_with(
mirror->dbid,
mirror->segindex,
newMirrorRole,
mirrorStatus,
mode);
}
......@@ -459,6 +474,8 @@ test_PrimayUpMirrorUpNotInSync_to_PrimaryUpMirrorDownNotInSync(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_DOWN,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ false,
/* willUpdateMirror */ true);
......@@ -514,6 +531,8 @@ test_PrimaryUpMirrorDownNotInSync_to_PrimayUpMirrorUpNotInSync(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ false,
/* willUpdateMirror */ true);
......@@ -577,6 +596,8 @@ test_probeWalRepPublishUpdate_multiple_segments(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ false,
/* willUpdateMirror */ true);
......@@ -590,6 +611,8 @@ test_probeWalRepPublishUpdate_multiple_segments(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_DOWN,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ false,
/* willUpdateMirror */ true);
/* Fourth segment will not change status */
......@@ -632,6 +655,8 @@ test_PrimayUpMirrorUpSync_to_PrimaryUpMirrorUpNotInSync(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ true,
/* willUpdateMirror */ true);
......@@ -682,6 +707,8 @@ test_PrimayUpMirrorUpSync_to_PrimaryUpMirrorDownNotInSync(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_DOWN,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ true,
/* willUpdateMirror */ true);
......@@ -697,11 +724,11 @@ test_PrimayUpMirrorUpSync_to_PrimaryUpMirrorDownNotInSync(void **state)
}
/*
* 1 segment, is_updated is true, because primary will be marked down and
* both will be marked not in sync
* 1 segment, is_updated is true, because FTS found primary goes down and
* both will be marked not in sync, then FTS promote mirror
*/
void
test_PrimayUpMirrorUpSync_to_PrimaryDown(void **state)
test_PrimayUpMirrorUpSync_to_PrimaryDown_to_MirrorPromote(void **state)
{
fts_context context;
CdbComponentDatabases *cdb_component_dbs;
......@@ -725,13 +752,23 @@ test_PrimayUpMirrorUpSync_to_PrimaryDown(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_DOWN,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* willUpdatePrimary */ true,
/* willUpdateMirror */ true);
bool is_updated = probeWalRepPublishUpdate(cdb_component_dbs, &context);
assert_true(is_updated);
assert_false(FtsWalRepSetupMessageContext(&context));
/* expect to be true, since we need to send PROMOTE message to the mirror now. */
assert_true(FtsWalRepSetupMessageContext(&context));
assert_string_equal(context.responses[0].message, FTS_MSG_PROMOTE);
CdbComponentDatabaseInfo *mirror =
GetSegmentFromCdbComponentDatabases(
cdb_component_dbs, 0, GP_SEGMENT_CONFIGURATION_ROLE_MIRROR);
assert_int_equal(context.responses[0].segment_db_info->dbid, mirror->dbid);
pfree(cdb_component_dbs);
}
......@@ -764,6 +801,8 @@ test_PrimayUpMirrorUpNotInSync_to_PrimayUpMirrorUpSync(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_INSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ true,
/* willUpdateMirror */ true);
......@@ -811,6 +850,8 @@ test_PrimaryUpMirrorDownNotInSync_to_PrimayUpMirrorUpSync(void **state)
/* primary status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mirror status */ GP_SEGMENT_CONFIGURATION_STATUS_UP,
/* mode */ GP_SEGMENT_CONFIGURATION_MODE_INSYNC,
/* newPrimaryRole */ GP_SEGMENT_CONFIGURATION_ROLE_PRIMARY,
/* newMirrorRole */ GP_SEGMENT_CONFIGURATION_ROLE_MIRROR,
/* willUpdatePrimary */ true,
/* willUpdateMirror */ true);
......@@ -908,7 +949,7 @@ main(int argc, char *argv[])
unit_test(test_PrimayUpMirrorUpSync_to_PrimaryUpMirrorUpNotInSync),
unit_test(test_PrimayUpMirrorUpSync_to_PrimaryUpMirrorDownNotInSync),
unit_test(test_PrimayUpMirrorUpSync_to_PrimaryDown),
unit_test(test_PrimayUpMirrorUpSync_to_PrimaryDown_to_MirrorPromote),
unit_test(test_PrimayUpMirrorUpNotInSync_to_PrimayUpMirrorUpSync),
unit_test(test_PrimayUpMirrorUpNotInSync_to_PrimayUpMirrorUpNotInSync),
......
......@@ -5,22 +5,13 @@
#include "postgres.h"
#define Assert(condition) if (!condition) AssertFailed()
bool is_assert_failed = false;
void AssertFailed()
{
is_assert_failed = true;
}
/* Actual function body */
#include "../ftsmessagehandler.c"
static void
mockSendFtsResponse(const char *messagetype)
expectSendFtsResponse(const char *expectedMessageType, const FtsResponse *expectedResponse)
{
expect_value(BeginCommand, commandTag, messagetype);
expect_value(BeginCommand, commandTag, expectedMessageType);
expect_value(BeginCommand, dest, DestRemote);
will_be_called(BeginCommand);
......@@ -44,27 +35,47 @@ mockSendFtsResponse(const char *messagetype)
will_be_called(pq_endmessage);
expect_any_count(pq_sendint, buf, -1);
expect_any_count(pq_sendint, i, -1);
expect_any_count(pq_sendint, b, -1);
/* verify the schema */
expect_value(pq_sendint, i, Natts_fts_message_response);
expect_any_count(pq_sendint, i, Natts_fts_message_response * 6 /* calls_per_column_for_schema */);
/* verify the data */
expect_value(pq_sendint, i, Natts_fts_message_response);
expect_value(pq_sendint, i, 1);
expect_value(pq_sendint, i, expectedResponse->IsMirrorUp);
expect_value(pq_sendint, i, 1);
expect_value(pq_sendint, i, expectedResponse->IsInSync);
expect_value(pq_sendint, i, 1);
expect_value(pq_sendint, i, expectedResponse->IsSyncRepEnabled);
expect_value(pq_sendint, i, 1);
expect_value(pq_sendint, i, expectedResponse->IsRoleMirror);
expect_value(pq_sendint, i, 1);
expect_value(pq_sendint, i, expectedResponse->RequestRetry);
will_be_called_count(pq_sendint, -1);
expect_any_count(pq_sendstring, buf, -1);
expect_any_count(pq_sendstring, str, -1);
will_be_called_count(pq_sendstring, -1);
will_be_called_count(pq_sendstring, Natts_fts_message_response);
expect_value(EndCommand, commandTag, messagetype);
expect_value(EndCommand, commandTag, expectedMessageType);
expect_value(EndCommand, dest, DestRemote);
will_be_called(EndCommand);
will_be_called(pq_flush);
}
void
test_HandleFtsWalRepProbe(void **state)
test_HandleFtsWalRepProbePrimary(void **state)
{
FtsResponse mockresponse;
mockresponse.IsMirrorUp = true;
mockresponse.IsInSync = true;
mockresponse.IsSyncRepEnabled = false;
mockresponse.IsRoleMirror = false;
mockresponse.RequestRetry = false;
expect_any(GetMirrorStatus, response);
will_assign_memory(GetMirrorStatus, response, &mockresponse, sizeof(FtsResponse));
......@@ -72,7 +83,9 @@ test_HandleFtsWalRepProbe(void **state)
will_be_called(SetSyncStandbysDefined);
mockSendFtsResponse(FTS_MSG_PROBE);
/* SyncRep should be enabled as soon as we found mirror is up. */
mockresponse.IsSyncRepEnabled = true;
expectSendFtsResponse(FTS_MSG_PROBE, &mockresponse);
HandleFtsWalRepProbe();
}
......@@ -83,7 +96,10 @@ test_HandleFtsWalRepSyncRepOff(void **state)
FtsResponse mockresponse;
mockresponse.IsMirrorUp = false;
mockresponse.IsInSync = false;
mockresponse.IsSyncRepEnabled = true;
mockresponse.RequestRetry = false;
/* unblock primary if FTS requests it */
mockresponse.IsSyncRepEnabled = false;
mockresponse.RequestRetry = false;
expect_any(GetMirrorStatus, response);
will_assign_memory(GetMirrorStatus, response, &mockresponse, sizeof(FtsResponse));
......@@ -91,19 +107,86 @@ test_HandleFtsWalRepSyncRepOff(void **state)
will_be_called(UnsetSyncStandbysDefined);
mockSendFtsResponse(FTS_MSG_SYNCREP_OFF);
/* since this function doesn't have any logic, the test just verified the message type */
expectSendFtsResponse(FTS_MSG_SYNCREP_OFF, &mockresponse);
HandleFtsWalRepSyncRepOff();
}
void
test_HandleFtsWalRepProbeMirror(void **state)
{
FtsResponse mockresponse;
mockresponse.IsMirrorUp = false;
mockresponse.IsInSync = false;
mockresponse.IsSyncRepEnabled = false;
mockresponse.IsRoleMirror = false;
mockresponse.RequestRetry = false;
expect_any(GetMirrorStatus, response);
will_assign_memory(GetMirrorStatus, response, &mockresponse, sizeof(FtsResponse));
will_be_called(GetMirrorStatus);
/* expect the IsRoleMirror changed to reflect the global variable */
am_mirror = true;
mockresponse.IsRoleMirror = true;
expectSendFtsResponse(FTS_MSG_PROBE, &mockresponse);
HandleFtsWalRepProbe();
}
void
test_HandleFtsWalRepPromoteMirror(void **state)
{
am_mirror = true;
will_return(GetCurrentDBState, DB_IN_STANDBY_MODE);
will_be_called(SignalPromote);
FtsResponse mockresponse;
mockresponse.IsMirrorUp = false;
mockresponse.IsInSync = false;
mockresponse.IsSyncRepEnabled = false;
mockresponse.IsRoleMirror = am_mirror;
mockresponse.RequestRetry = false;
/* expect SignalPromote() */
expectSendFtsResponse(FTS_MSG_PROMOTE, &mockresponse);
HandleFtsWalRepPromote();
}
void
test_HandleFtsWalRepPromotePrimary(void **state)
{
am_mirror = false;
will_return(GetCurrentDBState, DB_IN_PRODUCTION);
FtsResponse mockresponse;
mockresponse.IsMirrorUp = false;
mockresponse.IsInSync = false;
mockresponse.IsSyncRepEnabled = false;
mockresponse.IsRoleMirror = false;
mockresponse.RequestRetry = false;
/* expect no SignalPromote() */
expectSendFtsResponse(FTS_MSG_PROMOTE, &mockresponse);
HandleFtsWalRepPromote();
}
int
main(int argc, char* argv[])
{
cmockery_parse_arguments(argc, argv);
const UnitTest tests[] = {
unit_test(test_HandleFtsWalRepProbe),
unit_test(test_HandleFtsWalRepSyncRepOff)
unit_test(test_HandleFtsWalRepProbePrimary),
unit_test(test_HandleFtsWalRepSyncRepOff),
unit_test(test_HandleFtsWalRepProbeMirror),
unit_test(test_HandleFtsWalRepPromoteMirror),
unit_test(test_HandleFtsWalRepPromotePrimary)
};
return run_tests(tests);
}
......@@ -6,6 +6,8 @@
#include <poll.h>
static int poll_expected_return_value;
static const char true_value = 1;
static const char false_value = 0;
#define poll poll_mock
......@@ -102,12 +104,12 @@ static void PQclear_will_be_called()
will_be_called(PQclear);
}
static void PQgetvalue_will_return(int attnum, bool value)
static void PQgetvalue_will_return(int attnum, bool *value)
{
expect_any(PQgetvalue, res);
expect_value(PQgetvalue, tup_num, 0);
expect_value(PQgetvalue, field_num, attnum);
will_return(PQgetvalue, &value);
will_return(PQgetvalue, value);
}
/*
......@@ -215,9 +217,10 @@ static void ftsReceive_request_retry_setup()
expect_any(PQntuples, res);
will_return(PQntuples, FTS_MESSAGE_RESPONSE_NTUPLES);
PQgetvalue_will_return(Anum_fts_message_response_is_mirror_up, true);
PQgetvalue_will_return(Anum_fts_message_response_is_in_sync, true);
PQgetvalue_will_return(Anum_fts_message_response_is_syncrep_enabled, true);
PQgetvalue_will_return(Anum_fts_message_response_is_mirror_up, &true_value);
PQgetvalue_will_return(Anum_fts_message_response_is_in_sync, &true_value);
PQgetvalue_will_return(Anum_fts_message_response_is_syncrep_enabled, &true_value);
PQgetvalue_will_return(Anum_fts_message_response_is_role_mirror, &true_value);
}
void
......@@ -235,7 +238,7 @@ test_ftsReceive_when_primary_request_retry_true(void **state)
ftsReceive_request_retry_setup();
write_log_will_be_called();
write_log_will_be_called();
PQgetvalue_will_return(Anum_fts_message_response_request_retry, true);
PQgetvalue_will_return(Anum_fts_message_response_request_retry, &true_value);
/* TEST */
bool actual_return_value = ftsReceive(&info);
......@@ -256,7 +259,7 @@ test_ftsReceive_when_primary_request_retry_false(void **state)
ftsReceive_request_retry_setup();
write_log_will_be_called();
PQgetvalue_will_return(Anum_fts_message_response_request_retry, false);
PQgetvalue_will_return(Anum_fts_message_response_request_retry, &false_value);
/* TEST */
bool actual_return_value = ftsReceive(&info);
......
......@@ -154,6 +154,8 @@ void SeqServerMain(int argc, char *argv[]);
void FtsProbeMain(int argc, char *argv[]);
#endif
bool am_mirror = false;
/*
* List of active backends (or child processes anyway; we don't actually
......@@ -2462,6 +2464,23 @@ initMasks(fd_set *rmask)
return maxsock + 1;
}
/*
* XXX check to see if we're a mirror. And if we are: (1) Assume that we
* are running as super user. (2) No data pages need to be accessed by this
* backend - no snapshot / transaction needed.
*
* The recovery.conf file is renamed to recovery.done at the end of xlog
* replay. Normal backends can be created thereafter.
*/
bool
IsRoleMirror(void)
{
struct stat stat_buf;
return (stat(RECOVERY_COMMAND_FILE, &stat_buf) == 0);
}
/*
* Read a client's startup packet and do something according to it.
......@@ -2660,8 +2679,13 @@ retry1:
{
if (strcmp(valptr, GPCONN_TYPE_FTS) == 0)
{
if (GpIdentity.segindex == MASTER_CONTENT_ID)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("cannot handle FTS connection on master")));
elog(LOG, "handling FTS connection");
am_ftshandler = true;
am_mirror = IsRoleMirror();
}
else
ereport(FATAL,
......@@ -2777,6 +2801,8 @@ retry1:
switch (port->canAcceptConnections)
{
case CAC_STARTUP:
if (am_ftshandler && am_mirror)
break;
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errSendAlert(false),
......@@ -6906,6 +6932,20 @@ sigusr1_handler(SIGNAL_ARGS)
errno = save_errno;
}
/*
* GPDB_90_MERGE_FIXME: This function should be removed once hot
* standby can and will be enabled for mirrors.
*/
void SignalPromote(void)
{
FILE *fd;
if ((fd = fopen(PROMOTE_SIGNAL_FILE, "w")))
{
fclose(fd);
signal_child(StartupPID, SIGUSR2);
}
}
/*
* Dummy signal handler
......
......@@ -12,6 +12,7 @@
*/
#include "replication/gp_replication.h"
#include "replication/walreceiver.h"
#include "replication/walsender_private.h"
#include "utils/builtins.h"
......@@ -26,7 +27,8 @@
* Check the WalSndCtl to obtain if mirror is up or down, if the wal sender is
* in streaming, and if synchronous replication is enabled or not.
*/
void GetMirrorStatus(FtsResponse *response)
void
GetMirrorStatus(FtsResponse *response)
{
response->IsMirrorUp = false;
response->IsInSync = false;
......
......@@ -34,6 +34,8 @@
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
#include "postmaster/fts.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
......@@ -582,7 +584,8 @@ void
InitializeSessionUserIdStandalone(void)
{
/* This function should only be called in a single-user backend. */
AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || am_startup);
AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || am_startup
|| (am_ftshandler && am_mirror));
/* call only once */
AssertState(!OidIsValid(AuthenticatedUserId));
......
......@@ -718,8 +718,13 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
* Start a new transaction here before first access to db, and get a
* snapshot. We don't have a use for the snapshot itself, but we're
* interested in the secondary effect that it sets RecentGlobalXmin.
*
* Skip these steps if we are responding to a FTS message on mirror.
* Mirror operates in standby mode and is not ready to start a
* transaction or create a snapshot. Neither are they required to
* respond to a FTS message.
*/
if (!bootstrap)
if (!bootstrap && !(am_ftshandler && am_mirror))
{
StartTransactionCommand();
(void) GetTransactionSnapshot();
......@@ -747,6 +752,17 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
errhint("You should immediately run CREATE USER \"%s\" CREATEUSER;.",
username)));
}
else if (am_ftshandler && am_mirror)
{
/*
* A mirror must receive and act upon FTS messages. Performing proper
* authentication involves reading pg_authid. Heap access is not
* possible on mirror, which is in standby mode.
*/
FakeClientAuthentication(MyProcPort);
InitializeSessionUserIdStandalone();
am_superuser = true;
}
else
{
/* normal multiuser case */
......@@ -803,7 +819,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
/*
* We don't have replication role, which existed in postgres.
*/
if (!superuser())
if (!am_superuser)
ereport(FATAL,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser role to start walsender")));
......@@ -823,7 +839,8 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
pgstat_bestart();
/* close the transaction we started above */
CommitTransactionCommand();
if (!(am_ftshandler && am_mirror))
CommitTransactionCommand();
return;
}
......@@ -990,7 +1007,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
* process_startup_options parses the GUC.
*/
if (gp_maintenance_mode && Gp_role == GP_ROLE_DISPATCH &&
!(superuser() && gp_maintenance_conn))
!(am_superuser && gp_maintenance_conn))
ereport(FATAL,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("maintenance mode: connected by superuser only"),
......
......@@ -217,6 +217,10 @@ typedef struct CheckpointStatsData
extern CheckpointStatsData CheckpointStats;
/* File path names (all relative to $PGDATA) */
#define RECOVERY_COMMAND_FILE "recovery.conf"
#define RECOVERY_COMMAND_DONE "recovery.done"
#define PROMOTE_SIGNAL_FILE "promote"
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern XLogRecPtr XLogInsert_OverrideXid(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId overrideXid);
......@@ -302,6 +306,8 @@ extern TimeLineID GetRecoveryTargetTLI(void);
extern bool CheckPromoteSignal(bool do_unlink);
extern void WakeupRecovery(void);
extern bool IsStandbyMode(void);
extern DBState GetCurrentDBState(void);
extern bool IsRoleMirror(void);
/*
* Starting/stopping a base backup
......
......@@ -23,12 +23,14 @@
/* Queries for FTS messages */
#define FTS_MSG_PROBE "PROBE"
#define FTS_MSG_SYNCREP_OFF "SYNCREP_OFF"
#define FTS_MSG_PROMOTE "PROMOTE"
#define Natts_fts_message_response 4
#define Natts_fts_message_response 5
#define Anum_fts_message_response_is_mirror_up 0
#define Anum_fts_message_response_is_in_sync 1
#define Anum_fts_message_response_is_syncrep_enabled 2
#define Anum_fts_message_response_request_retry 3
#define Anum_fts_message_response_is_role_mirror 3
#define Anum_fts_message_response_request_retry 4
#define FTS_MESSAGE_RESPONSE_NTUPLES 1
......@@ -39,6 +41,7 @@ typedef struct
bool isMirrorAlive;
bool isInSync;
bool isSyncRepEnabled;
bool isRoleMirror;
bool retryRequested;
} probe_result;
......@@ -61,10 +64,12 @@ typedef struct FtsResponse
bool IsMirrorUp;
bool IsInSync;
bool IsSyncRepEnabled;
bool IsRoleMirror;
bool RequestRetry;
} FtsResponse;
extern bool am_ftshandler;
extern bool am_mirror;
/*
* ENUMS
......
......@@ -56,6 +56,9 @@ extern void ClosePostmasterPorts(bool am_syslogger);
extern int MaxLivePostmasterChildren(void);
extern bool AreWeAMirror;
extern void SignalPromote(void);
#ifdef EXEC_BACKEND
extern pid_t postmaster_forkexec(int argc, char *argv[]);
extern int SubPostmasterMain(int argc, char *argv[]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册