xlog.c 87.2 KB
Newer Older
1
/*-------------------------------------------------------------------------
2 3
 *
 * xlog.c
4
 *		PostgreSQL transaction log manager
5 6
 *
 *
7
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.60 2001/03/17 20:54:13 tgl Exp $
11 12 13
 *
 *-------------------------------------------------------------------------
 */
14

15 16
#include "postgres.h"

17
#include <fcntl.h>
T
Tom Lane 已提交
18
#include <signal.h>
19 20 21
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
V
Vadim B. Mikheev 已提交
22
#include <sys/time.h>
V
Vadim B. Mikheev 已提交
23 24
#include <sys/types.h>
#include <dirent.h>
25 26 27
#ifdef USE_LOCALE
#include <locale.h>
#endif
28

29
#include "access/transam.h"
30
#include "access/xact.h"
31
#include "catalog/catversion.h"
T
Tom Lane 已提交
32
#include "catalog/pg_control.h"
33 34 35 36
#include "storage/sinval.h"
#include "storage/proc.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
37
#include "storage/bufpage.h"
V
Vadim B. Mikheev 已提交
38 39
#include "access/xlog.h"
#include "access/xlogutils.h"
40
#include "utils/builtins.h"
41
#include "utils/relcache.h"
V
WAL  
Vadim B. Mikheev 已提交
42 43
#include "miscadmin.h"

44

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
/*
 * This chunk of hackery attempts to determine which file sync methods
 * are available on the current platform, and to choose an appropriate
 * default method.  We assume that fsync() is always available, and that
 * configure determined whether fdatasync() is.
 */
#define SYNC_METHOD_FSYNC		0
#define SYNC_METHOD_FDATASYNC	1
#define SYNC_METHOD_OPEN		2 /* used for both O_SYNC and O_DSYNC */

#if defined(O_SYNC)
# define OPEN_SYNC_FLAG		O_SYNC
#else
# if defined(O_FSYNC)
#  define OPEN_SYNC_FLAG	O_FSYNC
# endif
#endif

#if defined(OPEN_SYNC_FLAG)
# if defined(O_DSYNC) && (O_DSYNC != OPEN_SYNC_FLAG)
#  define OPEN_DATASYNC_FLAG	O_DSYNC
# endif
#endif

#if defined(OPEN_DATASYNC_FLAG)
# define DEFAULT_SYNC_METHOD_STR	"open_datasync"
# define DEFAULT_SYNC_METHOD		SYNC_METHOD_OPEN
# define DEFAULT_SYNC_FLAGBIT		OPEN_DATASYNC_FLAG
#else
# if defined(HAVE_FDATASYNC)
#  define DEFAULT_SYNC_METHOD_STR	"fdatasync"
#  define DEFAULT_SYNC_METHOD		SYNC_METHOD_FDATASYNC
#  define DEFAULT_SYNC_FLAGBIT		0
# else
#  define DEFAULT_SYNC_METHOD_STR	"fsync"
#  define DEFAULT_SYNC_METHOD		SYNC_METHOD_FSYNC
#  define DEFAULT_SYNC_FLAGBIT		0
# endif
#endif


86 87 88
/* Max time to wait to acquire XLog activity locks */
#define XLOG_LOCK_TIMEOUT			(5*60*1000000) /* 5 minutes */
/* Max time to wait to acquire checkpoint lock */
89
#define CHECKPOINT_LOCK_TIMEOUT		(20*60*1000000) /* 20 minutes */
90

T
Tom Lane 已提交
91 92
/* User-settable parameters */
int			CheckPointSegments = 3;
V
Vadim B. Mikheev 已提交
93
int			XLOGbuffers = 8;
T
Tom Lane 已提交
94 95
int			XLOGfiles = 0;	/* how many files to pre-allocate during ckpt */
int			XLOG_DEBUG = 0;
96 97
char	   *XLOG_sync_method = NULL;
const char	XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
T
Tom Lane 已提交
98 99
char		XLOG_archive_dir[MAXPGPATH]; /* null string means delete 'em */

100 101 102 103
/* these are derived from XLOG_sync_method by assign_xlog_sync_method */
static int	sync_method = DEFAULT_SYNC_METHOD;
static int	open_sync_bit = DEFAULT_SYNC_FLAGBIT;

T
Tom Lane 已提交
104 105
#define MinXLOGbuffers	4

106 107
#define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)

T
Tom Lane 已提交
108 109 110 111 112

/*
 * ThisStartUpID will be same in all backends --- it identifies current
 * instance of the database system.
 */
V
WAL  
Vadim B. Mikheev 已提交
113 114
StartUpID	ThisStartUpID = 0;

T
Tom Lane 已提交
115 116
/* Are we doing recovery by reading XLOG? */
bool		InRecovery = false;
117

T
Tom Lane 已提交
118 119 120 121 122 123 124 125 126
/*
 * MyLastRecPtr points to the start of the last XLOG record inserted by the
 * current transaction.  If MyLastRecPtr.xrecoff == 0, then we are not in
 * a transaction or the transaction has not yet made any loggable changes.
 *
 * Note that XLOG records inserted outside transaction control are not
 * reflected into MyLastRecPtr.
 */
XLogRecPtr	MyLastRecPtr = {0, 0};
V
Vadim B. Mikheev 已提交
127

T
Tom Lane 已提交
128 129 130 131 132 133
/*
 * ProcLastRecPtr points to the start of the last XLOG record inserted by the
 * current backend.  It is updated for all inserts, transaction-controlled
 * or not.
 */
static XLogRecPtr ProcLastRecPtr = {0, 0};
134

T
Tom Lane 已提交
135 136 137 138 139 140 141 142
/*
 * RedoRecPtr is this backend's local copy of the REDO record pointer
 * (which is almost but not quite the same as a pointer to the most recent
 * CHECKPOINT record).  We update this from the shared-memory copy,
 * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
 * hold the Insert spinlock).  See XLogInsert for details.
 */
static XLogRecPtr RedoRecPtr;
143

T
Tom Lane 已提交
144 145
/* This lock must be held to read/update control file or create new log file */
SPINLOCK	ControlFileLockId;
146

T
Tom Lane 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
/*----------
 * Shared-memory data structures for XLOG control
 *
 * LogwrtRqst indicates a byte position that we need to write and/or fsync
 * the log up to (all records before that point must be written or fsynced).
 * LogwrtResult indicates the byte positions we have already written/fsynced.
 * These structs are identical but are declared separately to indicate their
 * slightly different functions.
 *
 * We do a lot of pushups to minimize the amount of access to spinlocked
 * shared memory values.  There are actually three shared-memory copies of
 * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
 *		XLogCtl->LogwrtResult is protected by info_lck
 *		XLogCtl->Write.LogwrtResult is protected by logwrt_lck
 *		XLogCtl->Insert.LogwrtResult is protected by insert_lck
 * One must hold the associated spinlock to read or write any of these, but
 * of course no spinlock is needed to read/write the unshared LogwrtResult.
 *
 * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
 * right", since both are updated by a write or flush operation before
 * it releases logwrt_lck.  The point of keeping XLogCtl->Write.LogwrtResult
 * is that it can be examined/modified by code that already holds logwrt_lck
 * without needing to grab info_lck as well.
 *
 * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
 * but is updated when convenient.  Again, it exists for the convenience of
 * code that is already holding insert_lck but not the other locks.
 *
 * The unshared LogwrtResult may lag behind any or all of these, and again
 * is updated when convenient.
 *
 * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
 * (protected by info_lck), but we don't need to cache any copies of it.
 *
 * Note that this all works because the request and result positions can only
 * advance forward, never back up, and so we can easily determine which of two
 * values is "more up to date".
 *----------
 */
typedef struct XLogwrtRqst
187
{
T
Tom Lane 已提交
188 189 190
	XLogRecPtr	Write;			/* last byte + 1 to write out */
	XLogRecPtr	Flush;			/* last byte + 1 to flush */
} XLogwrtRqst;
191

T
Tom Lane 已提交
192
typedef struct XLogwrtResult
193
{
T
Tom Lane 已提交
194 195 196
	XLogRecPtr	Write;			/* last byte + 1 written out */
	XLogRecPtr	Flush;			/* last byte + 1 flushed */
} XLogwrtResult;
197

T
Tom Lane 已提交
198 199 200
/*
 * Shared state data for XLogInsert.
 */
201 202
typedef struct XLogCtlInsert
{
T
Tom Lane 已提交
203 204
	XLogwrtResult	LogwrtResult;	/* a recent value of LogwrtResult */
	XLogRecPtr		PrevRecord;		/* start of previously-inserted record */
205
	uint16			curridx;		/* current block index in cache */
T
Tom Lane 已提交
206 207 208
	XLogPageHeader	currpage;		/* points to header of block in cache */
	char		   *currpos;		/* current insertion point in cache */
	XLogRecPtr		RedoRecPtr;		/* current redo point for insertions */
209 210
} XLogCtlInsert;

T
Tom Lane 已提交
211 212 213
/*
 * Shared state data for XLogWrite/XLogFlush.
 */
214 215
typedef struct XLogCtlWrite
{
T
Tom Lane 已提交
216 217
	XLogwrtResult	LogwrtResult;	/* current value of LogwrtResult */
	uint16			curridx;		/* cache index of next block to write */
218 219
} XLogCtlWrite;

T
Tom Lane 已提交
220 221 222
/*
 * Total shared-memory state for XLOG.
 */
223 224
typedef struct XLogCtlData
{
T
Tom Lane 已提交
225
	/* Protected by insert_lck: */
V
Vadim B. Mikheev 已提交
226
	XLogCtlInsert	Insert;
T
Tom Lane 已提交
227 228 229 230
	/* Protected by info_lck: */
	XLogwrtRqst		LogwrtRqst;
	XLogwrtResult	LogwrtResult;
	/* Protected by logwrt_lck: */
V
Vadim B. Mikheev 已提交
231
	XLogCtlWrite	Write;
T
Tom Lane 已提交
232 233 234 235 236 237
	/*
	 * These values do not change after startup, although the pointed-to
	 * pages and xlblocks values certainly do.  Permission to read/write
	 * the pages and xlblocks values depends on insert_lck and logwrt_lck.
	 */
	char		   *pages;			/* buffers for unwritten XLOG pages */
V
Vadim B. Mikheev 已提交
238
	XLogRecPtr	   *xlblocks;		/* 1st byte ptr-s + BLCKSZ */
T
Tom Lane 已提交
239 240
	uint32			XLogCacheByte;	/* # bytes in xlog buffers */
	uint32			XLogCacheBlck;	/* highest allocated xlog buffer index */
V
Vadim B. Mikheev 已提交
241
	StartUpID		ThisStartUpID;
T
Tom Lane 已提交
242 243 244 245 246 247 248

	/* This value is not protected by *any* spinlock... */
	XLogRecPtr		RedoRecPtr;		/* see SetRedoRecPtr/GetRedoRecPtr */

	slock_t			insert_lck;		/* XLogInsert lock */
	slock_t			info_lck;		/* locks shared LogwrtRqst/LogwrtResult */
	slock_t			logwrt_lck;		/* XLogWrite/XLogFlush lock */
V
Vadim B. Mikheev 已提交
249
	slock_t			chkp_lck;		/* checkpoint lock */
250 251
} XLogCtlData;

252
static XLogCtlData *XLogCtl = NULL;
253

254
/*
T
Tom Lane 已提交
255
 * We maintain an image of pg_control in shared memory.
256
 */
257
static ControlFileData *ControlFile = NULL;
258

T
Tom Lane 已提交
259 260 261 262 263
/*
 * Macros for managing XLogInsert state.  In most cases, the calling routine
 * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
 * so these are passed as parameters instead of being fetched via XLogCtl.
 */
264

T
Tom Lane 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
/* Free space remaining in the current xlog page buffer */
#define INSERT_FREESPACE(Insert)  \
	(BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))

/* Construct XLogRecPtr value for current insertion point */
#define INSERT_RECPTR(recptr,Insert,curridx)  \
	( \
	  (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
	  (recptr).xrecoff = \
	  	XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
	)


/* Increment an xlogid/segment pair */
#define NextLogSeg(logId, logSeg)	\
	do { \
		if ((logSeg) >= XLogSegsPerFile-1) \
		{ \
			(logId)++; \
			(logSeg) = 0; \
		} \
		else \
			(logSeg)++; \
	} while (0)

/* Decrement an xlogid/segment pair (assume it's not 0,0) */
#define PrevLogSeg(logId, logSeg)	\
	do { \
		if (logSeg) \
			(logSeg)--; \
		else \
		{ \
			(logId)--; \
			(logSeg) = XLogSegsPerFile-1; \
		} \
	} while (0)
V
WAL  
Vadim B. Mikheev 已提交
301

T
Tom Lane 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
/*
 * Compute ID and segment from an XLogRecPtr.
 *
 * For XLByteToSeg, do the computation at face value.  For XLByteToPrevSeg,
 * a boundary byte is taken to be in the previous segment.  This is suitable
 * for deciding which segment to write given a pointer to a record end,
 * for example.
 */
#define XLByteToSeg(xlrp, logId, logSeg)	\
	( logId = (xlrp).xlogid, \
	  logSeg = (xlrp).xrecoff / XLogSegSize \
	)
#define XLByteToPrevSeg(xlrp, logId, logSeg)	\
	( logId = (xlrp).xlogid, \
	  logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \
	)
318

319
/*
T
Tom Lane 已提交
320 321 322 323
 * Is an XLogRecPtr within a particular XLOG segment?
 *
 * For XLByteInSeg, do the computation at face value.  For XLByteInPrevSeg,
 * a boundary byte is taken to be in the previous segment.
324
 */
T
Tom Lane 已提交
325 326 327 328 329 330 331
#define XLByteInSeg(xlrp, logId, logSeg)	\
	((xlrp).xlogid == (logId) && \
	 (xlrp).xrecoff / XLogSegSize == (logSeg))

#define XLByteInPrevSeg(xlrp, logId, logSeg)	\
	((xlrp).xlogid == (logId) && \
	 ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg))
332 333


334
#define XLogFileName(path, log, seg)	\
335 336
			snprintf(path, MAXPGPATH, "%s%c%08X%08X",	\
					 XLogDir, SEP_CHAR, log, seg)
337

T
Tom Lane 已提交
338 339 340 341 342
#define PrevBufIdx(idx)		\
		(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))

#define NextBufIdx(idx)		\
		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
343

344
#define XRecOffIsValid(xrecoff) \
T
Tom Lane 已提交
345 346
		((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \
		(BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord)
347

T
Tom Lane 已提交
348 349 350 351 352 353
/*
 * _INTL_MAXLOGRECSZ: max space needed for a record including header and
 * any backup-block data.
 */
#define _INTL_MAXLOGRECSZ	(SizeOfXLogRecord + MAXLOGRECSZ + \
							 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
354

355

T
Tom Lane 已提交
356 357 358 359 360 361 362 363 364
/* File path names */
static char		XLogDir[MAXPGPATH];
static char		ControlFilePath[MAXPGPATH];

/*
 * Private, possibly out-of-date copy of shared LogwrtResult.
 * See discussion above.
 */
static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
365

T
Tom Lane 已提交
366 367 368 369 370 371 372 373 374 375
/*
 * openLogFile is -1 or a kernel FD for an open log file segment.
 * When it's open, openLogOff is the current seek offset in the file.
 * openLogId/openLogSeg identify the segment.  These variables are only
 * used to write the XLOG, and so will normally refer to the active segment.
 */
static int	openLogFile = -1;
static uint32 openLogId = 0;
static uint32 openLogSeg = 0;
static uint32 openLogOff = 0;
376

T
Tom Lane 已提交
377 378 379 380 381 382
/*
 * These variables are used similarly to the ones above, but for reading
 * the XLOG.  Note, however, that readOff generally represents the offset
 * of the page just read, not the seek position of the FD itself, which
 * will be just past that page.
 */
383 384 385 386
static int	readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
T
Tom Lane 已提交
387 388 389 390 391
/* Buffer for currently read page (BLCKSZ bytes) */
static char *readBuf = NULL;
/* State information for XLOG reading */
static XLogRecPtr ReadRecPtr;
static XLogRecPtr EndRecPtr;
392
static XLogRecord *nextRecord = NULL;
393
static StartUpID lastReadSUI;
394

V
WAL  
Vadim B. Mikheev 已提交
395 396
static bool InRedo = false;

T
Tom Lane 已提交
397 398 399

static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst);
400 401
static int	XLogFileInit(uint32 log, uint32 seg,
						 bool *use_existent, bool use_lock);
T
Tom Lane 已提交
402 403 404 405
static int	XLogFileOpen(uint32 log, uint32 seg, bool econt);
static void PreallocXlogFiles(XLogRecPtr endptr);
static void MoveOfflineLogs(uint32 log, uint32 seg);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
406
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
T
Tom Lane 已提交
407 408 409 410 411 412 413
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
										const char *whichChkpt,
										char *buffer);
static void WriteControlFile(void);
static void ReadControlFile(void);
static char *str_time(time_t tnow);
static void xlog_outrec(char *buf, XLogRecord *record);
414
static void issue_xlog_fsync(void);
T
Tom Lane 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431


/*
 * Insert an XLOG record having the specified RMID and info bytes,
 * with the body of the record being the data chunk(s) described by
 * the rdata list (see xlog.h for notes about rdata).
 *
 * Returns XLOG pointer to end of record (beginning of next record).
 * This can be used as LSN for data pages affected by the logged action.
 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
 * before the data page can be written out.  This implements the basic
 * WAL rule "write the log before the data".)
 *
 * NB: this routine feels free to scribble on the XLogRecData structs,
 * though not on the data they reference.  This is OK since the XLogRecData
 * structs are always just temporaries in the calling code.
 */
432
XLogRecPtr
433
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
434
{
V
Vadim B. Mikheev 已提交
435 436
	XLogCtlInsert  *Insert = &XLogCtl->Insert;
	XLogRecord	   *record;
T
Tom Lane 已提交
437
	XLogContRecord *contrecord;
V
Vadim B. Mikheev 已提交
438
	XLogRecPtr		RecPtr;
T
Tom Lane 已提交
439
	XLogRecPtr		WriteRqst;
440
	uint32			freespace;
V
Vadim B. Mikheev 已提交
441
	uint16			curridx;
442
	XLogRecData	   *rdt;
T
Tom Lane 已提交
443 444 445 446 447 448 449 450
	Buffer			dtbuf[XLR_MAX_BKP_BLOCKS];
	bool			dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
	BkpBlock		dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
	XLogRecPtr		dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
	XLogRecData		dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];
	crc64			rdata_crc;
	uint32			len,
					write_len;
451
	unsigned		i;
T
Tom Lane 已提交
452 453
	bool			do_logwrt;
	bool			updrqst;
V
Vadim B. Mikheev 已提交
454 455 456 457 458 459
	bool			no_tran = (rmid == RM_XLOG_ID) ? true : false;

	if (info & XLR_INFO_MASK)
	{
		if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
			elog(STOP, "XLogInsert: invalid info mask %02X", 
T
Tom Lane 已提交
460
				 (info & XLR_INFO_MASK));
V
Vadim B. Mikheev 已提交
461 462 463 464
		no_tran = true;
		info &= ~XLR_INFO_MASK;
	}

T
Tom Lane 已提交
465 466 467 468
	/*
	 * In bootstrap mode, we don't actually log anything but XLOG resources;
	 * return a phony record pointer.
	 */
V
Vadim B. Mikheev 已提交
469
	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
V
WAL  
Vadim B. Mikheev 已提交
470 471 472 473 474 475
	{
		RecPtr.xlogid = 0;
		RecPtr.xrecoff = SizeOfXLogPHD;	/* start of 1st checkpoint record */
		return (RecPtr);
	}

T
Tom Lane 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
	/*
	 * Here we scan the rdata list, determine which buffers must be backed
	 * up, and compute the CRC values for the data.  Note that the record
	 * header isn't added into the CRC yet since we don't know the final
	 * length or info bits quite yet.
	 *
	 * We may have to loop back to here if a race condition is detected below.
	 * We could prevent the race by doing all this work while holding the
	 * insert spinlock, but it seems better to avoid doing CRC calculations
	 * while holding the lock.  This means we have to be careful about
	 * modifying the rdata list until we know we aren't going to loop back
	 * again.  The only change we allow ourselves to make earlier is to set
	 * rdt->data = NULL in list items we have decided we will have to back
	 * up the whole buffer for.  This is OK because we will certainly decide
	 * the same thing again for those items if we do it over; doing it here
	 * saves an extra pass over the list later.
	 */
493
begin:;
T
Tom Lane 已提交
494 495 496 497 498 499
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
	{
		dtbuf[i] = InvalidBuffer;
		dtbuf_bkp[i] = false;
	}

500
	INIT_CRC64(rdata_crc);
T
Tom Lane 已提交
501 502
	len = 0;
	for (rdt = rdata; ; )
503 504 505
	{
		if (rdt->buffer == InvalidBuffer)
		{
T
Tom Lane 已提交
506
			/* Simple data, just include it */
507 508 509
			len += rdt->len;
			COMP_CRC64(rdata_crc, rdt->data, rdt->len);
		}
T
Tom Lane 已提交
510
		else
511
		{
T
Tom Lane 已提交
512 513
			/* Find info for buffer */
			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
514
			{
T
Tom Lane 已提交
515
				if (rdt->buffer == dtbuf[i])
516
				{
T
Tom Lane 已提交
517 518 519 520 521 522 523 524 525
					/* Buffer already referenced by earlier list item */
					if (dtbuf_bkp[i])
						rdt->data = NULL;
					else if (rdt->data)
					{
						len += rdt->len;
						COMP_CRC64(rdata_crc, rdt->data, rdt->len);
					}
					break;
526
				}
T
Tom Lane 已提交
527
				if (dtbuf[i] == InvalidBuffer)
528
				{
T
Tom Lane 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
					/* OK, put it in this slot */
					dtbuf[i] = rdt->buffer;
					/*
					 * XXX We assume page LSN is first data on page
					 */
					dtbuf_lsn[i] = *((XLogRecPtr*)BufferGetBlock(rdt->buffer));
					if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
					{
						crc64	dtcrc;

						dtbuf_bkp[i] = true;
						rdt->data = NULL;
						INIT_CRC64(dtcrc);
						COMP_CRC64(dtcrc,
								   BufferGetBlock(dtbuf[i]),
								   BLCKSZ);
						dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
						dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
						COMP_CRC64(dtcrc,
								   (char*) &(dtbuf_xlg[i]) + sizeof(crc64),
								   sizeof(BkpBlock) - sizeof(crc64));
						FIN_CRC64(dtcrc);
						dtbuf_xlg[i].crc = dtcrc;
					}
					else if (rdt->data)
					{
						len += rdt->len;
						COMP_CRC64(rdata_crc, rdt->data, rdt->len);
					}
					break;
559 560
				}
			}
T
Tom Lane 已提交
561 562 563
			if (i >= XLR_MAX_BKP_BLOCKS)
				elog(STOP, "XLogInsert: can backup %d blocks at most",
					 XLR_MAX_BKP_BLOCKS);
564
		}
T
Tom Lane 已提交
565
		/* Break out of loop when rdt points to last list item */
566 567 568 569 570
		if (rdt->next == NULL)
			break;
		rdt = rdt->next;
	}

T
Tom Lane 已提交
571 572 573 574 575 576 577 578
	/*
	 * NOTE: the test for len == 0 here is somewhat fishy, since in theory
	 * all of the rmgr data might have been suppressed in favor of backup
	 * blocks.  Currently, all callers of XLogInsert provide at least some
	 * not-in-a-buffer data and so len == 0 should never happen, but that
	 * may not be true forever.  If you need to remove the len == 0 check,
	 * also remove the check for xl_len == 0 in ReadRecord, below.
	 */
579 580 581
	if (len == 0 || len > MAXLOGRECSZ)
		elog(STOP, "XLogInsert: invalid record len %u", len);

582
	START_CRIT_SECTION();
583

T
Tom Lane 已提交
584 585
	/* wait to obtain xlog insert lock */
	do_logwrt = true;
586

T
Tom Lane 已提交
587 588 589 590
	for (i = 0;;)
	{
		/* try to update LogwrtResult while waiting for insert lock */
		if (!TAS(&(XLogCtl->info_lck)))
591
		{
T
Tom Lane 已提交
592
			XLogwrtRqst	LogwrtRqst;
593

T
Tom Lane 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
			LogwrtRqst = XLogCtl->LogwrtRqst;
			LogwrtResult = XLogCtl->LogwrtResult;
			S_UNLOCK(&(XLogCtl->info_lck));

			/*
			 * If cache is half filled then try to acquire logwrt lock
			 * and do LOGWRT work, but only once per XLogInsert call.
			 * Ignore any fractional blocks in performing this check.
			 */
			LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
			if (do_logwrt &&
				(LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
				 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
				  XLogCtl->XLogCacheByte / 2)))
			{
				if (!TAS(&(XLogCtl->logwrt_lck)))
610
				{
T
Tom Lane 已提交
611 612
					LogwrtResult = XLogCtl->Write.LogwrtResult;
					if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
613
					{
T
Tom Lane 已提交
614 615
						XLogWrite(LogwrtRqst);
						do_logwrt = false;
616
					}
T
Tom Lane 已提交
617
					S_UNLOCK(&(XLogCtl->logwrt_lck));
618 619 620
				}
			}
		}
T
Tom Lane 已提交
621 622 623
		if (!TAS(&(XLogCtl->insert_lck)))
			break;
		S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT);
624 625
	}

T
Tom Lane 已提交
626 627 628 629 630 631
	/*
	 * Check to see if my RedoRecPtr is out of date.  If so, may have to
	 * go back and recompute everything.  This can only happen just after a
	 * checkpoint, so it's better to be slow in this case and fast otherwise.
	 */
	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
632
	{
T
Tom Lane 已提交
633 634 635 636
		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
		RedoRecPtr = Insert->RedoRecPtr;

		for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
637
		{
T
Tom Lane 已提交
638 639 640 641 642 643 644 645 646 647 648 649 650
			if (dtbuf[i] == InvalidBuffer)
				continue;
			if (dtbuf_bkp[i] == false &&
				XLByteLE(dtbuf_lsn[i], RedoRecPtr))
			{
				/*
				 * Oops, this buffer now needs to be backed up, but we didn't
				 * think so above.  Start over.
				 */
				S_UNLOCK(&(XLogCtl->insert_lck));
				END_CRIT_SECTION();
				goto begin;
			}
651 652 653
		}
	}

T
Tom Lane 已提交
654 655 656 657 658 659 660 661 662 663 664 665
	/*
	 * Make additional rdata list entries for the backup blocks, so that
	 * we don't need to special-case them in the write loop.  Note that we
	 * have now irrevocably changed the input rdata list.  At the exit of
	 * this loop, write_len includes the backup block data.
	 *
	 * Also set the appropriate info bits to show which buffers were backed
	 * up.  The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
	 * buffer value (ignoring InvalidBuffer) appearing in the rdata list.
	 */
	write_len = len;
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
666 667 668 669
	{
		if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
			continue;

T
Tom Lane 已提交
670
		info |= XLR_SET_BKP_BLOCK(i);
671 672 673

		rdt->next = &(dtbuf_rdt[2 * i]);

T
Tom Lane 已提交
674
		dtbuf_rdt[2 * i].data = (char*) &(dtbuf_xlg[i]);
675
		dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
T
Tom Lane 已提交
676
		write_len += sizeof(BkpBlock);
677 678 679

		rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);

T
Tom Lane 已提交
680
		dtbuf_rdt[2 * i + 1].data = (char*) BufferGetBlock(dtbuf[i]);
681
		dtbuf_rdt[2 * i + 1].len = BLCKSZ;
T
Tom Lane 已提交
682
		write_len += BLCKSZ;
683 684 685
		dtbuf_rdt[2 * i + 1].next = NULL;
	}

T
Tom Lane 已提交
686
	/* Insert record header */
687

T
Tom Lane 已提交
688 689
	updrqst = false;
	freespace = INSERT_FREESPACE(Insert);
690 691
	if (freespace < SizeOfXLogRecord)
	{
T
Tom Lane 已提交
692
		updrqst = AdvanceXLInsertBuffer();
693 694 695
		freespace = BLCKSZ - SizeOfXLogPHD;
	}

T
Tom Lane 已提交
696
	curridx = Insert->curridx;
697
	record = (XLogRecord *) Insert->currpos;
T
Tom Lane 已提交
698

699
	record->xl_prev = Insert->PrevRecord;
V
Vadim B. Mikheev 已提交
700
	if (no_tran)
701 702 703 704
	{
		record->xl_xact_prev.xlogid = 0;
		record->xl_xact_prev.xrecoff = 0;
	}
V
Vadim B. Mikheev 已提交
705 706 707
	else
		record->xl_xact_prev = MyLastRecPtr;

708
	record->xl_xid = GetCurrentTransactionId();
T
Tom Lane 已提交
709
	record->xl_len = len;		/* doesn't include backup blocks */
710
	record->xl_info = info;
711
	record->xl_rmid = rmid;
712

T
Tom Lane 已提交
713 714 715
	/* Now we can finish computing the main CRC */
	COMP_CRC64(rdata_crc, (char*) record + sizeof(crc64),
			   SizeOfXLogRecord - sizeof(crc64));
716 717 718
	FIN_CRC64(rdata_crc);
	record->xl_crc = rdata_crc;

T
Tom Lane 已提交
719 720 721 722
	/* Compute record's XLOG location */
	INSERT_RECPTR(RecPtr, Insert, curridx);

	/* If first XLOG record of transaction, save it in PROC array */
V
Vadim B. Mikheev 已提交
723
	if (MyLastRecPtr.xrecoff == 0 && !no_tran)
724 725 726 727 728
	{
		SpinAcquire(SInvalLock);
		MyProc->logRec = RecPtr;
		SpinRelease(SInvalLock);
	}
V
WAL  
Vadim B. Mikheev 已提交
729 730 731 732 733 734 735

	if (XLOG_DEBUG)
	{
		char	buf[8192];

		sprintf(buf, "INSERT @ %u/%u: ", RecPtr.xlogid, RecPtr.xrecoff);
		xlog_outrec(buf, record);
736
		if (rdata->data != NULL)
V
WAL  
Vadim B. Mikheev 已提交
737 738
		{
			strcat(buf, " - ");
739
			RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
V
WAL  
Vadim B. Mikheev 已提交
740
		}
T
Tom Lane 已提交
741
		fprintf(stderr, "%s\n", buf);
V
WAL  
Vadim B. Mikheev 已提交
742 743
	}

T
Tom Lane 已提交
744 745 746 747 748 749
	/* Record begin of record in appropriate places */
	if (!no_tran)
		MyLastRecPtr = RecPtr;
	ProcLastRecPtr = RecPtr;
	Insert->PrevRecord = RecPtr;

750
	Insert->currpos += SizeOfXLogRecord;
T
Tom Lane 已提交
751
	freespace -= SizeOfXLogRecord;
752

T
Tom Lane 已提交
753 754 755 756
	/*
	 * Append the data, including backup blocks if any
	 */
	while (write_len)
757
	{
758 759 760 761
		while (rdata->data == NULL)
			rdata = rdata->next;

		if (freespace > 0)
762
		{
763 764 765 766 767
			if (rdata->len > freespace)
			{
				memcpy(Insert->currpos, rdata->data, freespace);
				rdata->data += freespace;
				rdata->len -= freespace;
T
Tom Lane 已提交
768
				write_len -= freespace;
769 770 771 772 773
			}
			else
			{
				memcpy(Insert->currpos, rdata->data, rdata->len);
				freespace -= rdata->len;
T
Tom Lane 已提交
774
				write_len -= rdata->len;
775 776 777 778
				Insert->currpos += rdata->len;
				rdata = rdata->next;
				continue;
			}
779 780
		}

781
		/* Use next buffer */
T
Tom Lane 已提交
782 783 784 785 786 787 788 789
		updrqst = AdvanceXLInsertBuffer();
		curridx = Insert->curridx;
		/* Insert cont-record header */
		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
		contrecord = (XLogContRecord *) Insert->currpos;
		contrecord->xl_rem_len = write_len;
		Insert->currpos += SizeOfXLogContRecord;
		freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
790
	}
791

T
Tom Lane 已提交
792 793 794 795
	/* Ensure next record will be properly aligned */
	Insert->currpos = (char *) Insert->currpage +
			MAXALIGN(Insert->currpos - (char *) Insert->currpage);
	freespace = INSERT_FREESPACE(Insert);
796

V
Vadim B. Mikheev 已提交
797
	/*
T
Tom Lane 已提交
798 799
	 * The recptr I return is the beginning of the *next* record.
	 * This will be stored as LSN for changed data pages...
V
Vadim B. Mikheev 已提交
800
	 */
T
Tom Lane 已提交
801
	INSERT_RECPTR(RecPtr, Insert, curridx);
V
Vadim B. Mikheev 已提交
802

T
Tom Lane 已提交
803
	/* Need to update shared LogwrtRqst if some block was filled up */
804
	if (freespace < SizeOfXLogRecord)
805
		updrqst = true;	/* curridx is filled and available for writing out */
806 807
	else
		curridx = PrevBufIdx(curridx);
T
Tom Lane 已提交
808
	WriteRqst = XLogCtl->xlblocks[curridx];
809 810 811 812 813

	S_UNLOCK(&(XLogCtl->insert_lck));

	if (updrqst)
	{
814
		S_LOCK(&(XLogCtl->info_lck));
T
Tom Lane 已提交
815 816 817 818 819
		/* advance global request to include new block(s) */
		if (XLByteLT(XLogCtl->LogwrtRqst.Write, WriteRqst))
			XLogCtl->LogwrtRqst.Write = WriteRqst;
		/* update local result copy while I have the chance */
		LogwrtResult = XLogCtl->LogwrtResult;
820
		S_UNLOCK(&(XLogCtl->info_lck));
821 822
	}

823
	END_CRIT_SECTION();
824
	return (RecPtr);
825
}
826

T
Tom Lane 已提交
827 828 829 830 831 832 833 834 835 836 837 838 839
/*
 * Advance the Insert state to the next buffer page, writing out the next
 * buffer if it still contains unwritten data.
 *
 * The global LogwrtRqst.Write pointer needs to be advanced to include the
 * just-filled page.  If we can do this for free (without an extra spinlock),
 * we do so here.  Otherwise the caller must do it.  We return TRUE if the
 * request update still needs to be done, FALSE if we did it internally.
 *
 * Must be called with insert_lck held.
 */
static bool
AdvanceXLInsertBuffer(void)
840
{
T
Tom Lane 已提交
841 842 843 844 845 846
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	XLogCtlWrite *Write = &XLogCtl->Write;
	uint16		nextidx = NextBufIdx(Insert->curridx);
	bool		update_needed = true;
	XLogRecPtr	OldPageRqstPtr;
	XLogwrtRqst WriteRqst;
847

T
Tom Lane 已提交
848 849 850
	/* Use Insert->LogwrtResult copy if it's more fresh */
	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
		LogwrtResult = Insert->LogwrtResult;
V
WAL  
Vadim B. Mikheev 已提交
851

T
Tom Lane 已提交
852 853 854 855 856 857 858 859 860 861 862
	/*
	 * Get ending-offset of the buffer page we need to replace (this may be
	 * zero if the buffer hasn't been used yet).  Fall through if it's already
	 * written out.
	 */
	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
	{
		/* nope, got work to do... */
		unsigned	spins = 0;
		XLogRecPtr	FinishedPageRqstPtr;
863

T
Tom Lane 已提交
864
		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
865

T
Tom Lane 已提交
866
		for (;;)
867
		{
T
Tom Lane 已提交
868 869
			/* While waiting, try to get info_lck and update LogwrtResult */
			if (!TAS(&(XLogCtl->info_lck)))
870
			{
T
Tom Lane 已提交
871 872 873 874
				if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr))
					XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr;
				update_needed = false; /* Did the shared-request update */
				LogwrtResult = XLogCtl->LogwrtResult;
875 876
				S_UNLOCK(&(XLogCtl->info_lck));

T
Tom Lane 已提交
877
				if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
878
				{
T
Tom Lane 已提交
879 880 881
					/* OK, someone wrote it already */
					Insert->LogwrtResult = LogwrtResult;
					break;
882
				}
T
Tom Lane 已提交
883 884 885 886 887 888 889 890 891 892
			}

			/*
			 * LogwrtResult lock is busy or we know the page is still dirty.
			 * Try to acquire logwrt lock and write full blocks.
			 */
			if (!TAS(&(XLogCtl->logwrt_lck)))
			{
				LogwrtResult = Write->LogwrtResult;
				if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
893
				{
T
Tom Lane 已提交
894 895 896 897
					S_UNLOCK(&(XLogCtl->logwrt_lck));
					/* OK, someone wrote it already */
					Insert->LogwrtResult = LogwrtResult;
					break;
898
				}
T
Tom Lane 已提交
899 900 901 902 903 904 905 906 907 908 909
				/*
				 * Have to write buffers while holding insert lock.
				 * This is not good, so only write as much as we absolutely
				 * must.
				 */
				WriteRqst.Write = OldPageRqstPtr;
				WriteRqst.Flush.xlogid = 0;
				WriteRqst.Flush.xrecoff = 0;
				XLogWrite(WriteRqst);
				S_UNLOCK(&(XLogCtl->logwrt_lck));
				Insert->LogwrtResult = LogwrtResult;
910 911
				break;
			}
T
Tom Lane 已提交
912
			S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
913 914 915
		}
	}

T
Tom Lane 已提交
916 917 918 919 920
	/*
	 * Now the next buffer slot is free and we can set it up to be the
	 * next output page.
	 */
	if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
921
	{
T
Tom Lane 已提交
922 923 924 925
		/* crossing a logid boundary */
		XLogCtl->xlblocks[nextidx].xlogid =
			XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
		XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
926
	}
T
Tom Lane 已提交
927
	else
928
	{
T
Tom Lane 已提交
929 930 931 932
		XLogCtl->xlblocks[nextidx].xlogid =
			XLogCtl->xlblocks[Insert->curridx].xlogid;
		XLogCtl->xlblocks[nextidx].xrecoff =
			XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
933
	}
T
Tom Lane 已提交
934 935 936 937 938 939 940 941 942 943
	Insert->curridx = nextidx;
	Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
	Insert->currpos = ((char*) Insert->currpage) + SizeOfXLogPHD;
	/*
	 * Be sure to re-zero the buffer so that bytes beyond what we've written
	 * will look like zeroes and not valid XLOG records...
	 */
	MemSet((char*) Insert->currpage, 0, BLCKSZ);
	Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
	/* Insert->currpage->xlp_info = 0; */	/* done by memset */
944
	Insert->currpage->xlp_sui = ThisStartUpID;
T
Tom Lane 已提交
945 946

	return update_needed;
947 948
}

T
Tom Lane 已提交
949 950 951 952 953
/*
 * Write and/or fsync the log at least as far as WriteRqst indicates.
 *
 * Must be called with logwrt_lck held.
 */
954
static void
T
Tom Lane 已提交
955
XLogWrite(XLogwrtRqst WriteRqst)
956
{
957 958
	XLogCtlWrite *Write = &XLogCtl->Write;
	char	   *from;
T
Tom Lane 已提交
959
	bool		ispartialpage;
960
	bool		use_existent;
961

T
Tom Lane 已提交
962 963 964 965
	/* Update local LogwrtResult (caller probably did this already, but...) */
	LogwrtResult = Write->LogwrtResult;

	while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
966
	{
967 968 969 970 971 972 973 974 975
		/*
		 * Make sure we're not ahead of the insert process.  This could
		 * happen if we're passed a bogus WriteRqst.Write that is past the
		 * end of the last page that's been initialized by
		 * AdvanceXLInsertBuffer.
		 */
		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
			elog(STOP, "XLogWrite: write request is past end of log");

T
Tom Lane 已提交
976 977 978 979 980
		/* Advance LogwrtResult.Write to end of current buffer page */
		LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);

		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
981
		{
T
Tom Lane 已提交
982 983 984 985
			/*
			 * Switch to new logfile segment.
			 */
			if (openLogFile >= 0)
986
			{
T
Tom Lane 已提交
987
				if (close(openLogFile) != 0)
988
					elog(STOP, "close(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
989 990
						 openLogId, openLogSeg);
				openLogFile = -1;
991
			}
T
Tom Lane 已提交
992 993
			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);

994 995 996 997
			/* create/use new log file */
			use_existent = true;
			openLogFile = XLogFileInit(openLogId, openLogSeg,
									   &use_existent, true);
T
Tom Lane 已提交
998
			openLogOff = 0;
999 1000 1001 1002 1003

			if (!use_existent)	/* there was no precreated file */
				elog(LOG, "XLogWrite: new log file created - "
					 "consider increasing WAL_FILES");

T
Tom Lane 已提交
1004
			/* update pg_control, unless someone else already did */
1005
			SpinAcquire(ControlFileLockId);
T
Tom Lane 已提交
1006 1007 1008 1009 1010 1011 1012
			if (ControlFile->logId != openLogId ||
				ControlFile->logSeg != openLogSeg + 1)
			{
				ControlFile->logId = openLogId;
				ControlFile->logSeg = openLogSeg + 1;
				ControlFile->time = time(NULL);
				UpdateControlFile();
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
				/*
				 * Signal postmaster to start a checkpoint if it's been too
				 * long since the last one.  (We look at local copy of
				 * RedoRecPtr which might be a little out of date, but should
				 * be close enough for this purpose.)
				 */
				if (IsUnderPostmaster &&
					(openLogId != RedoRecPtr.xlogid ||
					 openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
					 (uint32) CheckPointSegments))
				{
					if (XLOG_DEBUG)
						fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
					kill(getppid(), SIGUSR1);
				}
T
Tom Lane 已提交
1028
			}
1029 1030 1031
			SpinRelease(ControlFileLockId);
		}

T
Tom Lane 已提交
1032
		if (openLogFile < 0)
1033
		{
T
Tom Lane 已提交
1034 1035 1036
			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
			openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
			openLogOff = 0;
1037 1038
		}

T
Tom Lane 已提交
1039 1040
		/* Need to seek in the file? */
		if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
1041
		{
T
Tom Lane 已提交
1042 1043
			openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
			if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
1044
				elog(STOP, "lseek(logfile %u seg %u off %u) failed: %m",
T
Tom Lane 已提交
1045
					 openLogId, openLogSeg, openLogOff);
1046 1047
		}

T
Tom Lane 已提交
1048 1049 1050
		/* OK to write the page */
		from = XLogCtl->pages + Write->curridx * BLCKSZ;
		if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
1051
			elog(STOP, "write(logfile %u seg %u off %u) failed: %m",
T
Tom Lane 已提交
1052 1053
				 openLogId, openLogSeg, openLogOff);
		openLogOff += BLCKSZ;
1054

T
Tom Lane 已提交
1055 1056 1057 1058 1059 1060 1061 1062 1063
		/*
		 * If we just wrote the whole last page of a logfile segment,
		 * fsync the segment immediately.  This avoids having to go back
		 * and re-open prior segments when an fsync request comes along later.
		 * Doing it here ensures that one and only one backend will perform
		 * this fsync.
		 */
		if (openLogOff >= XLogSegSize && !ispartialpage)
		{
1064
			issue_xlog_fsync();
T
Tom Lane 已提交
1065 1066
			LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */
		}
1067

T
Tom Lane 已提交
1068 1069 1070 1071 1072 1073 1074
		if (ispartialpage)
		{
			/* Only asked to write a partial page */
			LogwrtResult.Write = WriteRqst.Write;
			break;
		}
		Write->curridx = NextBufIdx(Write->curridx);
1075 1076
	}

T
Tom Lane 已提交
1077 1078 1079 1080 1081
	/*
	 * If asked to flush, do so
	 */
	if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
		XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1082
	{
T
Tom Lane 已提交
1083 1084 1085 1086 1087
		/*
		 * Could get here without iterating above loop, in which case
		 * we might have no open file or the wrong one.  However, we do
		 * not need to fsync more than one file.
		 */
1088
		if (sync_method != SYNC_METHOD_OPEN)
T
Tom Lane 已提交
1089
		{
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104
			if (openLogFile >= 0 &&
				!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
			{
				if (close(openLogFile) != 0)
					elog(STOP, "close(logfile %u seg %u) failed: %m",
						 openLogId, openLogSeg);
				openLogFile = -1;
			}
			if (openLogFile < 0)
			{
				XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
				openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
				openLogOff = 0;
			}
			issue_xlog_fsync();
T
Tom Lane 已提交
1105 1106
		}
		LogwrtResult.Flush = LogwrtResult.Write;
1107 1108
	}

T
Tom Lane 已提交
1109 1110 1111 1112 1113 1114 1115
	/*
	 * Update shared-memory status
	 *
	 * We make sure that the shared 'request' values do not fall behind
	 * the 'result' values.  This is not absolutely essential, but it saves
	 * some code in a couple of places.
	 */
1116
	S_LOCK(&(XLogCtl->info_lck));
T
Tom Lane 已提交
1117 1118 1119 1120 1121
	XLogCtl->LogwrtResult = LogwrtResult;
	if (XLByteLT(XLogCtl->LogwrtRqst.Write, LogwrtResult.Write))
		XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
	if (XLByteLT(XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush))
		XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
1122 1123
	S_UNLOCK(&(XLogCtl->info_lck));

T
Tom Lane 已提交
1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
	Write->LogwrtResult = LogwrtResult;
}

/*
 * Ensure that all XLOG data through the given position is flushed to disk.
 *
 * NOTE: this differs from XLogWrite mainly in that the logwrt_lck is not
 * already held, and we try to avoid acquiring it if possible.
 */
void
XLogFlush(XLogRecPtr record)
{
	XLogRecPtr	WriteRqstPtr;
	XLogwrtRqst WriteRqst;
	unsigned	spins = 0;

	if (XLOG_DEBUG)
	{
		fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n",
				(IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
				(InRedo) ? "(redo)" : "",
				record.xlogid, record.xrecoff,
				LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
				LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
		fflush(stderr);
	}

	/* Disabled during REDO */
	if (InRedo)
		return;

	/* Quick exit if already known flushed */
	if (XLByteLE(record, LogwrtResult.Flush))
		return;

	START_CRIT_SECTION();

	/*
	 * Since fsync is usually a horribly expensive operation, we try to
	 * piggyback as much data as we can on each fsync: if we see any more
	 * data entered into the xlog buffer, we'll write and fsync that too,
	 * so that the final value of LogwrtResult.Flush is as large as possible.
	 * This gives us some chance of avoiding another fsync immediately after.
	 */

	/* initialize to given target; may increase below */
	WriteRqstPtr = record;

	for (;;)
	{
		/* try to read LogwrtResult and update local state */
		if (!TAS(&(XLogCtl->info_lck)))
		{
			if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write))
				WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
			LogwrtResult = XLogCtl->LogwrtResult;
			S_UNLOCK(&(XLogCtl->info_lck));
			if (XLByteLE(record, LogwrtResult.Flush))
			{
				/* Done already */
				break;
			}
		}
		/* if something was added to log cache then try to flush this too */
		if (!TAS(&(XLogCtl->insert_lck)))
		{
			XLogCtlInsert *Insert = &XLogCtl->Insert;
			uint32		freespace = INSERT_FREESPACE(Insert);

			if (freespace < SizeOfXLogRecord)	/* buffer is full */
			{
				WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
			}
			else
			{
				WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
				WriteRqstPtr.xrecoff -= freespace;
			}
			S_UNLOCK(&(XLogCtl->insert_lck));
		}
		/* now try to get the logwrt lock */
		if (!TAS(&(XLogCtl->logwrt_lck)))
		{
			LogwrtResult = XLogCtl->Write.LogwrtResult;
			if (XLByteLE(record, LogwrtResult.Flush))
			{
				/* Done already */
				S_UNLOCK(&(XLogCtl->logwrt_lck));
				break;
			}
			WriteRqst.Write = WriteRqstPtr;
			WriteRqst.Flush = record;
			XLogWrite(WriteRqst);
			S_UNLOCK(&(XLogCtl->logwrt_lck));
			if (XLByteLT(LogwrtResult.Flush, record))
				elog(STOP, "XLogFlush: request is not satisfied");
			break;
		}
		S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
	}

	END_CRIT_SECTION();
1226 1227
}

T
Tom Lane 已提交
1228 1229 1230
/*
 * Create a new XLOG file segment, or open a pre-existing one.
 *
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240
 * log, seg: identify segment to be created/opened.
 *
 * *use_existent: if TRUE, OK to use a pre-existing file (else, any
 * pre-existing file will be deleted).  On return, TRUE if a pre-existing
 * file was used.
 *
 * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
 * place.  This should be TRUE except during bootstrap log creation.  The
 * caller must *not* hold the spinlock at call.
 *
T
Tom Lane 已提交
1241 1242
 * Returns FD of opened file.
 */
1243
static int
1244 1245
XLogFileInit(uint32 log, uint32 seg,
			 bool *use_existent, bool use_lock)
1246
{
1247
	char		path[MAXPGPATH];
1248 1249
	char		tmppath[MAXPGPATH];
	char		targpath[MAXPGPATH];
1250
	char		zbuffer[BLCKSZ];
1251 1252
	uint32		targlog,
				targseg;
1253
	int			fd;
1254
	int			nbytes;
1255 1256

	XLogFileName(path, log, seg);
V
Vadim B. Mikheev 已提交
1257 1258

	/*
T
Tom Lane 已提交
1259
	 * Try to use existent file (checkpoint maker may have created it already)
V
Vadim B. Mikheev 已提交
1260
	 */
1261
	if (*use_existent)
V
Vadim B. Mikheev 已提交
1262
	{
1263 1264
		fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
						   S_IRUSR | S_IWUSR);
V
Vadim B. Mikheev 已提交
1265 1266 1267
		if (fd < 0)
		{
			if (errno != ENOENT)
1268
				elog(STOP, "InitOpen(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1269
					 log, seg);
V
Vadim B. Mikheev 已提交
1270 1271 1272 1273 1274
		}
		else
			return(fd);
	}

1275 1276 1277 1278 1279 1280 1281 1282 1283 1284
	/*
	 * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
	 * another process is doing the same thing.  If so, we will end up
	 * pre-creating an extra log segment.  That seems OK, and better than
	 * holding the spinlock throughout this lengthy process.
	 */
	snprintf(tmppath, MAXPGPATH, "%s%cxlogtemp.%d",
			 XLogDir, SEP_CHAR, (int) getpid());

	unlink(tmppath);
1285

1286
	/* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1287
	fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
T
Tom Lane 已提交
1288
					   S_IRUSR | S_IWUSR);
1289
	if (fd < 0)
1290
		elog(STOP, "InitCreate(%s) failed: %m", tmppath);
1291

1292 1293 1294 1295 1296 1297
	/*
	 * Zero-fill the file.  We have to do this the hard way to ensure that
	 * all the file space has really been allocated --- on platforms that
	 * allow "holes" in files, just seeking to the end doesn't allocate
	 * intermediate space.  This way, we know that we have all the space
	 * and (after the fsync below) that all the indirect blocks are down
1298 1299
	 * on disk.  Therefore, fdatasync(2) or O_DSYNC will be sufficient to
	 * sync future writes to the log file.
1300 1301 1302 1303 1304
	 */
	MemSet(zbuffer, 0, sizeof(zbuffer));
	for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
	{
		if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
T
Tom Lane 已提交
1305 1306 1307 1308
		{
			int		save_errno = errno;

			/* If we fail to make the file, delete it to release disk space */
1309
			unlink(tmppath);
T
Tom Lane 已提交
1310 1311
			errno = save_errno;

1312
			elog(STOP, "ZeroFill(%s) failed: %m", tmppath);
T
Tom Lane 已提交
1313
		}
1314
	}
1315

1316
	if (pg_fsync(fd) != 0)
1317
		elog(STOP, "fsync(%s) failed: %m", tmppath);
1318

V
Vadim B. Mikheev 已提交
1319
	close(fd);
T
Tom Lane 已提交
1320

1321
	/*
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
	 * Now move the segment into place with its final name.  We want to be
	 * sure that only one process does this at a time.
	 */
	if (use_lock)
		SpinAcquire(ControlFileLockId);

	/*
	 * If caller didn't want to use a pre-existing file, get rid of any
	 * pre-existing file.  Otherwise, cope with possibility that someone
	 * else has created the file while we were filling ours: if so, use
	 * ours to pre-create a future log segment.
	 */
	targlog = log;
	targseg = seg;
	strcpy(targpath, path);

	if (! *use_existent)
	{
		unlink(targpath);
	}
	else
	{
		while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
								   S_IRUSR | S_IWUSR)) >= 0)
		{
			close(fd);
			NextLogSeg(targlog, targseg);
			XLogFileName(targpath, targlog, targseg);
		}
	}

	/*
	 * Prefer link() to rename() here just to be really sure that we don't
	 * overwrite an existing logfile.  However, there shouldn't be one, so
	 * rename() is an acceptable substitute except for the truly paranoid.
1357
	 */
1358
#ifndef __BEOS__
1359
	if (link(tmppath, targpath) < 0)
1360
		elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
1361 1362
			 targlog, targseg);
	unlink(tmppath);
1363
#else
1364
	if (rename(tmppath, targpath) < 0)
T
Tom Lane 已提交
1365
		elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
1366
			 targlog, targseg);
1367
#endif
V
Vadim B. Mikheev 已提交
1368

1369 1370 1371 1372 1373 1374 1375
	if (use_lock)
		SpinRelease(ControlFileLockId);

	/* Set flag to tell caller there was no existent file */
	*use_existent = false;

	/* Now open original target segment (might not be file I just made) */
1376 1377
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
					   S_IRUSR | S_IWUSR);
V
Vadim B. Mikheev 已提交
1378
	if (fd < 0)
1379
		elog(STOP, "InitReopen(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1380
			 log, seg);
V
Vadim B. Mikheev 已提交
1381

1382
	return (fd);
1383 1384
}

T
Tom Lane 已提交
1385 1386 1387
/*
 * Open a pre-existing logfile segment.
 */
1388 1389 1390
static int
XLogFileOpen(uint32 log, uint32 seg, bool econt)
{
1391 1392
	char		path[MAXPGPATH];
	int			fd;
1393 1394 1395

	XLogFileName(path, log, seg);

1396 1397
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
					   S_IRUSR | S_IWUSR);
1398 1399 1400 1401
	if (fd < 0)
	{
		if (econt && errno == ENOENT)
		{
1402
			elog(LOG, "open(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1403
				 log, seg);
1404 1405
			return (fd);
		}
1406
		elog(STOP, "open(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1407
			 log, seg);
1408 1409
	}

1410
	return (fd);
1411 1412
}

V
Vadim B. Mikheev 已提交
1413
/*
T
Tom Lane 已提交
1414 1415 1416 1417 1418 1419 1420 1421 1422
 * Preallocate log files beyond the specified log endpoint, according to
 * the XLOGfile user parameter.
 */
static void
PreallocXlogFiles(XLogRecPtr endptr)
{
	uint32		_logId;
	uint32		_logSeg;
	int			lf;
1423
	bool		use_existent;
T
Tom Lane 已提交
1424 1425 1426 1427 1428 1429 1430 1431
	int			i;

	XLByteToPrevSeg(endptr, _logId, _logSeg);
	if (XLOGfiles > 0)
	{
		for (i = 1; i <= XLOGfiles; i++)
		{
			NextLogSeg(_logId, _logSeg);
1432 1433
			use_existent = true;
			lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
T
Tom Lane 已提交
1434 1435 1436 1437 1438 1439 1440
			close(lf);
		}
	}
	else if ((endptr.xrecoff - 1) % XLogSegSize >=
			 (uint32) (0.75 * XLogSegSize))
	{
		NextLogSeg(_logId, _logSeg);
1441 1442
		use_existent = true;
		lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
T
Tom Lane 已提交
1443 1444 1445 1446 1447 1448
		close(lf);
	}
}

/*
 * Remove or move offline all log files older or equal to passed log/seg#
V
Vadim B. Mikheev 已提交
1449 1450
 */
static void
T
Tom Lane 已提交
1451
MoveOfflineLogs(uint32 log, uint32 seg)
V
Vadim B. Mikheev 已提交
1452 1453 1454 1455 1456 1457
{
	DIR			   *xldir;
	struct dirent  *xlde;
	char			lastoff[32];
	char			path[MAXPGPATH];

T
Tom Lane 已提交
1458
	Assert(XLOG_archive_dir[0] == 0);	/* ! implemented yet */
V
Vadim B. Mikheev 已提交
1459 1460 1461

	xldir = opendir(XLogDir);
	if (xldir == NULL)
1462
		elog(STOP, "MoveOfflineLogs: cannot open xlog dir: %m");
V
Vadim B. Mikheev 已提交
1463

T
Tom Lane 已提交
1464
	sprintf(lastoff, "%08X%08X", log, seg);
V
Vadim B. Mikheev 已提交
1465 1466 1467 1468

	errno = 0;
	while ((xlde = readdir(xldir)) != NULL)
	{
T
Tom Lane 已提交
1469 1470 1471
		if (strlen(xlde->d_name) == 16 &&
			strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
			strcmp(xlde->d_name, lastoff) <= 0)
V
Vadim B. Mikheev 已提交
1472
		{
T
Tom Lane 已提交
1473 1474 1475 1476 1477
			elog(LOG, "MoveOfflineLogs: %s %s", (XLOG_archive_dir[0]) ? 
				 "archive" : "remove", xlde->d_name);
			sprintf(path, "%s%c%s",	XLogDir, SEP_CHAR, xlde->d_name);
			if (XLOG_archive_dir[0] == 0)
				unlink(path);
V
Vadim B. Mikheev 已提交
1478 1479 1480 1481
		}
		errno = 0;
	}
	if (errno)
1482
		elog(STOP, "MoveOfflineLogs: cannot read xlog dir: %m");
V
Vadim B. Mikheev 已提交
1483 1484 1485
	closedir(xldir);
}

T
Tom Lane 已提交
1486 1487 1488 1489 1490
/*
 * Restore the backup blocks present in an XLOG record, if any.
 *
 * We assume all of the record has been read into memory at *record.
 */
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
static void
RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
{
	Relation	reln;
	Buffer		buffer;
	Page		page;
	BkpBlock	bkpb;
	char	   *blk;
	int			i;

T
Tom Lane 已提交
1501 1502
	blk = (char*)XLogRecGetData(record) + record->xl_len;
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1503
	{
T
Tom Lane 已提交
1504
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
			continue;

		memcpy((char*)&bkpb, blk, sizeof(BkpBlock));
		blk += sizeof(BkpBlock);

		reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node);

		if (reln)
		{
			buffer = XLogReadBuffer(true, reln, bkpb.block);
			if (BufferIsValid(buffer))
			{
				page = (Page) BufferGetPage(buffer);
				memcpy((char*)page, blk, BLCKSZ);
				PageSetLSN(page, lsn);
				PageSetSUI(page, ThisStartUpID);
				UnlockAndWriteBuffer(buffer);
			}
		}

		blk += BLCKSZ;
	}
}

T
Tom Lane 已提交
1529 1530 1531 1532 1533 1534 1535
/*
 * CRC-check an XLOG record.  We do not believe the contents of an XLOG
 * record (other than to the minimal extent of computing the amount of
 * data to read in) until we've checked the CRCs.
 *
 * We assume all of the record has been read into memory at *record.
 */
1536 1537 1538 1539 1540 1541 1542 1543 1544
static bool
RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
{
	crc64		crc;
	crc64		cbuf;
	int			i;
	uint32		len = record->xl_len;
	char	   *blk;

T
Tom Lane 已提交
1545
	/* Check CRC of rmgr data and record header */
1546
	INIT_CRC64(crc);
T
Tom Lane 已提交
1547 1548 1549
	COMP_CRC64(crc, XLogRecGetData(record), len);
	COMP_CRC64(crc, (char*) record + sizeof(crc64),
			   SizeOfXLogRecord - sizeof(crc64));
1550 1551
	FIN_CRC64(crc);

T
Tom Lane 已提交
1552
	if (!EQ_CRC64(record->xl_crc, crc))
1553 1554
	{
		elog(emode, "ReadRecord: bad rmgr data CRC in record at %u/%u",
T
Tom Lane 已提交
1555
			 recptr.xlogid, recptr.xrecoff);
1556 1557 1558
		return(false);
	}

T
Tom Lane 已提交
1559 1560 1561
	/* Check CRCs of backup blocks, if any */
	blk = (char*)XLogRecGetData(record) + len;
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1562
	{
T
Tom Lane 已提交
1563
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1564 1565 1566
			continue;

		INIT_CRC64(crc);
T
Tom Lane 已提交
1567 1568 1569
		COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ);
		COMP_CRC64(crc, blk + sizeof(crc64),
				   sizeof(BkpBlock) - sizeof(crc64));
1570
		FIN_CRC64(crc);
T
Tom Lane 已提交
1571
		memcpy((char*)&cbuf, blk, sizeof(crc64)); /* don't assume alignment */
1572

T
Tom Lane 已提交
1573
		if (!EQ_CRC64(cbuf, crc))
1574 1575
		{
			elog(emode, "ReadRecord: bad bkp block %d CRC in record at %u/%u",
T
Tom Lane 已提交
1576
				 i + 1, recptr.xlogid, recptr.xrecoff);
1577 1578
			return(false);
		}
T
Tom Lane 已提交
1579
		blk += sizeof(BkpBlock) + BLCKSZ;
1580 1581 1582 1583 1584
	}

	return(true);
}

T
Tom Lane 已提交
1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597
/*
 * Attempt to read an XLOG record.
 *
 * If RecPtr is not NULL, try to read a record at that position.  Otherwise
 * try to read a record just after the last one previously read.
 *
 * If no valid record is available, returns NULL, or fails if emode is STOP.
 * (emode must be either STOP or LOG.)
 *
 * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long.  It is needed
 * to reassemble a record that crosses block boundaries.  Note that on
 * successful return, the returned record pointer always points at buffer.
 */
1598
static XLogRecord *
T
Tom Lane 已提交
1599
ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer)
1600
{
1601 1602
	XLogRecord *record;
	XLogRecPtr	tmpRecPtr = EndRecPtr;
T
Tom Lane 已提交
1603 1604 1605 1606
	uint32		len,
				total_len;
	uint32		targetPageOff;
	unsigned	i;
1607
	bool		nextmode = false;
T
Tom Lane 已提交
1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621

	if (readBuf == NULL)
	{
		/*
		 * First time through, permanently allocate readBuf.  We do it
		 * this way, rather than just making a static array, for two
		 * reasons: (1) no need to waste the storage in most instantiations
		 * of the backend; (2) a static char array isn't guaranteed to
		 * have any particular alignment, whereas malloc() will provide
		 * MAXALIGN'd storage.
		 */
		readBuf = (char *) malloc(BLCKSZ);
		Assert(readBuf != NULL);
	}
1622

T
Tom Lane 已提交
1623
	if (RecPtr == NULL)
1624
	{
1625
		RecPtr = &tmpRecPtr;
1626
		nextmode = true;
T
Tom Lane 已提交
1627
		/* fast case if next record is on same page */
1628 1629 1630 1631 1632
		if (nextRecord != NULL)
		{
			record = nextRecord;
			goto got_record;
		}
T
Tom Lane 已提交
1633
		/* align old recptr to next page */
1634 1635 1636 1637 1638 1639 1640 1641
		if (tmpRecPtr.xrecoff % BLCKSZ != 0)
			tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
		if (tmpRecPtr.xrecoff >= XLogFileSize)
		{
			(tmpRecPtr.xlogid)++;
			tmpRecPtr.xrecoff = 0;
		}
		tmpRecPtr.xrecoff += SizeOfXLogPHD;
1642
	}
1643
	else if (!XRecOffIsValid(RecPtr->xrecoff))
1644
		elog(STOP, "ReadRecord: invalid record offset at (%u, %u)",
1645
			 RecPtr->xlogid, RecPtr->xrecoff);
1646

T
Tom Lane 已提交
1647
	if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
1648
	{
1649 1650
		close(readFile);
		readFile = -1;
1651
	}
T
Tom Lane 已提交
1652
	XLByteToSeg(*RecPtr, readId, readSeg);
1653
	if (readFile < 0)
1654
	{
T
Tom Lane 已提交
1655
		readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1656 1657
		if (readFile < 0)
			goto next_record_is_invalid;
T
Tom Lane 已提交
1658
		readOff = (uint32) (-1); /* force read to occur below */
1659 1660
	}

T
Tom Lane 已提交
1661 1662
	targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ;
	if (readOff != targetPageOff)
1663
	{
T
Tom Lane 已提交
1664 1665 1666 1667
		readOff = targetPageOff;
		if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
		{
			elog(emode, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m",
1668
				 readId, readSeg, readOff);
T
Tom Lane 已提交
1669 1670
			goto next_record_is_invalid;
		}
1671
		if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
T
Tom Lane 已提交
1672 1673
		{
			elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
1674
				 readId, readSeg, readOff);
T
Tom Lane 已提交
1675 1676
			goto next_record_is_invalid;
		}
1677
		if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, nextmode))
1678 1679
			goto next_record_is_invalid;
	}
T
Tom Lane 已提交
1680
	if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
1681 1682
		RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
	{
T
Tom Lane 已提交
1683
		elog(emode, "ReadRecord: contrecord is requested by (%u, %u)",
1684
			 RecPtr->xlogid, RecPtr->xrecoff);
1685 1686
		goto next_record_is_invalid;
	}
1687
	record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
1688 1689

got_record:;
T
Tom Lane 已提交
1690 1691 1692 1693
	/*
	 * Currently, xl_len == 0 must be bad data, but that might not be
	 * true forever.  See note in XLogInsert.
	 */
1694 1695 1696
	if (record->xl_len == 0)
	{
		elog(emode, "ReadRecord: record with zero len at (%u, %u)",
T
Tom Lane 已提交
1697
			 RecPtr->xlogid, RecPtr->xrecoff);
1698 1699
		goto next_record_is_invalid;
	}
T
Tom Lane 已提交
1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715
	/*
	 * Compute total length of record including any appended backup blocks.
	 */
	total_len = SizeOfXLogRecord + record->xl_len;
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
	{
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
			continue;
		total_len += sizeof(BkpBlock) + BLCKSZ;
	}
	/*
	 * Make sure it will fit in buffer (currently, it is mechanically
	 * impossible for this test to fail, but it seems like a good idea
	 * anyway).
	 */
	if (total_len > _INTL_MAXLOGRECSZ)
1716
	{
1717
		elog(emode, "ReadRecord: too long record len %u at (%u, %u)",
T
Tom Lane 已提交
1718
			 total_len, RecPtr->xlogid, RecPtr->xrecoff);
1719 1720 1721 1722
		goto next_record_is_invalid;
	}
	if (record->xl_rmid > RM_MAX_ID)
	{
T
Tom Lane 已提交
1723
		elog(emode, "ReadRecord: invalid resource manager id %u at (%u, %u)",
1724
			 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
1725 1726 1727
		goto next_record_is_invalid;
	}
	nextRecord = NULL;
T
Tom Lane 已提交
1728 1729
	len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
	if (total_len > len)
1730
	{
T
Tom Lane 已提交
1731 1732
		/* Need to reassemble record */
		XLogContRecord *contrecord;
1733
		uint32			gotlen = len;
1734

T
Tom Lane 已提交
1735
		memcpy(buffer, record, len);
1736
		record = (XLogRecord *) buffer;
T
Tom Lane 已提交
1737
		buffer += len;
1738
		for (;;)
1739
		{
T
Tom Lane 已提交
1740 1741
			readOff += BLCKSZ;
			if (readOff >= XLogSegSize)
1742 1743
			{
				close(readFile);
T
Tom Lane 已提交
1744 1745 1746
				readFile = -1;
				NextLogSeg(readId, readSeg);
				readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1747 1748
				if (readFile < 0)
					goto next_record_is_invalid;
T
Tom Lane 已提交
1749
				readOff = 0;
1750 1751
			}
			if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
T
Tom Lane 已提交
1752 1753
			{
				elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
1754
					 readId, readSeg, readOff);
T
Tom Lane 已提交
1755 1756
				goto next_record_is_invalid;
			}
1757
			if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, true))
1758
				goto next_record_is_invalid;
T
Tom Lane 已提交
1759
			if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
1760
			{
T
Tom Lane 已提交
1761
				elog(emode, "ReadRecord: there is no ContRecord flag in logfile %u seg %u off %u",
1762
					 readId, readSeg, readOff);
1763 1764
				goto next_record_is_invalid;
			}
T
Tom Lane 已提交
1765 1766 1767
			contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD);
			if (contrecord->xl_rem_len == 0 || 
				total_len != (contrecord->xl_rem_len + gotlen))
1768
			{
T
Tom Lane 已提交
1769 1770
				elog(emode, "ReadRecord: invalid cont-record len %u in logfile %u seg %u off %u",
					 contrecord->xl_rem_len, readId, readSeg, readOff);
1771 1772
				goto next_record_is_invalid;
			}
T
Tom Lane 已提交
1773 1774
			len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
			if (contrecord->xl_rem_len > len)
1775
			{
T
Tom Lane 已提交
1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798
				memcpy(buffer, (char *)contrecord + SizeOfXLogContRecord, len);
				gotlen += len;
				buffer += len;
				continue;
			}
			memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
				   contrecord->xl_rem_len);
			break;
		}
		if (!RecordIsValid(record, *RecPtr, emode))
			goto next_record_is_invalid;
		if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD +
			SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
		{
			nextRecord = (XLogRecord *) ((char *) contrecord + 
				SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len));
		}
		EndRecPtr.xlogid = readId;
		EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
			SizeOfXLogPHD + SizeOfXLogContRecord + 
			MAXALIGN(contrecord->xl_rem_len);
		ReadRecPtr = *RecPtr;
		return record;
1799 1800
	}

T
Tom Lane 已提交
1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811
	/* Record does not cross a page boundary */
	if (!RecordIsValid(record, *RecPtr, emode))
		goto next_record_is_invalid;
	if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ +
		MAXALIGN(total_len))
		nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
	EndRecPtr.xlogid = RecPtr->xlogid;
	EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
	ReadRecPtr = *RecPtr;
	memcpy(buffer, record, total_len);
	return (XLogRecord *) buffer;
1812

T
Tom Lane 已提交
1813 1814 1815 1816 1817
next_record_is_invalid:;
	close(readFile);
	readFile = -1;
	nextRecord = NULL;
	return NULL;
1818 1819
}

1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863
/*
 * Check whether the xlog header of a page just read in looks valid.
 *
 * This is just a convenience subroutine to avoid duplicated code in
 * ReadRecord.  It's not intended for use from anywhere else.
 */
static bool
ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
{
	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
	{
		elog(emode, "ReadRecord: invalid magic number %04X in logfile %u seg %u off %u",
			 hdr->xlp_magic, readId, readSeg, readOff);
		return false;
	}
	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
	{
		elog(emode, "ReadRecord: invalid info bits %04X in logfile %u seg %u off %u",
			 hdr->xlp_info, readId, readSeg, readOff);
		return false;
	}
	/*
	 * We disbelieve a SUI less than the previous page's SUI, or more
	 * than a few counts greater.  In theory as many as 512 shutdown
	 * checkpoint records could appear on a 32K-sized xlog page, so
	 * that's the most differential there could legitimately be.
	 *
	 * Note this check can only be applied when we are reading the next page
	 * in sequence, so ReadRecord passes a flag indicating whether to check.
	 */
	if (checkSUI)
	{
		if (hdr->xlp_sui < lastReadSUI ||
			hdr->xlp_sui > lastReadSUI + 512)
		{
			elog(emode, "ReadRecord: out-of-sequence SUI %u (after %u) in logfile %u seg %u off %u",
				 hdr->xlp_sui, lastReadSUI, readId, readSeg, readOff);
			return false;
		}
	}
	lastReadSUI = hdr->xlp_sui;
	return true;
}

1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882
/*
 * I/O routines for pg_control
 *
 * *ControlFile is a buffer in shared memory that holds an image of the
 * contents of pg_control.  WriteControlFile() initializes pg_control
 * given a preloaded buffer, ReadControlFile() loads the buffer from
 * the pg_control file (during postmaster or standalone-backend startup),
 * and UpdateControlFile() rewrites pg_control after we modify xlog state.
 *
 * For simplicity, WriteControlFile() initializes the fields of pg_control
 * that are related to checking backend/database compatibility, and
 * ReadControlFile() verifies they are correct.  We could split out the
 * I/O and compatibility-check functions, but there seems no need currently.
 */

void
XLOGPathInit(void)
{
	/* Init XLOG file paths */
1883 1884 1885
	snprintf(XLogDir, MAXPGPATH, "%s%cpg_xlog", DataDir, SEP_CHAR);
	snprintf(ControlFilePath, MAXPGPATH, "%s%cglobal%cpg_control",
			 DataDir, SEP_CHAR, SEP_CHAR);
1886 1887 1888 1889 1890 1891
}

static void
WriteControlFile(void)
{
	int			fd;
T
Tom Lane 已提交
1892
	char		buffer[BLCKSZ];	/* need not be aligned */
1893 1894 1895 1896 1897
#ifdef USE_LOCALE
	char	   *localeptr;
#endif

	/*
T
Tom Lane 已提交
1898
	 * Initialize version and compatibility-check fields
1899
	 */
T
Tom Lane 已提交
1900 1901
	ControlFile->pg_control_version = PG_CONTROL_VERSION;
	ControlFile->catalog_version_no = CATALOG_VERSION_NO;
1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929
	ControlFile->blcksz = BLCKSZ;
	ControlFile->relseg_size = RELSEG_SIZE;
#ifdef USE_LOCALE
	localeptr = setlocale(LC_COLLATE, NULL);
	if (!localeptr)
		elog(STOP, "Invalid LC_COLLATE setting");
	StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
	localeptr = setlocale(LC_CTYPE, NULL);
	if (!localeptr)
		elog(STOP, "Invalid LC_CTYPE setting");
	StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
	/*
	 * Issue warning notice if initdb'ing in a locale that will not permit
	 * LIKE index optimization.  This is not a clean place to do it, but
	 * I don't see a better place either...
	 */
	if (!locale_is_like_safe())
		elog(NOTICE, "Initializing database with %s collation order."
			 "\n\tThis locale setting will prevent use of index optimization for"
			 "\n\tLIKE and regexp searches.  If you are concerned about speed of"
			 "\n\tsuch queries, you may wish to set LC_COLLATE to \"C\" and"
			 "\n\tre-initdb.  For more information see the Administrator's Guide.",
			 ControlFile->lc_collate);
#else
	strcpy(ControlFile->lc_collate, "C");
	strcpy(ControlFile->lc_ctype, "C");
#endif

T
Tom Lane 已提交
1930 1931 1932 1933 1934 1935 1936
	/* Contents are protected with a CRC */
	INIT_CRC64(ControlFile->crc);
	COMP_CRC64(ControlFile->crc, 
			   (char*) ControlFile + sizeof(crc64),
			   sizeof(ControlFileData) - sizeof(crc64));
	FIN_CRC64(ControlFile->crc);

1937 1938 1939 1940 1941 1942 1943 1944 1945
	/*
	 * We write out BLCKSZ bytes into pg_control, zero-padding the
	 * excess over sizeof(ControlFileData).  This reduces the odds
	 * of premature-EOF errors when reading pg_control.  We'll still
	 * fail when we check the contents of the file, but hopefully with
	 * a more specific error than "couldn't read pg_control".
	 */
	if (sizeof(ControlFileData) > BLCKSZ)
		elog(STOP, "sizeof(ControlFileData) is too large ... fix xlog.c");
1946

1947 1948 1949
	memset(buffer, 0, BLCKSZ);
	memcpy(buffer, ControlFile, sizeof(ControlFileData));

1950 1951
	fd = BasicOpenFile(ControlFilePath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
					   S_IRUSR | S_IWUSR);
1952 1953 1954 1955 1956 1957 1958
	if (fd < 0)
		elog(STOP, "WriteControlFile failed to create control file (%s): %m",
			 ControlFilePath);

	if (write(fd, buffer, BLCKSZ) != BLCKSZ)
		elog(STOP, "WriteControlFile failed to write control file: %m");

1959
	if (pg_fsync(fd) != 0)
1960 1961 1962 1963 1964 1965 1966 1967
		elog(STOP, "WriteControlFile failed to fsync control file: %m");

	close(fd);
}

static void
ReadControlFile(void)
{
1968
	crc64		crc;
1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982
	int			fd;

	/*
	 * Read data...
	 */
	fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
	if (fd < 0)
		elog(STOP, "open(\"%s\") failed: %m", ControlFilePath);

	if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
		elog(STOP, "read(\"%s\") failed: %m", ControlFilePath);

	close(fd);

T
Tom Lane 已提交
1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993
	/*
	 * Check for expected pg_control format version.  If this is wrong,
	 * the CRC check will likely fail because we'll be checking the wrong
	 * number of bytes.  Complaining about wrong version will probably be
	 * more enlightening than complaining about wrong CRC.
	 */
	if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
		elog(STOP, "database was initialized with PG_CONTROL_VERSION %d,\n\tbut the backend was compiled with PG_CONTROL_VERSION %d.\n\tlooks like you need to initdb.",
			 ControlFile->pg_control_version, PG_CONTROL_VERSION);

	/* Now check the CRC. */
1994 1995
	INIT_CRC64(crc);
	COMP_CRC64(crc, 
T
Tom Lane 已提交
1996 1997
			   (char*) ControlFile + sizeof(crc64),
			   sizeof(ControlFileData) - sizeof(crc64));
1998 1999
	FIN_CRC64(crc);

T
Tom Lane 已提交
2000
	if (!EQ_CRC64(crc, ControlFile->crc))
2001 2002
		elog(STOP, "Invalid CRC in control file");

2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013
	/*
	 * Do compatibility checking immediately.  We do this here for 2 reasons:
	 *
	 * (1) if the database isn't compatible with the backend executable,
	 * we want to abort before we can possibly do any damage;
	 *
	 * (2) this code is executed in the postmaster, so the setlocale() will
	 * propagate to forked backends, which aren't going to read this file
	 * for themselves.  (These locale settings are considered critical
	 * compatibility items because they can affect sort order of indexes.)
	 */
T
Tom Lane 已提交
2014 2015 2016
	if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
		elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.",
			 ControlFile->catalog_version_no, CATALOG_VERSION_NO);
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037
	if (ControlFile->blcksz != BLCKSZ)
		elog(STOP, "database was initialized with BLCKSZ %d,\n\tbut the backend was compiled with BLCKSZ %d.\n\tlooks like you need to initdb.",
			 ControlFile->blcksz, BLCKSZ);
	if (ControlFile->relseg_size != RELSEG_SIZE)
		elog(STOP, "database was initialized with RELSEG_SIZE %d,\n\tbut the backend was compiled with RELSEG_SIZE %d.\n\tlooks like you need to initdb.",
			 ControlFile->relseg_size, RELSEG_SIZE);
#ifdef USE_LOCALE
	if (setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
		elog(STOP, "database was initialized with LC_COLLATE '%s',\n\twhich is not recognized by setlocale().\n\tlooks like you need to initdb.",
			 ControlFile->lc_collate);
	if (setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
		elog(STOP, "database was initialized with LC_CTYPE '%s',\n\twhich is not recognized by setlocale().\n\tlooks like you need to initdb.",
			 ControlFile->lc_ctype);
#else
	if (strcmp(ControlFile->lc_collate, "C") != 0 ||
		strcmp(ControlFile->lc_ctype, "C") != 0)
		elog(STOP, "database was initialized with LC_COLLATE '%s' and LC_CTYPE '%s',\n\tbut the backend was compiled without locale support.\n\tlooks like you need to initdb or recompile.",
			 ControlFile->lc_collate, ControlFile->lc_ctype);
#endif
}

2038
void
2039
UpdateControlFile(void)
2040
{
2041
	int			fd;
2042

2043 2044
	INIT_CRC64(ControlFile->crc);
	COMP_CRC64(ControlFile->crc, 
T
Tom Lane 已提交
2045 2046
			   (char*) ControlFile + sizeof(crc64),
			   sizeof(ControlFileData) - sizeof(crc64));
2047 2048
	FIN_CRC64(ControlFile->crc);

2049
	fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2050
	if (fd < 0)
2051
		elog(STOP, "open(\"%s\") failed: %m", ControlFilePath);
2052

2053
	if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2054
		elog(STOP, "write(cntlfile) failed: %m");
2055

2056
	if (pg_fsync(fd) != 0)
2057
		elog(STOP, "fsync(cntlfile) failed: %m");
2058 2059 2060 2061

	close(fd);
}

2062
/*
T
Tom Lane 已提交
2063
 * Initialization of shared memory for XLOG
2064 2065
 */

2066
int
2067
XLOGShmemSize(void)
2068 2069 2070 2071
{
	if (XLOGbuffers < MinXLOGbuffers)
		XLOGbuffers = MinXLOGbuffers;

T
Tom Lane 已提交
2072 2073 2074
	return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
		+ BLCKSZ * XLOGbuffers +
		MAXALIGN(sizeof(ControlFileData));
2075 2076 2077 2078 2079
}

void
XLOGShmemInit(void)
{
2080
	bool		found;
2081

2082
	/* this must agree with space requested by XLOGShmemSize() */
2083 2084 2085
	if (XLOGbuffers < MinXLOGbuffers)
		XLOGbuffers = MinXLOGbuffers;

2086
	XLogCtl = (XLogCtlData *)
T
Tom Lane 已提交
2087 2088 2089 2090 2091
		ShmemInitStruct("XLOG Ctl",
						MAXALIGN(sizeof(XLogCtlData) +
								 sizeof(XLogRecPtr) * XLOGbuffers)
						+ BLCKSZ * XLOGbuffers,
						&found);
2092
	Assert(!found);
2093 2094 2095 2096
	ControlFile = (ControlFileData *)
		ShmemInitStruct("Control File", sizeof(ControlFileData), &found);
	Assert(!found);

T
Tom Lane 已提交
2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126
	memset(XLogCtl, 0, sizeof(XLogCtlData));
	/*
	 * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be
	 * a multiple of the alignment for same, so no extra alignment padding
	 * is needed here.
	 */
	XLogCtl->xlblocks = (XLogRecPtr *)
		(((char *) XLogCtl) + sizeof(XLogCtlData));
	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
	/*
	 * Here, on the other hand, we must MAXALIGN to ensure the page buffers
	 * have worst-case alignment.
	 */
	XLogCtl->pages =
		((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
									  sizeof(XLogRecPtr) * XLOGbuffers);
	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);

	/*
	 * Do basic initialization of XLogCtl shared data.
	 * (StartupXLOG will fill in additional info.)
	 */
	XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
	S_INIT_LOCK(&(XLogCtl->insert_lck));
	S_INIT_LOCK(&(XLogCtl->info_lck));
	S_INIT_LOCK(&(XLogCtl->logwrt_lck));
	S_INIT_LOCK(&(XLogCtl->chkp_lck));

2127 2128 2129 2130 2131 2132 2133
	/*
	 * If we are not in bootstrap mode, pg_control should already exist.
	 * Read and validate it immediately (see comments in ReadControlFile()
	 * for the reasons why).
	 */
	if (!IsBootstrapProcessingMode())
		ReadControlFile();
2134 2135 2136
}

/*
T
Tom Lane 已提交
2137 2138
 * This func must be called ONCE on system install.  It creates pg_control
 * and the initial XLOG segment.
2139 2140
 */
void
T
Tom Lane 已提交
2141
BootStrapXLOG(void)
2142
{
2143
	CheckPoint	checkPoint;
T
Tom Lane 已提交
2144 2145
	char	   *buffer;
	XLogPageHeader page;
2146
	XLogRecord *record;
2147
	bool        use_existent;
2148
	crc64		crc;
2149

T
Tom Lane 已提交
2150 2151 2152 2153
	/* Use malloc() to ensure buffer is MAXALIGNED */
	buffer = (char *) malloc(BLCKSZ);
	page = (XLogPageHeader) buffer;

2154 2155 2156
	checkPoint.redo.xlogid = 0;
	checkPoint.redo.xrecoff = SizeOfXLogPHD;
	checkPoint.undo = checkPoint.redo;
T
Tom Lane 已提交
2157
	checkPoint.ThisStartUpID = 0;
2158
	checkPoint.nextXid = FirstTransactionId;
2159
	checkPoint.nextOid = BootstrapObjectIdData;
T
Tom Lane 已提交
2160
	checkPoint.time = time(NULL);
2161

2162
	ShmemVariableCache->nextXid = checkPoint.nextXid;
T
Tom Lane 已提交
2163
	ShmemVariableCache->xidCount = 0;
2164 2165 2166
	ShmemVariableCache->nextOid = checkPoint.nextOid;
	ShmemVariableCache->oidCount = 0;

2167 2168 2169
	memset(buffer, 0, BLCKSZ);
	page->xlp_magic = XLOG_PAGE_MAGIC;
	page->xlp_info = 0;
2170
	page->xlp_sui = checkPoint.ThisStartUpID;
2171 2172 2173
	record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
	record->xl_prev.xlogid = 0;
	record->xl_prev.xrecoff = 0;
2174 2175 2176
	record->xl_xact_prev = record->xl_prev;
	record->xl_xid = InvalidTransactionId;
	record->xl_len = sizeof(checkPoint);
T
Tom Lane 已提交
2177
	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
2178
	record->xl_rmid = RM_XLOG_ID;
T
Tom Lane 已提交
2179
	memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
2180

2181
	INIT_CRC64(crc);
T
Tom Lane 已提交
2182 2183 2184
	COMP_CRC64(crc, &checkPoint, sizeof(checkPoint));
	COMP_CRC64(crc, (char*) record + sizeof(crc64),
			   SizeOfXLogRecord - sizeof(crc64));
2185 2186 2187
	FIN_CRC64(crc);
	record->xl_crc = crc;

2188 2189
	use_existent = false;
	openLogFile = XLogFileInit(0, 0, &use_existent, false);
2190

T
Tom Lane 已提交
2191
	if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
2192
		elog(STOP, "BootStrapXLOG failed to write logfile: %m");
2193

T
Tom Lane 已提交
2194
	if (pg_fsync(openLogFile) != 0)
2195
		elog(STOP, "BootStrapXLOG failed to fsync logfile: %m");
2196

T
Tom Lane 已提交
2197 2198
	close(openLogFile);
	openLogFile = -1;
2199

2200
	memset(ControlFile, 0, sizeof(ControlFileData));
T
Tom Lane 已提交
2201 2202 2203
	/* Initialize pg_control status fields */
	ControlFile->state = DB_SHUTDOWNED;
	ControlFile->time = checkPoint.time;
2204 2205 2206
	ControlFile->logId = 0;
	ControlFile->logSeg = 1;
	ControlFile->checkPoint = checkPoint.redo;
T
Tom Lane 已提交
2207
	ControlFile->checkPointCopy = checkPoint;
2208
	/* some additional ControlFile fields are set in WriteControlFile() */
2209

2210
	WriteControlFile();
2211 2212
}

2213
static char *
2214 2215
str_time(time_t tnow)
{
T
Tom Lane 已提交
2216
	static char buf[32];
2217

2218
	strftime(buf, sizeof(buf),
T
Tom Lane 已提交
2219
			 "%Y-%m-%d %H:%M:%S %Z",
2220
			 localtime(&tnow));
2221

2222
	return buf;
2223 2224 2225
}

/*
T
Tom Lane 已提交
2226
 * This must be called ONCE during postmaster or standalone-backend startup
2227 2228
 */
void
T
Tom Lane 已提交
2229
StartupXLOG(void)
2230
{
2231 2232
	XLogCtlInsert *Insert;
	CheckPoint	checkPoint;
T
Tom Lane 已提交
2233
	bool		wasShutdown;
2234
	XLogRecPtr	RecPtr,
T
Tom Lane 已提交
2235 2236 2237
				LastRec,
				checkPointLoc,
				EndOfLog;
2238
	XLogRecord *record;
T
Tom Lane 已提交
2239
	char	   *buffer;
2240

T
Tom Lane 已提交
2241 2242
	/* Use malloc() to ensure record buffer is MAXALIGNED */
	buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
2243

T
Tom Lane 已提交
2244
	CritSectionCount++;
2245 2246

	/*
2247 2248 2249 2250
	 * Read control file and check XLOG status looks valid.
	 *
	 * Note: in most control paths, *ControlFile is already valid and we
	 * need not do ReadControlFile() here, but might as well do it to be sure.
2251
	 */
2252
	ReadControlFile();
2253

2254 2255 2256 2257
	if (ControlFile->logSeg == 0 ||
		ControlFile->time <= 0 ||
		ControlFile->state < DB_SHUTDOWNED ||
		ControlFile->state > DB_IN_PRODUCTION ||
2258
		!XRecOffIsValid(ControlFile->checkPoint.xrecoff))
2259
		elog(STOP, "control file context is broken");
2260 2261

	if (ControlFile->state == DB_SHUTDOWNED)
2262
		elog(LOG, "database system was shut down at %s",
2263
			 str_time(ControlFile->time));
2264
	else if (ControlFile->state == DB_SHUTDOWNING)
2265
		elog(LOG, "database system shutdown was interrupted at %s",
2266
			 str_time(ControlFile->time));
2267
	else if (ControlFile->state == DB_IN_RECOVERY)
2268
		elog(LOG, "database system was interrupted being in recovery at %s\n"
2269
			 "\tThis propably means that some data blocks are corrupted\n"
2270
			 "\tand you will have to use last backup for recovery.",
2271
			 str_time(ControlFile->time));
2272
	else if (ControlFile->state == DB_IN_PRODUCTION)
2273
		elog(LOG, "database system was interrupted at %s",
2274
			 str_time(ControlFile->time));
2275

T
Tom Lane 已提交
2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
	/*
	 * Get the last valid checkpoint record.  If the latest one according
	 * to pg_control is broken, try the next-to-last one.
	 */
	record = ReadCheckpointRecord(ControlFile->checkPoint,
								  "primary", buffer);
	if (record != NULL)
	{
		checkPointLoc = ControlFile->checkPoint;
		elog(LOG, "CheckPoint record at (%u, %u)",
			 checkPointLoc.xlogid, checkPointLoc.xrecoff);
	}
	else
	{
		record = ReadCheckpointRecord(ControlFile->prevCheckPoint,
									  "secondary", buffer);
		if (record != NULL)
		{
			checkPointLoc = ControlFile->prevCheckPoint;
			elog(LOG, "Using previous CheckPoint record at (%u, %u)",
				 checkPointLoc.xlogid, checkPointLoc.xrecoff);
			InRecovery = true;	/* force recovery even if SHUTDOWNED */
		}
		else
		{
			elog(STOP, "Unable to locate a valid CheckPoint record");
		}
	}
	LastRec = RecPtr = checkPointLoc;
	memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
	wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
2307

V
Vadim B. Mikheev 已提交
2308
	elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u); Shutdown %s",
2309
		 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
V
Vadim B. Mikheev 已提交
2310
		 checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
T
Tom Lane 已提交
2311
		 wasShutdown ? "TRUE" : "FALSE");
2312
	elog(LOG, "NextTransactionId: %u; NextOid: %u",
2313 2314
		 checkPoint.nextXid, checkPoint.nextOid);
	if (checkPoint.nextXid < FirstTransactionId ||
2315 2316 2317 2318
		checkPoint.nextOid < BootstrapObjectIdData)
		elog(STOP, "Invalid NextTransactionId/NextOid");

	ShmemVariableCache->nextXid = checkPoint.nextXid;
T
Tom Lane 已提交
2319
	ShmemVariableCache->xidCount = 0;
2320
	ShmemVariableCache->nextOid = checkPoint.nextOid;
2321
	ShmemVariableCache->oidCount = 0;
2322

V
WAL  
Vadim B. Mikheev 已提交
2323
	ThisStartUpID = checkPoint.ThisStartUpID;
2324 2325
	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = 
		XLogCtl->RedoRecPtr = checkPoint.redo;
V
WAL  
Vadim B. Mikheev 已提交
2326

2327 2328 2329 2330 2331
	if (XLByteLT(RecPtr, checkPoint.redo))
		elog(STOP, "Invalid redo in checkPoint record");
	if (checkPoint.undo.xrecoff == 0)
		checkPoint.undo = RecPtr;

V
Vadim B. Mikheev 已提交
2332 2333
	if (XLByteLT(checkPoint.undo, RecPtr) || 
		XLByteLT(checkPoint.redo, RecPtr))
2334
	{
T
Tom Lane 已提交
2335
		if (wasShutdown)
V
Vadim B. Mikheev 已提交
2336
			elog(STOP, "Invalid Redo/Undo record in shutdown checkpoint");
V
WAL  
Vadim B. Mikheev 已提交
2337
		InRecovery = true;
2338 2339
	}
	else if (ControlFile->state != DB_SHUTDOWNED)
V
Vadim B. Mikheev 已提交
2340
	{
V
WAL  
Vadim B. Mikheev 已提交
2341
		InRecovery = true;
V
Vadim B. Mikheev 已提交
2342
	}
2343

V
WAL  
Vadim B. Mikheev 已提交
2344 2345
	/* REDO */
	if (InRecovery)
2346
	{
2347 2348
		elog(LOG, "database system was not properly shut down; "
			 "automatic recovery in progress...");
2349 2350 2351 2352
		ControlFile->state = DB_IN_RECOVERY;
		ControlFile->time = time(NULL);
		UpdateControlFile();

V
Vadim B. Mikheev 已提交
2353
		XLogOpenLogRelation();	/* open pg_log */
V
WAL  
Vadim B. Mikheev 已提交
2354
		XLogInitRelationCache();
V
Vadim B. Mikheev 已提交
2355

2356 2357
		/* Is REDO required ? */
		if (XLByteLT(checkPoint.redo, RecPtr))
T
Tom Lane 已提交
2358
			record = ReadRecord(&(checkPoint.redo), STOP, buffer);
2359
		else	/* read past CheckPoint record */
T
Tom Lane 已提交
2360
			record = ReadRecord(NULL, LOG, buffer);
2361

T
Tom Lane 已提交
2362
		if (record != NULL)
2363
		{
V
WAL  
Vadim B. Mikheev 已提交
2364
			InRedo = true;
2365
			elog(LOG, "redo starts at (%u, %u)",
2366
				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2367 2368 2369
			do
			{
				if (record->xl_xid >= ShmemVariableCache->nextXid)
T
Tom Lane 已提交
2370 2371
				{
					/* This probably shouldn't happen... */
2372
					ShmemVariableCache->nextXid = record->xl_xid + 1;
T
Tom Lane 已提交
2373 2374
					ShmemVariableCache->xidCount = 0;
				}
V
WAL  
Vadim B. Mikheev 已提交
2375 2376 2377 2378
				if (XLOG_DEBUG)
				{
					char	buf[8192];

2379 2380 2381
					sprintf(buf, "REDO @ %u/%u; LSN %u/%u: ", 
						ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
						EndRecPtr.xlogid, EndRecPtr.xrecoff);
V
WAL  
Vadim B. Mikheev 已提交
2382 2383 2384 2385
					xlog_outrec(buf, record);
					strcat(buf, " - ");
					RmgrTable[record->xl_rmid].rm_desc(buf, 
						record->xl_info, XLogRecGetData(record));
T
Tom Lane 已提交
2386
					fprintf(stderr, "%s\n", buf);
V
WAL  
Vadim B. Mikheev 已提交
2387 2388
				}

T
Tom Lane 已提交
2389
				if (record->xl_info & XLR_BKP_BLOCK_MASK)
2390 2391
					RestoreBkpBlocks(record, EndRecPtr);

2392
				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
T
Tom Lane 已提交
2393 2394
				record = ReadRecord(NULL, LOG, buffer);
			} while (record != NULL);
2395
			elog(LOG, "redo done at (%u, %u)",
2396
				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2397
			LastRec = ReadRecPtr;
V
WAL  
Vadim B. Mikheev 已提交
2398
			InRedo = false;
2399 2400
		}
		else
2401
			elog(LOG, "redo is not required");
V
WAL  
Vadim B. Mikheev 已提交
2402 2403
	}

T
Tom Lane 已提交
2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415
	/*
	 * Init xlog buffer cache using the block containing the last valid
	 * record from the previous incarnation.
	 */
	record = ReadRecord(&LastRec, STOP, buffer);
	EndOfLog = EndRecPtr;
	XLByteToPrevSeg(EndOfLog, openLogId, openLogSeg);
	openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
	openLogOff = 0;
	ControlFile->logId = openLogId;
	ControlFile->logSeg = openLogSeg + 1;
	XLogCtl->xlblocks[0].xlogid = openLogId;
V
WAL  
Vadim B. Mikheev 已提交
2416
	XLogCtl->xlblocks[0].xrecoff =
T
Tom Lane 已提交
2417
		((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
V
WAL  
Vadim B. Mikheev 已提交
2418
	Insert = &XLogCtl->Insert;
T
Tom Lane 已提交
2419 2420 2421 2422 2423 2424 2425 2426 2427
	/* Tricky point here: readBuf contains the *last* block that the LastRec
	 * record spans, not the one it starts in, which is what we want.
	 */
	Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
	memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
	Insert->currpos = (char *) Insert->currpage +
		(EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
	/* Make sure rest of page is zero */
	memset(Insert->currpos, 0, INSERT_FREESPACE(Insert));
V
WAL  
Vadim B. Mikheev 已提交
2428 2429
	Insert->PrevRecord = LastRec;

T
Tom Lane 已提交
2430
	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
V
WAL  
Vadim B. Mikheev 已提交
2431

T
Tom Lane 已提交
2432 2433 2434
	XLogCtl->Write.LogwrtResult = LogwrtResult;
	Insert->LogwrtResult = LogwrtResult;
	XLogCtl->LogwrtResult = LogwrtResult;
V
WAL  
Vadim B. Mikheev 已提交
2435

T
Tom Lane 已提交
2436 2437
	XLogCtl->LogwrtRqst.Write = EndOfLog;
	XLogCtl->LogwrtRqst.Flush = EndOfLog;
2438

V
Vadim B. Mikheev 已提交
2439
#ifdef NOT_USED
V
WAL  
Vadim B. Mikheev 已提交
2440 2441 2442
	/* UNDO */
	if (InRecovery)
	{
2443 2444 2445
		RecPtr = ReadRecPtr;
		if (XLByteLT(checkPoint.undo, RecPtr))
		{
2446
			elog(LOG, "undo starts at (%u, %u)",
2447
				 RecPtr.xlogid, RecPtr.xrecoff);
2448 2449
			do
			{
T
Tom Lane 已提交
2450
				record = ReadRecord(&RecPtr, STOP, buffer);
2451
				if (TransactionIdIsValid(record->xl_xid) &&
2452
					!TransactionIdDidCommit(record->xl_xid))
V
misc  
Vadim B. Mikheev 已提交
2453
					RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
2454 2455
				RecPtr = record->xl_prev;
			} while (XLByteLE(checkPoint.undo, RecPtr));
2456
			elog(LOG, "undo done at (%u, %u)",
2457
				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2458 2459
		}
		else
2460
			elog(LOG, "undo is not required");
2461
	}
V
WAL  
Vadim B. Mikheev 已提交
2462
#endif
2463

V
WAL  
Vadim B. Mikheev 已提交
2464
	if (InRecovery)
2465
	{
T
Tom Lane 已提交
2466 2467 2468 2469 2470 2471 2472
		/*
		 * In case we had to use the secondary checkpoint, make sure that
		 * it will still be shown as the secondary checkpoint after this
		 * CreateCheckPoint operation; we don't want the broken primary
		 * checkpoint to become prevCheckPoint...
		 */
		ControlFile->checkPoint = checkPointLoc;
2473
		CreateCheckPoint(true);
V
WAL  
Vadim B. Mikheev 已提交
2474
		XLogCloseRelationCache();
2475
	}
2476

T
Tom Lane 已提交
2477 2478 2479 2480
	/*
	 * Preallocate additional log files, if wanted.
	 */
	PreallocXlogFiles(EndOfLog);
2481

V
WAL  
Vadim B. Mikheev 已提交
2482
	InRecovery = false;
2483 2484 2485 2486 2487

	ControlFile->state = DB_IN_PRODUCTION;
	ControlFile->time = time(NULL);
	UpdateControlFile();

V
WAL  
Vadim B. Mikheev 已提交
2488 2489 2490
	ThisStartUpID++;
	XLogCtl->ThisStartUpID = ThisStartUpID;

2491
	elog(LOG, "database system is in production state");
2492
	CritSectionCount--;
2493

T
Tom Lane 已提交
2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546
	/* Shut down readFile facility, free space */
	if (readFile >= 0)
	{
		close(readFile);
		readFile = -1;
	}
	if (readBuf)
	{
		free(readBuf);
		readBuf = NULL;
	}

	free(buffer);
}

/* Subroutine to try to fetch and validate a prior checkpoint record */
static XLogRecord *
ReadCheckpointRecord(XLogRecPtr RecPtr,
					 const char *whichChkpt,
					 char *buffer)
{
	XLogRecord *record;

	if (!XRecOffIsValid(RecPtr.xrecoff))
	{
		elog(LOG, "Invalid %s checkPoint link in control file", whichChkpt);
		return NULL;
	}

	record = ReadRecord(&RecPtr, LOG, buffer);

	if (record == NULL)
	{
		elog(LOG, "Invalid %s checkPoint record", whichChkpt);
		return NULL;
	}
	if (record->xl_rmid != RM_XLOG_ID)
	{
		elog(LOG, "Invalid RMID in %s checkPoint record", whichChkpt);
		return NULL;
	}
	if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
		record->xl_info != XLOG_CHECKPOINT_ONLINE)
	{
		elog(LOG, "Invalid xl_info in %s checkPoint record", whichChkpt);
		return NULL;
	}
	if (record->xl_len != sizeof(CheckPoint))
	{
		elog(LOG, "Invalid length of %s checkPoint record", whichChkpt);
		return NULL;
	}
	return record;
2547 2548
}

V
WAL  
Vadim B. Mikheev 已提交
2549
/*
T
Tom Lane 已提交
2550
 * Postmaster uses this to initialize ThisStartUpID & RedoRecPtr from
2551
 * XLogCtlData located in shmem after successful startup.
V
WAL  
Vadim B. Mikheev 已提交
2552 2553 2554 2555 2556
 */
void
SetThisStartUpID(void)
{
	ThisStartUpID = XLogCtl->ThisStartUpID;
2557 2558 2559 2560
	RedoRecPtr = XLogCtl->RedoRecPtr;
}

/*
T
Tom Lane 已提交
2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571
 * CheckPoint process called by postmaster saves copy of new RedoRecPtr
 * in shmem (using SetRedoRecPtr).  When checkpointer completes, postmaster
 * calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
 * subsequently-spawned backends will start out with a reasonably up-to-date
 * local RedoRecPtr.  Since these operations are not protected by any spinlock
 * and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
 * routines at other times!
 *
 * Note: once spawned, a backend must update its local RedoRecPtr from
 * XLogCtl->Insert.RedoRecPtr while holding the insert spinlock.  This is
 * done in XLogInsert().
2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582
 */
void
SetRedoRecPtr(void)
{
	XLogCtl->RedoRecPtr = RedoRecPtr;
}

void
GetRedoRecPtr(void)
{
	RedoRecPtr = XLogCtl->RedoRecPtr;
V
WAL  
Vadim B. Mikheev 已提交
2583 2584
}

2585
/*
T
Tom Lane 已提交
2586
 * This must be called ONCE during postmaster or standalone-backend shutdown
2587 2588
 */
void
T
Tom Lane 已提交
2589
ShutdownXLOG(void)
2590
{
2591
	elog(LOG, "shutting down");
2592

T
Tom Lane 已提交
2593 2594 2595
	/* suppress in-transaction check in CreateCheckPoint */
	MyLastRecPtr.xrecoff = 0;

2596
	CritSectionCount++;
V
Vadim B. Mikheev 已提交
2597
	CreateDummyCaches();
2598
	CreateCheckPoint(true);
2599
	CritSectionCount--;
2600

2601
	elog(LOG, "database system is shut down");
2602 2603
}

T
Tom Lane 已提交
2604 2605 2606
/*
 * Perform a checkpoint --- either during shutdown, or on-the-fly
 */
2607 2608 2609
void
CreateCheckPoint(bool shutdown)
{
2610 2611 2612
	CheckPoint	checkPoint;
	XLogRecPtr	recptr;
	XLogCtlInsert *Insert = &XLogCtl->Insert;
2613
	XLogRecData	rdata;
2614
	uint32		freespace;
V
Vadim B. Mikheev 已提交
2615 2616
	uint32		_logId;
	uint32		_logSeg;
2617
	unsigned	spins = 0;
V
Vadim B. Mikheev 已提交
2618 2619 2620 2621

	if (MyLastRecPtr.xrecoff != 0)
		elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
 
2622
	START_CRIT_SECTION();
2623 2624

	/* Grab lock, using larger than normal sleep between tries (1 sec) */
V
Vadim B. Mikheev 已提交
2625 2626
	while (TAS(&(XLogCtl->chkp_lck)))
	{
2627 2628
		S_LOCK_SLEEP_INTERVAL(&(XLogCtl->chkp_lck), spins++,
							  CHECKPOINT_LOCK_TIMEOUT, 1000000);
V
Vadim B. Mikheev 已提交
2629
	}
2630 2631 2632 2633 2634 2635 2636

	if (shutdown)
	{
		ControlFile->state = DB_SHUTDOWNING;
		ControlFile->time = time(NULL);
		UpdateControlFile();
	}
T
Tom Lane 已提交
2637 2638

	memset(&checkPoint, 0, sizeof(checkPoint));
V
WAL  
Vadim B. Mikheev 已提交
2639
	checkPoint.ThisStartUpID = ThisStartUpID;
T
Tom Lane 已提交
2640
	checkPoint.time = time(NULL);
2641

2642
	S_LOCK(&(XLogCtl->insert_lck));
T
Tom Lane 已提交
2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687

	/*
	 * If this isn't a shutdown, and we have not inserted any XLOG records
	 * since the start of the last checkpoint, skip the checkpoint.  The
	 * idea here is to avoid inserting duplicate checkpoints when the system
	 * is idle.  That wastes log space, and more importantly it exposes us to
	 * possible loss of both current and previous checkpoint records if the
	 * machine crashes just as we're writing the update.  (Perhaps it'd make
	 * even more sense to checkpoint only when the previous checkpoint record
	 * is in a different xlog page?)
	 *
	 * We have to make two tests to determine that nothing has happened since
	 * the start of the last checkpoint: current insertion point must match
	 * the end of the last checkpoint record, and its redo pointer must point
	 * to itself.
	 */
	if (!shutdown)
	{
		XLogRecPtr	curInsert;

		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
		if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
			curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
			MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
			ControlFile->checkPoint.xlogid ==
			ControlFile->checkPointCopy.redo.xlogid &&
			ControlFile->checkPoint.xrecoff ==
			ControlFile->checkPointCopy.redo.xrecoff)
		{
			S_UNLOCK(&(XLogCtl->insert_lck));
			S_UNLOCK(&(XLogCtl->chkp_lck));
			END_CRIT_SECTION();
			return;
		}
	}

	/*
	 * Compute new REDO record ptr = location of next XLOG record.
	 *
	 * NB: this is NOT necessarily where the checkpoint record itself will
	 * be, since other backends may insert more XLOG records while we're
	 * off doing the buffer flush work.  Those XLOG records are logically
	 * after the checkpoint, even though physically before it.  Got that?
	 */
	freespace = INSERT_FREESPACE(Insert);
2688 2689
	if (freespace < SizeOfXLogRecord)
	{
T
Tom Lane 已提交
2690 2691
		(void) AdvanceXLInsertBuffer();
		/* OK to ignore update return flag, since we will do flush anyway */
2692 2693
		freespace = BLCKSZ - SizeOfXLogPHD;
	}
T
Tom Lane 已提交
2694 2695 2696 2697 2698
	INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
	/*
	 * Here we update the shared RedoRecPtr for future XLogInsert calls;
	 * this must be done while holding the insert lock.
	 */
2699
	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
T
Tom Lane 已提交
2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714
	/*
	 * Get UNDO record ptr - this is oldest of PROC->logRec values.
	 * We do this while holding insert lock to ensure that we won't miss
	 * any about-to-commit transactions (UNDO must include all xacts that
	 * have commits after REDO point).
	 */
	checkPoint.undo = GetUndoRecPtr();

	if (shutdown && checkPoint.undo.xrecoff != 0)
		elog(STOP, "Active transaction while data base is shutting down");

	/*
	 * Now we can release insert lock, allowing other xacts to proceed
	 * even while we are flushing disk buffers.
	 */
2715 2716 2717 2718
	S_UNLOCK(&(XLogCtl->insert_lck));

	SpinAcquire(XidGenLockId);
	checkPoint.nextXid = ShmemVariableCache->nextXid;
T
Tom Lane 已提交
2719 2720
	if (!shutdown)
		checkPoint.nextXid += ShmemVariableCache->xidCount;
2721
	SpinRelease(XidGenLockId);
T
Tom Lane 已提交
2722

2723 2724
	SpinAcquire(OidGenLockId);
	checkPoint.nextOid = ShmemVariableCache->nextOid;
2725 2726
	if (!shutdown)
		checkPoint.nextOid += ShmemVariableCache->oidCount;
2727 2728
	SpinRelease(OidGenLockId);

T
Tom Lane 已提交
2729 2730 2731 2732
	/*
	 * Having constructed the checkpoint record, ensure all shmem disk buffers
	 * are flushed to disk.
	 */
V
Vadim B. Mikheev 已提交
2733
	FlushBufferPool();
2734

T
Tom Lane 已提交
2735 2736 2737
	/*
	 * Now insert the checkpoint record into XLOG.
	 */
2738 2739 2740 2741 2742
	rdata.buffer = InvalidBuffer;
	rdata.data = (char *)(&checkPoint);
	rdata.len = sizeof(checkPoint);
	rdata.next = NULL;

T
Tom Lane 已提交
2743 2744 2745 2746 2747 2748
	recptr = XLogInsert(RM_XLOG_ID,
						shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
						XLOG_CHECKPOINT_ONLINE,
						&rdata);

	XLogFlush(recptr);
2749

T
Tom Lane 已提交
2750 2751 2752 2753 2754
	/*
	 * We now have ProcLastRecPtr = start of actual checkpoint record,
	 * recptr = end of actual checkpoint record.
	 */
	if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
2755 2756
		elog(STOP, "XLog concurrent activity while data base is shutting down");

T
Tom Lane 已提交
2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767
	/*
	 * Remember location of prior checkpoint's earliest info.
	 * Oldest item is redo or undo, whichever is older; but watch out
	 * for case that undo = 0.
	 */
	if (ControlFile->checkPointCopy.undo.xrecoff != 0 && 
		XLByteLT(ControlFile->checkPointCopy.undo,
				 ControlFile->checkPointCopy.redo))
		XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg);
	else
		XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
2768

T
Tom Lane 已提交
2769 2770 2771
	/*
	 * Update the control file.
	 */
2772 2773 2774
	SpinAcquire(ControlFileLockId);
	if (shutdown)
		ControlFile->state = DB_SHUTDOWNED;
T
Tom Lane 已提交
2775 2776 2777
	ControlFile->prevCheckPoint = ControlFile->checkPoint;
	ControlFile->checkPoint = ProcLastRecPtr;
	ControlFile->checkPointCopy = checkPoint;
2778 2779 2780 2781
	ControlFile->time = time(NULL);
	UpdateControlFile();
	SpinRelease(ControlFileLockId);

V
Vadim B. Mikheev 已提交
2782
	/*
T
Tom Lane 已提交
2783 2784
	 * Delete offline log files (those no longer needed even for previous
	 * checkpoint).
V
Vadim B. Mikheev 已提交
2785 2786 2787
	 */
	if (_logId || _logSeg)
	{
T
Tom Lane 已提交
2788 2789
		PrevLogSeg(_logId, _logSeg);
		MoveOfflineLogs(_logId, _logSeg);
V
Vadim B. Mikheev 已提交
2790 2791
	}

T
Tom Lane 已提交
2792 2793 2794 2795 2796 2797 2798 2799
	/*
	 * Make more log segments if needed.  (Do this after deleting offline
	 * log segments, to avoid having peak disk space usage higher than
	 * necessary.)
	 */
	if (!shutdown)
		PreallocXlogFiles(recptr);

V
Vadim B. Mikheev 已提交
2800 2801
	S_UNLOCK(&(XLogCtl->chkp_lck));

2802
	END_CRIT_SECTION();
2803
}
V
WAL  
Vadim B. Mikheev 已提交
2804

T
Tom Lane 已提交
2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818
/*
 * Write a NEXTXID log record
 */
void
XLogPutNextXid(TransactionId nextXid)
{
	XLogRecData		rdata;

	rdata.buffer = InvalidBuffer;
	rdata.data = (char *)(&nextXid);
	rdata.len = sizeof(TransactionId);
	rdata.next = NULL;
	(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTXID, &rdata);
}
2819

T
Tom Lane 已提交
2820 2821 2822
/*
 * Write a NEXTOID log record
 */
2823 2824 2825
void
XLogPutNextOid(Oid nextOid)
{
2826
	XLogRecData		rdata;
2827

2828 2829 2830 2831 2832 2833
	rdata.buffer = InvalidBuffer;
	rdata.data = (char *)(&nextOid);
	rdata.len = sizeof(Oid);
	rdata.next = NULL;
	(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
}
V
WAL  
Vadim B. Mikheev 已提交
2834

T
Tom Lane 已提交
2835 2836 2837
/*
 * XLOG resource manager's routines
 */
V
WAL  
Vadim B. Mikheev 已提交
2838 2839 2840
void
xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
2841 2842
	uint8	info = record->xl_info & ~XLR_INFO_MASK;

T
Tom Lane 已提交
2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854
	if (info == XLOG_NEXTXID)
	{
		TransactionId		nextXid;

		memcpy(&nextXid, XLogRecGetData(record), sizeof(TransactionId));
		if (ShmemVariableCache->nextXid < nextXid)
		{
			ShmemVariableCache->nextXid = nextXid;
			ShmemVariableCache->xidCount = 0;
		}
	}
	else if (info == XLOG_NEXTOID)
2855 2856 2857 2858 2859
	{
		Oid		nextOid;

		memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
		if (ShmemVariableCache->nextOid < nextOid)
T
Tom Lane 已提交
2860
		{
2861
			ShmemVariableCache->nextOid = nextOid;
T
Tom Lane 已提交
2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891
			ShmemVariableCache->oidCount = 0;
		}
	}
	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
	{
		CheckPoint	checkPoint;

		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
		/* In a SHUTDOWN checkpoint, believe the counters exactly */
		ShmemVariableCache->nextXid = checkPoint.nextXid;
		ShmemVariableCache->xidCount = 0;
		ShmemVariableCache->nextOid = checkPoint.nextOid;
		ShmemVariableCache->oidCount = 0;
	}
	else if (info == XLOG_CHECKPOINT_ONLINE)
	{
		CheckPoint	checkPoint;

		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
		/* In an ONLINE checkpoint, treat the counters like NEXTXID/NEXTOID */
		if (ShmemVariableCache->nextXid < checkPoint.nextXid)
		{
			ShmemVariableCache->nextXid = checkPoint.nextXid;
			ShmemVariableCache->xidCount = 0;
		}
		if (ShmemVariableCache->nextOid < checkPoint.nextOid)
		{
			ShmemVariableCache->nextOid = checkPoint.nextOid;
			ShmemVariableCache->oidCount = 0;
		}
2892
	}
V
WAL  
Vadim B. Mikheev 已提交
2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904
}
 
void
xlog_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
 
void
xlog_desc(char *buf, uint8 xl_info, char* rec)
{
	uint8	info = xl_info & ~XLR_INFO_MASK;

T
Tom Lane 已提交
2905 2906
	if (info == XLOG_CHECKPOINT_SHUTDOWN ||
		info == XLOG_CHECKPOINT_ONLINE)
V
WAL  
Vadim B. Mikheev 已提交
2907 2908 2909 2910 2911 2912 2913 2914
	{
		CheckPoint	*checkpoint = (CheckPoint*) rec;
		sprintf(buf + strlen(buf), "checkpoint: redo %u/%u; undo %u/%u; "
		"sui %u; xid %u; oid %u; %s",
			checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
			checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
			checkpoint->ThisStartUpID, checkpoint->nextXid, 
			checkpoint->nextOid,
T
Tom Lane 已提交
2915 2916 2917 2918 2919 2920 2921 2922
			(info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
	}
	else if (info == XLOG_NEXTXID)
	{
		TransactionId		nextXid;

		memcpy(&nextXid, rec, sizeof(TransactionId));
		sprintf(buf + strlen(buf), "nextXid: %u", nextXid);
V
WAL  
Vadim B. Mikheev 已提交
2923
	}
2924 2925 2926 2927 2928 2929 2930
	else if (info == XLOG_NEXTOID)
	{
		Oid		nextOid;

		memcpy(&nextOid, rec, sizeof(Oid));
		sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
	}
V
WAL  
Vadim B. Mikheev 已提交
2931 2932 2933 2934 2935 2936 2937
	else
		strcat(buf, "UNKNOWN");
}

static void
xlog_outrec(char *buf, XLogRecord *record)
{
2938 2939 2940 2941
	int		bkpb;
	int		i;

	sprintf(buf + strlen(buf), "prev %u/%u; xprev %u/%u; xid %u",
V
WAL  
Vadim B. Mikheev 已提交
2942 2943
		record->xl_prev.xlogid, record->xl_prev.xrecoff,
		record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
2944 2945
		record->xl_xid);

T
Tom Lane 已提交
2946
	for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2947 2948 2949 2950 2951 2952 2953 2954 2955 2956
	{
		if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
			continue;
		bkpb++;
	}

	if (bkpb)
		sprintf(buf + strlen(buf), "; bkpb %d", bkpb);

	sprintf(buf + strlen(buf), ": %s",
V
WAL  
Vadim B. Mikheev 已提交
2957 2958
		RmgrTable[record->xl_rmid].rm_name);
}
2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075


/*
 * GUC support routines
 */

bool
check_xlog_sync_method(const char *method)
{
	if (strcasecmp(method, "fsync") == 0) return true;
#ifdef HAVE_FDATASYNC
	if (strcasecmp(method, "fdatasync") == 0) return true;
#endif
#ifdef OPEN_SYNC_FLAG
	if (strcasecmp(method, "open_sync") == 0) return true;
#endif
#ifdef OPEN_DATASYNC_FLAG
	if (strcasecmp(method, "open_datasync") == 0) return true;
#endif
	return false;
}

void
assign_xlog_sync_method(const char *method)
{
	int		new_sync_method;
	int		new_sync_bit;

	if (strcasecmp(method, "fsync") == 0)
	{
		new_sync_method = SYNC_METHOD_FSYNC;
		new_sync_bit = 0;
	}
#ifdef HAVE_FDATASYNC
	else if (strcasecmp(method, "fdatasync") == 0)
	{
		new_sync_method = SYNC_METHOD_FDATASYNC;
		new_sync_bit = 0;
	}
#endif
#ifdef OPEN_SYNC_FLAG
	else if (strcasecmp(method, "open_sync") == 0)
	{
		new_sync_method = SYNC_METHOD_OPEN;
		new_sync_bit = OPEN_SYNC_FLAG;
	}
#endif
#ifdef OPEN_DATASYNC_FLAG
	else if (strcasecmp(method, "open_datasync") == 0)
	{
		new_sync_method = SYNC_METHOD_OPEN;
		new_sync_bit = OPEN_DATASYNC_FLAG;
	}
#endif
	else
	{
		/* Can't get here unless guc.c screwed up */
		elog(ERROR, "Bogus xlog sync method %s", method);
		new_sync_method = 0;	/* keep compiler quiet */
		new_sync_bit = 0;
	}

	if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
	{
		/*
		 * To ensure that no blocks escape unsynced, force an fsync on
		 * the currently open log segment (if any).  Also, if the open
		 * flag is changing, close the log file so it will be reopened
		 * (with new flag bit) at next use.
		 */
		if (openLogFile >= 0)
		{
			if (pg_fsync(openLogFile) != 0)
				elog(STOP, "fsync(logfile %u seg %u) failed: %m",
					 openLogId, openLogSeg);
			if (open_sync_bit != new_sync_bit)
			{
				if (close(openLogFile) != 0)
					elog(STOP, "close(logfile %u seg %u) failed: %m",
						 openLogId, openLogSeg);
				openLogFile = -1;
			}
		}
		sync_method = new_sync_method;
		open_sync_bit = new_sync_bit;
	}
}


/*
 * Issue appropriate kind of fsync (if any) on the current XLOG output file
 */
static void
issue_xlog_fsync(void)
{
	switch (sync_method)
	{
		case SYNC_METHOD_FSYNC:
			if (pg_fsync(openLogFile) != 0)
				elog(STOP, "fsync(logfile %u seg %u) failed: %m",
					 openLogId, openLogSeg);
			break;
#ifdef HAVE_FDATASYNC
		case SYNC_METHOD_FDATASYNC:
			if (pg_fdatasync(openLogFile) != 0)
				elog(STOP, "fdatasync(logfile %u seg %u) failed: %m",
					 openLogId, openLogSeg);
			break;
#endif
		case SYNC_METHOD_OPEN:
			/* write synced it already */
			break;
		default:
			elog(STOP, "bogus sync_method %d", sync_method);
			break;
	}
}