nbtinsert.c 65.2 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * nbtinsert.c
4
 *	  Item insertion in Lehman and Yao btrees for Postgres.
5
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
B
Add:  
Bruce Momjian 已提交
7
 * Portions Copyright (c) 1994, Regents of the University of California
8 9 10
 *
 *
 * IDENTIFICATION
11
 *	  src/backend/access/nbtree/nbtinsert.c
12 13 14 15
 *
 *-------------------------------------------------------------------------
 */

16
#include "postgres.h"
17

18
#include "access/heapam.h"
B
Bruce Momjian 已提交
19
#include "access/nbtree.h"
20
#include "access/transam.h"
21
#include "miscadmin.h"
22
#include "storage/lmgr.h"
23
#include "storage/predicate.h"
24
#include "utils/tqual.h"
M
Marc G. Fournier 已提交
25

26

27 28 29
typedef struct
{
	/* context data for _bt_checksplitloc */
B
Bruce Momjian 已提交
30
	Size		newitemsz;		/* size of new item to be inserted */
31
	int			fillfactor;		/* needed when splitting rightmost page */
32 33
	bool		is_leaf;		/* T if splitting a leaf page */
	bool		is_rightmost;	/* T if splitting a rightmost page */
B
Bruce Momjian 已提交
34 35 36
	OffsetNumber newitemoff;	/* where the new item is to be inserted */
	int			leftspace;		/* space available for items on left page */
	int			rightspace;		/* space available for items on right page */
B
Bruce Momjian 已提交
37
	int			olddataitemstotal;		/* space taken by old items */
38

B
Bruce Momjian 已提交
39
	bool		have_split;		/* found a valid split? */
40 41

	/* these fields valid only if have_split is true */
B
Bruce Momjian 已提交
42
	bool		newitemonleft;	/* new item on left or right of best split */
43
	OffsetNumber firstright;	/* best split point */
B
Bruce Momjian 已提交
44
	int			best_delta;		/* best size delta so far */
45 46
} FindSplitData;

47 48

static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
49

50
static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
51 52 53
				 Relation heapRel, Buffer buf, OffsetNumber offset,
				 ScanKey itup_scankey,
				 IndexUniqueCheck checkUnique, bool *is_unique);
54
static void _bt_findinsertloc(Relation rel,
55
				  Buffer *bufptr,
56 57 58
				  OffsetNumber *offsetptr,
				  int keysz,
				  ScanKey scankey,
59 60
				  IndexTuple newtup,
				  Relation heapRel);
61
static void _bt_insertonpg(Relation rel, Buffer buf,
B
Bruce Momjian 已提交
62
			   BTStack stack,
63
			   IndexTuple itup,
64
			   OffsetNumber newitemoff,
65
			   bool split_only_page);
66
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
B
Bruce Momjian 已提交
67
		  OffsetNumber newitemoff, Size newitemsz,
68
		  IndexTuple newitem, bool newitemonleft);
69
static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
B
Bruce Momjian 已提交
70 71 72
				 OffsetNumber newitemoff,
				 Size newitemsz,
				 bool *newitemonleft);
73
static void _bt_checksplitloc(FindSplitData *state,
B
Bruce Momjian 已提交
74 75
				  OffsetNumber firstoldonright, bool newitemonleft,
				  int dataitemstoleft, Size firstoldonrightsz);
76 77
static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
			 OffsetNumber itup_off);
78
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
B
Bruce Momjian 已提交
79
			int keysz, ScanKey scankey);
80
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
81

V
WAL  
Vadim B. Mikheev 已提交
82

83
/*
84
 *	_bt_doinsert() -- Handle insertion of a single index tuple in the tree.
85
 *
86
 *		This routine is called by the public interface routines, btbuild
87
 *		and btinsert.  By here, itup is filled in, including the TID.
88 89
 *
 *		If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
B
Bruce Momjian 已提交
90
 *		will allow duplicates.	Otherwise (UNIQUE_CHECK_YES or
91 92 93 94 95 96 97 98 99
 *		UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
 *		For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
 *		don't actually insert.
 *
 *		The result value is only significant for UNIQUE_CHECK_PARTIAL:
 *		it must be TRUE if the entry is known unique, else FALSE.
 *		(In the current implementation we'll also return TRUE after a
 *		successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
 *		that's just a coding artifact.)
100
 */
101
bool
102
_bt_doinsert(Relation rel, IndexTuple itup,
103
			 IndexUniqueCheck checkUnique, Relation heapRel)
104
{
105
	bool		is_unique = false;
106
	int			natts = rel->rd_rel->relnatts;
107 108 109
	ScanKey		itup_scankey;
	BTStack		stack;
	Buffer		buf;
110
	OffsetNumber offset;
111

112
	/* we need an insertion scan key to do our search, so build one */
113 114
	itup_scankey = _bt_mkscankey(rel, itup);

115
top:
116 117
	/* find the first page containing this key */
	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
118

119 120
	offset = InvalidOffsetNumber;

121
	/* trade in our read lock for a write lock */
122 123
	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
	LockBuffer(buf, BT_WRITE);
V
Vadim B. Mikheev 已提交
124

125 126
	/*
	 * If the page was split between the time that we surrendered our read
B
Bruce Momjian 已提交
127 128 129 130
	 * lock and acquired our write lock, then this page may no longer be the
	 * right place for the key we want to insert.  In this case, we need to
	 * move right in the tree.	See Lehman and Yao for an excruciatingly
	 * precise description.
131
	 */
132
	buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
133

134
	/*
B
Bruce Momjian 已提交
135 136
	 * If we're not allowing duplicates, make sure the key isn't already in
	 * the index.
137
	 *
138 139
	 * NOTE: obviously, _bt_check_unique can only detect keys that are already
	 * in the index; so it cannot defend against concurrent insertions of the
B
Bruce Momjian 已提交
140 141 142 143 144 145 146
	 * same key.  We protect against that by means of holding a write lock on
	 * the target page.  Any other would-be inserter of the same key must
	 * acquire a write lock on the same target page, so only one would-be
	 * inserter can be making the check at one time.  Furthermore, once we are
	 * past the check we hold write locks continuously until we have performed
	 * our insertion, so no later inserter can fail to see our insertion.
	 * (This requires some care in _bt_insertonpg.)
147
	 *
148 149
	 * If we must wait for another xact, we release the lock while waiting,
	 * and then must start over completely.
150
	 *
B
Bruce Momjian 已提交
151 152 153
	 * For a partial uniqueness check, we don't wait for the other xact. Just
	 * let the tuple in and return false for possibly non-unique, or true for
	 * definitely unique.
154
	 */
155
	if (checkUnique != UNIQUE_CHECK_NO)
156
	{
157
		TransactionId xwait;
158

159
		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
160 161
		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
								 checkUnique, &is_unique);
162 163 164 165

		if (TransactionIdIsValid(xwait))
		{
			/* Have to wait for the other guy ... */
166
			_bt_relbuf(rel, buf);
167 168 169 170 171 172 173
			XactLockTableWait(xwait);
			/* start over... */
			_bt_freestack(stack);
			goto top;
		}
	}

174 175
	if (checkUnique != UNIQUE_CHECK_EXISTING)
	{
176 177 178 179
		/*
		 * The only conflict predicate locking cares about for indexes is when
		 * an index tuple insert conflicts with an existing lock.  Since the
		 * actual location of the insert is hard to predict because of the
180 181
		 * random search used to prevent O(N^2) performance when there are
		 * many duplicate entries, we can just use the "first valid" page.
182 183
		 */
		CheckForSerializableConflictIn(rel, NULL, buf);
184
		/* do the insertion */
185
		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
186 187 188 189 190 191 192
		_bt_insertonpg(rel, buf, stack, itup, offset, false);
	}
	else
	{
		/* just release the buffer */
		_bt_relbuf(rel, buf);
	}
193 194 195 196

	/* be tidy */
	_bt_freestack(stack);
	_bt_freeskey(itup_scankey);
197 198

	return is_unique;
199 200 201 202 203
}

/*
 *	_bt_check_unique() -- Check for violation of unique index constraint
 *
204 205 206 207
 * offset points to the first possible item that could conflict. It can
 * also point to end-of-page, which means that the first tuple to check
 * is the first tuple on the next page.
 *
208 209
 * Returns InvalidTransactionId if there is no conflict, else an xact ID
 * we must wait for to see if it commits a conflicting tuple.	If an actual
210
 * conflict is detected, no return --- just ereport().
211 212 213 214 215
 *
 * However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
 * InvalidTransactionId because we don't want to wait.  In this case we
 * set *is_unique to false if there is a potential conflict, and the
 * core code must redo the uniqueness check later.
216 217
 */
static TransactionId
218
_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
219 220
				 Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
				 IndexUniqueCheck checkUnique, bool *is_unique)
221 222 223
{
	TupleDesc	itupdesc = RelationGetDescr(rel);
	int			natts = rel->rd_rel->relnatts;
224
	SnapshotData SnapshotDirty;
225
	OffsetNumber maxoff;
226 227 228
	Page		page;
	BTPageOpaque opaque;
	Buffer		nbuf = InvalidBuffer;
229 230 231 232
	bool		found = false;

	/* Assume unique until we find a duplicate */
	*is_unique = true;
233

234 235
	InitDirtySnapshot(SnapshotDirty);

236 237 238 239 240 241 242 243 244
	page = BufferGetPage(buf);
	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
	maxoff = PageGetMaxOffsetNumber(page);

	/*
	 * Scan over all equal tuples, looking for live conflicts.
	 */
	for (;;)
	{
245
		ItemId		curitemid;
246
		IndexTuple	curitup;
247
		BlockNumber nblkno;
248

249
		/*
250 251
		 * make sure the offset points to an actual item before trying to
		 * examine it...
252 253
		 */
		if (offset <= maxoff)
254
		{
255
			curitemid = PageGetItemId(page, offset);
B
Bruce Momjian 已提交
256

257
			/*
258 259
			 * We can skip items that are marked killed.
			 *
260 261
			 * Formerly, we applied _bt_isequal() before checking the kill
			 * flag, so as to fall out of the item loop as soon as possible.
B
Bruce Momjian 已提交
262 263 264 265 266 267 268 269
			 * However, in the presence of heavy update activity an index may
			 * contain many killed items with the same key; running
			 * _bt_isequal() on each killed item gets expensive. Furthermore
			 * it is likely that the non-killed version of each key appears
			 * first, so that we didn't actually get to exit any sooner
			 * anyway. So now we just advance over killed items as quickly as
			 * we can. We only apply _bt_isequal() when we get to a non-killed
			 * item or the end of the page.
270
			 */
271
			if (!ItemIdIsDead(curitemid))
272
			{
273
				ItemPointerData htid;
B
Bruce Momjian 已提交
274
				bool		all_dead;
275

276
				/*
B
Bruce Momjian 已提交
277 278 279 280
				 * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
				 * how we handling NULLs - and so we must not use _bt_compare
				 * in real comparison, but only for ordering/finding items on
				 * pages. - vadim 03/24/97
281 282
				 */
				if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
B
Bruce Momjian 已提交
283
					break;		/* we're past all the equal tuples */
284 285

				/* okay, we gotta fetch the heap tuple ... */
286
				curitup = (IndexTuple) PageGetItem(page, curitemid);
287 288
				htid = curitup->t_tid;

289 290
				/*
				 * If we are doing a recheck, we expect to find the tuple we
B
Bruce Momjian 已提交
291
				 * are rechecking.	It's not a duplicate, but we have to keep
292 293 294 295 296 297 298 299
				 * scanning.
				 */
				if (checkUnique == UNIQUE_CHECK_EXISTING &&
					ItemPointerCompare(&htid, &itup->t_tid) == 0)
				{
					found = true;
				}

300 301
				/*
				 * We check the whole HOT-chain to see if there is any tuple
B
Bruce Momjian 已提交
302 303
				 * that satisfies SnapshotDirty.  This is necessary because we
				 * have just a single index entry for the entire chain.
304
				 */
305 306
				else if (heap_hot_search(&htid, heapRel, &SnapshotDirty,
										 &all_dead))
307
				{
308 309 310 311
					TransactionId xwait;

					/*
					 * It is a duplicate. If we are only doing a partial
B
Bruce Momjian 已提交
312 313 314 315
					 * check, then don't bother checking if the tuple is being
					 * updated in another transaction. Just return the fact
					 * that it is a potential conflict and leave the full
					 * check till later.
316 317 318 319 320 321 322 323
					 */
					if (checkUnique == UNIQUE_CHECK_PARTIAL)
					{
						if (nbuf != InvalidBuffer)
							_bt_relbuf(rel, nbuf);
						*is_unique = false;
						return InvalidTransactionId;
					}
324

325 326 327 328
					/*
					 * If this tuple is being updated by other transaction
					 * then we have to wait for its commit/abort.
					 */
329 330 331
					xwait = (TransactionIdIsValid(SnapshotDirty.xmin)) ?
						SnapshotDirty.xmin : SnapshotDirty.xmax;

332 333 334 335 336 337 338
					if (TransactionIdIsValid(xwait))
					{
						if (nbuf != InvalidBuffer)
							_bt_relbuf(rel, nbuf);
						/* Tell _bt_doinsert to wait... */
						return xwait;
					}
V
Vadim B. Mikheev 已提交
339

340
					/*
341 342 343 344 345
					 * Otherwise we have a definite conflict.  But before
					 * complaining, look to see if the tuple we want to insert
					 * is itself now committed dead --- if so, don't complain.
					 * This is a waste of time in normal scenarios but we must
					 * do it to support CREATE INDEX CONCURRENTLY.
B
Bruce Momjian 已提交
346
					 *
347 348 349
					 * We must follow HOT-chains here because during
					 * concurrent index build, we insert the root TID though
					 * the actual tuple may be somewhere in the HOT-chain.
B
Bruce Momjian 已提交
350 351 352 353 354 355
					 * While following the chain we might not stop at the
					 * exact tuple which triggered the insert, but that's OK
					 * because if we find a live tuple anywhere in this chain,
					 * we have a unique key conflict.  The other live tuple is
					 * not part of this chain because it had a different index
					 * entry.
356
					 */
357 358
					htid = itup->t_tid;
					if (heap_hot_search(&htid, heapRel, SnapshotSelf, NULL))
359 360 361
					{
						/* Normal case --- it's still live */
					}
362
					else
363 364 365 366 367 368 369 370
					{
						/*
						 * It's been deleted, so no error, and no need to
						 * continue searching
						 */
						break;
					}

371
					/*
B
Bruce Momjian 已提交
372 373 374
					 * This is a definite conflict.  Break the tuple down into
					 * datums and report the error.  But first, make sure we
					 * release the buffer locks we're holding ---
375
					 * BuildIndexValueDescription could make catalog accesses,
B
Bruce Momjian 已提交
376 377
					 * which in the worst case might touch this same index and
					 * cause deadlocks.
378
					 */
379 380 381 382 383
					if (nbuf != InvalidBuffer)
						_bt_relbuf(rel, nbuf);
					_bt_relbuf(rel, buf);

					{
B
Bruce Momjian 已提交
384 385
						Datum		values[INDEX_MAX_KEYS];
						bool		isnull[INDEX_MAX_KEYS];
386 387 388

						index_deform_tuple(itup, RelationGetDescr(rel),
										   values, isnull);
389 390 391 392 393 394
						ereport(ERROR,
								(errcode(ERRCODE_UNIQUE_VIOLATION),
								 errmsg("duplicate key value violates unique constraint \"%s\"",
										RelationGetRelationName(rel)),
								 errdetail("Key %s already exists.",
										   BuildIndexValueDescription(rel,
395 396 397
															values, isnull)),
								 errtableconstraint(heapRel,
											 RelationGetRelationName(rel))));
398
					}
399
				}
400
				else if (all_dead)
401 402
				{
					/*
403 404 405
					 * The conflicting tuple (or whole HOT chain) is dead to
					 * everyone, so we may as well mark the index entry
					 * killed.
406
					 */
407 408
					ItemIdMarkDead(curitemid);
					opaque->btpo_flags |= BTP_HAS_GARBAGE;
409 410 411 412 413

					/*
					 * Mark buffer with a dirty hint, since state is not
					 * crucial. Be sure to mark the proper buffer dirty.
					 */
414
					if (nbuf != InvalidBuffer)
415
						MarkBufferDirtyHint(nbuf, true);
416
					else
417
						MarkBufferDirtyHint(buf, true);
418
				}
419
			}
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
		}

		/*
		 * Advance to next tuple to continue checking.
		 */
		if (offset < maxoff)
			offset = OffsetNumberNext(offset);
		else
		{
			/* If scankey == hikey we gotta check the next page too */
			if (P_RIGHTMOST(opaque))
				break;
			if (!_bt_isequal(itupdesc, page, P_HIKEY,
							 natts, itup_scankey))
				break;
435 436 437 438
			/* Advance to next non-dead page --- there must be one */
			for (;;)
			{
				nblkno = opaque->btpo_next;
439
				nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
440 441 442 443 444
				page = BufferGetPage(nbuf);
				opaque = (BTPageOpaque) PageGetSpecialPointer(page);
				if (!P_IGNORE(opaque))
					break;
				if (P_RIGHTMOST(opaque))
445
					elog(ERROR, "fell off the end of index \"%s\"",
446 447
						 RelationGetRelationName(rel));
			}
448 449
			maxoff = PageGetMaxOffsetNumber(page);
			offset = P_FIRSTDATAKEY(opaque);
450
		}
451
	}
452

453
	/*
B
Bruce Momjian 已提交
454 455 456
	 * If we are doing a recheck then we should have found the tuple we are
	 * checking.  Otherwise there's something very wrong --- probably, the
	 * index is on a non-immutable expression.
457 458 459 460 461 462
	 */
	if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("failed to re-find tuple within index \"%s\"",
						RelationGetRelationName(rel)),
463 464 465
		 errhint("This may be because of a non-immutable index expression."),
				 errtableconstraint(heapRel,
									RelationGetRelationName(rel))));
466

467
	if (nbuf != InvalidBuffer)
468
		_bt_relbuf(rel, nbuf);
469

470
	return InvalidTransactionId;
471 472
}

473 474 475

/*
 *	_bt_findinsertloc() -- Finds an insert location for a tuple
476
 *
477
 *		If the new key is equal to one or more existing keys, we can
478 479
 *		legitimately place it anywhere in the series of equal keys --- in fact,
 *		if the new key is equal to the page's "high key" we can place it on
B
Bruce Momjian 已提交
480
 *		the next page.	If it is equal to the high key, and there's not room
481
 *		to insert the new tuple on the current page without splitting, then
482 483 484 485 486
 *		we can move right hoping to find more free space and avoid a split.
 *		(We should not move right indefinitely, however, since that leads to
 *		O(N^2) insertion behavior in the presence of many equal keys.)
 *		Once we have chosen the page to put the key on, we'll insert it before
 *		any existing equal keys because of the way _bt_binsrch() works.
487
 *
488
 *		If there's not enough room in the space, we try to make room by
489
 *		removing any LP_DEAD tuples.
490 491
 *
 *		On entry, *buf and *offsetptr point to the first legal position
492
 *		where the new tuple could be inserted.	The caller should hold an
493 494 495 496 497 498
 *		exclusive lock on *buf.  *offsetptr can also be set to
 *		InvalidOffsetNumber, in which case the function will search for the
 *		right location within the page if needed.  On exit, they point to the
 *		chosen insert location.  If _bt_findinsertloc decides to move right,
 *		the lock and pin on the original page will be released and the new
 *		page returned to the caller is exclusively locked instead.
499 500 501
 *
 *		newtup is the new tuple we're inserting, and scankey is an insertion
 *		type scan key for it.
502
 */
503
static void
504 505 506 507 508
_bt_findinsertloc(Relation rel,
				  Buffer *bufptr,
				  OffsetNumber *offsetptr,
				  int keysz,
				  ScanKey scankey,
509 510
				  IndexTuple newtup,
				  Relation heapRel)
511
{
B
Bruce Momjian 已提交
512 513 514
	Buffer		buf = *bufptr;
	Page		page = BufferGetPage(buf);
	Size		itemsz;
515
	BTPageOpaque lpageop;
B
Bruce Momjian 已提交
516 517
	bool		movedright,
				vacuumed;
518
	OffsetNumber newitemoff;
519
	OffsetNumber firstlegaloff = *offsetptr;
520 521 522

	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);

523
	itemsz = IndexTupleDSize(*newtup);
B
Bruce Momjian 已提交
524 525
	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
								 * need to be consistent */
526

527
	/*
B
Bruce Momjian 已提交
528 529 530 531 532
	 * Check whether the item can fit on a btree page at all. (Eventually, we
	 * ought to try to apply TOAST methods if not.) We actually need to be
	 * able to fit three items on every page, so restrict any one item to 1/3
	 * the per-page available space. Note that at this point, itemsz doesn't
	 * include the ItemId.
533 534
	 *
	 * NOTE: if you change this, see also the similar code in _bt_buildadd().
535
	 */
536
	if (itemsz > BTMaxItemSize(page))
537 538
		ereport(ERROR,
				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
539 540
			errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
				   itemsz, BTMaxItemSize(page),
B
Bruce Momjian 已提交
541
				   RelationGetRelationName(rel)),
B
Bruce Momjian 已提交
542 543
		errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
				"Consider a function index of an MD5 hash of the value, "
544 545 546
				"or use full text indexing."),
				 errtableconstraint(heapRel,
									RelationGetRelationName(rel))));
547

548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
	/*----------
	 * If we will need to split the page to put the item on this page,
	 * check whether we can put the tuple somewhere to the right,
	 * instead.  Keep scanning right until we
	 *		(a) find a page with enough free space,
	 *		(b) reach the last page where the tuple can legally go, or
	 *		(c) get tired of searching.
	 * (c) is not flippant; it is important because if there are many
	 * pages' worth of equal keys, it's better to split one of the early
	 * pages than to scan all the way to the end of the run of equal keys
	 * on every insert.  We implement "get tired" as a random choice,
	 * since stopping after scanning a fixed number of pages wouldn't work
	 * well (we'd never reach the right-hand side of previously split
	 * pages).	Currently the probability of moving right is set at 0.99,
	 * which may seem too high to change the behavior much, but it does an
	 * excellent job of preventing O(N^2) behavior with many equal keys.
	 *----------
V
Vadim B. Mikheev 已提交
565
	 */
566 567 568
	movedright = false;
	vacuumed = false;
	while (PageGetFreeSpace(page) < itemsz)
569
	{
570
		Buffer		rbuf;
571

572
		/*
B
Bruce Momjian 已提交
573 574
		 * before considering moving right, see if we can obtain enough space
		 * by erasing LP_DEAD items
575 576
		 */
		if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
577
		{
578
			_bt_vacuum_one_page(rel, buf, heapRel);
579

B
Bruce Momjian 已提交
580 581 582 583
			/*
			 * remember that we vacuumed this page, because that makes the
			 * hint supplied by the caller invalid
			 */
584
			vacuumed = true;
585

586
			if (PageGetFreeSpace(page) >= itemsz)
B
Bruce Momjian 已提交
587
				break;			/* OK, now we have enough space */
588
		}
B
Bruce Momjian 已提交
589

590
		/*
591
		 * nope, so check conditions (b) and (c) enumerated above
592
		 */
593 594 595 596 597 598 599 600
		if (P_RIGHTMOST(lpageop) ||
			_bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
			random() <= (MAX_RANDOM_VALUE / 100))
			break;

		/*
		 * step right to next non-dead page
		 *
B
Bruce Momjian 已提交
601 602 603 604
		 * must write-lock that page before releasing write lock on current
		 * page; else someone else's _bt_check_unique scan could fail to see
		 * our insertion.  write locks on intermediate dead pages won't do
		 * because we don't know when they will get de-linked from the tree.
605 606 607 608 609 610 611 612 613 614 615 616 617
		 */
		rbuf = InvalidBuffer;

		for (;;)
		{
			BlockNumber rblkno = lpageop->btpo_next;

			rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
			page = BufferGetPage(rbuf);
			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
			if (!P_IGNORE(lpageop))
				break;
			if (P_RIGHTMOST(lpageop))
618
				elog(ERROR, "fell off the end of index \"%s\"",
619 620 621 622 623 624
					 RelationGetRelationName(rel));
		}
		_bt_relbuf(rel, buf);
		buf = rbuf;
		movedright = true;
		vacuumed = false;
625
	}
626

627
	/*
B
Bruce Momjian 已提交
628 629 630 631 632 633
	 * Now we are on the right page, so find the insert position. If we moved
	 * right at all, we know we should insert at the start of the page. If we
	 * didn't move right, we can use the firstlegaloff hint if the caller
	 * supplied one, unless we vacuumed the page which might have moved tuples
	 * around making the hint invalid. If we didn't move right or can't use
	 * the hint, find the position by searching.
634 635 636
	 */
	if (movedright)
		newitemoff = P_FIRSTDATAKEY(lpageop);
B
Bruce Momjian 已提交
637
	else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
		newitemoff = firstlegaloff;
	else
		newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);

	*bufptr = buf;
	*offsetptr = newitemoff;
}

/*----------
 *	_bt_insertonpg() -- Insert a tuple on a particular page in the index.
 *
 *		This recursive procedure does the following things:
 *
 *			+  if necessary, splits the target page (making sure that the
 *			   split is equitable as far as post-insert free space goes).
 *			+  inserts the tuple.
 *			+  if the page was split, pops the parent stack, and finds the
 *			   right place to insert the new child pointer (by walking
 *			   right using information stored in the parent stack).
 *			+  invokes itself with the appropriate tuple for the right
 *			   child page on the parent.
 *			+  updates the metapage if a true root or fast root is split.
 *
 *		On entry, we must have the right buffer in which to do the
 *		insertion, and the buffer must be pinned and write-locked.	On return,
 *		we will have dropped both the pin and the lock on the buffer.
 *
 *		The locking interactions in this code are critical.  You should
 *		grok Lehman and Yao's paper before making any changes.  In addition,
 *		you need to understand how we disambiguate duplicate keys in this
 *		implementation, in order to be able to find our location using
 *		L&Y "move right" operations.  Since we may insert duplicate user
 *		keys, and since these dups may propagate up the tree, we use the
 *		'afteritem' parameter to position ourselves correctly for the
 *		insertion on internal pages.
 *----------
 */
static void
_bt_insertonpg(Relation rel,
			   Buffer buf,
			   BTStack stack,
			   IndexTuple itup,
			   OffsetNumber newitemoff,
			   bool split_only_page)
{
	Page		page;
	BTPageOpaque lpageop;
	OffsetNumber firstright = InvalidOffsetNumber;
	Size		itemsz;

	page = BufferGetPage(buf);
	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);

	itemsz = IndexTupleDSize(*itup);
	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
								 * need to be consistent */

695 696
	/*
	 * Do we need to split the page to fit the item on it?
697
	 *
698 699 700
	 * Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
	 * so this comparison is correct even though we appear to be accounting
	 * only for the item and not for its line pointer.
701 702 703 704
	 */
	if (PageGetFreeSpace(page) < itemsz)
	{
		bool		is_root = P_ISROOT(lpageop);
705
		bool		is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
706
		bool		newitemonleft;
707
		Buffer		rbuf;
708

709 710 711 712
		/* Choose the split point */
		firstright = _bt_findsplitloc(rel, page,
									  newitemoff, itemsz,
									  &newitemonleft);
713

714 715
		/* split the buffer into left and right halves */
		rbuf = _bt_split(rel, buf, firstright,
716
						 newitemoff, itemsz, itup, newitemonleft);
717 718 719
		PredicateLockPageSplit(rel,
							   BufferGetBlockNumber(buf),
							   BufferGetBlockNumber(rbuf));
720

721
		/*----------
722 723
		 * By here,
		 *
724 725 726 727 728 729 730 731 732 733 734 735
		 *		+  our target page has been split;
		 *		+  the original tuple has been inserted;
		 *		+  we have write locks on both the old (left half)
		 *		   and new (right half) buffers, after the split; and
		 *		+  we know the key we want to insert into the parent
		 *		   (it's the "high key" on the left child page).
		 *
		 * We're ready to do the parent insertion.  We need to hold onto the
		 * locks for the child pages until we locate the parent, but we can
		 * release them before doing the actual insertion (see Lehman and Yao
		 * for the reasoning).
		 *----------
V
Vadim B. Mikheev 已提交
736
		 */
737 738 739 740 741 742 743
		_bt_insert_parent(rel, buf, rbuf, stack, is_root, is_only);
	}
	else
	{
		Buffer		metabuf = InvalidBuffer;
		Page		metapg = NULL;
		BTMetaPageData *metad = NULL;
744 745
		OffsetNumber itup_off;
		BlockNumber itup_blkno;
746

747 748 749 750
		itup_off = newitemoff;
		itup_blkno = BufferGetBlockNumber(buf);

		/*
B
Bruce Momjian 已提交
751 752 753 754 755
		 * If we are doing this insert because we split a page that was the
		 * only one on its tree level, but was not the root, it may have been
		 * the "fast root".  We need to ensure that the fast root link points
		 * at or above the current page.  We can safely acquire a lock on the
		 * metapage here --- see comments for _bt_newroot().
756 757
		 */
		if (split_only_page)
758
		{
759 760
			Assert(!P_ISLEAF(lpageop));

761 762 763
			metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
			metapg = BufferGetPage(metabuf);
			metad = BTPageGetMeta(metapg);
764

765 766 767 768 769
			if (metad->btm_fastlevel >= lpageop->btpo.level)
			{
				/* no update wanted */
				_bt_relbuf(rel, metabuf);
				metabuf = InvalidBuffer;
770
			}
771
		}
772

773
		/* Do the update.  No ereport(ERROR) until changes are logged */
774
		START_CRIT_SECTION();
775

776 777 778
		if (!_bt_pgaddtup(page, itemsz, itup, newitemoff))
			elog(PANIC, "failed to add new item to block %u in index \"%s\"",
				 itup_blkno, RelationGetRelationName(rel));
779

780 781
		MarkBufferDirty(buf);

782 783 784 785
		if (BufferIsValid(metabuf))
		{
			metad->btm_fastroot = itup_blkno;
			metad->btm_fastlevel = lpageop->btpo.level;
786
			MarkBufferDirty(metabuf);
787
		}
788

789
		/* XLOG stuff */
790
		if (RelationNeedsWAL(rel))
791 792
		{
			xl_btree_insert xlrec;
B
Bruce Momjian 已提交
793
			BlockNumber xldownlink;
794 795 796
			xl_btree_metadata xlmeta;
			uint8		xlinfo;
			XLogRecPtr	recptr;
797
			XLogRecData rdata[4];
798
			XLogRecData *nextrdata;
799
			IndexTupleData trunctuple;
800 801 802 803 804 805

			xlrec.target.node = rel->rd_node;
			ItemPointerSet(&(xlrec.target.tid), itup_blkno, itup_off);

			rdata[0].data = (char *) &xlrec;
			rdata[0].len = SizeOfBtreeInsert;
806
			rdata[0].buffer = InvalidBuffer;
807 808
			rdata[0].next = nextrdata = &(rdata[1]);

809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
			if (P_ISLEAF(lpageop))
				xlinfo = XLOG_BTREE_INSERT_LEAF;
			else
			{
				xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
				Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);

				nextrdata->data = (char *) &xldownlink;
				nextrdata->len = sizeof(BlockNumber);
				nextrdata->buffer = InvalidBuffer;
				nextrdata->next = nextrdata + 1;
				nextrdata++;

				xlinfo = XLOG_BTREE_INSERT_UPPER;
			}

825 826 827 828 829 830 831 832 833
			if (BufferIsValid(metabuf))
			{
				xlmeta.root = metad->btm_root;
				xlmeta.level = metad->btm_level;
				xlmeta.fastroot = metad->btm_fastroot;
				xlmeta.fastlevel = metad->btm_fastlevel;

				nextrdata->data = (char *) &xlmeta;
				nextrdata->len = sizeof(xl_btree_metadata);
834
				nextrdata->buffer = InvalidBuffer;
835 836
				nextrdata->next = nextrdata + 1;
				nextrdata++;
837

838 839
				xlinfo = XLOG_BTREE_INSERT_META;
			}
840

841 842 843
			/* Read comments in _bt_pgaddtup */
			if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop))
			{
844 845 846 847
				trunctuple = *itup;
				trunctuple.t_info = sizeof(IndexTupleData);
				nextrdata->data = (char *) &trunctuple;
				nextrdata->len = sizeof(IndexTupleData);
848 849 850
			}
			else
			{
851 852
				nextrdata->data = (char *) itup;
				nextrdata->len = IndexTupleDSize(*itup);
853 854
			}
			nextrdata->buffer = buf;
855
			nextrdata->buffer_std = true;
856 857 858
			nextrdata->next = NULL;

			recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
859

860
			if (BufferIsValid(metabuf))
861
			{
862
				PageSetLSN(metapg, recptr);
863
			}
864

865
			PageSetLSN(page, recptr);
866
		}
V
Vadim B. Mikheev 已提交
867

868
		END_CRIT_SECTION();
V
Vadim B. Mikheev 已提交
869

870
		/* release buffers */
871
		if (BufferIsValid(metabuf))
872
			_bt_relbuf(rel, metabuf);
873

874
		_bt_relbuf(rel, buf);
875
	}
876 877 878
}

/*
879
 *	_bt_split() -- split a page in the btree.
880
 *
881
 *		On entry, buf is the page to split, and is pinned and write-locked.
882 883 884 885 886
 *		firstright is the item index of the first item to be moved to the
 *		new right page.  newitemoff etc. tell us about the new item that
 *		must be inserted along with the data from the old page.
 *
 *		Returns the new right sibling of buf, pinned and write-locked.
887
 *		The pin and lock on buf are maintained.
888
 */
889
static Buffer
890
_bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
891
		  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
892
		  bool newitemonleft)
893
{
894 895 896 897
	Buffer		rbuf;
	Page		origpage;
	Page		leftpage,
				rightpage;
898 899
	BlockNumber origpagenumber,
				rightpagenumber;
900 901 902
	BTPageOpaque ropaque,
				lopaque,
				oopaque;
903 904 905
	Buffer		sbuf = InvalidBuffer;
	Page		spage = NULL;
	BTPageOpaque sopaque = NULL;
906 907
	Size		itemsz;
	ItemId		itemid;
908
	IndexTuple	item;
909 910 911 912
	OffsetNumber leftoff,
				rightoff;
	OffsetNumber maxoff;
	OffsetNumber i;
913
	bool		isroot;
V
WAL  
Vadim B. Mikheev 已提交
914

915
	/* Acquire a new page to split into */
916
	rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
917 918 919 920

	/*
	 * origpage is the original page to be split.  leftpage is a temporary
	 * buffer that receives the left-sibling data, which will be copied back
921 922 923 924 925 926 927
	 * into origpage on success.  rightpage is the new page that receives the
	 * right-sibling data.	If we fail before reaching the critical section,
	 * origpage hasn't been modified and leftpage is only workspace. In
	 * principle we shouldn't need to worry about rightpage either, because it
	 * hasn't been linked into the btree page structure; but to avoid leaving
	 * possibly-confusing junk behind, we are careful to rewrite rightpage as
	 * zeroes before throwing any error.
928
	 */
929
	origpage = BufferGetPage(buf);
930
	leftpage = PageGetTempPage(origpage);
931 932
	rightpage = BufferGetPage(rbuf);

933 934 935
	origpagenumber = BufferGetBlockNumber(buf);
	rightpagenumber = BufferGetBlockNumber(rbuf);

936
	_bt_pageinit(leftpage, BufferGetPageSize(buf));
937
	/* rightpage was already initialized by _bt_getbuf */
938

939 940 941 942 943 944 945
	/*
	 * Copy the original page's LSN and TLI into leftpage, which will become
	 * the updated version of the page.  We need this because XLogInsert will
	 * examine these fields and possibly dump them in a page image.
	 */
	PageSetLSN(leftpage, PageGetLSN(origpage));

946 947 948 949 950
	/* init btree private data */
	oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
	lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
	ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);

951 952
	isroot = P_ISROOT(oopaque);

953
	/* if we're splitting this page, it won't be the root when we're done */
954
	/* also, clear the SPLIT_END and HAS_GARBAGE flags in both pages */
V
Vadim B. Mikheev 已提交
955
	lopaque->btpo_flags = oopaque->btpo_flags;
956
	lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
V
Vadim B. Mikheev 已提交
957
	ropaque->btpo_flags = lopaque->btpo_flags;
958
	lopaque->btpo_prev = oopaque->btpo_prev;
959 960
	lopaque->btpo_next = rightpagenumber;
	ropaque->btpo_prev = origpagenumber;
961
	ropaque->btpo_next = oopaque->btpo_next;
962
	lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
963 964 965
	/* Since we already have write-lock on both pages, ok to read cycleid */
	lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
	ropaque->btpo_cycleid = lopaque->btpo_cycleid;
966

967
	/*
B
Bruce Momjian 已提交
968 969 970 971 972
	 * If the page we're splitting is not the rightmost page at its level in
	 * the tree, then the first entry on the page is the high key for the
	 * page.  We need to copy that to the right half.  Otherwise (meaning the
	 * rightmost page case), all the items on the right half will be user
	 * data.
973
	 */
974
	rightoff = P_HIKEY;
975 976 977 978 979

	if (!P_RIGHTMOST(oopaque))
	{
		itemid = PageGetItemId(origpage, P_HIKEY);
		itemsz = ItemIdGetLength(itemid);
980
		item = (IndexTuple) PageGetItem(origpage, itemid);
981
		if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
982
						false, false) == InvalidOffsetNumber)
983 984 985
		{
			memset(rightpage, 0, BufferGetPageSize(rbuf));
			elog(ERROR, "failed to add hikey to the right sibling"
986
				 " while splitting block %u of index \"%s\"",
987 988
				 origpagenumber, RelationGetRelationName(rel));
		}
989
		rightoff = OffsetNumberNext(rightoff);
990 991
	}

992
	/*
B
Bruce Momjian 已提交
993 994 995
	 * The "high key" for the new left page will be the first key that's going
	 * to go into the new right page.  This might be either the existing data
	 * item at position firstright, or the incoming tuple.
996 997 998 999 1000 1001 1002
	 */
	leftoff = P_HIKEY;
	if (!newitemonleft && newitemoff == firstright)
	{
		/* incoming tuple will become first on right page */
		itemsz = newitemsz;
		item = newitem;
1003
	}
1004
	else
1005
	{
1006 1007 1008
		/* existing item at firstright will become first on right page */
		itemid = PageGetItemId(origpage, firstright);
		itemsz = ItemIdGetLength(itemid);
1009
		item = (IndexTuple) PageGetItem(origpage, itemid);
1010
	}
1011
	if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
1012
					false, false) == InvalidOffsetNumber)
1013 1014 1015
	{
		memset(rightpage, 0, BufferGetPageSize(rbuf));
		elog(ERROR, "failed to add hikey to the left sibling"
1016
			 " while splitting block %u of index \"%s\"",
1017 1018
			 origpagenumber, RelationGetRelationName(rel));
	}
1019
	leftoff = OffsetNumberNext(leftoff);
1020

1021
	/*
1022 1023 1024 1025
	 * Now transfer all the data items to the appropriate page.
	 *
	 * Note: we *must* insert at least the right page's items in item-number
	 * order, for the benefit of _bt_restore_page().
1026 1027 1028 1029
	 */
	maxoff = PageGetMaxOffsetNumber(origpage);

	for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i))
1030 1031 1032
	{
		itemid = PageGetItemId(origpage, i);
		itemsz = ItemIdGetLength(itemid);
1033
		item = (IndexTuple) PageGetItem(origpage, itemid);
1034

1035 1036 1037 1038 1039
		/* does new item belong before this one? */
		if (i == newitemoff)
		{
			if (newitemonleft)
			{
1040 1041 1042 1043 1044 1045 1046
				if (!_bt_pgaddtup(leftpage, newitemsz, newitem, leftoff))
				{
					memset(rightpage, 0, BufferGetPageSize(rbuf));
					elog(ERROR, "failed to add new item to the left sibling"
						 " while splitting block %u of index \"%s\"",
						 origpagenumber, RelationGetRelationName(rel));
				}
1047 1048 1049 1050
				leftoff = OffsetNumberNext(leftoff);
			}
			else
			{
1051 1052 1053 1054 1055 1056 1057
				if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
				{
					memset(rightpage, 0, BufferGetPageSize(rbuf));
					elog(ERROR, "failed to add new item to the right sibling"
						 " while splitting block %u of index \"%s\"",
						 origpagenumber, RelationGetRelationName(rel));
				}
1058 1059 1060 1061
				rightoff = OffsetNumberNext(rightoff);
			}
		}

1062 1063 1064
		/* decide which page to put it on */
		if (i < firstright)
		{
1065 1066 1067 1068 1069 1070 1071
			if (!_bt_pgaddtup(leftpage, itemsz, item, leftoff))
			{
				memset(rightpage, 0, BufferGetPageSize(rbuf));
				elog(ERROR, "failed to add old item to the left sibling"
					 " while splitting block %u of index \"%s\"",
					 origpagenumber, RelationGetRelationName(rel));
			}
1072 1073 1074 1075
			leftoff = OffsetNumberNext(leftoff);
		}
		else
		{
1076 1077 1078 1079 1080 1081 1082
			if (!_bt_pgaddtup(rightpage, itemsz, item, rightoff))
			{
				memset(rightpage, 0, BufferGetPageSize(rbuf));
				elog(ERROR, "failed to add old item to the right sibling"
					 " while splitting block %u of index \"%s\"",
					 origpagenumber, RelationGetRelationName(rel));
			}
1083 1084 1085 1086
			rightoff = OffsetNumberNext(rightoff);
		}
	}

1087 1088 1089
	/* cope with possibility that newitem goes at the end */
	if (i <= newitemoff)
	{
T
Tom Lane 已提交
1090 1091 1092 1093 1094 1095
		/*
		 * Can't have newitemonleft here; that would imply we were told to put
		 * *everything* on the left page, which cannot fit (if it could, we'd
		 * not be splitting the page).
		 */
		Assert(!newitemonleft);
1096 1097 1098 1099 1100 1101 1102
		if (!_bt_pgaddtup(rightpage, newitemsz, newitem, rightoff))
		{
			memset(rightpage, 0, BufferGetPageSize(rbuf));
			elog(ERROR, "failed to add new item to the right sibling"
				 " while splitting block %u of index \"%s\"",
				 origpagenumber, RelationGetRelationName(rel));
		}
T
Tom Lane 已提交
1103
		rightoff = OffsetNumberNext(rightoff);
1104
	}
1105

V
Vadim B. Mikheev 已提交
1106
	/*
B
Bruce Momjian 已提交
1107 1108
	 * We have to grab the right sibling (if any) and fix the prev pointer
	 * there. We are guaranteed that this is deadlock-free since no other
B
Bruce Momjian 已提交
1109 1110
	 * writer will be holding a lock on that page and trying to move left, and
	 * all readers release locks on a page before trying to fetch its
B
Bruce Momjian 已提交
1111
	 * neighbors.
V
Vadim B. Mikheev 已提交
1112 1113
	 */

1114
	if (!P_RIGHTMOST(oopaque))
V
Vadim B. Mikheev 已提交
1115
	{
1116
		sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
V
Vadim B. Mikheev 已提交
1117
		spage = BufferGetPage(sbuf);
1118
		sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
1119 1120 1121 1122
		if (sopaque->btpo_prev != origpagenumber)
		{
			memset(rightpage, 0, BufferGetPageSize(rbuf));
			elog(ERROR, "right sibling's left-link doesn't match: "
1123
			   "block %u links to %u instead of expected %u in index \"%s\"",
1124
				 oopaque->btpo_next, sopaque->btpo_prev, origpagenumber,
1125
				 RelationGetRelationName(rel));
1126
		}
B
Bruce Momjian 已提交
1127

1128 1129 1130 1131
		/*
		 * Check to see if we can set the SPLIT_END flag in the right-hand
		 * split page; this can save some I/O for vacuum since it need not
		 * proceed to the right sibling.  We can set the flag if the right
B
Bruce Momjian 已提交
1132 1133
		 * sibling has a different cycleid: that means it could not be part of
		 * a group of pages that were all split off from the same ancestor
1134 1135 1136
		 * page.  If you're confused, imagine that page A splits to A B and
		 * then again, yielding A C B, while vacuum is in progress.  Tuples
		 * originally in A could now be in either B or C, hence vacuum must
B
Bruce Momjian 已提交
1137
		 * examine both pages.	But if D, our right sibling, has a different
1138 1139 1140 1141 1142
		 * cycleid then it could not contain any tuples that were in A when
		 * the vacuum started.
		 */
		if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
			ropaque->btpo_flags |= BTP_SPLIT_END;
V
Vadim B. Mikheev 已提交
1143 1144 1145
	}

	/*
B
Bruce Momjian 已提交
1146
	 * Right sibling is locked, new siblings are prepared, but original page
1147
	 * is not updated yet.
V
Vadim B. Mikheev 已提交
1148
	 *
1149 1150
	 * NO EREPORT(ERROR) till right sibling is updated.  We can get away with
	 * not starting the critical section till here because we haven't been
1151
	 * scribbling on the original page yet; see comments above.
V
Vadim B. Mikheev 已提交
1152
	 */
1153
	START_CRIT_SECTION();
1154

1155 1156 1157 1158 1159 1160 1161 1162
	/*
	 * By here, the original data page has been split into two new halves, and
	 * these are correct.  The algorithm requires that the left page never
	 * move during a split, so we copy the new left page back on top of the
	 * original.  Note that this is not a waste of time, since we also require
	 * (in the page management code) that the center of a page always be
	 * clean, and the most efficient way to guarantee this is just to compact
	 * the data by reinserting it into a new left page.  (XXX the latter
1163 1164
	 * comment is probably obsolete; but in any case it's good to not scribble
	 * on the original page until we enter the critical section.)
1165
	 *
B
Bruce Momjian 已提交
1166 1167
	 * We need to do this before writing the WAL record, so that XLogInsert
	 * can WAL log an image of the page if necessary.
1168 1169
	 */
	PageRestoreTempPage(leftpage, origpage);
1170
	/* leftpage, lopaque must not be used below here */
1171

1172 1173 1174 1175 1176
	MarkBufferDirty(buf);
	MarkBufferDirty(rbuf);

	if (!P_RIGHTMOST(ropaque))
	{
1177
		sopaque->btpo_prev = rightpagenumber;
1178 1179 1180
		MarkBufferDirty(sbuf);
	}

1181
	/* XLOG stuff */
1182
	if (RelationNeedsWAL(rel))
V
Vadim B. Mikheev 已提交
1183
	{
B
Bruce Momjian 已提交
1184
		xl_btree_split xlrec;
1185
		uint8		xlinfo;
B
Bruce Momjian 已提交
1186
		XLogRecPtr	recptr;
1187
		XLogRecData rdata[7];
1188 1189 1190
		XLogRecData *lastrdata;

		xlrec.node = rel->rd_node;
1191 1192
		xlrec.leftsib = origpagenumber;
		xlrec.rightsib = rightpagenumber;
1193
		xlrec.rnext = ropaque->btpo_next;
1194
		xlrec.level = ropaque->btpo.level;
1195
		xlrec.firstright = firstright;
1196

1197 1198 1199 1200 1201 1202
		rdata[0].data = (char *) &xlrec;
		rdata[0].len = SizeOfBtreeSplit;
		rdata[0].buffer = InvalidBuffer;

		lastrdata = &rdata[0];

1203
		if (ropaque->btpo.level > 0)
1204
		{
1205
			/* Log downlink on non-leaf pages */
1206 1207 1208 1209 1210 1211
			lastrdata->next = lastrdata + 1;
			lastrdata++;

			lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
			lastrdata->len = sizeof(BlockIdData);
			lastrdata->buffer = InvalidBuffer;
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228

			/*
			 * We must also log the left page's high key, because the right
			 * page's leftmost key is suppressed on non-leaf levels.  Show it
			 * as belonging to the left page buffer, so that it is not stored
			 * if XLogInsert decides it needs a full-page image of the left
			 * page.
			 */
			lastrdata->next = lastrdata + 1;
			lastrdata++;

			itemid = PageGetItemId(origpage, P_HIKEY);
			item = (IndexTuple) PageGetItem(origpage, itemid);
			lastrdata->data = (char *) item;
			lastrdata->len = MAXALIGN(IndexTupleSize(item));
			lastrdata->buffer = buf;	/* backup block 1 */
			lastrdata->buffer_std = true;
1229 1230
		}

1231 1232 1233 1234
		/*
		 * Log the new item and its offset, if it was inserted on the left
		 * page. (If it was put on the right page, we don't need to explicitly
		 * WAL log it because it's included with all the other items on the
B
Bruce Momjian 已提交
1235 1236 1237 1238
		 * right page.) Show the new item as belonging to the left page
		 * buffer, so that it is not stored if XLogInsert decides it needs a
		 * full-page image of the left page.  We store the offset anyway,
		 * though, to support archive compression of these records.
1239
		 */
V
Vadim B. Mikheev 已提交
1240
		if (newitemonleft)
1241
		{
1242 1243
			lastrdata->next = lastrdata + 1;
			lastrdata++;
1244

1245 1246
			lastrdata->data = (char *) &newitemoff;
			lastrdata->len = sizeof(OffsetNumber);
1247
			lastrdata->buffer = InvalidBuffer;
1248 1249 1250

			lastrdata->next = lastrdata + 1;
			lastrdata++;
1251

1252 1253
			lastrdata->data = (char *) newitem;
			lastrdata->len = MAXALIGN(newitemsz);
B
Bruce Momjian 已提交
1254
			lastrdata->buffer = buf;	/* backup block 1 */
1255 1256
			lastrdata->buffer_std = true;
		}
1257
		else if (ropaque->btpo.level == 0)
1258
		{
1259
			/*
B
Bruce Momjian 已提交
1260 1261 1262 1263
			 * Although we don't need to WAL-log the new item, we still need
			 * XLogInsert to consider storing a full-page image of the left
			 * page, so make an empty entry referencing that buffer. This also
			 * ensures that the left page is always backup block 1.
1264 1265 1266
			 */
			lastrdata->next = lastrdata + 1;
			lastrdata++;
1267

1268 1269
			lastrdata->data = NULL;
			lastrdata->len = 0;
B
Bruce Momjian 已提交
1270
			lastrdata->buffer = buf;	/* backup block 1 */
1271 1272
			lastrdata->buffer_std = true;
		}
B
Bruce Momjian 已提交
1273

1274 1275
		/*
		 * Log the contents of the right page in the format understood by
1276
		 * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
B
Bruce Momjian 已提交
1277 1278
		 * because we're going to recreate the whole page anyway, so it should
		 * never be stored by XLogInsert.
1279
		 *
B
Bruce Momjian 已提交
1280 1281
		 * Direct access to page is not good but faster - we should implement
		 * some new func in page API.  Note we only store the tuples
1282 1283
		 * themselves, knowing that they were inserted in item-number order
		 * and so the item pointers can be reconstructed.  See comments for
B
Bruce Momjian 已提交
1284
		 * _bt_restore_page().
V
Vadim B. Mikheev 已提交
1285
		 */
1286 1287
		lastrdata->next = lastrdata + 1;
		lastrdata++;
1288

1289
		lastrdata->data = (char *) rightpage +
1290 1291
			((PageHeader) rightpage)->pd_upper;
		lastrdata->len = ((PageHeader) rightpage)->pd_special -
B
Bruce Momjian 已提交
1292
			((PageHeader) rightpage)->pd_upper;
1293
		lastrdata->buffer = InvalidBuffer;
1294

1295
		/* Log the right sibling, because we've changed its' prev-pointer. */
1296 1297
		if (!P_RIGHTMOST(ropaque))
		{
1298 1299 1300 1301 1302
			lastrdata->next = lastrdata + 1;
			lastrdata++;

			lastrdata->data = NULL;
			lastrdata->len = 0;
B
Bruce Momjian 已提交
1303
			lastrdata->buffer = sbuf;	/* backup block 2 */
1304
			lastrdata->buffer_std = true;
1305 1306
		}

1307 1308 1309
		lastrdata->next = NULL;

		if (isroot)
1310 1311 1312
			xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L_ROOT : XLOG_BTREE_SPLIT_R_ROOT;
		else
			xlinfo = newitemonleft ? XLOG_BTREE_SPLIT_L : XLOG_BTREE_SPLIT_R;
1313

1314
		recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
V
Vadim B. Mikheev 已提交
1315

1316
		PageSetLSN(origpage, recptr);
V
Vadim B. Mikheev 已提交
1317 1318 1319 1320 1321 1322 1323
		PageSetLSN(rightpage, recptr);
		if (!P_RIGHTMOST(ropaque))
		{
			PageSetLSN(spage, recptr);
		}
	}

1324 1325
	END_CRIT_SECTION();

1326
	/* release the old right sibling */
1327
	if (!P_RIGHTMOST(ropaque))
1328
		_bt_relbuf(rel, sbuf);
1329 1330

	/* split's done */
1331
	return rbuf;
1332 1333 1334
}

/*
1335 1336 1337 1338 1339 1340
 *	_bt_findsplitloc() -- find an appropriate place to split a page.
 *
 * The idea here is to equalize the free space that will be on each split
 * page, *after accounting for the inserted tuple*.  (If we fail to account
 * for it, we might find ourselves with too little room on the page that
 * it needs to go into!)
1341
 *
1342
 * If the page is the rightmost page on its level, we instead try to arrange
1343 1344 1345
 * to leave the left split page fillfactor% full.  In this way, when we are
 * inserting successively increasing keys (consider sequences, timestamps,
 * etc) we will end up with a tree whose pages are about fillfactor% full,
1346
 * instead of the 50% full result that we'd get without this special case.
1347 1348
 * This is the same as nbtsort.c produces for a newly-created tree.  Note
 * that leaf and nonleaf pages use different fillfactors.
1349
 *
1350 1351 1352 1353 1354 1355
 * We are passed the intended insert position of the new tuple, expressed as
 * the offsetnumber of the tuple it must go in front of.  (This could be
 * maxoff+1 if the tuple is to go at the end.)
 *
 * We return the index of the first existing tuple that should go on the
 * righthand page, plus a boolean indicating whether the new tuple goes on
B
Bruce Momjian 已提交
1356
 * the left or right page.	The bool is necessary to disambiguate the case
1357
 * where firstright == newitemoff.
1358
 */
1359
static OffsetNumber
1360
_bt_findsplitloc(Relation rel,
1361
				 Page page,
1362 1363 1364
				 OffsetNumber newitemoff,
				 Size newitemsz,
				 bool *newitemonleft)
1365
{
1366 1367 1368 1369 1370 1371 1372
	BTPageOpaque opaque;
	OffsetNumber offnum;
	OffsetNumber maxoff;
	ItemId		itemid;
	FindSplitData state;
	int			leftspace,
				rightspace,
1373
				goodenough,
B
Bruce Momjian 已提交
1374 1375 1376
				olddataitemstotal,
				olddataitemstoleft;
	bool		goodenoughfound;
1377 1378 1379

	opaque = (BTPageOpaque) PageGetSpecialPointer(page);

1380 1381
	/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
	newitemsz += sizeof(ItemIdData);
B
Bruce Momjian 已提交
1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398

	/* Total free space available on a btree page, after fixed overhead */
	leftspace = rightspace =
		PageGetPageSize(page) - SizeOfPageHeaderData -
		MAXALIGN(sizeof(BTPageOpaqueData));

	/* The right page will have the same high key as the old page */
	if (!P_RIGHTMOST(opaque))
	{
		itemid = PageGetItemId(page, P_HIKEY);
		rightspace -= (int) (MAXALIGN(ItemIdGetLength(itemid)) +
							 sizeof(ItemIdData));
	}

	/* Count up total space in data items without actually scanning 'em */
	olddataitemstotal = rightspace - (int) PageGetExactFreeSpace(page);

1399
	state.newitemsz = newitemsz;
1400 1401
	state.is_leaf = P_ISLEAF(opaque);
	state.is_rightmost = P_RIGHTMOST(opaque);
1402
	state.have_split = false;
1403 1404 1405 1406 1407
	if (state.is_leaf)
		state.fillfactor = RelationGetFillFactor(rel,
												 BTREE_DEFAULT_FILLFACTOR);
	else
		state.fillfactor = BTREE_NONLEAF_FILLFACTOR;
1408 1409 1410
	state.newitemonleft = false;	/* these just to keep compiler quiet */
	state.firstright = 0;
	state.best_delta = 0;
B
Bruce Momjian 已提交
1411 1412 1413 1414
	state.leftspace = leftspace;
	state.rightspace = rightspace;
	state.olddataitemstotal = olddataitemstotal;
	state.newitemoff = newitemoff;
1415

1416
	/*
B
Bruce Momjian 已提交
1417 1418 1419 1420 1421 1422 1423
	 * Finding the best possible split would require checking all the possible
	 * split points, because of the high-key and left-key special cases.
	 * That's probably more work than it's worth; instead, stop as soon as we
	 * find a "good-enough" split, where good-enough is defined as an
	 * imbalance in free space of no more than pagesize/16 (arbitrary...) This
	 * should let us stop near the middle on most pages, instead of plowing to
	 * the end.
1424 1425 1426
	 */
	goodenough = leftspace / 16;

1427
	/*
B
Bruce Momjian 已提交
1428 1429
	 * Scan through the data items and calculate space usage for a split at
	 * each possible position.
1430
	 */
B
Bruce Momjian 已提交
1431 1432
	olddataitemstoleft = 0;
	goodenoughfound = false;
1433
	maxoff = PageGetMaxOffsetNumber(page);
1434

1435 1436 1437 1438 1439 1440 1441
	for (offnum = P_FIRSTDATAKEY(opaque);
		 offnum <= maxoff;
		 offnum = OffsetNumberNext(offnum))
	{
		Size		itemsz;

		itemid = PageGetItemId(page, offnum);
1442
		itemsz = MAXALIGN(ItemIdGetLength(itemid)) + sizeof(ItemIdData);
1443

1444 1445 1446 1447
		/*
		 * Will the new item go to left or right of split?
		 */
		if (offnum > newitemoff)
B
Bruce Momjian 已提交
1448 1449 1450
			_bt_checksplitloc(&state, offnum, true,
							  olddataitemstoleft, itemsz);

1451
		else if (offnum < newitemoff)
1452
			_bt_checksplitloc(&state, offnum, false,
B
Bruce Momjian 已提交
1453
							  olddataitemstoleft, itemsz);
1454
		else
1455
		{
1456
			/* need to try it both ways! */
B
Bruce Momjian 已提交
1457 1458 1459 1460 1461
			_bt_checksplitloc(&state, offnum, true,
							  olddataitemstoleft, itemsz);

			_bt_checksplitloc(&state, offnum, false,
							  olddataitemstoleft, itemsz);
1462
		}
1463

1464 1465
		/* Abort scan once we find a good-enough choice */
		if (state.have_split && state.best_delta <= goodenough)
B
Bruce Momjian 已提交
1466 1467
		{
			goodenoughfound = true;
1468
			break;
B
Bruce Momjian 已提交
1469
		}
1470

B
Bruce Momjian 已提交
1471
		olddataitemstoleft += itemsz;
1472
	}
1473

B
Bruce Momjian 已提交
1474 1475 1476 1477
	/*
	 * If the new item goes as the last item, check for splitting so that all
	 * the old items go to the left page and the new item goes to the right
	 * page.
B
Bruce Momjian 已提交
1478 1479 1480 1481
	 */
	if (newitemoff > maxoff && !goodenoughfound)
		_bt_checksplitloc(&state, newitemoff, false, olddataitemstotal, 0);

1482
	/*
B
Bruce Momjian 已提交
1483 1484
	 * I believe it is not possible to fail to find a feasible split, but just
	 * in case ...
1485
	 */
B
Bruce Momjian 已提交
1486
	if (!state.have_split)
1487
		elog(ERROR, "could not find a feasible split point for index \"%s\"",
1488
			 RelationGetRelationName(rel));
1489

1490 1491 1492 1493
	*newitemonleft = state.newitemonleft;
	return state.firstright;
}

1494 1495 1496
/*
 * Subroutine to analyze a particular possible split choice (ie, firstright
 * and newitemonleft settings), and record the best split so far in *state.
B
Bruce Momjian 已提交
1497 1498 1499 1500 1501 1502 1503
 *
 * firstoldonright is the offset of the first item on the original page
 * that goes to the right page, and firstoldonrightsz is the size of that
 * tuple. firstoldonright can be > max offset, which means that all the old
 * items go to the left page and only the new item goes to the right page.
 * In that case, firstoldonrightsz is not used.
 *
1504 1505
 * olddataitemstoleft is the total size of all old items to the left of
 * firstoldonright.
1506
 */
1507
static void
1508
_bt_checksplitloc(FindSplitData *state,
B
Bruce Momjian 已提交
1509 1510 1511 1512
				  OffsetNumber firstoldonright,
				  bool newitemonleft,
				  int olddataitemstoleft,
				  Size firstoldonrightsz)
1513
{
B
Bruce Momjian 已提交
1514 1515 1516 1517
	int			leftfree,
				rightfree;
	Size		firstrightitemsz;
	bool		newitemisfirstonright;
B
Bruce Momjian 已提交
1518 1519 1520 1521 1522

	/* Is the new item going to be the first item on the right page? */
	newitemisfirstonright = (firstoldonright == state->newitemoff
							 && !newitemonleft);

B
Bruce Momjian 已提交
1523
	if (newitemisfirstonright)
B
Bruce Momjian 已提交
1524 1525 1526 1527 1528 1529
		firstrightitemsz = state->newitemsz;
	else
		firstrightitemsz = firstoldonrightsz;

	/* Account for all the old tuples */
	leftfree = state->leftspace - olddataitemstoleft;
1530
	rightfree = state->rightspace -
B
Bruce Momjian 已提交
1531 1532
		(state->olddataitemstotal - olddataitemstoleft);

1533
	/*
B
Bruce Momjian 已提交
1534 1535
	 * The first item on the right page becomes the high key of the left page;
	 * therefore it counts against left space as well as right space.
1536
	 */
B
Bruce Momjian 已提交
1537 1538 1539
	leftfree -= firstrightitemsz;

	/* account for the new item */
1540 1541 1542 1543
	if (newitemonleft)
		leftfree -= (int) state->newitemsz;
	else
		rightfree -= (int) state->newitemsz;
B
Bruce Momjian 已提交
1544

1545
	/*
B
Bruce Momjian 已提交
1546 1547
	 * If we are not on the leaf level, we will be able to discard the key
	 * data from the first item that winds up on the right page.
1548
	 */
1549
	if (!state->is_leaf)
1550
		rightfree += (int) firstrightitemsz -
1551
			(int) (MAXALIGN(sizeof(IndexTupleData)) + sizeof(ItemIdData));
B
Bruce Momjian 已提交
1552

1553
	/*
1554
	 * If feasible split point, remember best delta.
1555
	 */
1556 1557
	if (leftfree >= 0 && rightfree >= 0)
	{
1558 1559 1560 1561 1562
		int			delta;

		if (state->is_rightmost)
		{
			/*
1563
			 * If splitting a rightmost page, try to put (100-fillfactor)% of
B
Bruce Momjian 已提交
1564
			 * free space on left page. See comments for _bt_findsplitloc.
1565
			 */
B
Bruce Momjian 已提交
1566 1567
			delta = (state->fillfactor * leftfree)
				- ((100 - state->fillfactor) * rightfree);
1568 1569 1570 1571 1572 1573
		}
		else
		{
			/* Otherwise, aim for equal free space on both sides */
			delta = leftfree - rightfree;
		}
1574 1575 1576 1577 1578 1579 1580

		if (delta < 0)
			delta = -delta;
		if (!state->have_split || delta < state->best_delta)
		{
			state->have_split = true;
			state->newitemonleft = newitemonleft;
B
Bruce Momjian 已提交
1581
			state->firstright = firstoldonright;
1582 1583 1584 1585 1586
			state->best_delta = delta;
		}
	}
}

1587 1588 1589 1590 1591
/*
 * _bt_insert_parent() -- Insert downlink into parent after a page split.
 *
 * On entry, buf and rbuf are the left and right split pages, which we
 * still hold write locks on per the L&Y algorithm.  We release the
B
Bruce Momjian 已提交
1592
 * write locks once we have write lock on the parent page.	(Any sooner,
1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611
 * and it'd be possible for some other process to try to split or delete
 * one of these pages, and get confused because it cannot find the downlink.)
 *
 * stack - stack showing how we got here.  May be NULL in cases that don't
 *			have to be efficient (concurrent ROOT split, WAL recovery)
 * is_root - we split the true root
 * is_only - we split a page alone on its level (might have been fast root)
 *
 * This is exported so it can be called by nbtxlog.c.
 */
void
_bt_insert_parent(Relation rel,
				  Buffer buf,
				  Buffer rbuf,
				  BTStack stack,
				  bool is_root,
				  bool is_only)
{
	/*
B
Bruce Momjian 已提交
1612 1613 1614 1615 1616 1617 1618
	 * Here we have to do something Lehman and Yao don't talk about: deal with
	 * a root split and construction of a new root.  If our stack is empty
	 * then we have just split a node on what had been the root level when we
	 * descended the tree.	If it was still the root then we perform a
	 * new-root construction.  If it *wasn't* the root anymore, search to find
	 * the next higher level that someone constructed meanwhile, and find the
	 * right place to insert as for the normal case.
1619
	 *
1620 1621 1622 1623
	 * If we have to search for the parent level, we do so by re-descending
	 * from the root.  This is not super-efficient, but it's rare enough not
	 * to matter.  (This path is also taken when called from WAL recovery ---
	 * we have no stack in that case.)
1624 1625 1626 1627 1628
	 */
	if (is_root)
	{
		Buffer		rootbuf;

1629
		Assert(stack == NULL);
1630 1631 1632 1633
		Assert(is_only);
		/* create a new root node and update the metapage */
		rootbuf = _bt_newroot(rel, buf, rbuf);
		/* release the split buffers */
1634 1635 1636
		_bt_relbuf(rel, rootbuf);
		_bt_relbuf(rel, rbuf);
		_bt_relbuf(rel, buf);
1637 1638 1639 1640 1641 1642
	}
	else
	{
		BlockNumber bknum = BufferGetBlockNumber(buf);
		BlockNumber rbknum = BufferGetBlockNumber(rbuf);
		Page		page = BufferGetPage(buf);
1643
		IndexTuple	new_item;
1644
		BTStackData fakestack;
1645
		IndexTuple	ritem;
1646 1647
		Buffer		pbuf;

1648
		if (stack == NULL)
1649 1650 1651 1652
		{
			BTPageOpaque lpageop;

			if (!InRecovery)
1653
				elog(DEBUG2, "concurrent ROOT page split");
1654 1655 1656 1657 1658 1659 1660
			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
			/* Find the leftmost page at the next level up */
			pbuf = _bt_get_endpoint(rel, lpageop->btpo.level + 1, false);
			/* Set up a phony stack entry pointing there */
			stack = &fakestack;
			stack->bts_blkno = BufferGetBlockNumber(pbuf);
			stack->bts_offset = InvalidOffsetNumber;
1661
			/* bts_btentry will be initialized below */
1662 1663 1664 1665 1666
			stack->bts_parent = NULL;
			_bt_relbuf(rel, pbuf);
		}

		/* get high key from left page == lowest key on new right page */
1667 1668
		ritem = (IndexTuple) PageGetItem(page,
										 PageGetItemId(page, P_HIKEY));
1669 1670

		/* form an index tuple that points at the new right page */
1671 1672
		new_item = CopyIndexTuple(ritem);
		ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY);
1673 1674 1675 1676

		/*
		 * Find the parent buffer and get the parent page.
		 *
B
Bruce Momjian 已提交
1677 1678 1679
		 * Oops - if we were moved right then we need to change stack item! We
		 * want to find parent pointing to where we are, right ?	- vadim
		 * 05/27/97
1680
		 */
1681
		ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
1682 1683 1684

		pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);

1685 1686 1687
		/* Now we can unlock the children */
		_bt_relbuf(rel, rbuf);
		_bt_relbuf(rel, buf);
1688 1689 1690

		/* Check for error only after writing children */
		if (pbuf == InvalidBuffer)
1691
			elog(ERROR, "failed to re-find parent key in index \"%s\" for split pages %u/%u",
1692
				 RelationGetRelationName(rel), bknum, rbknum);
1693 1694

		/* Recursively update the parent */
1695
		_bt_insertonpg(rel, pbuf, stack->bts_parent,
1696
					   new_item, stack->bts_offset + 1,
1697
					   is_only);
1698 1699 1700 1701 1702 1703

		/* be tidy */
		pfree(new_item);
	}
}

1704 1705 1706 1707
/*
 *	_bt_getstackbuf() -- Walk back up the tree one step, and find the item
 *						 we last looked at in the parent.
 *
1708 1709 1710 1711
 *		This is possible because we save the downlink from the parent item,
 *		which is enough to uniquely identify it.  Insertions into the parent
 *		level could cause the item to move right; deletions could cause it
 *		to move left, but not left of the page we previously found it in.
1712
 *
1713 1714 1715
 *		Adjusts bts_blkno & bts_offset if changed.
 *
 *		Returns InvalidBuffer if item not found (should not happen).
1716
 */
1717
Buffer
1718
_bt_getstackbuf(Relation rel, BTStack stack, int access)
1719 1720
{
	BlockNumber blkno;
1721
	OffsetNumber start;
1722 1723 1724

	blkno = stack->bts_blkno;
	start = stack->bts_offset;
B
Bruce Momjian 已提交
1725

1726 1727
	for (;;)
	{
1728 1729 1730 1731 1732 1733 1734 1735
		Buffer		buf;
		Page		page;
		BTPageOpaque opaque;

		buf = _bt_getbuf(rel, blkno, access);
		page = BufferGetPage(buf);
		opaque = (BTPageOpaque) PageGetSpecialPointer(page);

1736
		if (!P_IGNORE(opaque))
1737
		{
1738 1739 1740 1741
			OffsetNumber offnum,
						minoff,
						maxoff;
			ItemId		itemid;
1742
			IndexTuple	item;
1743 1744 1745 1746 1747

			minoff = P_FIRSTDATAKEY(opaque);
			maxoff = PageGetMaxOffsetNumber(page);

			/*
B
Bruce Momjian 已提交
1748 1749 1750
			 * start = InvalidOffsetNumber means "search the whole page". We
			 * need this test anyway due to possibility that page has a high
			 * key now when it didn't before.
1751 1752 1753 1754
			 */
			if (start < minoff)
				start = minoff;

1755 1756 1757 1758 1759 1760 1761
			/*
			 * Need this check too, to guard against possibility that page
			 * split since we visited it originally.
			 */
			if (start > maxoff)
				start = OffsetNumberNext(maxoff);

1762 1763
			/*
			 * These loops will check every item on the page --- but in an
B
Bruce Momjian 已提交
1764 1765
			 * order that's attuned to the probability of where it actually
			 * is.	Scan to the right first, then to the left.
1766 1767 1768 1769
			 */
			for (offnum = start;
				 offnum <= maxoff;
				 offnum = OffsetNumberNext(offnum))
1770
			{
1771
				itemid = PageGetItemId(page, offnum);
1772 1773
				item = (IndexTuple) PageGetItem(page, itemid);
				if (BTEntrySame(item, &stack->bts_btentry))
1774 1775 1776 1777 1778 1779
				{
					/* Return accurate pointer to where link is now */
					stack->bts_blkno = blkno;
					stack->bts_offset = offnum;
					return buf;
				}
1780
			}
B
Bruce Momjian 已提交
1781

1782 1783 1784
			for (offnum = OffsetNumberPrev(start);
				 offnum >= minoff;
				 offnum = OffsetNumberPrev(offnum))
1785
			{
1786
				itemid = PageGetItemId(page, offnum);
1787 1788
				item = (IndexTuple) PageGetItem(page, itemid);
				if (BTEntrySame(item, &stack->bts_btentry))
1789 1790 1791 1792 1793 1794
				{
					/* Return accurate pointer to where link is now */
					stack->bts_blkno = blkno;
					stack->bts_offset = offnum;
					return buf;
				}
1795 1796 1797
			}
		}

B
Bruce Momjian 已提交
1798
		/*
1799
		 * The item we're looking for moved right at least one page.
B
Bruce Momjian 已提交
1800
		 */
1801
		if (P_RIGHTMOST(opaque))
1802
		{
1803
			_bt_relbuf(rel, buf);
1804
			return InvalidBuffer;
1805
		}
1806
		blkno = opaque->btpo_next;
1807
		start = InvalidOffsetNumber;
1808
		_bt_relbuf(rel, buf);
1809
	}
1810 1811 1812
}

/*
1813
 *	_bt_newroot() -- Create a new root page for the index.
1814
 *
1815 1816 1817 1818 1819 1820 1821 1822
 *		We've just split the old root page and need to create a new one.
 *		In order to do this, we add a new root page to the file, then lock
 *		the metadata page and update it.  This is guaranteed to be deadlock-
 *		free, because all readers release their locks on the metadata page
 *		before trying to lock the root, and all writers lock the root before
 *		trying to lock the metadata page.  We have a write lock on the old
 *		root page, so we have not introduced any cycles into the waits-for
 *		graph.
1823
 *
1824
 *		On entry, lbuf (the old root) and rbuf (its new peer) are write-
1825
 *		locked. On exit, a new root page exists with entries for the
B
Bruce Momjian 已提交
1826 1827 1828
 *		two new children, metapage is updated and unlocked/unpinned.
 *		The new root buffer is returned to caller which has to unlock/unpin
 *		lbuf, rbuf & rootbuf.
1829
 */
1830
static Buffer
1831 1832
_bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
{
B
Bruce Momjian 已提交
1833 1834 1835 1836 1837 1838 1839 1840
	Buffer		rootbuf;
	Page		lpage,
				rootpage;
	BlockNumber lbkno,
				rbkno;
	BlockNumber rootblknum;
	BTPageOpaque rootopaque;
	ItemId		itemid;
1841
	IndexTuple	item;
B
Bruce Momjian 已提交
1842
	Size		itemsz;
1843
	IndexTuple	new_item;
B
Bruce Momjian 已提交
1844 1845
	Buffer		metabuf;
	Page		metapg;
1846
	BTMetaPageData *metad;
V
WAL  
Vadim B. Mikheev 已提交
1847

1848 1849 1850 1851
	lbkno = BufferGetBlockNumber(lbuf);
	rbkno = BufferGetBlockNumber(rbuf);
	lpage = BufferGetPage(lbuf);

1852 1853 1854
	/* get a new root page */
	rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
	rootpage = BufferGetPage(rootbuf);
V
Vadim B. Mikheev 已提交
1855
	rootblknum = BufferGetBlockNumber(rootbuf);
1856 1857

	/* acquire lock on the metapage */
1858 1859 1860
	metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
	metapg = BufferGetPage(metabuf);
	metad = BTPageGetMeta(metapg);
V
Vadim B. Mikheev 已提交
1861

1862
	/* NO EREPORT(ERROR) from here till newroot op is logged */
1863
	START_CRIT_SECTION();
1864 1865 1866 1867

	/* set btree special data */
	rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
	rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
1868 1869 1870
	rootopaque->btpo_flags = BTP_ROOT;
	rootopaque->btpo.level =
		((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo.level + 1;
1871
	rootopaque->btpo_cycleid = 0;
1872

1873 1874 1875 1876 1877
	/* update metapage data */
	metad->btm_root = rootblknum;
	metad->btm_level = rootopaque->btpo.level;
	metad->btm_fastroot = rootblknum;
	metad->btm_fastlevel = rootopaque->btpo.level;
1878

1879
	/*
B
Bruce Momjian 已提交
1880 1881 1882
	 * Create downlink item for left page (old root).  Since this will be the
	 * first item in a non-leaf page, it implicitly has minus-infinity key
	 * value, so we need not store any actual key in it.
1883
	 */
1884 1885 1886 1887
	itemsz = sizeof(IndexTupleData);
	new_item = (IndexTuple) palloc(itemsz);
	new_item->t_info = itemsz;
	ItemPointerSet(&(new_item->t_tid), lbkno, P_HIKEY);
1888 1889

	/*
B
Bruce Momjian 已提交
1890 1891 1892
	 * Insert the left page pointer into the new root page.  The root page is
	 * the rightmost page on its level so there is no "high key" in it; the
	 * two items will go into positions P_HIKEY and P_FIRSTKEY.
1893 1894 1895
	 *
	 * Note: we *must* insert the two items in item-number order, for the
	 * benefit of _bt_restore_page().
1896
	 */
1897
	if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY,
1898
					false, false) == InvalidOffsetNumber)
1899 1900 1901
		elog(PANIC, "failed to add leftkey to new root page"
			 " while splitting block %u of index \"%s\"",
			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1902 1903 1904
	pfree(new_item);

	/*
B
Bruce Momjian 已提交
1905 1906
	 * Create downlink item for right page.  The key for it is obtained from
	 * the "high key" position in the left page.
1907
	 */
1908
	itemid = PageGetItemId(lpage, P_HIKEY);
1909
	itemsz = ItemIdGetLength(itemid);
1910 1911 1912
	item = (IndexTuple) PageGetItem(lpage, itemid);
	new_item = CopyIndexTuple(item);
	ItemPointerSet(&(new_item->t_tid), rbkno, P_HIKEY);
1913 1914 1915 1916

	/*
	 * insert the right page pointer into the new root page.
	 */
1917
	if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY,
1918
					false, false) == InvalidOffsetNumber)
1919 1920 1921
		elog(PANIC, "failed to add rightkey to new root page"
			 " while splitting block %u of index \"%s\"",
			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
1922 1923
	pfree(new_item);

1924 1925 1926
	MarkBufferDirty(rootbuf);
	MarkBufferDirty(metabuf);

V
Vadim B. Mikheev 已提交
1927
	/* XLOG stuff */
1928
	if (RelationNeedsWAL(rel))
V
Vadim B. Mikheev 已提交
1929
	{
B
Bruce Momjian 已提交
1930 1931 1932
		xl_btree_newroot xlrec;
		XLogRecPtr	recptr;
		XLogRecData rdata[2];
V
WAL  
Vadim B. Mikheev 已提交
1933

V
Vadim B. Mikheev 已提交
1934
		xlrec.node = rel->rd_node;
1935
		xlrec.rootblk = rootblknum;
1936
		xlrec.level = metad->btm_level;
1937

B
Bruce Momjian 已提交
1938
		rdata[0].data = (char *) &xlrec;
1939
		rdata[0].len = SizeOfBtreeNewroot;
1940
		rdata[0].buffer = InvalidBuffer;
1941
		rdata[0].next = &(rdata[1]);
V
Vadim B. Mikheev 已提交
1942

B
Bruce Momjian 已提交
1943
		/*
B
Bruce Momjian 已提交
1944 1945
		 * Direct access to page is not good but faster - we should implement
		 * some new func in page API.
V
Vadim B. Mikheev 已提交
1946
		 */
B
Bruce Momjian 已提交
1947 1948 1949
		rdata[1].data = (char *) rootpage + ((PageHeader) rootpage)->pd_upper;
		rdata[1].len = ((PageHeader) rootpage)->pd_special -
			((PageHeader) rootpage)->pd_upper;
1950
		rdata[1].buffer = InvalidBuffer;
1951
		rdata[1].next = NULL;
V
Vadim B. Mikheev 已提交
1952

1953
		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
V
WAL  
Vadim B. Mikheev 已提交
1954

V
Vadim B. Mikheev 已提交
1955
		PageSetLSN(rootpage, recptr);
V
WAL  
Vadim B. Mikheev 已提交
1956
		PageSetLSN(metapg, recptr);
V
Vadim B. Mikheev 已提交
1957
	}
1958

1959
	END_CRIT_SECTION();
V
Vadim B. Mikheev 已提交
1960

1961 1962
	/* done with metapage */
	_bt_relbuf(rel, metabuf);
1963

1964
	return rootbuf;
1965 1966
}

1967
/*
1968
 *	_bt_pgaddtup() -- add a tuple to a particular page in the index.
1969
 *
1970 1971 1972 1973 1974 1975
 *		This routine adds the tuple to the page as requested.  It does
 *		not affect pin/lock status, but you'd better have a write lock
 *		and pin on the target buffer!  Don't forget to write and release
 *		the buffer afterwards, either.
 *
 *		The main difference between this routine and a bare PageAddItem call
1976
 *		is that this code knows that the leftmost index tuple on a non-leaf
1977
 *		btree page doesn't need to have a key.  Therefore, it strips such
1978 1979 1980
 *		tuples down to just the tuple header.  CAUTION: this works ONLY if
 *		we insert the tuples in order, so that the given itup_off does
 *		represent the final position of the tuple!
1981
 */
1982 1983
static bool
_bt_pgaddtup(Page page,
1984
			 Size itemsize,
1985
			 IndexTuple itup,
1986
			 OffsetNumber itup_off)
1987
{
1988
	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
1989
	IndexTupleData trunctuple;
1990

B
Bruce Momjian 已提交
1991
	if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque))
1992
	{
1993 1994 1995 1996
		trunctuple = *itup;
		trunctuple.t_info = sizeof(IndexTupleData);
		itup = &trunctuple;
		itemsize = sizeof(IndexTupleData);
1997 1998
	}

1999
	if (PageAddItem(page, (Item) itup, itemsize, itup_off,
2000
					false, false) == InvalidOffsetNumber)
2001 2002 2003
		return false;

	return true;
2004
}
V
Vadim B. Mikheev 已提交
2005 2006 2007 2008

/*
 * _bt_isequal - used in _bt_doinsert in check for duplicates.
 *
2009
 * This is very similar to _bt_compare, except for NULL handling.
2010
 * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
V
Vadim B. Mikheev 已提交
2011
 */
2012
static bool
2013 2014
_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
			int keysz, ScanKey scankey)
V
Vadim B. Mikheev 已提交
2015
{
2016 2017
	IndexTuple	itup;
	int			i;
2018 2019 2020

	/* Better be comparing to a leaf item */
	Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
2021

2022
	itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
2023 2024 2025

	for (i = 1; i <= keysz; i++)
	{
2026 2027 2028 2029 2030
		AttrNumber	attno;
		Datum		datum;
		bool		isNull;
		int32		result;

2031
		attno = scankey->sk_attno;
2032
		Assert(attno == i);
2033
		datum = index_getattr(itup, attno, itupdesc, &isNull);
2034

2035
		/* NULLs are never equal to anything */
2036
		if (isNull || (scankey->sk_flags & SK_ISNULL))
2037
			return false;
2038

2039 2040 2041 2042
		result = DatumGetInt32(FunctionCall2Coll(&scankey->sk_func,
												 scankey->sk_collation,
												 datum,
												 scankey->sk_argument));
2043

2044
		if (result != 0)
2045
			return false;
2046 2047

		scankey++;
2048 2049
	}

2050
	/* if we get here, the keys are equal */
2051
	return true;
V
Vadim B. Mikheev 已提交
2052
}
2053 2054 2055 2056

/*
 * _bt_vacuum_one_page - vacuum just one index page.
 *
2057
 * Try to remove LP_DEAD items from the given page.  The passed buffer
2058 2059 2060 2061
 * must be exclusive-locked, but unlike a real VACUUM, we don't need a
 * super-exclusive "cleanup" lock (see nbtree/README).
 */
static void
2062
_bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
2063
{
B
Bruce Momjian 已提交
2064 2065 2066 2067 2068 2069 2070
	OffsetNumber deletable[MaxOffsetNumber];
	int			ndeletable = 0;
	OffsetNumber offnum,
				minoff,
				maxoff;
	Page		page = BufferGetPage(buffer);
	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
2071 2072

	/*
B
Bruce Momjian 已提交
2073 2074
	 * Scan over all items to see which ones need to be deleted according to
	 * LP_DEAD flags.
2075 2076 2077 2078 2079 2080 2081
	 */
	minoff = P_FIRSTDATAKEY(opaque);
	maxoff = PageGetMaxOffsetNumber(page);
	for (offnum = minoff;
		 offnum <= maxoff;
		 offnum = OffsetNumberNext(offnum))
	{
B
Bruce Momjian 已提交
2082
		ItemId		itemId = PageGetItemId(page, offnum);
2083

2084
		if (ItemIdIsDead(itemId))
2085 2086 2087 2088
			deletable[ndeletable++] = offnum;
	}

	if (ndeletable > 0)
2089
		_bt_delitems_delete(rel, buffer, deletable, ndeletable, heapRel);
B
Bruce Momjian 已提交
2090

2091
	/*
2092
	 * Note: if we didn't find any LP_DEAD items, then the page's
B
Bruce Momjian 已提交
2093 2094 2095
	 * BTP_HAS_GARBAGE hint bit is falsely set.  We do not bother expending a
	 * separate write to clear it, however.  We will clear it when we split
	 * the page.
2096 2097
	 */
}