提交 6d80ce31 编写于 作者: A Ashwin Agrawal

Align transaction log manager (xlog.c and xlog.h) to upstream.

Lot of differences collected over the years compared to upstream. Some
confusing or redundant code as well hence better to make it match
upstream.
上级 b55a0b71
......@@ -81,6 +81,13 @@
extern uint32 bootstrap_data_checksum_version;
/* File path names (all relative to $PGDATA) */
#define RECOVERY_COMMAND_FILE "recovery.conf"
#define RECOVERY_COMMAND_DONE "recovery.done"
#define PROMOTE_SIGNAL_FILE "promote"
#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
/* User-settable parameters */
int CheckPointSegments = 3;
int wal_keep_segments = 0;
......@@ -91,8 +98,6 @@ char *XLogArchiveCommand = NULL;
bool EnableHotStandby = false;
bool fullPageWrites = true;
bool wal_log_hints = false;
char *wal_consistency_checking_string = NULL;
bool *wal_consistency_checking = NULL;
bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD;
int wal_level = WAL_LEVEL_MINIMAL;
......@@ -4933,10 +4938,6 @@ XLOGShmemSize(void)
* routine again below to compute the actual allocation size.
*/
/*
* Similary, we also don't PgControlWatch for the above reasons, too.
*/
return size;
}
......@@ -5228,7 +5229,7 @@ readRecoveryCommandFile(void)
if (fd == NULL)
{
if (errno == ENOENT)
return; /* not there, so no recovery in standby mode */
return; /* not there, so no archive recovery */
ereport(FATAL,
(errcode_for_file_access(),
errmsg("could not open recovery command file \"%s\": %m",
......@@ -5486,25 +5487,6 @@ readRecoveryCommandFile(void)
FreeConfigVariables(head);
}
static void
renameRecoveryFile()
{
/*
* Rename the config file out of the way, so that we don't accidentally
* re-enter archive recovery mode in a subsequent crash.
*/
unlink(RECOVERY_COMMAND_DONE);
durable_rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE, FATAL);
/*
* Response to FTS probes after this point will not indicate that we are a
* mirror because the am_mirror flag is set based on existence of
* RECOVERY_COMMAND_FILE. New libpq connections to the postmaster should
* no longer return CAC_MIRROR_READY as response because we are no longer a
* mirror.
*/
ResetMirrorReadyFlag();
}
/*
* Exit archive-recovery state
*/
......@@ -5573,7 +5555,24 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
/* Get rid of any remaining recovered timeline-history file, too */
snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
unlink(recoveryPath); /* ignore any error */
renameRecoveryFile();
/*
* Rename the config file out of the way, so that we don't accidentally
* re-enter archive recovery mode in a subsequent crash.
*/
unlink(RECOVERY_COMMAND_DONE);
durable_rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE, FATAL);
/*
* Response to FTS probes after this point will not indicate that we are a
* mirror because the am_mirror flag is set based on existence of
* RECOVERY_COMMAND_FILE. New libpq connections to the postmaster should
* no longer return CAC_MIRROR_READY as response because we are no longer a
* mirror.
*/
ResetMirrorReadyFlag();
ereport(LOG,
(errmsg("archive recovery complete")));
}
/*
......@@ -6111,48 +6110,6 @@ SetCurrentChunkStartTime(TimestampTz xtime)
SpinLockRelease(&xlogctl->info_lck);
}
static void
ApplyStartupRedo(
XLogRecPtr *beginLoc,
XLogRecPtr *lsn,
XLogRecord *record)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
ErrorContextCallback errcontext_cb;
/* Setup error traceback support for ereport() */
errcontext_cb.callback = rm_redo_error_callback;
errcontext_cb.arg = (void *) record;
errcontext_cb.previous = error_context_stack;
error_context_stack = &errcontext_cb;
/* nextXid must be beyond record's xid */
if (TransactionIdFollowsOrEquals(record->xl_xid,
ShmemVariableCache->nextXid))
{
ShmemVariableCache->nextXid = record->xl_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
}
/*
* Update shared replayEndRecPtr before replaying this record,
* so that XLogFlush will update minRecoveryPoint correctly.
*/
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->replayEndRecPtr = EndRecPtr;
SpinLockRelease(&xlogctl->info_lck);
RmgrTable[record->xl_rmid].rm_redo(*beginLoc, *lsn, record);
/* Pop the error context stack */
error_context_stack = errcontext_cb.previous;
}
/*
* Process passed checkpoint record either during normal recovery or
* in standby mode.
......@@ -6457,10 +6414,16 @@ StartupXLOG(void)
str_time(ControlFile->time)),
errhint("This probably means that some data is corrupted and"
" you will have to use the last backup for recovery.")));
else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
ereport(LOG,
(errmsg("database system was interrupted while in recovery at log time %s",
str_time(ControlFile->checkPointCopy.time)),
errhint("If this has occurred more than once some data might be corrupted"
" and you might need to choose an earlier recovery target.")));
else if (ControlFile->state == DB_IN_PRODUCTION)
ereport(LOG,
(errmsg("database system was interrupted; last known up at %s",
str_time(ControlFile->time))));
(errmsg("database system was interrupted; last known up at %s",
str_time(ControlFile->time))));
/* This is just to allow attaching to startup process with a debugger */
#ifdef XLOG_REPLAY_DELAY
......@@ -6574,11 +6537,11 @@ StartupXLOG(void)
InArchiveRecovery = true;
if (StandbyModeRequested)
StandbyMode = true;
/*
* When a backup_label file is present, we want to roll forward from
* the checkpoint it identifies, rather than using pg_control.
*/
record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
if (record != NULL)
{
......@@ -6867,7 +6830,8 @@ StartupXLOG(void)
/*
* Update pg_control to show that we are recovering and to show the
* selected checkpoint as the place we are starting from. We also mark
* pg_control with any minimum recovery stop point
* pg_control with any minimum recovery stop point obtained from a
* backup history file.
*/
dbstate_at_startup = ControlFile->state;
if (InArchiveRecovery)
......@@ -6883,14 +6847,11 @@ StartupXLOG(void)
"and has target timeline %u",
ControlFile->checkPointCopy.ThisTimeLineID,
recoveryTargetTLI)));
ControlFile->state = DB_IN_CRASH_RECOVERY;
}
ControlFile->prevCheckPoint = ControlFile->checkPoint;
ControlFile->checkPoint = checkPointLoc;
ControlFile->checkPointCopy = checkPoint;
if (InArchiveRecovery)
{
/* initialize minRecoveryPoint if not set yet */
......@@ -7167,7 +7128,7 @@ StartupXLOG(void)
(uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
xlog_outrec(&buf, record);
appendStringInfoString(&buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(&buf, record);
RmgrTable[record->xl_rmid].rm_desc(&buf,record);
elog(LOG, "%s", buf.data);
pfree(buf.data);
}
......@@ -7224,6 +7185,21 @@ StartupXLOG(void)
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
/*
* ShmemVariableCache->nextXid must be beyond record's xid.
*
* We don't expect anyone else to modify nextXid, hence we
* don't need to hold a lock while examining it. We still
* acquire the lock to modify it, though.
*/
if (TransactionIdFollowsOrEquals(record->xl_xid,
ShmemVariableCache->nextXid))
{
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->nextXid = record->xl_xid;
TransactionIdAdvance(ShmemVariableCache->nextXid);
LWLockRelease(XidGenLock);
}
/*
* See if this record is a checkpoint, if yes then uncover it to
* find distributed committed Xacts.
......@@ -7300,7 +7276,8 @@ StartupXLOG(void)
TransactionIdIsValid(record->xl_xid))
RecordKnownAssignedTransactionIds(record->xl_xid);
ApplyStartupRedo(&ReadRecPtr, &EndRecPtr, record);
/* Now apply the WAL record itself */
RmgrTable[record->xl_rmid].rm_redo(ReadRecPtr, EndRecPtr, record);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
......@@ -7476,8 +7453,7 @@ StartupXLOG(void)
* crashes while an online backup is in progress. We must not treat
* that as an error, or the database will refuse to start up.
*/
// WALREP_FIXME: But we should probably do this check in standby mode, too
if (StandbyModeRequested || ControlFile->backupEndRequired)
if (ArchiveRecoveryRequested || ControlFile->backupEndRequired)
{
if (ControlFile->backupEndRequired)
ereport(FATAL,
......@@ -7486,7 +7462,7 @@ StartupXLOG(void)
else if (!XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
ereport(FATAL,
(errmsg("WAL ends before end of online backup"),
errhint("Online backup should be complete, and all WAL up to that point must be available at recovery.")));
errhint("Online backup started with pg_start_backup() must be ended with pg_stop_backup(), and all WAL up to that point must be available at recovery.")));
else
ereport(FATAL,
(errmsg("WAL ends before consistent recovery point")));
......@@ -7712,6 +7688,12 @@ StartupXLOG(void)
*/
InRecovery = false;
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->state = DB_IN_PRODUCTION;
ControlFile->time = (pg_time_t) time(NULL);
UpdateControlFile();
LWLockRelease(ControlFileLock);
/* start the archive_timeout timer running */
XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
......@@ -7754,12 +7736,7 @@ StartupXLOG(void)
bool needToPromoteCatalog = (IS_QUERY_DISPATCHER() &&
ControlFile->state == DB_IN_ARCHIVE_RECOVERY);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->state = DB_IN_PRODUCTION;
ControlFile->time = (pg_time_t) time(NULL);
UpdateControlFile();
ereport(LOG, (errmsg("database system is ready")));
LWLockRelease(ControlFileLock);
/*
* Shutdown the recovery environment. This must occur after
......@@ -7812,9 +7789,7 @@ StartupXLOG(void)
* primary state while the recovery is trying to stream.
*/
if (needToPromoteCatalog)
{
UpdateCatalogForStandbyPromotion();
}
/*
* If this was a fast promotion, request an (online) checkpoint now. This
......@@ -8645,11 +8620,11 @@ CreateCheckPoint(int flags)
if (shutdown)
{
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->state = DB_SHUTDOWNING;
ControlFile->time = (pg_time_t) time(NULL);
UpdateControlFile();
LWLockRelease(ControlFileLock);
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
ControlFile->state = DB_SHUTDOWNING;
ControlFile->time = (pg_time_t) time(NULL);
UpdateControlFile();
LWLockRelease(ControlFileLock);
}
/*
......@@ -8949,8 +8924,9 @@ CreateCheckPoint(int flags)
memcpy(&ptrd_oldest, ptrd_oldest_ptr, sizeof(ptrd_oldest));
recptr = XLogInsert(RM_XLOG_ID,
shutdown ? XLOG_CHECKPOINT_SHUTDOWN : XLOG_CHECKPOINT_ONLINE,
rdata);
shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
XLOG_CHECKPOINT_ONLINE,
rdata);
XLogFlush(recptr);
......@@ -8991,10 +8967,7 @@ CreateCheckPoint(int flags)
*/
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (shutdown)
{
ControlFile->state = DB_SHUTDOWNED;
}
ControlFile->state = DB_SHUTDOWNED;
ControlFile->prevCheckPoint = ControlFile->checkPoint;
ControlFile->checkPoint = ProcLastRecPtr;
ControlFile->checkPointCopy = checkPoint;
......@@ -9363,7 +9336,7 @@ CreateRestartPoint(int flags)
*/
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY &&
ControlFile->checkPointCopy.redo < lastCheckPoint.redo)
ControlFile->checkPointCopy.redo < lastCheckPoint.redo)
{
ControlFile->prevCheckPoint = ControlFile->checkPoint;
ControlFile->checkPoint = lastCheckPointRecPtr;
......@@ -9450,7 +9423,6 @@ CreateRestartPoint(int flags)
* on. Reset it now, to restore the normal state of affairs for
* debugging purposes.
*/
if (RecoveryInProgress())
ThisTimeLineID = 0;
}
......@@ -10058,7 +10030,7 @@ xlog_redo(XLogRecPtr beginLoc __attribute__((unused)), XLogRecPtr lsn __attribut
checkPoint.nextXid))
ShmemVariableCache->nextXid = checkPoint.nextXid;
LWLockRelease(XidGenLock);
/*
* We ignore the nextOid counter in an ONLINE checkpoint, preferring
* to track OID assignment through XLOG_NEXTOID records. The nextOid
......@@ -12244,3 +12216,32 @@ wait_for_mirror()
SyncRepWaitForLSN(tmpLogwrtResult.Flush);
}
/*
* Check to see if we're a mirror, and if we are: (1) Assume that we are
* running as superuser; (2) No data pages need to be accessed by this backend
* - no snapshot / transaction needed.
*
* The recovery.conf file is renamed to recovery.done at the end of xlog
* replay. Normal backends can be created thereafter.
*/
bool
IsRoleMirror()
{
struct stat stat_buf;
return (stat(RECOVERY_COMMAND_FILE, &stat_buf) == 0);
}
/*
* GPDB_90_MERGE_FIXME: This function should be removed once hot
* standby can and will be enabled for mirrors.
*/
void SignalPromote(void)
{
FILE *fd;
if ((fd = fopen(PROMOTE_SIGNAL_FILE, "w")))
{
fclose(fd);
kill(PostmasterPid, SIGUSR1);
}
}
......@@ -2171,21 +2171,6 @@ initMasks(fd_set *rmask)
return maxsock + 1;
}
/*
* Check to see if we're a mirror, and if we are: (1) Assume that we are
* running as superuser; (2) No data pages need to be accessed by this backend
* - no snapshot / transaction needed.
*
* The recovery.conf file is renamed to recovery.done at the end of xlog
* replay. Normal backends can be created thereafter.
*/
static bool
IsRoleMirror(void)
{
struct stat stat_buf;
return (stat(RECOVERY_COMMAND_FILE, &stat_buf) == 0);
}
/*
* Once the flag is reset, libpq connections (e.g. FTS probe requests) should
* not get CAC_MIRROR_READY response. This flag is needed during GPDB startup
......@@ -5779,20 +5764,6 @@ sigusr1_handler(SIGNAL_ARGS)
errno = save_errno;
}
/*
* GPDB_90_MERGE_FIXME: This function should be removed once hot
* standby can and will be enabled for mirrors.
*/
void SignalPromote(void)
{
FILE *fd;
if ((fd = fopen(PROMOTE_SIGNAL_FILE, "w")))
{
fclose(fd);
kill(PostmasterPid, SIGUSR1);
}
}
/*
* SIGTERM or SIGQUIT while processing startup packet.
* Clean up and exit(1).
......
......@@ -66,20 +66,10 @@ typedef struct XLogRecord
#define XLogRecGetData(record) ((char*) (record) + SizeOfXLogRecord)
/*
* XLOG uses only low 4 bits of xl_info. High 4 bits may be used by rmgr.
* XLR_CHECK_CONSISTENCY bits can be passed by XLogInsert caller.
* XLOG uses only low 4 bits of xl_info. High 4 bits may be used by rmgr.
*/
#define XLR_INFO_MASK 0x0F
/*
* Enforces consistency checks of replayed WAL at recovery. If enabled,
* each record will log a full-page write for each block modified by the
* record and will reuse it afterwards for consistency checks. The caller
* of XLogInsert can use this value if necessary, but if
* wal_consistency_checking is enabled for a rmgr this is set unconditionally.
*/
#define XLR_CHECK_CONSISTENCY 0x02
/*
* If we backed up any disk blocks with the XLOG record, we use flag bits in
* xl_info to signal it. We support backup of up to 4 disk blocks per XLOG
......@@ -207,9 +197,6 @@ extern char *XLogArchiveCommand;
extern bool EnableHotStandby;
extern bool gp_keep_all_xlog;
extern bool *wal_consistency_checking;
extern char *wal_consistency_checking_string;
extern bool fullPageWrites;
extern bool wal_log_hints;
extern bool log_checkpoints;
......@@ -300,12 +287,6 @@ typedef struct CheckpointStatsData
extern CheckpointStatsData CheckpointStats;
/* File path names (all relative to $PGDATA) */
#define RECOVERY_COMMAND_FILE "recovery.conf"
#define RECOVERY_COMMAND_DONE "recovery.done"
#define PROMOTE_SIGNAL_FILE "promote"
#define FALLBACK_PROMOTE_SIGNAL_FILE "fallback_promote"
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern XLogRecPtr XLogInsert_OverrideXid(RmgrId rmid, uint8 info, XLogRecData *rdata, TransactionId overrideXid);
extern XLogRecPtr XLogLastInsertBeginLoc(void);
......@@ -334,7 +315,6 @@ extern void UnpackCheckPointRecord(struct XLogRecord *record, CheckpointExtended
extern void issue_xlog_fsync(int fd, XLogSegNo segno);
extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void);
extern bool HotStandbyActiveInReplay(void);
......@@ -357,8 +337,6 @@ extern Size XLOGShmemSize(void);
extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void);
extern void StartupXLOG(void);
extern bool XLogStartupMultipleRecoveryPassesNeeded(void);
extern bool XLogStartupIntegrityCheckNeeded(void);
extern void ShutdownXLOG(int code, Datum arg);
extern void InitXLOGAccess(void);
extern void CreateCheckPoint(int flags);
......@@ -401,5 +379,8 @@ extern bool IsStandbyMode(void);
extern DBState GetCurrentDBState(void);
extern XLogRecPtr last_xlog_replay_location(void);
extern void wait_for_mirror(void);
extern bool IsRoleMirror(void);
extern void SignalPromote(void);
#endif /* XLOG_H */
......@@ -62,7 +62,6 @@ extern int MaxLivePostmasterChildren(void);
extern int GetNumShmemAttachedBgworkers(void);
extern bool PostmasterMarkPIDForWorkerNotify(int);
extern void SignalPromote(void);
extern void ResetMirrorReadyFlag(void);
#ifdef EXEC_BACKEND
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册