提交 d453a4aa 编写于 作者: A Ashwin Agrawal

FTS detects when primary is in recovery avoiding config change

Previous behavior when primary is in crash recovery FTS probe fails and hence
qqprimary is marked down. This change provides a recovery progress metric so that
FTS can detect progress. We added last replayed LSN number inside the error
message to determine recovery progress. This allows FTS to distinguish between
recovery in progress and recovery hang or rolling panics. Only when FTS detects
recovery is not making progress then FTS marks primary down.

For testing a new fault injector is added to allow simulation of recovery hang
and recovery in progress.

Just fyi...this reverts the reverted commit 7b7219a4.
Co-authored-by: NAshwin Agrawal <aagrawal@pivotal.io>
Co-authored-by: NDavid Kimura <dkimura@pivotal.io>
上级 2452a109
......@@ -11992,3 +11992,21 @@ WakeupRecovery(void)
{
SetLatch(&XLogCtl->recoveryWakeupLatch);
}
/*
* Report the last WAL replay location
*/
XLogRecPtr
last_xlog_replay_location()
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
Assert(xlogctl != NULL);
XLogRecPtr recptr;
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->recoveryLastRecPtr;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
}
......@@ -28,8 +28,10 @@
#include "cdb/cdbvars.h"
#include "postmaster/fts.h"
#include "postmaster/ftsprobe.h"
#include "postmaster/postmaster.h"
#include "utils/snapmgr.h"
static struct pollfd *PollFds;
static CdbComponentDatabaseInfo *
......@@ -195,6 +197,66 @@ ftsConnectStart(fts_segment_info *ftsInfo)
return true;
}
static void
checkIfFailedDueToRecoveryInProgress(fts_segment_info *ftsInfo)
{
if (strstr(PQerrorMessage(ftsInfo->conn), _(POSTMASTER_IN_RECOVERY_MSG)))
{
XLogRecPtr tmpptr;
char *ptr = strstr(PQerrorMessage(ftsInfo->conn),
_(POSTMASTER_IN_RECOVERY_DETAIL_MSG));
if ((ptr == NULL) ||
sscanf(ptr, POSTMASTER_IN_RECOVERY_DETAIL_MSG " %X/%X\n",
&tmpptr.xlogid, &tmpptr.xrecoff) != 2)
{
#ifdef USE_ASSERT_CHECKING
elog(ERROR,
#else
elog(LOG,
#endif
"invalid in-recovery message %s "
"(content=%d, dbid=%d) state=%d",
PQerrorMessage(ftsInfo->conn),
ftsInfo->primary_cdbinfo->segindex,
ftsInfo->primary_cdbinfo->dbid,
ftsInfo->state);
return;
}
/*
* If the xlog record returned from the primary is less than or
* equal to the xlog record we had saved from the last probe
* then we assume that recovery is not making progress. In the
* case of rolling panics on the primary the returned xlog
* location can be less than the recorded xlog location. In
* these cases of rolling panic or recovery hung we want to
* mark the primary as down.
*/
if (XLByteLE(tmpptr, ftsInfo->xlogrecptr))
{
elog(LOG, "FTS: detected segment is in recovery mode and not making progress (content=%d) "
"primary dbid=%d, mirror dbid=%d",
ftsInfo->primary_cdbinfo->segindex,
ftsInfo->primary_cdbinfo->dbid,
ftsInfo->mirror_cdbinfo->dbid);
}
else
{
ftsInfo->recovery_making_progress = true;
ftsInfo->xlogrecptr.xlogid = tmpptr.xlogid;
ftsInfo->xlogrecptr.xrecoff = tmpptr.xrecoff;
elogif(gp_log_fts >= GPVARS_VERBOSITY_VERBOSE, LOG,
"FTS: detected segment is in recovery mode replayed (%s) (content=%d) "
"primary dbid=%d, mirror dbid=%d",
XLogLocationToString(&tmpptr),
ftsInfo->primary_cdbinfo->segindex,
ftsInfo->mirror_cdbinfo->dbid,
ftsInfo->mirror_cdbinfo->dbid);
}
}
}
/*
* Start a libpq connection for each "per segment" object in context. If the
* connection is already started for an object, advance libpq state machine for
......@@ -227,6 +289,11 @@ ftsConnect(fts_context *context)
case FTS_PROBE_SEGMENT:
case FTS_SYNCREP_OFF_SEGMENT:
case FTS_PROMOTE_SEGMENT:
/*
* We always default to false. If connect fails due to recovery in progress
* this variable will be set based on LSN value in error message.
*/
ftsInfo->recovery_making_progress = false;
if (ftsInfo->conn == NULL)
{
AssertImply(ftsInfo->retry_count > 0,
......@@ -275,6 +342,7 @@ ftsConnect(fts_context *context)
case PGRES_POLLING_FAILED:
ftsInfo->state = nextFailedState(ftsInfo->state);
checkIfFailedDueToRecoveryInProgress(ftsInfo);
elog(LOG, "FTS: cannot establish libpq connection "
"(content=%d, dbid=%d): %s, retry_count=%d",
ftsInfo->primary_cdbinfo->segindex,
......@@ -695,6 +763,41 @@ ftsReceive(fts_context *context)
}
}
static void
retryForFtsFailed(fts_segment_info *ftsInfo, pg_time_t now)
{
if (ftsInfo->retry_count == gp_fts_probe_retries)
{
elog(LOG, "FTS max (%d) retries exhausted "
"(content=%d, dbid=%d) state=%d",
ftsInfo->retry_count,
ftsInfo->primary_cdbinfo->segindex,
ftsInfo->primary_cdbinfo->dbid, ftsInfo->state);
return;
}
ftsInfo->retry_count++;
if (ftsInfo->state == FTS_PROBE_SUCCESS ||
ftsInfo->state == FTS_PROBE_FAILED)
ftsInfo->state = FTS_PROBE_RETRY_WAIT;
else if (ftsInfo->state == FTS_SYNCREP_OFF_FAILED)
ftsInfo->state = FTS_SYNCREP_OFF_RETRY_WAIT;
else
ftsInfo->state = FTS_PROMOTE_RETRY_WAIT;
ftsInfo->retryStartTime = now;
elogif(gp_log_fts == GPVARS_VERBOSITY_DEBUG, LOG,
"FTS initialized retry start time to now "
"(content=%d, dbid=%d) state=%d",
ftsInfo->primary_cdbinfo->segindex,
ftsInfo->primary_cdbinfo->dbid, ftsInfo->state);
PQfinish(ftsInfo->conn);
ftsInfo->conn = NULL;
ftsInfo->poll_events = ftsInfo->poll_revents = 0;
/* Reset result before next attempt. */
memset(&ftsInfo->result, 0, sizeof(fts_result));
}
/*
* If retry attempts are available, transition the sgement to the start state
* corresponding to their failure state. If retries have exhausted, leave the
......@@ -724,36 +827,7 @@ processRetry(fts_context *context)
case FTS_PROBE_FAILED:
case FTS_SYNCREP_OFF_FAILED:
case FTS_PROMOTE_FAILED:
if (ftsInfo->retry_count == gp_fts_probe_retries)
{
elog(LOG, "FTS max (%d) retries exhausted "
"(content=%d, dbid=%d) state=%d",
ftsInfo->retry_count,
ftsInfo->primary_cdbinfo->segindex,
ftsInfo->primary_cdbinfo->dbid, ftsInfo->state);
}
else
{
ftsInfo->retry_count++;
if (ftsInfo->state == FTS_PROBE_SUCCESS ||
ftsInfo->state == FTS_PROBE_FAILED)
ftsInfo->state = FTS_PROBE_RETRY_WAIT;
else if (ftsInfo->state == FTS_SYNCREP_OFF_FAILED)
ftsInfo->state = FTS_SYNCREP_OFF_RETRY_WAIT;
else
ftsInfo->state = FTS_PROMOTE_RETRY_WAIT;
ftsInfo->retryStartTime = now;
elogif(gp_log_fts == GPVARS_VERBOSITY_DEBUG, LOG,
"FTS initialized retry start time to now "
"(content=%d, dbid=%d) state=%d",
ftsInfo->primary_cdbinfo->segindex,
ftsInfo->primary_cdbinfo->dbid, ftsInfo->state);
PQfinish(ftsInfo->conn);
ftsInfo->conn = NULL;
ftsInfo->poll_events = ftsInfo->poll_revents = 0;
/* Reset result before next attempt. */
memset(&ftsInfo->result, 0, sizeof(fts_result));
}
retryForFtsFailed(ftsInfo, now);
break;
case FTS_PROBE_RETRY_WAIT:
case FTS_SYNCREP_OFF_RETRY_WAIT:
......@@ -988,8 +1062,23 @@ processResponse(fts_context *context)
}
break;
case FTS_PROBE_FAILED:
/* Primary is down, see if mirror can be promoted. */
/* Primary is down */
/* If primary is in recovery, do not mark it down and promote mirror */
if (ftsInfo->recovery_making_progress)
{
Assert(strstr(PQerrorMessage(ftsInfo->conn), _(POSTMASTER_IN_RECOVERY_MSG)));
elogif(gp_log_fts >= GPVARS_VERBOSITY_VERBOSE, LOG,
"FTS: detected segment is in recovery mode and making "
"progress (content=%d) primary dbid=%d, mirror dbid=%d",
primary->segindex, primary->dbid, mirror->dbid);
ftsInfo->state = FTS_RESPONSE_PROCESSED;
break;
}
Assert(!IsPrimaryAlive);
/* See if mirror can be promoted. */
if (SEGMENT_IS_IN_SYNC(mirror))
{
/*
......@@ -1142,6 +1231,9 @@ FtsWalRepInitProbeContext(CdbComponentDatabases *cdbs, fts_context *context)
ftsInfo->result.isRoleMirror = false;
ftsInfo->result.dbid = primary->dbid;
ftsInfo->state = FTS_PROBE_SEGMENT;
ftsInfo->recovery_making_progress = false;
ftsInfo->xlogrecptr.xlogid = 0;
ftsInfo->xlogrecptr.xrecoff = 0;
ftsInfo->primary_cdbinfo = primary;
ftsInfo->mirror_cdbinfo = mirror;
......
......@@ -265,8 +265,9 @@ test_ftsConnect_one_failure_one_success(void **state)
PGconn *failure_pgconn = palloc(sizeof(PGconn));
failure_pgconn->status = CONNECTION_BAD;
will_return(PQconnectStart, failure_pgconn);
expect_value(PQerrorMessage, conn, failure_pgconn);
will_be_called(PQerrorMessage);
will_return(PQerrorMessage, "");
ftsConnect(&context);
......@@ -461,7 +462,7 @@ test_ftsReceive_when_fts_handler_FATAL(void **state)
will_return(PQconsumeInput, 0);
expect_value(PQerrorMessage, conn, ftsInfo->conn);
will_be_called(PQerrorMessage);
will_return(PQerrorMessage, "");
/*
* TEST
......
......@@ -81,7 +81,7 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <limits.h>
#include "access/xlog.h"
/* headers required for process affinity bindings */
#ifdef HAVE_NUMA_H
#define NUMA_VERSION1_COMPATIBILITY 1
......@@ -1900,6 +1900,7 @@ ProcessStartupPacket(Port *port, bool SSLdone)
ProtocolVersion proto;
MemoryContext oldcontext;
char *gpqeid = NULL;
XLogRecPtr recptr;
if (pq_getbytes((char *) &len, 4) == EOF)
{
......@@ -2076,6 +2077,43 @@ retry1:
errmsg("cannot handle FTS connection on master")));
am_ftshandler = true;
am_mirror = IsRoleMirror();
#ifdef FAULT_INJECTOR
if (FaultInjector_InjectFaultIfSet(
FTSConnStartupPacket,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */) == FaultInjectorTypeSkip)
{
/*
* If this fault is set to skip, report recovery is
* hung. Without this fault recovery is reported as
* progressing.
*/
if (FaultInjector_InjectFaultIfSet(
FTSRecoveryInProgress,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */) == FaultInjectorTypeSkip)
{
recptr = last_xlog_replay_location();
}
else
{
time_t counter = time(NULL);
recptr.xlogid = counter;
recptr.xrecoff = counter;
}
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errSendAlert(true),
errmsg(POSTMASTER_IN_RECOVERY_MSG),
errdetail(POSTMASTER_IN_RECOVERY_DETAIL_MSG " %s",
XLogLocationToString(&recptr))));
}
#endif
}
else
ereport(FATAL,
......@@ -2205,10 +2243,14 @@ retry1:
errmsg("the database system is shutting down")));
break;
case CAC_RECOVERY:
recptr = last_xlog_replay_location();
ereport(FATAL,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errSendAlert(true),
errmsg(POSTMASTER_IN_RECOVERY_MSG)));
errmsg(POSTMASTER_IN_RECOVERY_MSG),
errdetail(POSTMASTER_IN_RECOVERY_DETAIL_MSG " %s",
XLogLocationToString(&recptr))));
break;
case CAC_TOOMANY:
ereport(FATAL,
......@@ -2226,10 +2268,14 @@ retry1:
Assert(am_mirror);
break;
}
recptr = last_xlog_replay_location();
ereport(FATAL,
(errcode(ERRCODE_MIRROR_READY),
errSendAlert(true),
errmsg(POSTMASTER_IN_RECOVERY_MSG)));
errmsg(POSTMASTER_IN_RECOVERY_MSG),
errdetail(POSTMASTER_IN_RECOVERY_DETAIL_MSG " %s",
XLogLocationToString(&recptr))));
break;
case CAC_OK:
break;
......
......@@ -390,4 +390,7 @@ extern void do_pg_abort_backup(void);
extern bool
IsBkpBlockApplied(XLogRecord *record, uint8 block_id);
extern XLogRecPtr
last_xlog_replay_location(void);
#endif /* XLOG_H */
......@@ -13,6 +13,7 @@
*/
#ifndef FTSPROBE_H
#define FTSPROBE_H
#include "access/xlogdefs.h"
typedef struct
{
......@@ -81,6 +82,8 @@ typedef struct
int16 probe_errno; /* saved errno from the latest system call */
struct pg_conn *conn; /* libpq connection object */
int retry_count;
XLogRecPtr xlogrecptr;
bool recovery_making_progress;
} fts_segment_info;
typedef struct
......
......@@ -47,6 +47,7 @@ extern int postmaster_alive_fds[2];
#define POSTMASTER_IN_STARTUP_MSG "the database system is starting up"
#define POSTMASTER_IN_RECOVERY_MSG "the database system is in recovery mode"
#define POSTMASTER_IN_RECOVERY_DETAIL_MSG "last replayed record at"
extern const char *progname;
......
......@@ -224,6 +224,14 @@ FI_IDENT(SyncRepQueryCancel, "sync_rep_query_cancel")
FI_IDENT(DistributedLogAdvanceOldestXmin, "distributedlog_advance_oldest_xmin")
/* inject fault at initialization of wal sender */
FI_IDENT(InitializeWalSender, "initialize_wal_sender")
/* inject fault when fts connection is received on primary/mirror */
FI_IDENT(FTSConnStartupPacket, "fts_conn_startup_packet")
/*
* inject fault to report recovery is hung to FTS. This fault only works with
* FTSConnStartupPacket fault set to skip.
*/
FI_IDENT(FTSRecoveryInProgress, "fts_recovery_in_progress")
#endif
/*
......
-- Test to make sure FTS doesn't mark primary down if its recovering. Fault
-- 'fts_conn_startup_packet' is used to simulate the primary responding
-- in-recovery to FTS, primary is not actually going through crash-recovery in
-- test.
create extension if not exists gp_inject_fault;
select role, preferred_role, mode from gp_segment_configuration where content = 0;
role | preferred_role | mode
------+----------------+------
p | p | s
m | m | s
(2 rows)
select gp_inject_fault('fts_conn_startup_packet', 'skip', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
NOTICE: Success: (seg0 127.0.0.1:25432 pid=26540)
gp_inject_fault
-----------------
t
(1 row)
-- to make test deterministic and fast
-- start_ignore
\!gpconfig -c gp_fts_probe_retries -v 2 --masteronly
\!gpstop -u
-- end_ignore
show gp_fts_probe_retries;
gp_fts_probe_retries
----------------------
2
(1 row)
select gp_request_fts_probe_scan();
gp_request_fts_probe_scan
---------------------------
t
(1 row)
select gp_wait_until_triggered_fault('fts_conn_startup_packet', 3, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
NOTICE: Success: (seg0 127.0.0.1:25432 pid=26540)
gp_wait_until_triggered_fault
-------------------------------
t
(1 row)
select role, preferred_role, mode from gp_segment_configuration where content = 0;
role | preferred_role | mode
------+----------------+------
p | p | s
m | m | s
(2 rows)
-- test other scenario where recovery on primary is hung and hence FTS marks
-- primary down and promotes mirror. When 'fts_recovery_in_progress' is set to
-- skip it mimics the behavior of hung recovery on primary.
select gp_inject_fault('fts_recovery_in_progress', 'skip', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
NOTICE: Success: (seg0 127.0.0.1:25432 pid=26540)
gp_inject_fault
-----------------
t
(1 row)
-- We call gp_request_fts_probe_scan twice to guarantee that the scan happens
-- after the fts_recovery_in_progress fault has been injected. If periodic fts
-- probe is running when the first request scan is run it is possible to not
-- see the effect due to the fault.
select gp_request_fts_probe_scan();
gp_request_fts_probe_scan
---------------------------
t
(1 row)
select gp_request_fts_probe_scan();
gp_request_fts_probe_scan
---------------------------
t
(1 row)
select role, preferred_role, mode from gp_segment_configuration where content = 0;
role | preferred_role | mode
------+----------------+------
m | p | n
p | m | n
(2 rows)
-- The remaining steps are to bring back the cluster to original state.
-- start_ignore
\! gprecoverseg -aF
-- end_ignore
-- loop while segments come in sync
do $$
begin
for i in 1..120 loop
if (select count(*) = 0 from gp_segment_configuration where content = 0 and mode != 's') then
return;
end if;
perform gp_request_fts_probe_scan();
end loop;
end;
$$;
select role, preferred_role, mode from gp_segment_configuration where content = 0;
role | preferred_role | mode
------+----------------+------
p | m | s
m | p | s
(2 rows)
-- start_ignore
\! gprecoverseg -ar
-- end_ignore
-- loop while segments come in sync
do $$
begin
for i in 1..120 loop
if (select count(*) = 0 from gp_segment_configuration where content = 0 and mode != 's') then
return;
end if;
perform gp_request_fts_probe_scan();
end loop;
end;
$$;
select role, preferred_role, mode from gp_segment_configuration where content = 0;
role | preferred_role | mode
------+----------------+------
p | p | s
m | m | s
(2 rows)
-- start_ignore
\!gpconfig -r gp_fts_probe_retries --masteronly
\!gpstop -u
-- end_ignore
-- cleanup steps
select gp_inject_fault('fts_recovery_in_progress', 'reset', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
NOTICE: Success: (seg0 127.0.0.1:25432 pid=27127)
gp_inject_fault
-----------------
t
(1 row)
select gp_inject_fault('fts_conn_startup_packet', 'reset', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
NOTICE: Success: (seg0 127.0.0.1:25432 pid=27127)
gp_inject_fault
-----------------
t
(1 row)
......@@ -208,6 +208,9 @@ test: psql_gp_commands pg_resetxlog
# Check for shmem leak for instrumentation slots
test: instr_in_shmem_verify
# fts_recovery_in_progresss uses fault injectors to simulate FTS fault states,
# hence it should be run in isolation.
test: fts_recovery_in_progress
test: autovacuum-template0
# end of tests
-- Test to make sure FTS doesn't mark primary down if its recovering. Fault
-- 'fts_conn_startup_packet' is used to simulate the primary responding
-- in-recovery to FTS, primary is not actually going through crash-recovery in
-- test.
create extension if not exists gp_inject_fault;
select role, preferred_role, mode from gp_segment_configuration where content = 0;
select gp_inject_fault('fts_conn_startup_packet', 'skip', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
-- to make test deterministic and fast
-- start_ignore
\!gpconfig -c gp_fts_probe_retries -v 2 --masteronly
\!gpstop -u
-- end_ignore
show gp_fts_probe_retries;
select gp_request_fts_probe_scan();
select gp_wait_until_triggered_fault('fts_conn_startup_packet', 3, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
select role, preferred_role, mode from gp_segment_configuration where content = 0;
-- test other scenario where recovery on primary is hung and hence FTS marks
-- primary down and promotes mirror. When 'fts_recovery_in_progress' is set to
-- skip it mimics the behavior of hung recovery on primary.
select gp_inject_fault('fts_recovery_in_progress', 'skip', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
-- We call gp_request_fts_probe_scan twice to guarantee that the scan happens
-- after the fts_recovery_in_progress fault has been injected. If periodic fts
-- probe is running when the first request scan is run it is possible to not
-- see the effect due to the fault.
select gp_request_fts_probe_scan();
select gp_request_fts_probe_scan();
select role, preferred_role, mode from gp_segment_configuration where content = 0;
-- The remaining steps are to bring back the cluster to original state.
-- start_ignore
\! gprecoverseg -aF
-- end_ignore
-- loop while segments come in sync
do $$
begin
for i in 1..120 loop
if (select count(*) = 0 from gp_segment_configuration where content = 0 and mode != 's') then
return;
end if;
perform gp_request_fts_probe_scan();
end loop;
end;
$$;
select role, preferred_role, mode from gp_segment_configuration where content = 0;
-- start_ignore
\! gprecoverseg -ar
-- end_ignore
-- loop while segments come in sync
do $$
begin
for i in 1..120 loop
if (select count(*) = 0 from gp_segment_configuration where content = 0 and mode != 's') then
return;
end if;
perform gp_request_fts_probe_scan();
end loop;
end;
$$;
select role, preferred_role, mode from gp_segment_configuration where content = 0;
-- start_ignore
\!gpconfig -r gp_fts_probe_retries --masteronly
\!gpstop -u
-- end_ignore
-- cleanup steps
select gp_inject_fault('fts_recovery_in_progress', 'reset', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
select gp_inject_fault('fts_conn_startup_packet', 'reset', '', '', '', -1, 0, dbid)
from gp_segment_configuration where content = 0 and role = 'p';
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册