xlog.c 87.2 KB
Newer Older
1
/*-------------------------------------------------------------------------
2 3
 *
 * xlog.c
4
 *		PostgreSQL transaction log manager
5 6
 *
 *
7
 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.66 2001/05/22 16:52:49 momjian Exp $
11 12 13
 *
 *-------------------------------------------------------------------------
 */
14

15 16
#include "postgres.h"

17
#include <fcntl.h>
T
Tom Lane 已提交
18
#include <signal.h>
19 20 21
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
V
Vadim B. Mikheev 已提交
22
#include <sys/time.h>
V
Vadim B. Mikheev 已提交
23 24
#include <sys/types.h>
#include <dirent.h>
25 26 27
#ifdef USE_LOCALE
#include <locale.h>
#endif
28

29
#include "access/transam.h"
30
#include "access/xact.h"
31
#include "catalog/catversion.h"
T
Tom Lane 已提交
32
#include "catalog/pg_control.h"
33 34 35 36
#include "storage/sinval.h"
#include "storage/proc.h"
#include "storage/spin.h"
#include "storage/s_lock.h"
37
#include "storage/bufpage.h"
V
Vadim B. Mikheev 已提交
38 39
#include "access/xlog.h"
#include "access/xlogutils.h"
40
#include "utils/builtins.h"
41
#include "utils/relcache.h"
V
WAL  
Vadim B. Mikheev 已提交
42 43
#include "miscadmin.h"

44

45 46 47
/*
 * This chunk of hackery attempts to determine which file sync methods
 * are available on the current platform, and to choose an appropriate
B
Bruce Momjian 已提交
48
 * default method.	We assume that fsync() is always available, and that
49 50 51 52
 * configure determined whether fdatasync() is.
 */
#define SYNC_METHOD_FSYNC		0
#define SYNC_METHOD_FDATASYNC	1
B
Bruce Momjian 已提交
53 54
#define SYNC_METHOD_OPEN		2		/* used for both O_SYNC and
										 * O_DSYNC */
55 56

#if defined(O_SYNC)
B
Bruce Momjian 已提交
57
#define OPEN_SYNC_FLAG	   O_SYNC
58
#else
B
Bruce Momjian 已提交
59 60 61
#if defined(O_FSYNC)
#define OPEN_SYNC_FLAG	  O_FSYNC
#endif
62 63 64
#endif

#if defined(OPEN_SYNC_FLAG)
B
Bruce Momjian 已提交
65 66 67
#if defined(O_DSYNC) && (O_DSYNC != OPEN_SYNC_FLAG)
#define OPEN_DATASYNC_FLAG	  O_DSYNC
#endif
68 69 70
#endif

#if defined(OPEN_DATASYNC_FLAG)
B
Bruce Momjian 已提交
71 72 73
#define DEFAULT_SYNC_METHOD_STR    "open_datasync"
#define DEFAULT_SYNC_METHOD		   SYNC_METHOD_OPEN
#define DEFAULT_SYNC_FLAGBIT	   OPEN_DATASYNC_FLAG
74
#else
B
Bruce Momjian 已提交
75 76 77 78 79 80 81 82 83
#if defined(HAVE_FDATASYNC)
#define DEFAULT_SYNC_METHOD_STR   "fdatasync"
#define DEFAULT_SYNC_METHOD		  SYNC_METHOD_FDATASYNC
#define DEFAULT_SYNC_FLAGBIT	  0
#else
#define DEFAULT_SYNC_METHOD_STR   "fsync"
#define DEFAULT_SYNC_METHOD		  SYNC_METHOD_FSYNC
#define DEFAULT_SYNC_FLAGBIT	  0
#endif
84 85 86
#endif


87
/* Max time to wait to acquire XLog activity locks */
B
Bruce Momjian 已提交
88
#define XLOG_LOCK_TIMEOUT			(5*60*1000000)		/* 5 minutes */
89
/* Max time to wait to acquire checkpoint lock */
B
Bruce Momjian 已提交
90
#define CHECKPOINT_LOCK_TIMEOUT		(20*60*1000000)		/* 20 minutes */
91

T
Tom Lane 已提交
92 93
/* User-settable parameters */
int			CheckPointSegments = 3;
V
Vadim B. Mikheev 已提交
94
int			XLOGbuffers = 8;
B
Bruce Momjian 已提交
95 96
int			XLOGfiles = 0;		/* how many files to pre-allocate during
								 * ckpt */
T
Tom Lane 已提交
97
int			XLOG_DEBUG = 0;
98 99
char	   *XLOG_sync_method = NULL;
const char	XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
B
Bruce Momjian 已提交
100 101
char		XLOG_archive_dir[MAXPGPATH];		/* null string means
												 * delete 'em */
T
Tom Lane 已提交
102

103 104 105 106
/* these are derived from XLOG_sync_method by assign_xlog_sync_method */
static int	sync_method = DEFAULT_SYNC_METHOD;
static int	open_sync_bit = DEFAULT_SYNC_FLAGBIT;

T
Tom Lane 已提交
107 108
#define MinXLOGbuffers	4

109 110
#define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)

T
Tom Lane 已提交
111 112 113 114 115

/*
 * ThisStartUpID will be same in all backends --- it identifies current
 * instance of the database system.
 */
V
WAL  
Vadim B. Mikheev 已提交
116 117
StartUpID	ThisStartUpID = 0;

T
Tom Lane 已提交
118 119
/* Are we doing recovery by reading XLOG? */
bool		InRecovery = false;
120

T
Tom Lane 已提交
121 122 123 124 125 126 127 128 129
/*
 * MyLastRecPtr points to the start of the last XLOG record inserted by the
 * current transaction.  If MyLastRecPtr.xrecoff == 0, then we are not in
 * a transaction or the transaction has not yet made any loggable changes.
 *
 * Note that XLOG records inserted outside transaction control are not
 * reflected into MyLastRecPtr.
 */
XLogRecPtr	MyLastRecPtr = {0, 0};
V
Vadim B. Mikheev 已提交
130

T
Tom Lane 已提交
131 132 133 134 135 136
/*
 * ProcLastRecPtr points to the start of the last XLOG record inserted by the
 * current backend.  It is updated for all inserts, transaction-controlled
 * or not.
 */
static XLogRecPtr ProcLastRecPtr = {0, 0};
137

T
Tom Lane 已提交
138 139 140
/*
 * RedoRecPtr is this backend's local copy of the REDO record pointer
 * (which is almost but not quite the same as a pointer to the most recent
B
Bruce Momjian 已提交
141
 * CHECKPOINT record).	We update this from the shared-memory copy,
T
Tom Lane 已提交
142 143 144 145
 * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
 * hold the Insert spinlock).  See XLogInsert for details.
 */
static XLogRecPtr RedoRecPtr;
146

T
Tom Lane 已提交
147 148
/* This lock must be held to read/update control file or create new log file */
SPINLOCK	ControlFileLockId;
149

T
Tom Lane 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
/*----------
 * Shared-memory data structures for XLOG control
 *
 * LogwrtRqst indicates a byte position that we need to write and/or fsync
 * the log up to (all records before that point must be written or fsynced).
 * LogwrtResult indicates the byte positions we have already written/fsynced.
 * These structs are identical but are declared separately to indicate their
 * slightly different functions.
 *
 * We do a lot of pushups to minimize the amount of access to spinlocked
 * shared memory values.  There are actually three shared-memory copies of
 * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
 *		XLogCtl->LogwrtResult is protected by info_lck
 *		XLogCtl->Write.LogwrtResult is protected by logwrt_lck
 *		XLogCtl->Insert.LogwrtResult is protected by insert_lck
 * One must hold the associated spinlock to read or write any of these, but
 * of course no spinlock is needed to read/write the unshared LogwrtResult.
 *
 * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
 * right", since both are updated by a write or flush operation before
B
Bruce Momjian 已提交
170
 * it releases logwrt_lck.	The point of keeping XLogCtl->Write.LogwrtResult
T
Tom Lane 已提交
171 172 173 174
 * is that it can be examined/modified by code that already holds logwrt_lck
 * without needing to grab info_lck as well.
 *
 * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
B
Bruce Momjian 已提交
175
 * but is updated when convenient.	Again, it exists for the convenience of
T
Tom Lane 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189
 * code that is already holding insert_lck but not the other locks.
 *
 * The unshared LogwrtResult may lag behind any or all of these, and again
 * is updated when convenient.
 *
 * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
 * (protected by info_lck), but we don't need to cache any copies of it.
 *
 * Note that this all works because the request and result positions can only
 * advance forward, never back up, and so we can easily determine which of two
 * values is "more up to date".
 *----------
 */
typedef struct XLogwrtRqst
190
{
T
Tom Lane 已提交
191 192
	XLogRecPtr	Write;			/* last byte + 1 to write out */
	XLogRecPtr	Flush;			/* last byte + 1 to flush */
B
Bruce Momjian 已提交
193
}			XLogwrtRqst;
194

T
Tom Lane 已提交
195
typedef struct XLogwrtResult
196
{
T
Tom Lane 已提交
197 198
	XLogRecPtr	Write;			/* last byte + 1 written out */
	XLogRecPtr	Flush;			/* last byte + 1 flushed */
B
Bruce Momjian 已提交
199
}			XLogwrtResult;
200

T
Tom Lane 已提交
201 202 203
/*
 * Shared state data for XLogInsert.
 */
204 205
typedef struct XLogCtlInsert
{
B
Bruce Momjian 已提交
206 207 208 209 210 211
	XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
	XLogRecPtr	PrevRecord;		/* start of previously-inserted record */
	uint16		curridx;		/* current block index in cache */
	XLogPageHeader currpage;	/* points to header of block in cache */
	char	   *currpos;		/* current insertion point in cache */
	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
212 213
} XLogCtlInsert;

T
Tom Lane 已提交
214 215 216
/*
 * Shared state data for XLogWrite/XLogFlush.
 */
217 218
typedef struct XLogCtlWrite
{
B
Bruce Momjian 已提交
219 220
	XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
	uint16		curridx;		/* cache index of next block to write */
221 222
} XLogCtlWrite;

T
Tom Lane 已提交
223 224 225
/*
 * Total shared-memory state for XLOG.
 */
226 227
typedef struct XLogCtlData
{
T
Tom Lane 已提交
228
	/* Protected by insert_lck: */
B
Bruce Momjian 已提交
229
	XLogCtlInsert Insert;
T
Tom Lane 已提交
230
	/* Protected by info_lck: */
B
Bruce Momjian 已提交
231 232
	XLogwrtRqst LogwrtRqst;
	XLogwrtResult LogwrtResult;
T
Tom Lane 已提交
233
	/* Protected by logwrt_lck: */
B
Bruce Momjian 已提交
234 235
	XLogCtlWrite Write;

T
Tom Lane 已提交
236 237
	/*
	 * These values do not change after startup, although the pointed-to
B
Bruce Momjian 已提交
238
	 * pages and xlblocks values certainly do.	Permission to read/write
T
Tom Lane 已提交
239 240
	 * the pages and xlblocks values depends on insert_lck and logwrt_lck.
	 */
B
Bruce Momjian 已提交
241 242 243 244 245
	char	   *pages;			/* buffers for unwritten XLOG pages */
	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + BLCKSZ */
	uint32		XLogCacheByte;	/* # bytes in xlog buffers */
	uint32		XLogCacheBlck;	/* highest allocated xlog buffer index */
	StartUpID	ThisStartUpID;
T
Tom Lane 已提交
246 247

	/* This value is not protected by *any* spinlock... */
B
Bruce Momjian 已提交
248
	XLogRecPtr	RedoRecPtr;		/* see SetRedoRecPtr/GetRedoRecPtr */
T
Tom Lane 已提交
249

B
Bruce Momjian 已提交
250 251 252 253
	slock_t		insert_lck;		/* XLogInsert lock */
	slock_t		info_lck;		/* locks shared LogwrtRqst/LogwrtResult */
	slock_t		logwrt_lck;		/* XLogWrite/XLogFlush lock */
	slock_t		chkp_lck;		/* checkpoint lock */
254 255
} XLogCtlData;

256
static XLogCtlData *XLogCtl = NULL;
257

258
/*
T
Tom Lane 已提交
259
 * We maintain an image of pg_control in shared memory.
260
 */
261
static ControlFileData *ControlFile = NULL;
262

T
Tom Lane 已提交
263 264 265 266 267
/*
 * Macros for managing XLogInsert state.  In most cases, the calling routine
 * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
 * so these are passed as parameters instead of being fetched via XLogCtl.
 */
268

T
Tom Lane 已提交
269 270 271 272 273 274 275 276 277
/* Free space remaining in the current xlog page buffer */
#define INSERT_FREESPACE(Insert)  \
	(BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))

/* Construct XLogRecPtr value for current insertion point */
#define INSERT_RECPTR(recptr,Insert,curridx)  \
	( \
	  (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
	  (recptr).xrecoff = \
B
Bruce Momjian 已提交
278
		XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
T
Tom Lane 已提交
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
	)


/* Increment an xlogid/segment pair */
#define NextLogSeg(logId, logSeg)	\
	do { \
		if ((logSeg) >= XLogSegsPerFile-1) \
		{ \
			(logId)++; \
			(logSeg) = 0; \
		} \
		else \
			(logSeg)++; \
	} while (0)

/* Decrement an xlogid/segment pair (assume it's not 0,0) */
#define PrevLogSeg(logId, logSeg)	\
	do { \
		if (logSeg) \
			(logSeg)--; \
		else \
		{ \
			(logId)--; \
			(logSeg) = XLogSegsPerFile-1; \
		} \
	} while (0)
V
WAL  
Vadim B. Mikheev 已提交
305

T
Tom Lane 已提交
306 307 308 309
/*
 * Compute ID and segment from an XLogRecPtr.
 *
 * For XLByteToSeg, do the computation at face value.  For XLByteToPrevSeg,
B
Bruce Momjian 已提交
310
 * a boundary byte is taken to be in the previous segment.	This is suitable
T
Tom Lane 已提交
311 312 313 314 315 316 317 318 319 320 321
 * for deciding which segment to write given a pointer to a record end,
 * for example.
 */
#define XLByteToSeg(xlrp, logId, logSeg)	\
	( logId = (xlrp).xlogid, \
	  logSeg = (xlrp).xrecoff / XLogSegSize \
	)
#define XLByteToPrevSeg(xlrp, logId, logSeg)	\
	( logId = (xlrp).xlogid, \
	  logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \
	)
322

323
/*
T
Tom Lane 已提交
324 325 326 327
 * Is an XLogRecPtr within a particular XLOG segment?
 *
 * For XLByteInSeg, do the computation at face value.  For XLByteInPrevSeg,
 * a boundary byte is taken to be in the previous segment.
328
 */
T
Tom Lane 已提交
329 330 331 332 333 334 335
#define XLByteInSeg(xlrp, logId, logSeg)	\
	((xlrp).xlogid == (logId) && \
	 (xlrp).xrecoff / XLogSegSize == (logSeg))

#define XLByteInPrevSeg(xlrp, logId, logSeg)	\
	((xlrp).xlogid == (logId) && \
	 ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg))
336 337


338
#define XLogFileName(path, log, seg)	\
339 340
			snprintf(path, MAXPGPATH, "%s%c%08X%08X",	\
					 XLogDir, SEP_CHAR, log, seg)
341

T
Tom Lane 已提交
342 343 344 345 346
#define PrevBufIdx(idx)		\
		(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))

#define NextBufIdx(idx)		\
		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
347

348
#define XRecOffIsValid(xrecoff) \
T
Tom Lane 已提交
349 350
		((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \
		(BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord)
351

T
Tom Lane 已提交
352 353 354 355 356 357
/*
 * _INTL_MAXLOGRECSZ: max space needed for a record including header and
 * any backup-block data.
 */
#define _INTL_MAXLOGRECSZ	(SizeOfXLogRecord + MAXLOGRECSZ + \
							 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
358

359

T
Tom Lane 已提交
360
/* File path names */
B
Bruce Momjian 已提交
361 362
static char XLogDir[MAXPGPATH];
static char ControlFilePath[MAXPGPATH];
T
Tom Lane 已提交
363 364 365 366 367 368

/*
 * Private, possibly out-of-date copy of shared LogwrtResult.
 * See discussion above.
 */
static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
369

T
Tom Lane 已提交
370 371 372 373 374 375 376 377 378 379
/*
 * openLogFile is -1 or a kernel FD for an open log file segment.
 * When it's open, openLogOff is the current seek offset in the file.
 * openLogId/openLogSeg identify the segment.  These variables are only
 * used to write the XLOG, and so will normally refer to the active segment.
 */
static int	openLogFile = -1;
static uint32 openLogId = 0;
static uint32 openLogSeg = 0;
static uint32 openLogOff = 0;
380

T
Tom Lane 已提交
381 382 383 384 385 386
/*
 * These variables are used similarly to the ones above, but for reading
 * the XLOG.  Note, however, that readOff generally represents the offset
 * of the page just read, not the seek position of the FD itself, which
 * will be just past that page.
 */
387 388 389 390
static int	readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
B
Bruce Momjian 已提交
391

T
Tom Lane 已提交
392 393
/* Buffer for currently read page (BLCKSZ bytes) */
static char *readBuf = NULL;
B
Bruce Momjian 已提交
394

T
Tom Lane 已提交
395 396 397
/* State information for XLOG reading */
static XLogRecPtr ReadRecPtr;
static XLogRecPtr EndRecPtr;
398
static XLogRecord *nextRecord = NULL;
399
static StartUpID lastReadSUI;
400

V
WAL  
Vadim B. Mikheev 已提交
401 402
static bool InRedo = false;

T
Tom Lane 已提交
403 404 405

static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst);
B
Bruce Momjian 已提交
406 407
static int XLogFileInit(uint32 log, uint32 seg,
			 bool *use_existent, bool use_lock);
T
Tom Lane 已提交
408 409 410 411
static int	XLogFileOpen(uint32 log, uint32 seg, bool econt);
static void PreallocXlogFiles(XLogRecPtr endptr);
static void MoveOfflineLogs(uint32 log, uint32 seg);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
412
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
T
Tom Lane 已提交
413
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
B
Bruce Momjian 已提交
414 415
					 const char *whichChkpt,
					 char *buffer);
T
Tom Lane 已提交
416 417 418 419
static void WriteControlFile(void);
static void ReadControlFile(void);
static char *str_time(time_t tnow);
static void xlog_outrec(char *buf, XLogRecord *record);
420
static void issue_xlog_fsync(void);
T
Tom Lane 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437


/*
 * Insert an XLOG record having the specified RMID and info bytes,
 * with the body of the record being the data chunk(s) described by
 * the rdata list (see xlog.h for notes about rdata).
 *
 * Returns XLOG pointer to end of record (beginning of next record).
 * This can be used as LSN for data pages affected by the logged action.
 * (LSN is the XLOG point up to which the XLOG must be flushed to disk
 * before the data page can be written out.  This implements the basic
 * WAL rule "write the log before the data".)
 *
 * NB: this routine feels free to scribble on the XLogRecData structs,
 * though not on the data they reference.  This is OK since the XLogRecData
 * structs are always just temporaries in the calling code.
 */
438
XLogRecPtr
439
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
440
{
B
Bruce Momjian 已提交
441 442
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	XLogRecord *record;
T
Tom Lane 已提交
443
	XLogContRecord *contrecord;
B
Bruce Momjian 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
	XLogRecPtr	RecPtr;
	XLogRecPtr	WriteRqst;
	uint32		freespace;
	uint16		curridx;
	XLogRecData *rdt;
	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
	bool		dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
	BkpBlock	dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
	XLogRecPtr	dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
	XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];
	crc64		rdata_crc;
	uint32		len,
				write_len;
	unsigned	i;
	bool		do_logwrt;
	bool		updrqst;
	bool		no_tran = (rmid == RM_XLOG_ID) ? true : false;
V
Vadim B. Mikheev 已提交
461 462 463 464

	if (info & XLR_INFO_MASK)
	{
		if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
B
Bruce Momjian 已提交
465
			elog(STOP, "XLogInsert: invalid info mask %02X",
T
Tom Lane 已提交
466
				 (info & XLR_INFO_MASK));
V
Vadim B. Mikheev 已提交
467 468 469 470
		no_tran = true;
		info &= ~XLR_INFO_MASK;
	}

T
Tom Lane 已提交
471
	/*
B
Bruce Momjian 已提交
472 473
	 * In bootstrap mode, we don't actually log anything but XLOG
	 * resources; return a phony record pointer.
T
Tom Lane 已提交
474
	 */
V
Vadim B. Mikheev 已提交
475
	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
V
WAL  
Vadim B. Mikheev 已提交
476 477
	{
		RecPtr.xlogid = 0;
B
Bruce Momjian 已提交
478
		RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */
V
WAL  
Vadim B. Mikheev 已提交
479 480 481
		return (RecPtr);
	}

T
Tom Lane 已提交
482 483 484 485 486 487
	/*
	 * Here we scan the rdata list, determine which buffers must be backed
	 * up, and compute the CRC values for the data.  Note that the record
	 * header isn't added into the CRC yet since we don't know the final
	 * length or info bits quite yet.
	 *
B
Bruce Momjian 已提交
488 489 490 491 492 493 494 495 496 497 498
	 * We may have to loop back to here if a race condition is detected
	 * below. We could prevent the race by doing all this work while
	 * holding the insert spinlock, but it seems better to avoid doing CRC
	 * calculations while holding the lock.  This means we have to be
	 * careful about modifying the rdata list until we know we aren't
	 * going to loop back again.  The only change we allow ourselves to
	 * make earlier is to set rdt->data = NULL in list items we have
	 * decided we will have to back up the whole buffer for.  This is OK
	 * because we will certainly decide the same thing again for those
	 * items if we do it over; doing it here saves an extra pass over the
	 * list later.
T
Tom Lane 已提交
499
	 */
500
begin:;
T
Tom Lane 已提交
501 502 503 504 505 506
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
	{
		dtbuf[i] = InvalidBuffer;
		dtbuf_bkp[i] = false;
	}

507
	INIT_CRC64(rdata_crc);
T
Tom Lane 已提交
508
	len = 0;
B
Bruce Momjian 已提交
509
	for (rdt = rdata;;)
510 511 512
	{
		if (rdt->buffer == InvalidBuffer)
		{
T
Tom Lane 已提交
513
			/* Simple data, just include it */
514 515 516
			len += rdt->len;
			COMP_CRC64(rdata_crc, rdt->data, rdt->len);
		}
T
Tom Lane 已提交
517
		else
518
		{
T
Tom Lane 已提交
519 520
			/* Find info for buffer */
			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
521
			{
T
Tom Lane 已提交
522
				if (rdt->buffer == dtbuf[i])
523
				{
T
Tom Lane 已提交
524 525 526 527 528 529 530 531 532
					/* Buffer already referenced by earlier list item */
					if (dtbuf_bkp[i])
						rdt->data = NULL;
					else if (rdt->data)
					{
						len += rdt->len;
						COMP_CRC64(rdata_crc, rdt->data, rdt->len);
					}
					break;
533
				}
T
Tom Lane 已提交
534
				if (dtbuf[i] == InvalidBuffer)
535
				{
T
Tom Lane 已提交
536 537
					/* OK, put it in this slot */
					dtbuf[i] = rdt->buffer;
B
Bruce Momjian 已提交
538

T
Tom Lane 已提交
539 540 541
					/*
					 * XXX We assume page LSN is first data on page
					 */
B
Bruce Momjian 已提交
542
					dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer));
T
Tom Lane 已提交
543 544
					if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
					{
B
Bruce Momjian 已提交
545
						crc64		dtcrc;
T
Tom Lane 已提交
546 547 548 549 550 551 552 553 554 555

						dtbuf_bkp[i] = true;
						rdt->data = NULL;
						INIT_CRC64(dtcrc);
						COMP_CRC64(dtcrc,
								   BufferGetBlock(dtbuf[i]),
								   BLCKSZ);
						dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
						dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
						COMP_CRC64(dtcrc,
B
Bruce Momjian 已提交
556
								(char *) &(dtbuf_xlg[i]) + sizeof(crc64),
T
Tom Lane 已提交
557 558 559 560 561 562 563 564 565 566
								   sizeof(BkpBlock) - sizeof(crc64));
						FIN_CRC64(dtcrc);
						dtbuf_xlg[i].crc = dtcrc;
					}
					else if (rdt->data)
					{
						len += rdt->len;
						COMP_CRC64(rdata_crc, rdt->data, rdt->len);
					}
					break;
567 568
				}
			}
T
Tom Lane 已提交
569 570 571
			if (i >= XLR_MAX_BKP_BLOCKS)
				elog(STOP, "XLogInsert: can backup %d blocks at most",
					 XLR_MAX_BKP_BLOCKS);
572
		}
T
Tom Lane 已提交
573
		/* Break out of loop when rdt points to last list item */
574 575 576 577 578
		if (rdt->next == NULL)
			break;
		rdt = rdt->next;
	}

T
Tom Lane 已提交
579 580 581
	/*
	 * NOTE: the test for len == 0 here is somewhat fishy, since in theory
	 * all of the rmgr data might have been suppressed in favor of backup
B
Bruce Momjian 已提交
582
	 * blocks.	Currently, all callers of XLogInsert provide at least some
T
Tom Lane 已提交
583 584 585 586
	 * not-in-a-buffer data and so len == 0 should never happen, but that
	 * may not be true forever.  If you need to remove the len == 0 check,
	 * also remove the check for xl_len == 0 in ReadRecord, below.
	 */
587 588 589
	if (len == 0 || len > MAXLOGRECSZ)
		elog(STOP, "XLogInsert: invalid record len %u", len);

590
	START_CRIT_SECTION();
591

T
Tom Lane 已提交
592 593
	/* wait to obtain xlog insert lock */
	do_logwrt = true;
594

T
Tom Lane 已提交
595 596 597 598
	for (i = 0;;)
	{
		/* try to update LogwrtResult while waiting for insert lock */
		if (!TAS(&(XLogCtl->info_lck)))
599
		{
B
Bruce Momjian 已提交
600
			XLogwrtRqst LogwrtRqst;
601

T
Tom Lane 已提交
602 603 604 605 606
			LogwrtRqst = XLogCtl->LogwrtRqst;
			LogwrtResult = XLogCtl->LogwrtResult;
			S_UNLOCK(&(XLogCtl->info_lck));

			/*
B
Bruce Momjian 已提交
607 608 609
			 * If cache is half filled then try to acquire logwrt lock and
			 * do LOGWRT work, but only once per XLogInsert call. Ignore
			 * any fractional blocks in performing this check.
T
Tom Lane 已提交
610 611 612 613 614 615 616 617
			 */
			LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
			if (do_logwrt &&
				(LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
				 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
				  XLogCtl->XLogCacheByte / 2)))
			{
				if (!TAS(&(XLogCtl->logwrt_lck)))
618
				{
T
Tom Lane 已提交
619 620
					LogwrtResult = XLogCtl->Write.LogwrtResult;
					if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
621
					{
T
Tom Lane 已提交
622 623
						XLogWrite(LogwrtRqst);
						do_logwrt = false;
624
					}
T
Tom Lane 已提交
625
					S_UNLOCK(&(XLogCtl->logwrt_lck));
626 627 628
				}
			}
		}
T
Tom Lane 已提交
629 630 631
		if (!TAS(&(XLogCtl->insert_lck)))
			break;
		S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT);
632 633
	}

T
Tom Lane 已提交
634 635
	/*
	 * Check to see if my RedoRecPtr is out of date.  If so, may have to
B
Bruce Momjian 已提交
636 637 638
	 * go back and recompute everything.  This can only happen just after
	 * a checkpoint, so it's better to be slow in this case and fast
	 * otherwise.
T
Tom Lane 已提交
639 640
	 */
	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
641
	{
T
Tom Lane 已提交
642 643 644 645
		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
		RedoRecPtr = Insert->RedoRecPtr;

		for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
646
		{
T
Tom Lane 已提交
647 648 649 650 651
			if (dtbuf[i] == InvalidBuffer)
				continue;
			if (dtbuf_bkp[i] == false &&
				XLByteLE(dtbuf_lsn[i], RedoRecPtr))
			{
B
Bruce Momjian 已提交
652

T
Tom Lane 已提交
653
				/*
B
Bruce Momjian 已提交
654 655
				 * Oops, this buffer now needs to be backed up, but we
				 * didn't think so above.  Start over.
T
Tom Lane 已提交
656 657 658 659 660
				 */
				S_UNLOCK(&(XLogCtl->insert_lck));
				END_CRIT_SECTION();
				goto begin;
			}
661 662 663
		}
	}

T
Tom Lane 已提交
664 665 666 667 668 669 670
	/*
	 * Make additional rdata list entries for the backup blocks, so that
	 * we don't need to special-case them in the write loop.  Note that we
	 * have now irrevocably changed the input rdata list.  At the exit of
	 * this loop, write_len includes the backup block data.
	 *
	 * Also set the appropriate info bits to show which buffers were backed
B
Bruce Momjian 已提交
671 672 673
	 * up.	The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th
	 * distinct buffer value (ignoring InvalidBuffer) appearing in the
	 * rdata list.
T
Tom Lane 已提交
674 675 676
	 */
	write_len = len;
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
677 678 679 680
	{
		if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
			continue;

T
Tom Lane 已提交
681
		info |= XLR_SET_BKP_BLOCK(i);
682 683 684

		rdt->next = &(dtbuf_rdt[2 * i]);

B
Bruce Momjian 已提交
685
		dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]);
686
		dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
T
Tom Lane 已提交
687
		write_len += sizeof(BkpBlock);
688 689 690

		rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);

B
Bruce Momjian 已提交
691
		dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]);
692
		dtbuf_rdt[2 * i + 1].len = BLCKSZ;
T
Tom Lane 已提交
693
		write_len += BLCKSZ;
694 695 696
		dtbuf_rdt[2 * i + 1].next = NULL;
	}

T
Tom Lane 已提交
697
	/* Insert record header */
698

T
Tom Lane 已提交
699 700
	updrqst = false;
	freespace = INSERT_FREESPACE(Insert);
701 702
	if (freespace < SizeOfXLogRecord)
	{
T
Tom Lane 已提交
703
		updrqst = AdvanceXLInsertBuffer();
704 705 706
		freespace = BLCKSZ - SizeOfXLogPHD;
	}

T
Tom Lane 已提交
707
	curridx = Insert->curridx;
708
	record = (XLogRecord *) Insert->currpos;
T
Tom Lane 已提交
709

710
	record->xl_prev = Insert->PrevRecord;
V
Vadim B. Mikheev 已提交
711
	if (no_tran)
712 713 714 715
	{
		record->xl_xact_prev.xlogid = 0;
		record->xl_xact_prev.xrecoff = 0;
	}
V
Vadim B. Mikheev 已提交
716 717 718
	else
		record->xl_xact_prev = MyLastRecPtr;

719
	record->xl_xid = GetCurrentTransactionId();
T
Tom Lane 已提交
720
	record->xl_len = len;		/* doesn't include backup blocks */
721
	record->xl_info = info;
722
	record->xl_rmid = rmid;
723

T
Tom Lane 已提交
724
	/* Now we can finish computing the main CRC */
B
Bruce Momjian 已提交
725
	COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),
T
Tom Lane 已提交
726
			   SizeOfXLogRecord - sizeof(crc64));
727 728 729
	FIN_CRC64(rdata_crc);
	record->xl_crc = rdata_crc;

T
Tom Lane 已提交
730 731 732 733
	/* Compute record's XLOG location */
	INSERT_RECPTR(RecPtr, Insert, curridx);

	/* If first XLOG record of transaction, save it in PROC array */
V
Vadim B. Mikheev 已提交
734
	if (MyLastRecPtr.xrecoff == 0 && !no_tran)
735 736 737 738 739
	{
		SpinAcquire(SInvalLock);
		MyProc->logRec = RecPtr;
		SpinRelease(SInvalLock);
	}
V
WAL  
Vadim B. Mikheev 已提交
740 741 742

	if (XLOG_DEBUG)
	{
B
Bruce Momjian 已提交
743
		char		buf[8192];
V
WAL  
Vadim B. Mikheev 已提交
744 745 746

		sprintf(buf, "INSERT @ %u/%u: ", RecPtr.xlogid, RecPtr.xrecoff);
		xlog_outrec(buf, record);
747
		if (rdata->data != NULL)
V
WAL  
Vadim B. Mikheev 已提交
748 749
		{
			strcat(buf, " - ");
750
			RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
V
WAL  
Vadim B. Mikheev 已提交
751
		}
T
Tom Lane 已提交
752
		fprintf(stderr, "%s\n", buf);
V
WAL  
Vadim B. Mikheev 已提交
753 754
	}

T
Tom Lane 已提交
755 756 757 758 759 760
	/* Record begin of record in appropriate places */
	if (!no_tran)
		MyLastRecPtr = RecPtr;
	ProcLastRecPtr = RecPtr;
	Insert->PrevRecord = RecPtr;

761
	Insert->currpos += SizeOfXLogRecord;
T
Tom Lane 已提交
762
	freespace -= SizeOfXLogRecord;
763

T
Tom Lane 已提交
764 765 766 767
	/*
	 * Append the data, including backup blocks if any
	 */
	while (write_len)
768
	{
769 770 771 772
		while (rdata->data == NULL)
			rdata = rdata->next;

		if (freespace > 0)
773
		{
774 775 776 777 778
			if (rdata->len > freespace)
			{
				memcpy(Insert->currpos, rdata->data, freespace);
				rdata->data += freespace;
				rdata->len -= freespace;
T
Tom Lane 已提交
779
				write_len -= freespace;
780 781 782 783 784
			}
			else
			{
				memcpy(Insert->currpos, rdata->data, rdata->len);
				freespace -= rdata->len;
T
Tom Lane 已提交
785
				write_len -= rdata->len;
786 787 788 789
				Insert->currpos += rdata->len;
				rdata = rdata->next;
				continue;
			}
790 791
		}

792
		/* Use next buffer */
T
Tom Lane 已提交
793 794 795 796 797 798 799 800
		updrqst = AdvanceXLInsertBuffer();
		curridx = Insert->curridx;
		/* Insert cont-record header */
		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
		contrecord = (XLogContRecord *) Insert->currpos;
		contrecord->xl_rem_len = write_len;
		Insert->currpos += SizeOfXLogContRecord;
		freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
801
	}
802

T
Tom Lane 已提交
803 804
	/* Ensure next record will be properly aligned */
	Insert->currpos = (char *) Insert->currpage +
B
Bruce Momjian 已提交
805
		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
T
Tom Lane 已提交
806
	freespace = INSERT_FREESPACE(Insert);
807

V
Vadim B. Mikheev 已提交
808
	/*
B
Bruce Momjian 已提交
809 810
	 * The recptr I return is the beginning of the *next* record. This
	 * will be stored as LSN for changed data pages...
V
Vadim B. Mikheev 已提交
811
	 */
T
Tom Lane 已提交
812
	INSERT_RECPTR(RecPtr, Insert, curridx);
V
Vadim B. Mikheev 已提交
813

T
Tom Lane 已提交
814
	/* Need to update shared LogwrtRqst if some block was filled up */
815
	if (freespace < SizeOfXLogRecord)
B
Bruce Momjian 已提交
816 817
		updrqst = true;			/* curridx is filled and available for
								 * writing out */
818 819
	else
		curridx = PrevBufIdx(curridx);
T
Tom Lane 已提交
820
	WriteRqst = XLogCtl->xlblocks[curridx];
821 822 823 824 825

	S_UNLOCK(&(XLogCtl->insert_lck));

	if (updrqst)
	{
826
		S_LOCK(&(XLogCtl->info_lck));
T
Tom Lane 已提交
827 828 829 830 831
		/* advance global request to include new block(s) */
		if (XLByteLT(XLogCtl->LogwrtRqst.Write, WriteRqst))
			XLogCtl->LogwrtRqst.Write = WriteRqst;
		/* update local result copy while I have the chance */
		LogwrtResult = XLogCtl->LogwrtResult;
832
		S_UNLOCK(&(XLogCtl->info_lck));
833 834
	}

835
	END_CRIT_SECTION();
836
	return (RecPtr);
837
}
838

T
Tom Lane 已提交
839 840 841 842 843 844 845 846 847 848 849 850 851
/*
 * Advance the Insert state to the next buffer page, writing out the next
 * buffer if it still contains unwritten data.
 *
 * The global LogwrtRqst.Write pointer needs to be advanced to include the
 * just-filled page.  If we can do this for free (without an extra spinlock),
 * we do so here.  Otherwise the caller must do it.  We return TRUE if the
 * request update still needs to be done, FALSE if we did it internally.
 *
 * Must be called with insert_lck held.
 */
static bool
AdvanceXLInsertBuffer(void)
852
{
T
Tom Lane 已提交
853 854 855 856 857 858
	XLogCtlInsert *Insert = &XLogCtl->Insert;
	XLogCtlWrite *Write = &XLogCtl->Write;
	uint16		nextidx = NextBufIdx(Insert->curridx);
	bool		update_needed = true;
	XLogRecPtr	OldPageRqstPtr;
	XLogwrtRqst WriteRqst;
859

T
Tom Lane 已提交
860 861 862
	/* Use Insert->LogwrtResult copy if it's more fresh */
	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
		LogwrtResult = Insert->LogwrtResult;
V
WAL  
Vadim B. Mikheev 已提交
863

T
Tom Lane 已提交
864
	/*
B
Bruce Momjian 已提交
865 866 867
	 * Get ending-offset of the buffer page we need to replace (this may
	 * be zero if the buffer hasn't been used yet).  Fall through if it's
	 * already written out.
T
Tom Lane 已提交
868 869 870 871 872 873 874
	 */
	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
	{
		/* nope, got work to do... */
		unsigned	spins = 0;
		XLogRecPtr	FinishedPageRqstPtr;
875

T
Tom Lane 已提交
876
		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
877

T
Tom Lane 已提交
878
		for (;;)
879
		{
T
Tom Lane 已提交
880 881
			/* While waiting, try to get info_lck and update LogwrtResult */
			if (!TAS(&(XLogCtl->info_lck)))
882
			{
T
Tom Lane 已提交
883 884
				if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr))
					XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr;
B
Bruce Momjian 已提交
885
				update_needed = false;	/* Did the shared-request update */
T
Tom Lane 已提交
886
				LogwrtResult = XLogCtl->LogwrtResult;
887 888
				S_UNLOCK(&(XLogCtl->info_lck));

T
Tom Lane 已提交
889
				if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
890
				{
T
Tom Lane 已提交
891 892 893
					/* OK, someone wrote it already */
					Insert->LogwrtResult = LogwrtResult;
					break;
894
				}
T
Tom Lane 已提交
895 896 897
			}

			/*
B
Bruce Momjian 已提交
898 899
			 * LogwrtResult lock is busy or we know the page is still
			 * dirty. Try to acquire logwrt lock and write full blocks.
T
Tom Lane 已提交
900 901 902 903 904
			 */
			if (!TAS(&(XLogCtl->logwrt_lck)))
			{
				LogwrtResult = Write->LogwrtResult;
				if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
905
				{
T
Tom Lane 已提交
906 907 908 909
					S_UNLOCK(&(XLogCtl->logwrt_lck));
					/* OK, someone wrote it already */
					Insert->LogwrtResult = LogwrtResult;
					break;
910
				}
B
Bruce Momjian 已提交
911

T
Tom Lane 已提交
912
				/*
B
Bruce Momjian 已提交
913 914
				 * Have to write buffers while holding insert lock. This
				 * is not good, so only write as much as we absolutely
T
Tom Lane 已提交
915 916 917 918 919 920 921 922
				 * must.
				 */
				WriteRqst.Write = OldPageRqstPtr;
				WriteRqst.Flush.xlogid = 0;
				WriteRqst.Flush.xrecoff = 0;
				XLogWrite(WriteRqst);
				S_UNLOCK(&(XLogCtl->logwrt_lck));
				Insert->LogwrtResult = LogwrtResult;
923 924
				break;
			}
T
Tom Lane 已提交
925
			S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
926 927 928
		}
	}

T
Tom Lane 已提交
929 930 931 932 933
	/*
	 * Now the next buffer slot is free and we can set it up to be the
	 * next output page.
	 */
	if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
934
	{
T
Tom Lane 已提交
935 936 937 938
		/* crossing a logid boundary */
		XLogCtl->xlblocks[nextidx].xlogid =
			XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
		XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
939
	}
T
Tom Lane 已提交
940
	else
941
	{
T
Tom Lane 已提交
942 943 944 945
		XLogCtl->xlblocks[nextidx].xlogid =
			XLogCtl->xlblocks[Insert->curridx].xlogid;
		XLogCtl->xlblocks[nextidx].xrecoff =
			XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
946
	}
T
Tom Lane 已提交
947 948
	Insert->curridx = nextidx;
	Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
B
Bruce Momjian 已提交
949 950
	Insert->currpos = ((char *) Insert->currpage) + SizeOfXLogPHD;

T
Tom Lane 已提交
951
	/*
B
Bruce Momjian 已提交
952 953
	 * Be sure to re-zero the buffer so that bytes beyond what we've
	 * written will look like zeroes and not valid XLOG records...
T
Tom Lane 已提交
954
	 */
B
Bruce Momjian 已提交
955
	MemSet((char *) Insert->currpage, 0, BLCKSZ);
T
Tom Lane 已提交
956
	Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
B
Bruce Momjian 已提交
957
	/* Insert->currpage->xlp_info = 0; *//* done by memset */
958
	Insert->currpage->xlp_sui = ThisStartUpID;
T
Tom Lane 已提交
959 960

	return update_needed;
961 962
}

T
Tom Lane 已提交
963 964 965 966 967
/*
 * Write and/or fsync the log at least as far as WriteRqst indicates.
 *
 * Must be called with logwrt_lck held.
 */
968
static void
T
Tom Lane 已提交
969
XLogWrite(XLogwrtRqst WriteRqst)
970
{
971 972
	XLogCtlWrite *Write = &XLogCtl->Write;
	char	   *from;
T
Tom Lane 已提交
973
	bool		ispartialpage;
974
	bool		use_existent;
975

B
Bruce Momjian 已提交
976 977 978 979
	/*
	 * Update local LogwrtResult (caller probably did this already,
	 * but...)
	 */
T
Tom Lane 已提交
980 981 982
	LogwrtResult = Write->LogwrtResult;

	while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
983
	{
B
Bruce Momjian 已提交
984

985 986 987 988 989 990 991 992 993
		/*
		 * Make sure we're not ahead of the insert process.  This could
		 * happen if we're passed a bogus WriteRqst.Write that is past the
		 * end of the last page that's been initialized by
		 * AdvanceXLInsertBuffer.
		 */
		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
			elog(STOP, "XLogWrite: write request is past end of log");

T
Tom Lane 已提交
994 995 996 997 998
		/* Advance LogwrtResult.Write to end of current buffer page */
		LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);

		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
999
		{
B
Bruce Momjian 已提交
1000

T
Tom Lane 已提交
1001 1002 1003 1004
			/*
			 * Switch to new logfile segment.
			 */
			if (openLogFile >= 0)
1005
			{
T
Tom Lane 已提交
1006
				if (close(openLogFile) != 0)
1007
					elog(STOP, "close(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1008 1009
						 openLogId, openLogSeg);
				openLogFile = -1;
1010
			}
T
Tom Lane 已提交
1011 1012
			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);

1013 1014 1015 1016
			/* create/use new log file */
			use_existent = true;
			openLogFile = XLogFileInit(openLogId, openLogSeg,
									   &use_existent, true);
T
Tom Lane 已提交
1017
			openLogOff = 0;
1018 1019 1020 1021 1022

			if (!use_existent)	/* there was no precreated file */
				elog(LOG, "XLogWrite: new log file created - "
					 "consider increasing WAL_FILES");

T
Tom Lane 已提交
1023
			/* update pg_control, unless someone else already did */
1024
			SpinAcquire(ControlFileLockId);
1025 1026 1027
			if (ControlFile->logId < openLogId ||
				(ControlFile->logId == openLogId &&
				 ControlFile->logSeg < openLogSeg + 1))
T
Tom Lane 已提交
1028 1029 1030 1031 1032
			{
				ControlFile->logId = openLogId;
				ControlFile->logSeg = openLogSeg + 1;
				ControlFile->time = time(NULL);
				UpdateControlFile();
B
Bruce Momjian 已提交
1033

1034
				/*
B
Bruce Momjian 已提交
1035 1036 1037 1038
				 * Signal postmaster to start a checkpoint if it's been
				 * too long since the last one.  (We look at local copy of
				 * RedoRecPtr which might be a little out of date, but
				 * should be close enough for this purpose.)
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
				 */
				if (IsUnderPostmaster &&
					(openLogId != RedoRecPtr.xlogid ||
					 openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
					 (uint32) CheckPointSegments))
				{
					if (XLOG_DEBUG)
						fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
					kill(getppid(), SIGUSR1);
				}
T
Tom Lane 已提交
1049
			}
1050 1051 1052
			SpinRelease(ControlFileLockId);
		}

T
Tom Lane 已提交
1053
		if (openLogFile < 0)
1054
		{
T
Tom Lane 已提交
1055 1056 1057
			XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
			openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
			openLogOff = 0;
1058 1059
		}

T
Tom Lane 已提交
1060 1061
		/* Need to seek in the file? */
		if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
1062
		{
T
Tom Lane 已提交
1063 1064
			openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
			if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
1065
				elog(STOP, "lseek(logfile %u seg %u off %u) failed: %m",
T
Tom Lane 已提交
1066
					 openLogId, openLogSeg, openLogOff);
1067 1068
		}

T
Tom Lane 已提交
1069 1070 1071
		/* OK to write the page */
		from = XLogCtl->pages + Write->curridx * BLCKSZ;
		if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
1072
			elog(STOP, "write(logfile %u seg %u off %u) failed: %m",
T
Tom Lane 已提交
1073 1074
				 openLogId, openLogSeg, openLogOff);
		openLogOff += BLCKSZ;
1075

T
Tom Lane 已提交
1076 1077 1078
		/*
		 * If we just wrote the whole last page of a logfile segment,
		 * fsync the segment immediately.  This avoids having to go back
B
Bruce Momjian 已提交
1079 1080 1081
		 * and re-open prior segments when an fsync request comes along
		 * later. Doing it here ensures that one and only one backend will
		 * perform this fsync.
T
Tom Lane 已提交
1082 1083 1084
		 */
		if (openLogOff >= XLogSegSize && !ispartialpage)
		{
1085
			issue_xlog_fsync();
B
Bruce Momjian 已提交
1086
			LogwrtResult.Flush = LogwrtResult.Write;	/* end of current page */
T
Tom Lane 已提交
1087
		}
1088

T
Tom Lane 已提交
1089 1090 1091 1092 1093 1094 1095
		if (ispartialpage)
		{
			/* Only asked to write a partial page */
			LogwrtResult.Write = WriteRqst.Write;
			break;
		}
		Write->curridx = NextBufIdx(Write->curridx);
1096 1097
	}

T
Tom Lane 已提交
1098 1099 1100 1101 1102
	/*
	 * If asked to flush, do so
	 */
	if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
		XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1103
	{
B
Bruce Momjian 已提交
1104

T
Tom Lane 已提交
1105
		/*
B
Bruce Momjian 已提交
1106 1107 1108
		 * Could get here without iterating above loop, in which case we
		 * might have no open file or the wrong one.  However, we do not
		 * need to fsync more than one file.
T
Tom Lane 已提交
1109
		 */
1110
		if (sync_method != SYNC_METHOD_OPEN)
T
Tom Lane 已提交
1111
		{
1112
			if (openLogFile >= 0 &&
B
Bruce Momjian 已提交
1113
			 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
			{
				if (close(openLogFile) != 0)
					elog(STOP, "close(logfile %u seg %u) failed: %m",
						 openLogId, openLogSeg);
				openLogFile = -1;
			}
			if (openLogFile < 0)
			{
				XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
				openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
				openLogOff = 0;
			}
			issue_xlog_fsync();
T
Tom Lane 已提交
1127 1128
		}
		LogwrtResult.Flush = LogwrtResult.Write;
1129 1130
	}

T
Tom Lane 已提交
1131 1132 1133
	/*
	 * Update shared-memory status
	 *
B
Bruce Momjian 已提交
1134 1135
	 * We make sure that the shared 'request' values do not fall behind the
	 * 'result' values.  This is not absolutely essential, but it saves
T
Tom Lane 已提交
1136 1137
	 * some code in a couple of places.
	 */
1138
	S_LOCK(&(XLogCtl->info_lck));
T
Tom Lane 已提交
1139 1140 1141 1142 1143
	XLogCtl->LogwrtResult = LogwrtResult;
	if (XLByteLT(XLogCtl->LogwrtRqst.Write, LogwrtResult.Write))
		XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
	if (XLByteLT(XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush))
		XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
1144 1145
	S_UNLOCK(&(XLogCtl->info_lck));

T
Tom Lane 已提交
1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186
	Write->LogwrtResult = LogwrtResult;
}

/*
 * Ensure that all XLOG data through the given position is flushed to disk.
 *
 * NOTE: this differs from XLogWrite mainly in that the logwrt_lck is not
 * already held, and we try to avoid acquiring it if possible.
 */
void
XLogFlush(XLogRecPtr record)
{
	XLogRecPtr	WriteRqstPtr;
	XLogwrtRqst WriteRqst;
	unsigned	spins = 0;

	if (XLOG_DEBUG)
	{
		fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n",
				(IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
				(InRedo) ? "(redo)" : "",
				record.xlogid, record.xrecoff,
				LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
				LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
		fflush(stderr);
	}

	/* Disabled during REDO */
	if (InRedo)
		return;

	/* Quick exit if already known flushed */
	if (XLByteLE(record, LogwrtResult.Flush))
		return;

	START_CRIT_SECTION();

	/*
	 * Since fsync is usually a horribly expensive operation, we try to
	 * piggyback as much data as we can on each fsync: if we see any more
	 * data entered into the xlog buffer, we'll write and fsync that too,
B
Bruce Momjian 已提交
1187 1188 1189
	 * so that the final value of LogwrtResult.Flush is as large as
	 * possible. This gives us some chance of avoiding another fsync
	 * immediately after.
T
Tom Lane 已提交
1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
	 */

	/* initialize to given target; may increase below */
	WriteRqstPtr = record;

	for (;;)
	{
		/* try to read LogwrtResult and update local state */
		if (!TAS(&(XLogCtl->info_lck)))
		{
			if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write))
				WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
			LogwrtResult = XLogCtl->LogwrtResult;
			S_UNLOCK(&(XLogCtl->info_lck));
			if (XLByteLE(record, LogwrtResult.Flush))
			{
				/* Done already */
				break;
			}
		}
		/* if something was added to log cache then try to flush this too */
		if (!TAS(&(XLogCtl->insert_lck)))
		{
			XLogCtlInsert *Insert = &XLogCtl->Insert;
			uint32		freespace = INSERT_FREESPACE(Insert);

			if (freespace < SizeOfXLogRecord)	/* buffer is full */
				WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
			else
			{
				WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
				WriteRqstPtr.xrecoff -= freespace;
			}
			S_UNLOCK(&(XLogCtl->insert_lck));
		}
		/* now try to get the logwrt lock */
		if (!TAS(&(XLogCtl->logwrt_lck)))
		{
			LogwrtResult = XLogCtl->Write.LogwrtResult;
			if (XLByteLE(record, LogwrtResult.Flush))
			{
				/* Done already */
				S_UNLOCK(&(XLogCtl->logwrt_lck));
				break;
			}
			WriteRqst.Write = WriteRqstPtr;
			WriteRqst.Flush = record;
			XLogWrite(WriteRqst);
			S_UNLOCK(&(XLogCtl->logwrt_lck));
			if (XLByteLT(LogwrtResult.Flush, record))
				elog(STOP, "XLogFlush: request is not satisfied");
			break;
		}
		S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
	}

	END_CRIT_SECTION();
1247 1248
}

T
Tom Lane 已提交
1249 1250 1251
/*
 * Create a new XLOG file segment, or open a pre-existing one.
 *
1252 1253 1254
 * log, seg: identify segment to be created/opened.
 *
 * *use_existent: if TRUE, OK to use a pre-existing file (else, any
B
Bruce Momjian 已提交
1255
 * pre-existing file will be deleted).	On return, TRUE if a pre-existing
1256 1257 1258 1259 1260 1261
 * file was used.
 *
 * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
 * place.  This should be TRUE except during bootstrap log creation.  The
 * caller must *not* hold the spinlock at call.
 *
T
Tom Lane 已提交
1262 1263
 * Returns FD of opened file.
 */
1264
static int
1265 1266
XLogFileInit(uint32 log, uint32 seg,
			 bool *use_existent, bool use_lock)
1267
{
1268
	char		path[MAXPGPATH];
1269 1270
	char		tmppath[MAXPGPATH];
	char		targpath[MAXPGPATH];
1271
	char		zbuffer[BLCKSZ];
1272 1273
	uint32		targlog,
				targseg;
1274
	int			fd;
1275
	int			nbytes;
1276 1277

	XLogFileName(path, log, seg);
V
Vadim B. Mikheev 已提交
1278 1279

	/*
B
Bruce Momjian 已提交
1280 1281
	 * Try to use existent file (checkpoint maker may have created it
	 * already)
V
Vadim B. Mikheev 已提交
1282
	 */
1283
	if (*use_existent)
V
Vadim B. Mikheev 已提交
1284
	{
1285 1286
		fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
						   S_IRUSR | S_IWUSR);
V
Vadim B. Mikheev 已提交
1287 1288 1289
		if (fd < 0)
		{
			if (errno != ENOENT)
1290
				elog(STOP, "InitOpen(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1291
					 log, seg);
V
Vadim B. Mikheev 已提交
1292 1293
		}
		else
B
Bruce Momjian 已提交
1294
			return (fd);
V
Vadim B. Mikheev 已提交
1295 1296
	}

1297
	/*
B
Bruce Momjian 已提交
1298 1299 1300 1301
	 * Initialize an empty (all zeroes) segment.  NOTE: it is possible
	 * that another process is doing the same thing.  If so, we will end
	 * up pre-creating an extra log segment.  That seems OK, and better
	 * than holding the spinlock throughout this lengthy process.
1302 1303 1304 1305 1306
	 */
	snprintf(tmppath, MAXPGPATH, "%s%cxlogtemp.%d",
			 XLogDir, SEP_CHAR, (int) getpid());

	unlink(tmppath);
1307

1308
	/* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1309
	fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
T
Tom Lane 已提交
1310
					   S_IRUSR | S_IWUSR);
1311
	if (fd < 0)
1312
		elog(STOP, "InitCreate(%s) failed: %m", tmppath);
1313

1314
	/*
B
Bruce Momjian 已提交
1315
	 * Zero-fill the file.	We have to do this the hard way to ensure that
1316 1317
	 * all the file space has really been allocated --- on platforms that
	 * allow "holes" in files, just seeking to the end doesn't allocate
B
Bruce Momjian 已提交
1318
	 * intermediate space.	This way, we know that we have all the space
1319
	 * and (after the fsync below) that all the indirect blocks are down
1320 1321
	 * on disk.  Therefore, fdatasync(2) or O_DSYNC will be sufficient to
	 * sync future writes to the log file.
1322 1323 1324 1325 1326
	 */
	MemSet(zbuffer, 0, sizeof(zbuffer));
	for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
	{
		if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
T
Tom Lane 已提交
1327
		{
B
Bruce Momjian 已提交
1328
			int			save_errno = errno;
T
Tom Lane 已提交
1329

B
Bruce Momjian 已提交
1330 1331 1332 1333
			/*
			 * If we fail to make the file, delete it to release disk
			 * space
			 */
1334
			unlink(tmppath);
T
Tom Lane 已提交
1335 1336
			errno = save_errno;

1337
			elog(STOP, "ZeroFill failed to create or write %s: %m", tmppath);
T
Tom Lane 已提交
1338
		}
1339
	}
1340

1341
	if (pg_fsync(fd) != 0)
1342
		elog(STOP, "fsync(%s) failed: %m", tmppath);
1343

V
Vadim B. Mikheev 已提交
1344
	close(fd);
T
Tom Lane 已提交
1345

1346
	/*
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
	 * Now move the segment into place with its final name.  We want to be
	 * sure that only one process does this at a time.
	 */
	if (use_lock)
		SpinAcquire(ControlFileLockId);

	/*
	 * If caller didn't want to use a pre-existing file, get rid of any
	 * pre-existing file.  Otherwise, cope with possibility that someone
	 * else has created the file while we were filling ours: if so, use
	 * ours to pre-create a future log segment.
	 */
	targlog = log;
	targseg = seg;
	strcpy(targpath, path);

B
Bruce Momjian 已提交
1363
	if (!*use_existent)
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379
		unlink(targpath);
	else
	{
		while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
								   S_IRUSR | S_IWUSR)) >= 0)
		{
			close(fd);
			NextLogSeg(targlog, targseg);
			XLogFileName(targpath, targlog, targseg);
		}
	}

	/*
	 * Prefer link() to rename() here just to be really sure that we don't
	 * overwrite an existing logfile.  However, there shouldn't be one, so
	 * rename() is an acceptable substitute except for the truly paranoid.
1380
	 */
1381
#ifndef __BEOS__
1382
	if (link(tmppath, targpath) < 0)
1383
		elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
1384 1385
			 targlog, targseg);
	unlink(tmppath);
1386
#else
1387
	if (rename(tmppath, targpath) < 0)
T
Tom Lane 已提交
1388
		elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
1389
			 targlog, targseg);
1390
#endif
V
Vadim B. Mikheev 已提交
1391

1392 1393 1394 1395 1396 1397 1398
	if (use_lock)
		SpinRelease(ControlFileLockId);

	/* Set flag to tell caller there was no existent file */
	*use_existent = false;

	/* Now open original target segment (might not be file I just made) */
1399 1400
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
					   S_IRUSR | S_IWUSR);
V
Vadim B. Mikheev 已提交
1401
	if (fd < 0)
1402
		elog(STOP, "InitReopen(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1403
			 log, seg);
V
Vadim B. Mikheev 已提交
1404

1405
	return (fd);
1406 1407
}

T
Tom Lane 已提交
1408 1409 1410
/*
 * Open a pre-existing logfile segment.
 */
1411 1412 1413
static int
XLogFileOpen(uint32 log, uint32 seg, bool econt)
{
1414 1415
	char		path[MAXPGPATH];
	int			fd;
1416 1417 1418

	XLogFileName(path, log, seg);

1419 1420
	fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
					   S_IRUSR | S_IWUSR);
1421 1422 1423 1424
	if (fd < 0)
	{
		if (econt && errno == ENOENT)
		{
1425
			elog(LOG, "open(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1426
				 log, seg);
1427 1428
			return (fd);
		}
1429
		elog(STOP, "open(logfile %u seg %u) failed: %m",
T
Tom Lane 已提交
1430
			 log, seg);
1431 1432
	}

1433
	return (fd);
1434 1435
}

V
Vadim B. Mikheev 已提交
1436
/*
T
Tom Lane 已提交
1437 1438 1439 1440 1441 1442 1443 1444 1445
 * Preallocate log files beyond the specified log endpoint, according to
 * the XLOGfile user parameter.
 */
static void
PreallocXlogFiles(XLogRecPtr endptr)
{
	uint32		_logId;
	uint32		_logSeg;
	int			lf;
1446
	bool		use_existent;
T
Tom Lane 已提交
1447 1448 1449 1450 1451 1452 1453 1454
	int			i;

	XLByteToPrevSeg(endptr, _logId, _logSeg);
	if (XLOGfiles > 0)
	{
		for (i = 1; i <= XLOGfiles; i++)
		{
			NextLogSeg(_logId, _logSeg);
1455 1456
			use_existent = true;
			lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
T
Tom Lane 已提交
1457 1458 1459 1460 1461 1462 1463
			close(lf);
		}
	}
	else if ((endptr.xrecoff - 1) % XLogSegSize >=
			 (uint32) (0.75 * XLogSegSize))
	{
		NextLogSeg(_logId, _logSeg);
1464 1465
		use_existent = true;
		lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
T
Tom Lane 已提交
1466 1467 1468 1469 1470 1471
		close(lf);
	}
}

/*
 * Remove or move offline all log files older or equal to passed log/seg#
V
Vadim B. Mikheev 已提交
1472 1473
 */
static void
T
Tom Lane 已提交
1474
MoveOfflineLogs(uint32 log, uint32 seg)
V
Vadim B. Mikheev 已提交
1475
{
B
Bruce Momjian 已提交
1476 1477 1478 1479
	DIR		   *xldir;
	struct dirent *xlde;
	char		lastoff[32];
	char		path[MAXPGPATH];
V
Vadim B. Mikheev 已提交
1480

T
Tom Lane 已提交
1481
	Assert(XLOG_archive_dir[0] == 0);	/* ! implemented yet */
V
Vadim B. Mikheev 已提交
1482 1483 1484

	xldir = opendir(XLogDir);
	if (xldir == NULL)
1485
		elog(STOP, "MoveOfflineLogs: cannot open xlog dir: %m");
V
Vadim B. Mikheev 已提交
1486

T
Tom Lane 已提交
1487
	sprintf(lastoff, "%08X%08X", log, seg);
V
Vadim B. Mikheev 已提交
1488 1489 1490 1491

	errno = 0;
	while ((xlde = readdir(xldir)) != NULL)
	{
T
Tom Lane 已提交
1492 1493 1494
		if (strlen(xlde->d_name) == 16 &&
			strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
			strcmp(xlde->d_name, lastoff) <= 0)
V
Vadim B. Mikheev 已提交
1495
		{
B
Bruce Momjian 已提交
1496
			elog(LOG, "MoveOfflineLogs: %s %s", (XLOG_archive_dir[0]) ?
T
Tom Lane 已提交
1497
				 "archive" : "remove", xlde->d_name);
B
Bruce Momjian 已提交
1498
			sprintf(path, "%s%c%s", XLogDir, SEP_CHAR, xlde->d_name);
T
Tom Lane 已提交
1499 1500
			if (XLOG_archive_dir[0] == 0)
				unlink(path);
V
Vadim B. Mikheev 已提交
1501 1502 1503 1504
		}
		errno = 0;
	}
	if (errno)
1505
		elog(STOP, "MoveOfflineLogs: cannot read xlog dir: %m");
V
Vadim B. Mikheev 已提交
1506 1507 1508
	closedir(xldir);
}

T
Tom Lane 已提交
1509 1510 1511 1512 1513
/*
 * Restore the backup blocks present in an XLOG record, if any.
 *
 * We assume all of the record has been read into memory at *record.
 */
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523
static void
RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
{
	Relation	reln;
	Buffer		buffer;
	Page		page;
	BkpBlock	bkpb;
	char	   *blk;
	int			i;

B
Bruce Momjian 已提交
1524
	blk = (char *) XLogRecGetData(record) + record->xl_len;
T
Tom Lane 已提交
1525
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1526
	{
T
Tom Lane 已提交
1527
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1528 1529
			continue;

B
Bruce Momjian 已提交
1530
		memcpy((char *) &bkpb, blk, sizeof(BkpBlock));
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540
		blk += sizeof(BkpBlock);

		reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node);

		if (reln)
		{
			buffer = XLogReadBuffer(true, reln, bkpb.block);
			if (BufferIsValid(buffer))
			{
				page = (Page) BufferGetPage(buffer);
B
Bruce Momjian 已提交
1541
				memcpy((char *) page, blk, BLCKSZ);
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551
				PageSetLSN(page, lsn);
				PageSetSUI(page, ThisStartUpID);
				UnlockAndWriteBuffer(buffer);
			}
		}

		blk += BLCKSZ;
	}
}

T
Tom Lane 已提交
1552 1553 1554 1555 1556 1557 1558
/*
 * CRC-check an XLOG record.  We do not believe the contents of an XLOG
 * record (other than to the minimal extent of computing the amount of
 * data to read in) until we've checked the CRCs.
 *
 * We assume all of the record has been read into memory at *record.
 */
1559 1560 1561 1562 1563 1564 1565 1566 1567
static bool
RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
{
	crc64		crc;
	crc64		cbuf;
	int			i;
	uint32		len = record->xl_len;
	char	   *blk;

T
Tom Lane 已提交
1568
	/* Check CRC of rmgr data and record header */
1569
	INIT_CRC64(crc);
T
Tom Lane 已提交
1570
	COMP_CRC64(crc, XLogRecGetData(record), len);
B
Bruce Momjian 已提交
1571
	COMP_CRC64(crc, (char *) record + sizeof(crc64),
T
Tom Lane 已提交
1572
			   SizeOfXLogRecord - sizeof(crc64));
1573 1574
	FIN_CRC64(crc);

T
Tom Lane 已提交
1575
	if (!EQ_CRC64(record->xl_crc, crc))
1576 1577
	{
		elog(emode, "ReadRecord: bad rmgr data CRC in record at %u/%u",
T
Tom Lane 已提交
1578
			 recptr.xlogid, recptr.xrecoff);
B
Bruce Momjian 已提交
1579
		return (false);
1580 1581
	}

T
Tom Lane 已提交
1582
	/* Check CRCs of backup blocks, if any */
B
Bruce Momjian 已提交
1583
	blk = (char *) XLogRecGetData(record) + len;
T
Tom Lane 已提交
1584
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1585
	{
T
Tom Lane 已提交
1586
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1587 1588 1589
			continue;

		INIT_CRC64(crc);
T
Tom Lane 已提交
1590 1591 1592
		COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ);
		COMP_CRC64(crc, blk + sizeof(crc64),
				   sizeof(BkpBlock) - sizeof(crc64));
1593
		FIN_CRC64(crc);
B
Bruce Momjian 已提交
1594 1595
		memcpy((char *) &cbuf, blk, sizeof(crc64));		/* don't assume
														 * alignment */
1596

T
Tom Lane 已提交
1597
		if (!EQ_CRC64(cbuf, crc))
1598 1599
		{
			elog(emode, "ReadRecord: bad bkp block %d CRC in record at %u/%u",
T
Tom Lane 已提交
1600
				 i + 1, recptr.xlogid, recptr.xrecoff);
B
Bruce Momjian 已提交
1601
			return (false);
1602
		}
T
Tom Lane 已提交
1603
		blk += sizeof(BkpBlock) + BLCKSZ;
1604 1605
	}

B
Bruce Momjian 已提交
1606
	return (true);
1607 1608
}

T
Tom Lane 已提交
1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621
/*
 * Attempt to read an XLOG record.
 *
 * If RecPtr is not NULL, try to read a record at that position.  Otherwise
 * try to read a record just after the last one previously read.
 *
 * If no valid record is available, returns NULL, or fails if emode is STOP.
 * (emode must be either STOP or LOG.)
 *
 * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long.  It is needed
 * to reassemble a record that crosses block boundaries.  Note that on
 * successful return, the returned record pointer always points at buffer.
 */
1622
static XLogRecord *
T
Tom Lane 已提交
1623
ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer)
1624
{
1625 1626
	XLogRecord *record;
	XLogRecPtr	tmpRecPtr = EndRecPtr;
T
Tom Lane 已提交
1627 1628 1629 1630
	uint32		len,
				total_len;
	uint32		targetPageOff;
	unsigned	i;
1631
	bool		nextmode = false;
T
Tom Lane 已提交
1632 1633 1634

	if (readBuf == NULL)
	{
B
Bruce Momjian 已提交
1635

T
Tom Lane 已提交
1636 1637 1638
		/*
		 * First time through, permanently allocate readBuf.  We do it
		 * this way, rather than just making a static array, for two
B
Bruce Momjian 已提交
1639 1640 1641 1642
		 * reasons: (1) no need to waste the storage in most
		 * instantiations of the backend; (2) a static char array isn't
		 * guaranteed to have any particular alignment, whereas malloc()
		 * will provide MAXALIGN'd storage.
T
Tom Lane 已提交
1643 1644 1645 1646
		 */
		readBuf = (char *) malloc(BLCKSZ);
		Assert(readBuf != NULL);
	}
1647

T
Tom Lane 已提交
1648
	if (RecPtr == NULL)
1649
	{
1650
		RecPtr = &tmpRecPtr;
1651
		nextmode = true;
T
Tom Lane 已提交
1652
		/* fast case if next record is on same page */
1653 1654 1655 1656 1657
		if (nextRecord != NULL)
		{
			record = nextRecord;
			goto got_record;
		}
T
Tom Lane 已提交
1658
		/* align old recptr to next page */
1659 1660 1661 1662 1663 1664 1665 1666
		if (tmpRecPtr.xrecoff % BLCKSZ != 0)
			tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
		if (tmpRecPtr.xrecoff >= XLogFileSize)
		{
			(tmpRecPtr.xlogid)++;
			tmpRecPtr.xrecoff = 0;
		}
		tmpRecPtr.xrecoff += SizeOfXLogPHD;
1667
	}
1668
	else if (!XRecOffIsValid(RecPtr->xrecoff))
1669
		elog(STOP, "ReadRecord: invalid record offset at (%u, %u)",
1670
			 RecPtr->xlogid, RecPtr->xrecoff);
1671

T
Tom Lane 已提交
1672
	if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
1673
	{
1674 1675
		close(readFile);
		readFile = -1;
1676
	}
T
Tom Lane 已提交
1677
	XLByteToSeg(*RecPtr, readId, readSeg);
1678
	if (readFile < 0)
1679
	{
T
Tom Lane 已提交
1680
		readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1681 1682
		if (readFile < 0)
			goto next_record_is_invalid;
B
Bruce Momjian 已提交
1683
		readOff = (uint32) (-1);/* force read to occur below */
1684 1685
	}

T
Tom Lane 已提交
1686 1687
	targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ;
	if (readOff != targetPageOff)
1688
	{
T
Tom Lane 已提交
1689 1690 1691 1692
		readOff = targetPageOff;
		if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
		{
			elog(emode, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m",
1693
				 readId, readSeg, readOff);
T
Tom Lane 已提交
1694 1695
			goto next_record_is_invalid;
		}
1696
		if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
T
Tom Lane 已提交
1697 1698
		{
			elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
1699
				 readId, readSeg, readOff);
T
Tom Lane 已提交
1700 1701
			goto next_record_is_invalid;
		}
1702
		if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, nextmode))
1703 1704
			goto next_record_is_invalid;
	}
T
Tom Lane 已提交
1705
	if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
1706 1707
		RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
	{
T
Tom Lane 已提交
1708
		elog(emode, "ReadRecord: contrecord is requested by (%u, %u)",
1709
			 RecPtr->xlogid, RecPtr->xrecoff);
1710 1711
		goto next_record_is_invalid;
	}
1712
	record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
1713 1714

got_record:;
B
Bruce Momjian 已提交
1715

T
Tom Lane 已提交
1716
	/*
B
Bruce Momjian 已提交
1717 1718
	 * Currently, xl_len == 0 must be bad data, but that might not be true
	 * forever.  See note in XLogInsert.
T
Tom Lane 已提交
1719
	 */
1720 1721 1722
	if (record->xl_len == 0)
	{
		elog(emode, "ReadRecord: record with zero len at (%u, %u)",
T
Tom Lane 已提交
1723
			 RecPtr->xlogid, RecPtr->xrecoff);
1724 1725
		goto next_record_is_invalid;
	}
B
Bruce Momjian 已提交
1726

T
Tom Lane 已提交
1727
	/*
B
Bruce Momjian 已提交
1728 1729
	 * Compute total length of record including any appended backup
	 * blocks.
T
Tom Lane 已提交
1730 1731 1732 1733 1734 1735 1736 1737
	 */
	total_len = SizeOfXLogRecord + record->xl_len;
	for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
	{
		if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
			continue;
		total_len += sizeof(BkpBlock) + BLCKSZ;
	}
B
Bruce Momjian 已提交
1738

T
Tom Lane 已提交
1739 1740 1741 1742 1743 1744
	/*
	 * Make sure it will fit in buffer (currently, it is mechanically
	 * impossible for this test to fail, but it seems like a good idea
	 * anyway).
	 */
	if (total_len > _INTL_MAXLOGRECSZ)
1745
	{
1746
		elog(emode, "ReadRecord: too long record len %u at (%u, %u)",
T
Tom Lane 已提交
1747
			 total_len, RecPtr->xlogid, RecPtr->xrecoff);
1748 1749 1750 1751
		goto next_record_is_invalid;
	}
	if (record->xl_rmid > RM_MAX_ID)
	{
T
Tom Lane 已提交
1752
		elog(emode, "ReadRecord: invalid resource manager id %u at (%u, %u)",
1753
			 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
1754 1755 1756
		goto next_record_is_invalid;
	}
	nextRecord = NULL;
T
Tom Lane 已提交
1757 1758
	len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
	if (total_len > len)
1759
	{
T
Tom Lane 已提交
1760 1761
		/* Need to reassemble record */
		XLogContRecord *contrecord;
B
Bruce Momjian 已提交
1762
		uint32		gotlen = len;
1763

T
Tom Lane 已提交
1764
		memcpy(buffer, record, len);
1765
		record = (XLogRecord *) buffer;
T
Tom Lane 已提交
1766
		buffer += len;
1767
		for (;;)
1768
		{
T
Tom Lane 已提交
1769 1770
			readOff += BLCKSZ;
			if (readOff >= XLogSegSize)
1771 1772
			{
				close(readFile);
T
Tom Lane 已提交
1773 1774 1775
				readFile = -1;
				NextLogSeg(readId, readSeg);
				readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1776 1777
				if (readFile < 0)
					goto next_record_is_invalid;
T
Tom Lane 已提交
1778
				readOff = 0;
1779 1780
			}
			if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
T
Tom Lane 已提交
1781 1782
			{
				elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
1783
					 readId, readSeg, readOff);
T
Tom Lane 已提交
1784 1785
				goto next_record_is_invalid;
			}
1786
			if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, true))
1787
				goto next_record_is_invalid;
T
Tom Lane 已提交
1788
			if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
1789
			{
T
Tom Lane 已提交
1790
				elog(emode, "ReadRecord: there is no ContRecord flag in logfile %u seg %u off %u",
1791
					 readId, readSeg, readOff);
1792 1793
				goto next_record_is_invalid;
			}
T
Tom Lane 已提交
1794
			contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD);
B
Bruce Momjian 已提交
1795
			if (contrecord->xl_rem_len == 0 ||
T
Tom Lane 已提交
1796
				total_len != (contrecord->xl_rem_len + gotlen))
1797
			{
T
Tom Lane 已提交
1798 1799
				elog(emode, "ReadRecord: invalid cont-record len %u in logfile %u seg %u off %u",
					 contrecord->xl_rem_len, readId, readSeg, readOff);
1800 1801
				goto next_record_is_invalid;
			}
T
Tom Lane 已提交
1802 1803
			len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
			if (contrecord->xl_rem_len > len)
1804
			{
B
Bruce Momjian 已提交
1805
				memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
T
Tom Lane 已提交
1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818
				gotlen += len;
				buffer += len;
				continue;
			}
			memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
				   contrecord->xl_rem_len);
			break;
		}
		if (!RecordIsValid(record, *RecPtr, emode))
			goto next_record_is_invalid;
		if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD +
			SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
		{
B
Bruce Momjian 已提交
1819
			nextRecord = (XLogRecord *) ((char *) contrecord +
T
Tom Lane 已提交
1820 1821 1822 1823
				SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len));
		}
		EndRecPtr.xlogid = readId;
		EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
B
Bruce Momjian 已提交
1824
			SizeOfXLogPHD + SizeOfXLogContRecord +
T
Tom Lane 已提交
1825 1826 1827
			MAXALIGN(contrecord->xl_rem_len);
		ReadRecPtr = *RecPtr;
		return record;
1828 1829
	}

T
Tom Lane 已提交
1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840
	/* Record does not cross a page boundary */
	if (!RecordIsValid(record, *RecPtr, emode))
		goto next_record_is_invalid;
	if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ +
		MAXALIGN(total_len))
		nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
	EndRecPtr.xlogid = RecPtr->xlogid;
	EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
	ReadRecPtr = *RecPtr;
	memcpy(buffer, record, total_len);
	return (XLogRecord *) buffer;
1841

T
Tom Lane 已提交
1842 1843 1844 1845 1846
next_record_is_invalid:;
	close(readFile);
	readFile = -1;
	nextRecord = NULL;
	return NULL;
1847 1848
}

1849 1850 1851 1852
/*
 * Check whether the xlog header of a page just read in looks valid.
 *
 * This is just a convenience subroutine to avoid duplicated code in
B
Bruce Momjian 已提交
1853
 * ReadRecord.	It's not intended for use from anywhere else.
1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869
 */
static bool
ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
{
	if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
	{
		elog(emode, "ReadRecord: invalid magic number %04X in logfile %u seg %u off %u",
			 hdr->xlp_magic, readId, readSeg, readOff);
		return false;
	}
	if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
	{
		elog(emode, "ReadRecord: invalid info bits %04X in logfile %u seg %u off %u",
			 hdr->xlp_info, readId, readSeg, readOff);
		return false;
	}
B
Bruce Momjian 已提交
1870

1871
	/*
B
Bruce Momjian 已提交
1872 1873 1874 1875
	 * We disbelieve a SUI less than the previous page's SUI, or more than
	 * a few counts greater.  In theory as many as 512 shutdown checkpoint
	 * records could appear on a 32K-sized xlog page, so that's the most
	 * differential there could legitimately be.
1876 1877
	 *
	 * Note this check can only be applied when we are reading the next page
B
Bruce Momjian 已提交
1878 1879
	 * in sequence, so ReadRecord passes a flag indicating whether to
	 * check.
1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894
	 */
	if (checkSUI)
	{
		if (hdr->xlp_sui < lastReadSUI ||
			hdr->xlp_sui > lastReadSUI + 512)
		{
			elog(emode, "ReadRecord: out-of-sequence SUI %u (after %u) in logfile %u seg %u off %u",
				 hdr->xlp_sui, lastReadSUI, readId, readSeg, readOff);
			return false;
		}
	}
	lastReadSUI = hdr->xlp_sui;
	return true;
}

1895 1896 1897 1898
/*
 * I/O routines for pg_control
 *
 * *ControlFile is a buffer in shared memory that holds an image of the
B
Bruce Momjian 已提交
1899
 * contents of pg_control.	WriteControlFile() initializes pg_control
1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913
 * given a preloaded buffer, ReadControlFile() loads the buffer from
 * the pg_control file (during postmaster or standalone-backend startup),
 * and UpdateControlFile() rewrites pg_control after we modify xlog state.
 *
 * For simplicity, WriteControlFile() initializes the fields of pg_control
 * that are related to checking backend/database compatibility, and
 * ReadControlFile() verifies they are correct.  We could split out the
 * I/O and compatibility-check functions, but there seems no need currently.
 */

void
XLOGPathInit(void)
{
	/* Init XLOG file paths */
1914 1915 1916
	snprintf(XLogDir, MAXPGPATH, "%s%cpg_xlog", DataDir, SEP_CHAR);
	snprintf(ControlFilePath, MAXPGPATH, "%s%cglobal%cpg_control",
			 DataDir, SEP_CHAR, SEP_CHAR);
1917 1918 1919 1920 1921 1922
}

static void
WriteControlFile(void)
{
	int			fd;
B
Bruce Momjian 已提交
1923 1924
	char		buffer[BLCKSZ]; /* need not be aligned */

1925 1926
#ifdef USE_LOCALE
	char	   *localeptr;
B
Bruce Momjian 已提交
1927

1928 1929 1930
#endif

	/*
T
Tom Lane 已提交
1931
	 * Initialize version and compatibility-check fields
1932
	 */
T
Tom Lane 已提交
1933 1934
	ControlFile->pg_control_version = PG_CONTROL_VERSION;
	ControlFile->catalog_version_no = CATALOG_VERSION_NO;
1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945
	ControlFile->blcksz = BLCKSZ;
	ControlFile->relseg_size = RELSEG_SIZE;
#ifdef USE_LOCALE
	localeptr = setlocale(LC_COLLATE, NULL);
	if (!localeptr)
		elog(STOP, "Invalid LC_COLLATE setting");
	StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
	localeptr = setlocale(LC_CTYPE, NULL);
	if (!localeptr)
		elog(STOP, "Invalid LC_CTYPE setting");
	StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
B
Bruce Momjian 已提交
1946

1947 1948
	/*
	 * Issue warning notice if initdb'ing in a locale that will not permit
B
Bruce Momjian 已提交
1949 1950
	 * LIKE index optimization.  This is not a clean place to do it, but I
	 * don't see a better place either...
1951 1952 1953 1954 1955
	 */
	if (!locale_is_like_safe())
		elog(NOTICE, "Initializing database with %s collation order."
			 "\n\tThis locale setting will prevent use of index optimization for"
			 "\n\tLIKE and regexp searches.  If you are concerned about speed of"
B
Bruce Momjian 已提交
1956
		  "\n\tsuch queries, you may wish to set LC_COLLATE to \"C\" and"
1957 1958 1959 1960 1961 1962 1963
			 "\n\tre-initdb.  For more information see the Administrator's Guide.",
			 ControlFile->lc_collate);
#else
	strcpy(ControlFile->lc_collate, "C");
	strcpy(ControlFile->lc_ctype, "C");
#endif

T
Tom Lane 已提交
1964 1965
	/* Contents are protected with a CRC */
	INIT_CRC64(ControlFile->crc);
B
Bruce Momjian 已提交
1966 1967
	COMP_CRC64(ControlFile->crc,
			   (char *) ControlFile + sizeof(crc64),
T
Tom Lane 已提交
1968 1969 1970
			   sizeof(ControlFileData) - sizeof(crc64));
	FIN_CRC64(ControlFile->crc);

1971
	/*
B
Bruce Momjian 已提交
1972 1973 1974 1975 1976
	 * We write out BLCKSZ bytes into pg_control, zero-padding the excess
	 * over sizeof(ControlFileData).  This reduces the odds of
	 * premature-EOF errors when reading pg_control.  We'll still fail
	 * when we check the contents of the file, but hopefully with a more
	 * specific error than "couldn't read pg_control".
1977 1978 1979
	 */
	if (sizeof(ControlFileData) > BLCKSZ)
		elog(STOP, "sizeof(ControlFileData) is too large ... fix xlog.c");
1980

1981 1982 1983
	memset(buffer, 0, BLCKSZ);
	memcpy(buffer, ControlFile, sizeof(ControlFileData));

1984 1985
	fd = BasicOpenFile(ControlFilePath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
					   S_IRUSR | S_IWUSR);
1986 1987 1988 1989 1990 1991 1992
	if (fd < 0)
		elog(STOP, "WriteControlFile failed to create control file (%s): %m",
			 ControlFilePath);

	if (write(fd, buffer, BLCKSZ) != BLCKSZ)
		elog(STOP, "WriteControlFile failed to write control file: %m");

1993
	if (pg_fsync(fd) != 0)
1994 1995 1996 1997 1998 1999 2000 2001
		elog(STOP, "WriteControlFile failed to fsync control file: %m");

	close(fd);
}

static void
ReadControlFile(void)
{
2002
	crc64		crc;
2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
	int			fd;

	/*
	 * Read data...
	 */
	fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
	if (fd < 0)
		elog(STOP, "open(\"%s\") failed: %m", ControlFilePath);

	if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
		elog(STOP, "read(\"%s\") failed: %m", ControlFilePath);

	close(fd);

T
Tom Lane 已提交
2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027
	/*
	 * Check for expected pg_control format version.  If this is wrong,
	 * the CRC check will likely fail because we'll be checking the wrong
	 * number of bytes.  Complaining about wrong version will probably be
	 * more enlightening than complaining about wrong CRC.
	 */
	if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
		elog(STOP, "database was initialized with PG_CONTROL_VERSION %d,\n\tbut the backend was compiled with PG_CONTROL_VERSION %d.\n\tlooks like you need to initdb.",
			 ControlFile->pg_control_version, PG_CONTROL_VERSION);

	/* Now check the CRC. */
2028
	INIT_CRC64(crc);
B
Bruce Momjian 已提交
2029 2030
	COMP_CRC64(crc,
			   (char *) ControlFile + sizeof(crc64),
T
Tom Lane 已提交
2031
			   sizeof(ControlFileData) - sizeof(crc64));
2032 2033
	FIN_CRC64(crc);

T
Tom Lane 已提交
2034
	if (!EQ_CRC64(crc, ControlFile->crc))
2035 2036
		elog(STOP, "Invalid CRC in control file");

2037
	/*
B
Bruce Momjian 已提交
2038 2039
	 * Do compatibility checking immediately.  We do this here for 2
	 * reasons:
2040
	 *
B
Bruce Momjian 已提交
2041 2042
	 * (1) if the database isn't compatible with the backend executable, we
	 * want to abort before we can possibly do any damage;
2043 2044 2045
	 *
	 * (2) this code is executed in the postmaster, so the setlocale() will
	 * propagate to forked backends, which aren't going to read this file
B
Bruce Momjian 已提交
2046
	 * for themselves.	(These locale settings are considered critical
2047 2048
	 * compatibility items because they can affect sort order of indexes.)
	 */
T
Tom Lane 已提交
2049 2050 2051
	if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
		elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.",
			 ControlFile->catalog_version_no, CATALOG_VERSION_NO);
2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072
	if (ControlFile->blcksz != BLCKSZ)
		elog(STOP, "database was initialized with BLCKSZ %d,\n\tbut the backend was compiled with BLCKSZ %d.\n\tlooks like you need to initdb.",
			 ControlFile->blcksz, BLCKSZ);
	if (ControlFile->relseg_size != RELSEG_SIZE)
		elog(STOP, "database was initialized with RELSEG_SIZE %d,\n\tbut the backend was compiled with RELSEG_SIZE %d.\n\tlooks like you need to initdb.",
			 ControlFile->relseg_size, RELSEG_SIZE);
#ifdef USE_LOCALE
	if (setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
		elog(STOP, "database was initialized with LC_COLLATE '%s',\n\twhich is not recognized by setlocale().\n\tlooks like you need to initdb.",
			 ControlFile->lc_collate);
	if (setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
		elog(STOP, "database was initialized with LC_CTYPE '%s',\n\twhich is not recognized by setlocale().\n\tlooks like you need to initdb.",
			 ControlFile->lc_ctype);
#else
	if (strcmp(ControlFile->lc_collate, "C") != 0 ||
		strcmp(ControlFile->lc_ctype, "C") != 0)
		elog(STOP, "database was initialized with LC_COLLATE '%s' and LC_CTYPE '%s',\n\tbut the backend was compiled without locale support.\n\tlooks like you need to initdb or recompile.",
			 ControlFile->lc_collate, ControlFile->lc_ctype);
#endif
}

2073
void
2074
UpdateControlFile(void)
2075
{
2076
	int			fd;
2077

2078
	INIT_CRC64(ControlFile->crc);
B
Bruce Momjian 已提交
2079 2080
	COMP_CRC64(ControlFile->crc,
			   (char *) ControlFile + sizeof(crc64),
T
Tom Lane 已提交
2081
			   sizeof(ControlFileData) - sizeof(crc64));
2082 2083
	FIN_CRC64(ControlFile->crc);

2084
	fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2085
	if (fd < 0)
2086
		elog(STOP, "open(\"%s\") failed: %m", ControlFilePath);
2087

2088
	if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2089
		elog(STOP, "write(cntlfile) failed: %m");
2090

2091
	if (pg_fsync(fd) != 0)
2092
		elog(STOP, "fsync(cntlfile) failed: %m");
2093 2094 2095 2096

	close(fd);
}

2097
/*
T
Tom Lane 已提交
2098
 * Initialization of shared memory for XLOG
2099 2100
 */

2101
int
2102
XLOGShmemSize(void)
2103 2104 2105 2106
{
	if (XLOGbuffers < MinXLOGbuffers)
		XLOGbuffers = MinXLOGbuffers;

T
Tom Lane 已提交
2107 2108 2109
	return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
		+ BLCKSZ * XLOGbuffers +
		MAXALIGN(sizeof(ControlFileData));
2110 2111 2112 2113 2114
}

void
XLOGShmemInit(void)
{
2115
	bool		found;
2116

2117
	/* this must agree with space requested by XLOGShmemSize() */
2118 2119 2120
	if (XLOGbuffers < MinXLOGbuffers)
		XLOGbuffers = MinXLOGbuffers;

2121
	XLogCtl = (XLogCtlData *)
T
Tom Lane 已提交
2122 2123 2124 2125 2126
		ShmemInitStruct("XLOG Ctl",
						MAXALIGN(sizeof(XLogCtlData) +
								 sizeof(XLogRecPtr) * XLOGbuffers)
						+ BLCKSZ * XLOGbuffers,
						&found);
2127
	Assert(!found);
2128 2129 2130 2131
	ControlFile = (ControlFileData *)
		ShmemInitStruct("Control File", sizeof(ControlFileData), &found);
	Assert(!found);

T
Tom Lane 已提交
2132
	memset(XLogCtl, 0, sizeof(XLogCtlData));
B
Bruce Momjian 已提交
2133

T
Tom Lane 已提交
2134 2135 2136 2137 2138 2139 2140 2141
	/*
	 * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be
	 * a multiple of the alignment for same, so no extra alignment padding
	 * is needed here.
	 */
	XLogCtl->xlblocks = (XLogRecPtr *)
		(((char *) XLogCtl) + sizeof(XLogCtlData));
	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
B
Bruce Momjian 已提交
2142

T
Tom Lane 已提交
2143
	/*
B
Bruce Momjian 已提交
2144 2145
	 * Here, on the other hand, we must MAXALIGN to ensure the page
	 * buffers have worst-case alignment.
T
Tom Lane 已提交
2146 2147 2148 2149 2150 2151 2152
	 */
	XLogCtl->pages =
		((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
									  sizeof(XLogRecPtr) * XLOGbuffers);
	memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);

	/*
B
Bruce Momjian 已提交
2153 2154
	 * Do basic initialization of XLogCtl shared data. (StartupXLOG will
	 * fill in additional info.)
T
Tom Lane 已提交
2155 2156 2157 2158 2159 2160 2161 2162 2163
	 */
	XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
	S_INIT_LOCK(&(XLogCtl->insert_lck));
	S_INIT_LOCK(&(XLogCtl->info_lck));
	S_INIT_LOCK(&(XLogCtl->logwrt_lck));
	S_INIT_LOCK(&(XLogCtl->chkp_lck));

2164 2165 2166 2167 2168 2169 2170
	/*
	 * If we are not in bootstrap mode, pg_control should already exist.
	 * Read and validate it immediately (see comments in ReadControlFile()
	 * for the reasons why).
	 */
	if (!IsBootstrapProcessingMode())
		ReadControlFile();
2171 2172 2173
}

/*
T
Tom Lane 已提交
2174 2175
 * This func must be called ONCE on system install.  It creates pg_control
 * and the initial XLOG segment.
2176 2177
 */
void
T
Tom Lane 已提交
2178
BootStrapXLOG(void)
2179
{
2180
	CheckPoint	checkPoint;
T
Tom Lane 已提交
2181 2182
	char	   *buffer;
	XLogPageHeader page;
2183
	XLogRecord *record;
B
Bruce Momjian 已提交
2184
	bool		use_existent;
2185
	crc64		crc;
2186

T
Tom Lane 已提交
2187 2188 2189 2190
	/* Use malloc() to ensure buffer is MAXALIGNED */
	buffer = (char *) malloc(BLCKSZ);
	page = (XLogPageHeader) buffer;

2191 2192 2193
	checkPoint.redo.xlogid = 0;
	checkPoint.redo.xrecoff = SizeOfXLogPHD;
	checkPoint.undo = checkPoint.redo;
T
Tom Lane 已提交
2194
	checkPoint.ThisStartUpID = 0;
2195
	checkPoint.nextXid = FirstTransactionId;
2196
	checkPoint.nextOid = BootstrapObjectIdData;
T
Tom Lane 已提交
2197
	checkPoint.time = time(NULL);
2198

2199 2200 2201 2202
	ShmemVariableCache->nextXid = checkPoint.nextXid;
	ShmemVariableCache->nextOid = checkPoint.nextOid;
	ShmemVariableCache->oidCount = 0;

2203 2204 2205
	memset(buffer, 0, BLCKSZ);
	page->xlp_magic = XLOG_PAGE_MAGIC;
	page->xlp_info = 0;
2206
	page->xlp_sui = checkPoint.ThisStartUpID;
2207 2208 2209
	record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
	record->xl_prev.xlogid = 0;
	record->xl_prev.xrecoff = 0;
2210 2211 2212
	record->xl_xact_prev = record->xl_prev;
	record->xl_xid = InvalidTransactionId;
	record->xl_len = sizeof(checkPoint);
T
Tom Lane 已提交
2213
	record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
2214
	record->xl_rmid = RM_XLOG_ID;
T
Tom Lane 已提交
2215
	memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
2216

2217
	INIT_CRC64(crc);
T
Tom Lane 已提交
2218
	COMP_CRC64(crc, &checkPoint, sizeof(checkPoint));
B
Bruce Momjian 已提交
2219
	COMP_CRC64(crc, (char *) record + sizeof(crc64),
T
Tom Lane 已提交
2220
			   SizeOfXLogRecord - sizeof(crc64));
2221 2222 2223
	FIN_CRC64(crc);
	record->xl_crc = crc;

2224 2225
	use_existent = false;
	openLogFile = XLogFileInit(0, 0, &use_existent, false);
2226

T
Tom Lane 已提交
2227
	if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
2228
		elog(STOP, "BootStrapXLOG failed to write logfile: %m");
2229

T
Tom Lane 已提交
2230
	if (pg_fsync(openLogFile) != 0)
2231
		elog(STOP, "BootStrapXLOG failed to fsync logfile: %m");
2232

T
Tom Lane 已提交
2233 2234
	close(openLogFile);
	openLogFile = -1;
2235

2236
	memset(ControlFile, 0, sizeof(ControlFileData));
T
Tom Lane 已提交
2237 2238 2239
	/* Initialize pg_control status fields */
	ControlFile->state = DB_SHUTDOWNED;
	ControlFile->time = checkPoint.time;
2240 2241 2242
	ControlFile->logId = 0;
	ControlFile->logSeg = 1;
	ControlFile->checkPoint = checkPoint.redo;
T
Tom Lane 已提交
2243
	ControlFile->checkPointCopy = checkPoint;
2244
	/* some additional ControlFile fields are set in WriteControlFile() */
2245

2246
	WriteControlFile();
2247 2248
}

2249
static char *
2250 2251
str_time(time_t tnow)
{
T
Tom Lane 已提交
2252
	static char buf[32];
2253

2254
	strftime(buf, sizeof(buf),
T
Tom Lane 已提交
2255
			 "%Y-%m-%d %H:%M:%S %Z",
2256
			 localtime(&tnow));
2257

2258
	return buf;
2259 2260 2261
}

/*
T
Tom Lane 已提交
2262
 * This must be called ONCE during postmaster or standalone-backend startup
2263 2264
 */
void
T
Tom Lane 已提交
2265
StartupXLOG(void)
2266
{
2267 2268
	XLogCtlInsert *Insert;
	CheckPoint	checkPoint;
T
Tom Lane 已提交
2269
	bool		wasShutdown;
2270
	XLogRecPtr	RecPtr,
T
Tom Lane 已提交
2271 2272 2273
				LastRec,
				checkPointLoc,
				EndOfLog;
2274
	XLogRecord *record;
T
Tom Lane 已提交
2275
	char	   *buffer;
2276

T
Tom Lane 已提交
2277 2278
	/* Use malloc() to ensure record buffer is MAXALIGNED */
	buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
2279

T
Tom Lane 已提交
2280
	CritSectionCount++;
2281 2282

	/*
2283 2284
	 * Read control file and check XLOG status looks valid.
	 *
B
Bruce Momjian 已提交
2285 2286
	 * Note: in most control paths, *ControlFile is already valid and we need
	 * not do ReadControlFile() here, but might as well do it to be sure.
2287
	 */
2288
	ReadControlFile();
2289

2290 2291 2292 2293
	if (ControlFile->logSeg == 0 ||
		ControlFile->time <= 0 ||
		ControlFile->state < DB_SHUTDOWNED ||
		ControlFile->state > DB_IN_PRODUCTION ||
2294
		!XRecOffIsValid(ControlFile->checkPoint.xrecoff))
2295
		elog(STOP, "control file context is broken");
2296 2297

	if (ControlFile->state == DB_SHUTDOWNED)
2298
		elog(LOG, "database system was shut down at %s",
2299
			 str_time(ControlFile->time));
2300
	else if (ControlFile->state == DB_SHUTDOWNING)
2301
		elog(LOG, "database system shutdown was interrupted at %s",
2302
			 str_time(ControlFile->time));
2303
	else if (ControlFile->state == DB_IN_RECOVERY)
2304
		elog(LOG, "database system was interrupted being in recovery at %s\n"
2305
			 "\tThis propably means that some data blocks are corrupted\n"
2306
			 "\tand you will have to use last backup for recovery.",
2307
			 str_time(ControlFile->time));
2308
	else if (ControlFile->state == DB_IN_PRODUCTION)
2309
		elog(LOG, "database system was interrupted at %s",
2310
			 str_time(ControlFile->time));
2311

T
Tom Lane 已提交
2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340
	/*
	 * Get the last valid checkpoint record.  If the latest one according
	 * to pg_control is broken, try the next-to-last one.
	 */
	record = ReadCheckpointRecord(ControlFile->checkPoint,
								  "primary", buffer);
	if (record != NULL)
	{
		checkPointLoc = ControlFile->checkPoint;
		elog(LOG, "CheckPoint record at (%u, %u)",
			 checkPointLoc.xlogid, checkPointLoc.xrecoff);
	}
	else
	{
		record = ReadCheckpointRecord(ControlFile->prevCheckPoint,
									  "secondary", buffer);
		if (record != NULL)
		{
			checkPointLoc = ControlFile->prevCheckPoint;
			elog(LOG, "Using previous CheckPoint record at (%u, %u)",
				 checkPointLoc.xlogid, checkPointLoc.xrecoff);
			InRecovery = true;	/* force recovery even if SHUTDOWNED */
		}
		else
			elog(STOP, "Unable to locate a valid CheckPoint record");
	}
	LastRec = RecPtr = checkPointLoc;
	memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
	wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
2341

V
Vadim B. Mikheev 已提交
2342
	elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u); Shutdown %s",
2343
		 checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
V
Vadim B. Mikheev 已提交
2344
		 checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
T
Tom Lane 已提交
2345
		 wasShutdown ? "TRUE" : "FALSE");
2346
	elog(LOG, "NextTransactionId: %u; NextOid: %u",
2347 2348
		 checkPoint.nextXid, checkPoint.nextOid);
	if (checkPoint.nextXid < FirstTransactionId ||
2349 2350 2351 2352 2353
		checkPoint.nextOid < BootstrapObjectIdData)
		elog(STOP, "Invalid NextTransactionId/NextOid");

	ShmemVariableCache->nextXid = checkPoint.nextXid;
	ShmemVariableCache->nextOid = checkPoint.nextOid;
2354
	ShmemVariableCache->oidCount = 0;
2355

V
WAL  
Vadim B. Mikheev 已提交
2356
	ThisStartUpID = checkPoint.ThisStartUpID;
B
Bruce Momjian 已提交
2357
	RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
2358
		XLogCtl->RedoRecPtr = checkPoint.redo;
V
WAL  
Vadim B. Mikheev 已提交
2359

2360 2361 2362 2363 2364
	if (XLByteLT(RecPtr, checkPoint.redo))
		elog(STOP, "Invalid redo in checkPoint record");
	if (checkPoint.undo.xrecoff == 0)
		checkPoint.undo = RecPtr;

B
Bruce Momjian 已提交
2365
	if (XLByteLT(checkPoint.undo, RecPtr) ||
V
Vadim B. Mikheev 已提交
2366
		XLByteLT(checkPoint.redo, RecPtr))
2367
	{
T
Tom Lane 已提交
2368
		if (wasShutdown)
V
Vadim B. Mikheev 已提交
2369
			elog(STOP, "Invalid Redo/Undo record in shutdown checkpoint");
V
WAL  
Vadim B. Mikheev 已提交
2370
		InRecovery = true;
2371 2372
	}
	else if (ControlFile->state != DB_SHUTDOWNED)
V
WAL  
Vadim B. Mikheev 已提交
2373
		InRecovery = true;
2374

V
WAL  
Vadim B. Mikheev 已提交
2375 2376
	/* REDO */
	if (InRecovery)
2377
	{
2378 2379
		elog(LOG, "database system was not properly shut down; "
			 "automatic recovery in progress...");
2380 2381 2382 2383
		ControlFile->state = DB_IN_RECOVERY;
		ControlFile->time = time(NULL);
		UpdateControlFile();

V
Vadim B. Mikheev 已提交
2384
		XLogOpenLogRelation();	/* open pg_log */
V
WAL  
Vadim B. Mikheev 已提交
2385
		XLogInitRelationCache();
V
Vadim B. Mikheev 已提交
2386

2387 2388
		/* Is REDO required ? */
		if (XLByteLT(checkPoint.redo, RecPtr))
T
Tom Lane 已提交
2389
			record = ReadRecord(&(checkPoint.redo), STOP, buffer);
B
Bruce Momjian 已提交
2390 2391
		else
/* read past CheckPoint record */
T
Tom Lane 已提交
2392
			record = ReadRecord(NULL, LOG, buffer);
2393

T
Tom Lane 已提交
2394
		if (record != NULL)
2395
		{
V
WAL  
Vadim B. Mikheev 已提交
2396
			InRedo = true;
2397
			elog(LOG, "redo starts at (%u, %u)",
2398
				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2399 2400 2401 2402
			do
			{
				if (record->xl_xid >= ShmemVariableCache->nextXid)
					ShmemVariableCache->nextXid = record->xl_xid + 1;
V
WAL  
Vadim B. Mikheev 已提交
2403 2404
				if (XLOG_DEBUG)
				{
B
Bruce Momjian 已提交
2405
					char		buf[8192];
V
WAL  
Vadim B. Mikheev 已提交
2406

B
Bruce Momjian 已提交
2407 2408 2409
					sprintf(buf, "REDO @ %u/%u; LSN %u/%u: ",
							ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
							EndRecPtr.xlogid, EndRecPtr.xrecoff);
V
WAL  
Vadim B. Mikheev 已提交
2410 2411
					xlog_outrec(buf, record);
					strcat(buf, " - ");
B
Bruce Momjian 已提交
2412 2413
					RmgrTable[record->xl_rmid].rm_desc(buf,
								record->xl_info, XLogRecGetData(record));
T
Tom Lane 已提交
2414
					fprintf(stderr, "%s\n", buf);
V
WAL  
Vadim B. Mikheev 已提交
2415 2416
				}

T
Tom Lane 已提交
2417
				if (record->xl_info & XLR_BKP_BLOCK_MASK)
2418 2419
					RestoreBkpBlocks(record, EndRecPtr);

2420
				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
T
Tom Lane 已提交
2421 2422
				record = ReadRecord(NULL, LOG, buffer);
			} while (record != NULL);
2423
			elog(LOG, "redo done at (%u, %u)",
2424
				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2425
			LastRec = ReadRecPtr;
V
WAL  
Vadim B. Mikheev 已提交
2426
			InRedo = false;
2427 2428
		}
		else
2429
			elog(LOG, "redo is not required");
V
WAL  
Vadim B. Mikheev 已提交
2430 2431
	}

T
Tom Lane 已提交
2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442
	/*
	 * Init xlog buffer cache using the block containing the last valid
	 * record from the previous incarnation.
	 */
	record = ReadRecord(&LastRec, STOP, buffer);
	EndOfLog = EndRecPtr;
	XLByteToPrevSeg(EndOfLog, openLogId, openLogSeg);
	openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
	openLogOff = 0;
	ControlFile->logId = openLogId;
	ControlFile->logSeg = openLogSeg + 1;
V
WAL  
Vadim B. Mikheev 已提交
2443
	Insert = &XLogCtl->Insert;
2444
	Insert->PrevRecord = LastRec;
B
Bruce Momjian 已提交
2445 2446

	/*
2447 2448
	 * If the next record will go to the new page
	 * then initialize for that one.
T
Tom Lane 已提交
2449
	 */
2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469
	if ((BLCKSZ - EndOfLog.xrecoff % BLCKSZ) < SizeOfXLogRecord)
		EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
	if (EndOfLog.xrecoff % BLCKSZ == 0)
	{
		if (EndOfLog.xrecoff >= XLogFileSize)
		{
			XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid + 1;
			XLogCtl->xlblocks[0].xrecoff = BLCKSZ;
		}
		else
		{
			XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid;
			XLogCtl->xlblocks[0].xrecoff = EndOfLog.xrecoff + BLCKSZ;
		}
		Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
		Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
		if (InRecovery)
			Insert->currpage->xlp_sui = ThisStartUpID;
		else
			Insert->currpage->xlp_sui = ThisStartUpID + 1;
2470
		/* rest of buffer was zeroed in XLOGShmemInit */
2471 2472 2473 2474 2475 2476 2477 2478
	}
	else
	{
		XLogCtl->xlblocks[0].xlogid = openLogId;
		XLogCtl->xlblocks[0].xrecoff =
			((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
		/*
		 * Tricky point here: readBuf contains the *last* block that the
2479 2480
		 * LastRec record spans, not the one it starts in.  The last block
		 * is indeed the one we want to use.
2481 2482 2483 2484 2485 2486 2487 2488
		 */
		Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
		memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
		Insert->currpos = (char *) Insert->currpage +
			(EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
		/* Make sure rest of page is zero */
		memset(Insert->currpos, 0, INSERT_FREESPACE(Insert));
	}
V
WAL  
Vadim B. Mikheev 已提交
2489

T
Tom Lane 已提交
2490
	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
V
WAL  
Vadim B. Mikheev 已提交
2491

T
Tom Lane 已提交
2492 2493 2494
	XLogCtl->Write.LogwrtResult = LogwrtResult;
	Insert->LogwrtResult = LogwrtResult;
	XLogCtl->LogwrtResult = LogwrtResult;
V
WAL  
Vadim B. Mikheev 已提交
2495

T
Tom Lane 已提交
2496 2497
	XLogCtl->LogwrtRqst.Write = EndOfLog;
	XLogCtl->LogwrtRqst.Flush = EndOfLog;
2498

V
Vadim B. Mikheev 已提交
2499
#ifdef NOT_USED
V
WAL  
Vadim B. Mikheev 已提交
2500 2501 2502
	/* UNDO */
	if (InRecovery)
	{
2503 2504 2505
		RecPtr = ReadRecPtr;
		if (XLByteLT(checkPoint.undo, RecPtr))
		{
2506
			elog(LOG, "undo starts at (%u, %u)",
2507
				 RecPtr.xlogid, RecPtr.xrecoff);
2508 2509
			do
			{
T
Tom Lane 已提交
2510
				record = ReadRecord(&RecPtr, STOP, buffer);
2511
				if (TransactionIdIsValid(record->xl_xid) &&
2512
					!TransactionIdDidCommit(record->xl_xid))
V
misc  
Vadim B. Mikheev 已提交
2513
					RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
2514 2515
				RecPtr = record->xl_prev;
			} while (XLByteLE(checkPoint.undo, RecPtr));
2516
			elog(LOG, "undo done at (%u, %u)",
2517
				 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
2518 2519
		}
		else
2520
			elog(LOG, "undo is not required");
2521
	}
V
WAL  
Vadim B. Mikheev 已提交
2522
#endif
2523

V
WAL  
Vadim B. Mikheev 已提交
2524
	if (InRecovery)
2525
	{
B
Bruce Momjian 已提交
2526

T
Tom Lane 已提交
2527 2528 2529 2530 2531 2532 2533
		/*
		 * In case we had to use the secondary checkpoint, make sure that
		 * it will still be shown as the secondary checkpoint after this
		 * CreateCheckPoint operation; we don't want the broken primary
		 * checkpoint to become prevCheckPoint...
		 */
		ControlFile->checkPoint = checkPointLoc;
2534
		CreateCheckPoint(true);
V
WAL  
Vadim B. Mikheev 已提交
2535
		XLogCloseRelationCache();
2536
	}
2537

T
Tom Lane 已提交
2538 2539 2540 2541
	/*
	 * Preallocate additional log files, if wanted.
	 */
	PreallocXlogFiles(EndOfLog);
2542

V
WAL  
Vadim B. Mikheev 已提交
2543
	InRecovery = false;
2544 2545 2546 2547 2548

	ControlFile->state = DB_IN_PRODUCTION;
	ControlFile->time = time(NULL);
	UpdateControlFile();

V
WAL  
Vadim B. Mikheev 已提交
2549 2550 2551
	ThisStartUpID++;
	XLogCtl->ThisStartUpID = ThisStartUpID;

2552
	elog(LOG, "database system is in production state");
2553
	CritSectionCount--;
2554

T
Tom Lane 已提交
2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607
	/* Shut down readFile facility, free space */
	if (readFile >= 0)
	{
		close(readFile);
		readFile = -1;
	}
	if (readBuf)
	{
		free(readBuf);
		readBuf = NULL;
	}

	free(buffer);
}

/* Subroutine to try to fetch and validate a prior checkpoint record */
static XLogRecord *
ReadCheckpointRecord(XLogRecPtr RecPtr,
					 const char *whichChkpt,
					 char *buffer)
{
	XLogRecord *record;

	if (!XRecOffIsValid(RecPtr.xrecoff))
	{
		elog(LOG, "Invalid %s checkPoint link in control file", whichChkpt);
		return NULL;
	}

	record = ReadRecord(&RecPtr, LOG, buffer);

	if (record == NULL)
	{
		elog(LOG, "Invalid %s checkPoint record", whichChkpt);
		return NULL;
	}
	if (record->xl_rmid != RM_XLOG_ID)
	{
		elog(LOG, "Invalid RMID in %s checkPoint record", whichChkpt);
		return NULL;
	}
	if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
		record->xl_info != XLOG_CHECKPOINT_ONLINE)
	{
		elog(LOG, "Invalid xl_info in %s checkPoint record", whichChkpt);
		return NULL;
	}
	if (record->xl_len != sizeof(CheckPoint))
	{
		elog(LOG, "Invalid length of %s checkPoint record", whichChkpt);
		return NULL;
	}
	return record;
2608 2609
}

V
WAL  
Vadim B. Mikheev 已提交
2610
/*
T
Tom Lane 已提交
2611
 * Postmaster uses this to initialize ThisStartUpID & RedoRecPtr from
2612
 * XLogCtlData located in shmem after successful startup.
V
WAL  
Vadim B. Mikheev 已提交
2613 2614 2615 2616 2617
 */
void
SetThisStartUpID(void)
{
	ThisStartUpID = XLogCtl->ThisStartUpID;
2618 2619 2620 2621
	RedoRecPtr = XLogCtl->RedoRecPtr;
}

/*
T
Tom Lane 已提交
2622
 * CheckPoint process called by postmaster saves copy of new RedoRecPtr
B
Bruce Momjian 已提交
2623
 * in shmem (using SetRedoRecPtr).	When checkpointer completes, postmaster
T
Tom Lane 已提交
2624 2625 2626 2627 2628 2629 2630 2631 2632
 * calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
 * subsequently-spawned backends will start out with a reasonably up-to-date
 * local RedoRecPtr.  Since these operations are not protected by any spinlock
 * and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
 * routines at other times!
 *
 * Note: once spawned, a backend must update its local RedoRecPtr from
 * XLogCtl->Insert.RedoRecPtr while holding the insert spinlock.  This is
 * done in XLogInsert().
2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643
 */
void
SetRedoRecPtr(void)
{
	XLogCtl->RedoRecPtr = RedoRecPtr;
}

void
GetRedoRecPtr(void)
{
	RedoRecPtr = XLogCtl->RedoRecPtr;
V
WAL  
Vadim B. Mikheev 已提交
2644 2645
}

2646
/*
T
Tom Lane 已提交
2647
 * This must be called ONCE during postmaster or standalone-backend shutdown
2648 2649
 */
void
T
Tom Lane 已提交
2650
ShutdownXLOG(void)
2651
{
2652
	elog(LOG, "shutting down");
2653

T
Tom Lane 已提交
2654 2655 2656
	/* suppress in-transaction check in CreateCheckPoint */
	MyLastRecPtr.xrecoff = 0;

2657
	CritSectionCount++;
V
Vadim B. Mikheev 已提交
2658
	CreateDummyCaches();
2659
	CreateCheckPoint(true);
2660
	CritSectionCount--;
2661

2662
	elog(LOG, "database system is shut down");
2663 2664
}

T
Tom Lane 已提交
2665 2666 2667
/*
 * Perform a checkpoint --- either during shutdown, or on-the-fly
 */
2668 2669 2670
void
CreateCheckPoint(bool shutdown)
{
2671 2672 2673
	CheckPoint	checkPoint;
	XLogRecPtr	recptr;
	XLogCtlInsert *Insert = &XLogCtl->Insert;
B
Bruce Momjian 已提交
2674
	XLogRecData rdata;
2675
	uint32		freespace;
V
Vadim B. Mikheev 已提交
2676 2677
	uint32		_logId;
	uint32		_logSeg;
2678
	unsigned	spins = 0;
V
Vadim B. Mikheev 已提交
2679 2680 2681

	if (MyLastRecPtr.xrecoff != 0)
		elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
B
Bruce Momjian 已提交
2682

2683
	START_CRIT_SECTION();
2684 2685

	/* Grab lock, using larger than normal sleep between tries (1 sec) */
V
Vadim B. Mikheev 已提交
2686 2687
	while (TAS(&(XLogCtl->chkp_lck)))
	{
2688 2689
		S_LOCK_SLEEP_INTERVAL(&(XLogCtl->chkp_lck), spins++,
							  CHECKPOINT_LOCK_TIMEOUT, 1000000);
V
Vadim B. Mikheev 已提交
2690
	}
2691 2692 2693 2694 2695 2696 2697

	if (shutdown)
	{
		ControlFile->state = DB_SHUTDOWNING;
		ControlFile->time = time(NULL);
		UpdateControlFile();
	}
T
Tom Lane 已提交
2698 2699

	memset(&checkPoint, 0, sizeof(checkPoint));
V
WAL  
Vadim B. Mikheev 已提交
2700
	checkPoint.ThisStartUpID = ThisStartUpID;
T
Tom Lane 已提交
2701
	checkPoint.time = time(NULL);
2702

2703
	S_LOCK(&(XLogCtl->insert_lck));
T
Tom Lane 已提交
2704 2705 2706 2707

	/*
	 * If this isn't a shutdown, and we have not inserted any XLOG records
	 * since the start of the last checkpoint, skip the checkpoint.  The
B
Bruce Momjian 已提交
2708 2709 2710 2711 2712 2713
	 * idea here is to avoid inserting duplicate checkpoints when the
	 * system is idle.	That wastes log space, and more importantly it
	 * exposes us to possible loss of both current and previous checkpoint
	 * records if the machine crashes just as we're writing the update.
	 * (Perhaps it'd make even more sense to checkpoint only when the
	 * previous checkpoint record is in a different xlog page?)
T
Tom Lane 已提交
2714 2715
	 *
	 * We have to make two tests to determine that nothing has happened since
B
Bruce Momjian 已提交
2716 2717 2718
	 * the start of the last checkpoint: current insertion point must
	 * match the end of the last checkpoint record, and its redo pointer
	 * must point to itself.
T
Tom Lane 已提交
2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745
	 */
	if (!shutdown)
	{
		XLogRecPtr	curInsert;

		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
		if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
			curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
			MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
			ControlFile->checkPoint.xlogid ==
			ControlFile->checkPointCopy.redo.xlogid &&
			ControlFile->checkPoint.xrecoff ==
			ControlFile->checkPointCopy.redo.xrecoff)
		{
			S_UNLOCK(&(XLogCtl->insert_lck));
			S_UNLOCK(&(XLogCtl->chkp_lck));
			END_CRIT_SECTION();
			return;
		}
	}

	/*
	 * Compute new REDO record ptr = location of next XLOG record.
	 *
	 * NB: this is NOT necessarily where the checkpoint record itself will
	 * be, since other backends may insert more XLOG records while we're
	 * off doing the buffer flush work.  Those XLOG records are logically
B
Bruce Momjian 已提交
2746
	 * after the checkpoint, even though physically before it.	Got that?
T
Tom Lane 已提交
2747 2748
	 */
	freespace = INSERT_FREESPACE(Insert);
2749 2750
	if (freespace < SizeOfXLogRecord)
	{
T
Tom Lane 已提交
2751 2752
		(void) AdvanceXLInsertBuffer();
		/* OK to ignore update return flag, since we will do flush anyway */
2753 2754
		freespace = BLCKSZ - SizeOfXLogPHD;
	}
T
Tom Lane 已提交
2755
	INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
B
Bruce Momjian 已提交
2756

T
Tom Lane 已提交
2757 2758 2759 2760
	/*
	 * Here we update the shared RedoRecPtr for future XLogInsert calls;
	 * this must be done while holding the insert lock.
	 */
2761
	RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
B
Bruce Momjian 已提交
2762

T
Tom Lane 已提交
2763
	/*
B
Bruce Momjian 已提交
2764 2765 2766 2767
	 * Get UNDO record ptr - this is oldest of PROC->logRec values. We do
	 * this while holding insert lock to ensure that we won't miss any
	 * about-to-commit transactions (UNDO must include all xacts that have
	 * commits after REDO point).
T
Tom Lane 已提交
2768 2769 2770 2771 2772 2773 2774 2775 2776 2777
	 */
	checkPoint.undo = GetUndoRecPtr();

	if (shutdown && checkPoint.undo.xrecoff != 0)
		elog(STOP, "Active transaction while data base is shutting down");

	/*
	 * Now we can release insert lock, allowing other xacts to proceed
	 * even while we are flushing disk buffers.
	 */
2778 2779 2780 2781 2782
	S_UNLOCK(&(XLogCtl->insert_lck));

	SpinAcquire(XidGenLockId);
	checkPoint.nextXid = ShmemVariableCache->nextXid;
	SpinRelease(XidGenLockId);
T
Tom Lane 已提交
2783

2784 2785
	SpinAcquire(OidGenLockId);
	checkPoint.nextOid = ShmemVariableCache->nextOid;
2786 2787
	if (!shutdown)
		checkPoint.nextOid += ShmemVariableCache->oidCount;
2788 2789
	SpinRelease(OidGenLockId);

T
Tom Lane 已提交
2790
	/*
B
Bruce Momjian 已提交
2791 2792
	 * Having constructed the checkpoint record, ensure all shmem disk
	 * buffers are flushed to disk.
T
Tom Lane 已提交
2793
	 */
V
Vadim B. Mikheev 已提交
2794
	FlushBufferPool();
2795

T
Tom Lane 已提交
2796 2797 2798
	/*
	 * Now insert the checkpoint record into XLOG.
	 */
2799
	rdata.buffer = InvalidBuffer;
B
Bruce Momjian 已提交
2800
	rdata.data = (char *) (&checkPoint);
2801 2802 2803
	rdata.len = sizeof(checkPoint);
	rdata.next = NULL;

T
Tom Lane 已提交
2804 2805 2806 2807 2808 2809
	recptr = XLogInsert(RM_XLOG_ID,
						shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
						XLOG_CHECKPOINT_ONLINE,
						&rdata);

	XLogFlush(recptr);
2810

T
Tom Lane 已提交
2811 2812 2813 2814 2815
	/*
	 * We now have ProcLastRecPtr = start of actual checkpoint record,
	 * recptr = end of actual checkpoint record.
	 */
	if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
2816 2817
		elog(STOP, "XLog concurrent activity while data base is shutting down");

T
Tom Lane 已提交
2818
	/*
B
Bruce Momjian 已提交
2819 2820 2821
	 * Remember location of prior checkpoint's earliest info. Oldest item
	 * is redo or undo, whichever is older; but watch out for case that
	 * undo = 0.
T
Tom Lane 已提交
2822
	 */
B
Bruce Momjian 已提交
2823
	if (ControlFile->checkPointCopy.undo.xrecoff != 0 &&
T
Tom Lane 已提交
2824 2825 2826 2827 2828
		XLByteLT(ControlFile->checkPointCopy.undo,
				 ControlFile->checkPointCopy.redo))
		XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg);
	else
		XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
2829

T
Tom Lane 已提交
2830 2831 2832
	/*
	 * Update the control file.
	 */
2833 2834 2835
	SpinAcquire(ControlFileLockId);
	if (shutdown)
		ControlFile->state = DB_SHUTDOWNED;
T
Tom Lane 已提交
2836 2837 2838
	ControlFile->prevCheckPoint = ControlFile->checkPoint;
	ControlFile->checkPoint = ProcLastRecPtr;
	ControlFile->checkPointCopy = checkPoint;
2839 2840 2841 2842
	ControlFile->time = time(NULL);
	UpdateControlFile();
	SpinRelease(ControlFileLockId);

V
Vadim B. Mikheev 已提交
2843
	/*
T
Tom Lane 已提交
2844 2845
	 * Delete offline log files (those no longer needed even for previous
	 * checkpoint).
V
Vadim B. Mikheev 已提交
2846 2847 2848
	 */
	if (_logId || _logSeg)
	{
T
Tom Lane 已提交
2849 2850
		PrevLogSeg(_logId, _logSeg);
		MoveOfflineLogs(_logId, _logSeg);
V
Vadim B. Mikheev 已提交
2851 2852
	}

T
Tom Lane 已提交
2853 2854 2855 2856 2857 2858 2859 2860
	/*
	 * Make more log segments if needed.  (Do this after deleting offline
	 * log segments, to avoid having peak disk space usage higher than
	 * necessary.)
	 */
	if (!shutdown)
		PreallocXlogFiles(recptr);

V
Vadim B. Mikheev 已提交
2861 2862
	S_UNLOCK(&(XLogCtl->chkp_lck));

2863
	END_CRIT_SECTION();
2864
}
V
WAL  
Vadim B. Mikheev 已提交
2865

T
Tom Lane 已提交
2866 2867 2868
/*
 * Write a NEXTOID log record
 */
2869 2870 2871
void
XLogPutNextOid(Oid nextOid)
{
B
Bruce Momjian 已提交
2872
	XLogRecData rdata;
2873

2874
	rdata.buffer = InvalidBuffer;
B
Bruce Momjian 已提交
2875
	rdata.data = (char *) (&nextOid);
2876 2877 2878 2879
	rdata.len = sizeof(Oid);
	rdata.next = NULL;
	(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
}
V
WAL  
Vadim B. Mikheev 已提交
2880

T
Tom Lane 已提交
2881 2882 2883
/*
 * XLOG resource manager's routines
 */
V
WAL  
Vadim B. Mikheev 已提交
2884 2885 2886
void
xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
B
Bruce Momjian 已提交
2887
	uint8		info = record->xl_info & ~XLR_INFO_MASK;
2888

2889
	if (info == XLOG_NEXTOID)
2890
	{
B
Bruce Momjian 已提交
2891
		Oid			nextOid;
2892 2893 2894

		memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
		if (ShmemVariableCache->nextOid < nextOid)
T
Tom Lane 已提交
2895
		{
2896
			ShmemVariableCache->nextOid = nextOid;
T
Tom Lane 已提交
2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914
			ShmemVariableCache->oidCount = 0;
		}
	}
	else if (info == XLOG_CHECKPOINT_SHUTDOWN)
	{
		CheckPoint	checkPoint;

		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
		/* In a SHUTDOWN checkpoint, believe the counters exactly */
		ShmemVariableCache->nextXid = checkPoint.nextXid;
		ShmemVariableCache->nextOid = checkPoint.nextOid;
		ShmemVariableCache->oidCount = 0;
	}
	else if (info == XLOG_CHECKPOINT_ONLINE)
	{
		CheckPoint	checkPoint;

		memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
2915
		/* In an ONLINE checkpoint, treat the counters like NEXTOID */
T
Tom Lane 已提交
2916 2917 2918 2919 2920 2921 2922
		if (ShmemVariableCache->nextXid < checkPoint.nextXid)
			ShmemVariableCache->nextXid = checkPoint.nextXid;
		if (ShmemVariableCache->nextOid < checkPoint.nextOid)
		{
			ShmemVariableCache->nextOid = checkPoint.nextOid;
			ShmemVariableCache->oidCount = 0;
		}
2923
	}
V
WAL  
Vadim B. Mikheev 已提交
2924
}
B
Bruce Momjian 已提交
2925

V
WAL  
Vadim B. Mikheev 已提交
2926 2927 2928 2929
void
xlog_undo(XLogRecPtr lsn, XLogRecord *record)
{
}
B
Bruce Momjian 已提交
2930

V
WAL  
Vadim B. Mikheev 已提交
2931
void
B
Bruce Momjian 已提交
2932
xlog_desc(char *buf, uint8 xl_info, char *rec)
V
WAL  
Vadim B. Mikheev 已提交
2933
{
B
Bruce Momjian 已提交
2934
	uint8		info = xl_info & ~XLR_INFO_MASK;
V
WAL  
Vadim B. Mikheev 已提交
2935

T
Tom Lane 已提交
2936 2937
	if (info == XLOG_CHECKPOINT_SHUTDOWN ||
		info == XLOG_CHECKPOINT_ONLINE)
V
WAL  
Vadim B. Mikheev 已提交
2938
	{
B
Bruce Momjian 已提交
2939 2940
		CheckPoint *checkpoint = (CheckPoint *) rec;

V
WAL  
Vadim B. Mikheev 已提交
2941
		sprintf(buf + strlen(buf), "checkpoint: redo %u/%u; undo %u/%u; "
B
Bruce Momjian 已提交
2942 2943 2944 2945 2946 2947
				"sui %u; xid %u; oid %u; %s",
				checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
				checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
				checkpoint->ThisStartUpID, checkpoint->nextXid,
				checkpoint->nextOid,
			 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
T
Tom Lane 已提交
2948
	}
2949 2950
	else if (info == XLOG_NEXTOID)
	{
B
Bruce Momjian 已提交
2951
		Oid			nextOid;
2952 2953 2954 2955

		memcpy(&nextOid, rec, sizeof(Oid));
		sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
	}
V
WAL  
Vadim B. Mikheev 已提交
2956 2957 2958 2959 2960 2961 2962
	else
		strcat(buf, "UNKNOWN");
}

static void
xlog_outrec(char *buf, XLogRecord *record)
{
B
Bruce Momjian 已提交
2963 2964
	int			bkpb;
	int			i;
2965 2966

	sprintf(buf + strlen(buf), "prev %u/%u; xprev %u/%u; xid %u",
B
Bruce Momjian 已提交
2967 2968 2969
			record->xl_prev.xlogid, record->xl_prev.xrecoff,
			record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
			record->xl_xid);
2970

T
Tom Lane 已提交
2971
	for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2972 2973 2974 2975 2976 2977 2978 2979 2980 2981
	{
		if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
			continue;
		bkpb++;
	}

	if (bkpb)
		sprintf(buf + strlen(buf), "; bkpb %d", bkpb);

	sprintf(buf + strlen(buf), ": %s",
B
Bruce Momjian 已提交
2982
			RmgrTable[record->xl_rmid].rm_name);
V
WAL  
Vadim B. Mikheev 已提交
2983
}
2984 2985 2986 2987 2988 2989 2990 2991 2992


/*
 * GUC support routines
 */

bool
check_xlog_sync_method(const char *method)
{
B
Bruce Momjian 已提交
2993 2994
	if (strcasecmp(method, "fsync") == 0)
		return true;
2995
#ifdef HAVE_FDATASYNC
B
Bruce Momjian 已提交
2996 2997
	if (strcasecmp(method, "fdatasync") == 0)
		return true;
2998 2999
#endif
#ifdef OPEN_SYNC_FLAG
B
Bruce Momjian 已提交
3000 3001
	if (strcasecmp(method, "open_sync") == 0)
		return true;
3002 3003
#endif
#ifdef OPEN_DATASYNC_FLAG
B
Bruce Momjian 已提交
3004 3005
	if (strcasecmp(method, "open_datasync") == 0)
		return true;
3006 3007 3008 3009 3010 3011 3012
#endif
	return false;
}

void
assign_xlog_sync_method(const char *method)
{
B
Bruce Momjian 已提交
3013 3014
	int			new_sync_method;
	int			new_sync_bit;
3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051

	if (strcasecmp(method, "fsync") == 0)
	{
		new_sync_method = SYNC_METHOD_FSYNC;
		new_sync_bit = 0;
	}
#ifdef HAVE_FDATASYNC
	else if (strcasecmp(method, "fdatasync") == 0)
	{
		new_sync_method = SYNC_METHOD_FDATASYNC;
		new_sync_bit = 0;
	}
#endif
#ifdef OPEN_SYNC_FLAG
	else if (strcasecmp(method, "open_sync") == 0)
	{
		new_sync_method = SYNC_METHOD_OPEN;
		new_sync_bit = OPEN_SYNC_FLAG;
	}
#endif
#ifdef OPEN_DATASYNC_FLAG
	else if (strcasecmp(method, "open_datasync") == 0)
	{
		new_sync_method = SYNC_METHOD_OPEN;
		new_sync_bit = OPEN_DATASYNC_FLAG;
	}
#endif
	else
	{
		/* Can't get here unless guc.c screwed up */
		elog(ERROR, "Bogus xlog sync method %s", method);
		new_sync_method = 0;	/* keep compiler quiet */
		new_sync_bit = 0;
	}

	if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
	{
B
Bruce Momjian 已提交
3052

3053
		/*
B
Bruce Momjian 已提交
3054 3055 3056 3057
		 * To ensure that no blocks escape unsynced, force an fsync on the
		 * currently open log segment (if any).  Also, if the open flag is
		 * changing, close the log file so it will be reopened (with new
		 * flag bit) at next use.
3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085
		 */
		if (openLogFile >= 0)
		{
			if (pg_fsync(openLogFile) != 0)
				elog(STOP, "fsync(logfile %u seg %u) failed: %m",
					 openLogId, openLogSeg);
			if (open_sync_bit != new_sync_bit)
			{
				if (close(openLogFile) != 0)
					elog(STOP, "close(logfile %u seg %u) failed: %m",
						 openLogId, openLogSeg);
				openLogFile = -1;
			}
		}
		sync_method = new_sync_method;
		open_sync_bit = new_sync_bit;
	}
}


/*
 * Issue appropriate kind of fsync (if any) on the current XLOG output file
 */
static void
issue_xlog_fsync(void)
{
	switch (sync_method)
	{
B
Bruce Momjian 已提交
3086
			case SYNC_METHOD_FSYNC:
3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105
			if (pg_fsync(openLogFile) != 0)
				elog(STOP, "fsync(logfile %u seg %u) failed: %m",
					 openLogId, openLogSeg);
			break;
#ifdef HAVE_FDATASYNC
		case SYNC_METHOD_FDATASYNC:
			if (pg_fdatasync(openLogFile) != 0)
				elog(STOP, "fdatasync(logfile %u seg %u) failed: %m",
					 openLogId, openLogSeg);
			break;
#endif
		case SYNC_METHOD_OPEN:
			/* write synced it already */
			break;
		default:
			elog(STOP, "bogus sync_method %d", sync_method);
			break;
	}
}