heapam.c 40.9 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * heapam.c
4
 *	  heap access method code
5 6 7 8 9
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
10
 *	  $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.54 1999/09/18 19:05:58 tgl Exp $
11 12 13
 *
 *
 * INTERFACE ROUTINES
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
 *		heapgettup		- fetch next heap tuple from a scan
 *		heap_open		- open a heap relation by relationId
 *		heap_openr		- open a heap relation by name
 *		heap_close		- close a heap relation
 *		heap_beginscan	- begin relation scan
 *		heap_rescan		- restart a relation scan
 *		heap_endscan	- end relation scan
 *		heap_getnext	- retrieve next tuple in scan
 *		heap_fetch		- retrive tuple with tid
 *		heap_insert		- insert tuple into a relation
 *		heap_delete		- delete a tuple from a relation
 *		heap_replace	- replace a tuple in a relation with another tuple
 *		heap_markpos	- mark scan position
 *		heap_restrpos	- restore position to marked location
 *
29
 * NOTES
30 31 32
 *	  This file contains the heap_ routines which implement
 *	  the POSTGRES heap access method used for all POSTGRES
 *	  relations.
33 34
 *
 * OLD COMMENTS
35
 *		struct relscan hints:  (struct should be made AM independent?)
36
 *
37 38 39 40
 *		rs_ctid is the tid of the last tuple returned by getnext.
 *		rs_ptid and rs_ntid are the tids of the previous and next tuples
 *		returned by getnext, respectively.	NULL indicates an end of
 *		scan (either direction); NON indicates an unknow value.
41
 *
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
 *		possible combinations:
 *		rs_p	rs_c	rs_n			interpretation
 *		NULL	NULL	NULL			empty scan
 *		NULL	NULL	NON				at begining of scan
 *		NULL	NULL	t1				at begining of scan (with cached tid)
 *		NON		NULL	NULL			at end of scan
 *		t1		NULL	NULL			at end of scan (with cached tid)
 *		NULL	t1		NULL			just returned only tuple
 *		NULL	t1		NON				just returned first tuple
 *		NULL	t1		t2				returned first tuple (with cached tid)
 *		NON		t1		NULL			just returned last tuple
 *		t2		t1		NULL			returned last tuple (with cached tid)
 *		t1		t2		NON				in the middle of a forward scan
 *		NON		t2		t1				in the middle of a reverse scan
 *		ti		tj		tk				in the middle of a scan (w cached tid)
57
 *
58 59
 *		Here NULL is ...tup == NULL && ...buf == InvalidBuffer,
 *		and NON is ...tup == NULL && ...buf == UnknownBuffer.
60
 *
61 62 63
 *		Currently, the NONTID values are not cached with their actual
 *		values by getnext.	Values may be cached by markpos since it stores
 *		all three tids.
64
 *
65 66
 *		NOTE:  the calls to elog() must stop.  Should decide on an interface
 *		between the general and specific AM calls.
67
 *
68 69 70 71
 *		XXX probably do not need a free tuple routine for heaps.
 *		Huh?  Free tuple is not necessary for tuples returned by scans, but
 *		is necessary for tuples which are returned by
 *		RelationGetTupleByItemPointer. -hirohama
72 73 74 75
 *
 *-------------------------------------------------------------------------
 */

76
#include "postgres.h"
77

78 79
#include "access/heapam.h"
#include "access/hio.h"
B
Bruce Momjian 已提交
80
#include "access/valid.h"
81
#include "catalog/catalog.h"
B
Bruce Momjian 已提交
82 83
#include "miscadmin.h"
#include "storage/smgr.h"
84
#include "utils/builtins.h"
B
Bruce Momjian 已提交
85 86
#include "utils/inval.h"
#include "utils/relcache.h"
87

M
-Wall'd  
Marc G. Fournier 已提交
88

89
/* ----------------------------------------------------------------
90
 *						 heap support routines
91 92 93 94
 * ----------------------------------------------------------------
 */

/* ----------------
95
 *		initscan - scan code common to heap_beginscan and heap_rescan
96 97 98
 * ----------------
 */
static void
99
initscan(HeapScanDesc scan,
100 101 102 103
		 Relation relation,
		 int atend,
		 unsigned nkeys,
		 ScanKey key)
104
{
105 106 107 108 109 110
	if (!RelationGetNumberOfBlocks(relation))
	{
		/* ----------------
		 *	relation is empty
		 * ----------------
		 */
B
Bruce Momjian 已提交
111
		scan->rs_ntup.t_data = scan->rs_ctup.t_data =
112
		scan->rs_ptup.t_data = NULL;
113
		scan->rs_nbuf = scan->rs_cbuf = scan->rs_pbuf = InvalidBuffer;
114 115 116 117 118 119 120
	}
	else if (atend)
	{
		/* ----------------
		 *	reverse scan
		 * ----------------
		 */
121
		scan->rs_ntup.t_data = scan->rs_ctup.t_data = NULL;
122
		scan->rs_nbuf = scan->rs_cbuf = InvalidBuffer;
123
		scan->rs_ptup.t_data = NULL;
124
		scan->rs_pbuf = UnknownBuffer;
125 126 127 128 129 130 131
	}
	else
	{
		/* ----------------
		 *	forward scan
		 * ----------------
		 */
132
		scan->rs_ctup.t_data = scan->rs_ptup.t_data = NULL;
133
		scan->rs_cbuf = scan->rs_pbuf = InvalidBuffer;
134
		scan->rs_ntup.t_data = NULL;
135
		scan->rs_nbuf = UnknownBuffer;
136 137 138
	}							/* invalid too */

	/* we don't have a marked position... */
139 140 141 142
	ItemPointerSetInvalid(&(scan->rs_mptid));
	ItemPointerSetInvalid(&(scan->rs_mctid));
	ItemPointerSetInvalid(&(scan->rs_mntid));
	ItemPointerSetInvalid(&(scan->rs_mcd));
143

144
	/* ----------------
145
	 *	copy the scan key, if appropriate
146 147
	 * ----------------
	 */
148
	if (key != NULL)
149
		memmove(scan->rs_key, key, nkeys * sizeof(ScanKeyData));
150 151 152
}

/* ----------------
153
 *		unpinscan - code common to heap_rescan and heap_endscan
154 155 156
 * ----------------
 */
static void
157
unpinscan(HeapScanDesc scan)
158
{
159 160
	if (BufferIsValid(scan->rs_pbuf))
		ReleaseBuffer(scan->rs_pbuf);
161 162 163 164 165 166 167

	/* ------------------------------------
	 *	Scan will pin buffer one for each non-NULL tuple pointer
	 *	(ptup, ctup, ntup), so they have to be unpinned multiple
	 *	times.
	 * ------------------------------------
	 */
168 169
	if (BufferIsValid(scan->rs_cbuf))
		ReleaseBuffer(scan->rs_cbuf);
170

171 172
	if (BufferIsValid(scan->rs_nbuf))
		ReleaseBuffer(scan->rs_nbuf);
173 174 175
}

/* ------------------------------------------
176
 *		nextpage
177
 *
178 179 180
 *		figure out the next page to scan after the current page
 *		taking into account of possible adjustment of degrees of
 *		parallelism
181 182 183 184 185
 * ------------------------------------------
 */
static int
nextpage(int page, int dir)
{
186
	return (dir < 0) ? page - 1 : page + 1;
187 188 189
}

/* ----------------
190
 *		heapgettup - fetch next heap tuple
191
 *
192 193
 *		routine used by heap_getnext() which does most of the
 *		real work in scanning tuples.
194 195 196 197 198
 *
 *		The scan routines handle their own buffer lock/unlocking, so
 *		there is no reason to request the buffer number unless
 *		to want to perform some other operation with the result,
 *		like pass it to another function.
199 200
 * ----------------
 */
201
static void
202
heapgettup(Relation relation,
203
		   HeapTuple tuple,
204
		   int dir,
V
Vadim B. Mikheev 已提交
205
		   Buffer *buffer,
206
		   Snapshot snapshot,
207 208
		   int nkeys,
		   ScanKey key)
209
{
B
Bruce Momjian 已提交
210 211 212 213 214 215 216 217 218
	ItemId		lpp;
	Page		dp;
	int			page;
	int			pages;
	int			lines;
	OffsetNumber lineoff;
	int			linesleft;
	ItemPointer tid = (tuple->t_data == NULL) ?
	(ItemPointer) NULL : &(tuple->t_self);
219

220
	/* ----------------
221
	 *	increment access statistics
222 223
	 * ----------------
	 */
224 225 226
	IncrHeapAccessStat(local_heapgettup);
	IncrHeapAccessStat(global_heapgettup);

227
	/* ----------------
228 229 230 231
	 *	debugging stuff
	 *
	 * check validity of arguments, here and for other functions too
	 * Note: no locking manipulations needed--this is a local function
232 233
	 * ----------------
	 */
234 235 236
#ifdef	HEAPDEBUGALL
	if (ItemPointerIsValid(tid))
	{
237
		elog(DEBUG, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)",
238 239
			 RelationGetRelationName(relation), tid, tid->ip_blkid,
			 tid->ip_posid, dir);
240
	}
241 242
	else
	{
243
		elog(DEBUG, "heapgettup(%s, tid=0x%x, dir=%d, ...)",
244
			 RelationGetRelationName(relation), tid, dir);
245
	}
V
Vadim B. Mikheev 已提交
246
	elog(DEBUG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key);
247

248
	elog(DEBUG, "heapgettup: relation(%c)=`%s', %p",
249
		 relation->rd_rel->relkind, &relation->rd_rel->relname,
250
		 snapshot);
251
#endif	 /* !defined(HEAPDEBUGALL) */
252 253 254

	if (!ItemPointerIsValid(tid))
		Assert(!PointerIsValid(tid));
255 256

	/* ----------------
257
	 *	return null immediately if relation is empty
258 259
	 * ----------------
	 */
260
	if (!(pages = relation->rd_nblocks))
261 262 263 264
	{
		tuple->t_data = NULL;
		return;
	}
265 266 267 268 269 270 271 272 273 274 275 276 277 278

	/* ----------------
	 *	calculate next starting lineoff, given scan direction
	 * ----------------
	 */
	if (!dir)
	{
		/* ----------------
		 * ``no movement'' scan direction
		 * ----------------
		 */
		/* assume it is a valid TID XXX */
		if (ItemPointerIsValid(tid) == false)
		{
V
Vadim B. Mikheev 已提交
279
			*buffer = InvalidBuffer;
280 281
			tuple->t_data = NULL;
			return;
282
		}
V
Vadim B. Mikheev 已提交
283
		*buffer = RelationGetBufferWithBuffer(relation,
B
Bruce Momjian 已提交
284 285
										  ItemPointerGetBlockNumber(tid),
											  *buffer);
286

V
Vadim B. Mikheev 已提交
287
		if (!BufferIsValid(*buffer))
288
			elog(ERROR, "heapgettup: failed ReadBuffer");
289

V
Vadim B. Mikheev 已提交
290 291 292
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
293 294 295
		lineoff = ItemPointerGetOffsetNumber(tid);
		lpp = PageGetItemId(dp, lineoff);

296 297
		tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
		tuple->t_len = ItemIdGetLength(lpp);
V
Vadim B. Mikheev 已提交
298
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
299
		return;
300

301
	}
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
	else if (dir < 0)
	{
		/* ----------------
		 *	reverse scan direction
		 * ----------------
		 */
		if (ItemPointerIsValid(tid) == false)
			tid = NULL;
		if (tid == NULL)
		{
			page = pages - 1;	/* final page */
		}
		else
		{
			page = ItemPointerGetBlockNumber(tid);		/* current page */
		}
		if (page < 0)
		{
V
Vadim B. Mikheev 已提交
320
			*buffer = InvalidBuffer;
321 322
			tuple->t_data = NULL;
			return;
323 324
		}

V
Vadim B. Mikheev 已提交
325 326
		*buffer = RelationGetBufferWithBuffer(relation, page, *buffer);
		if (!BufferIsValid(*buffer))
327
			elog(ERROR, "heapgettup: failed ReadBuffer");
328

V
Vadim B. Mikheev 已提交
329 330 331
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
332 333 334 335
		lines = PageGetMaxOffsetNumber(dp);
		if (tid == NULL)
		{
			lineoff = lines;	/* final offnum */
336
		}
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
		else
		{
			lineoff =			/* previous offnum */
				OffsetNumberPrev(ItemPointerGetOffsetNumber(tid));
		}
		/* page and lineoff now reference the physically previous tid */

	}
	else
	{
		/* ----------------
		 *	forward scan direction
		 * ----------------
		 */
		if (ItemPointerIsValid(tid) == false)
		{
			page = 0;			/* first page */
			lineoff = FirstOffsetNumber;		/* first offnum */
		}
		else
		{
			page = ItemPointerGetBlockNumber(tid);		/* current page */
			lineoff =			/* next offnum */
				OffsetNumberNext(ItemPointerGetOffsetNumber(tid));
		}

		if (page >= pages)
		{
V
Vadim B. Mikheev 已提交
365
			*buffer = InvalidBuffer;
366 367
			tuple->t_data = NULL;
			return;
368 369 370
		}
		/* page and lineoff now reference the physically next tid */

V
Vadim B. Mikheev 已提交
371 372
		*buffer = RelationGetBufferWithBuffer(relation, page, *buffer);
		if (!BufferIsValid(*buffer))
373
			elog(ERROR, "heapgettup: failed ReadBuffer");
374

V
Vadim B. Mikheev 已提交
375 376 377
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);

		dp = (Page) BufferGetPage(*buffer);
378
		lines = PageGetMaxOffsetNumber(dp);
379
	}
380 381 382

	/* 'dir' is now non-zero */

383
	/* ----------------
384 385
	 *	calculate line pointer and number of remaining items
	 *	to check on this page.
386 387
	 * ----------------
	 */
388 389 390 391 392 393
	lpp = PageGetItemId(dp, lineoff);
	if (dir < 0)
		linesleft = lineoff - 1;
	else
		linesleft = lines - lineoff;

394
	/* ----------------
395 396
	 *	advance the scan until we find a qualifying tuple or
	 *	run out of stuff to scan
397 398
	 * ----------------
	 */
399 400 401 402
	for (;;)
	{
		while (linesleft >= 0)
		{
403
			if (ItemIdIsUsed(lpp))
404
			{
405 406 407 408 409 410 411
				tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
				tuple->t_len = ItemIdGetLength(lpp);
				ItemPointerSet(&(tuple->t_self), page, lineoff);
				/* ----------------
				 *	if current tuple qualifies, return it.
				 * ----------------
				 */
V
Vadim B. Mikheev 已提交
412
				HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp,
413 414
								   snapshot, nkeys, key);
				if (tuple->t_data != NULL)
V
Vadim B. Mikheev 已提交
415 416
				{
					LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
417
					return;
V
Vadim B. Mikheev 已提交
418
				}
419 420 421 422 423 424 425 426 427 428
			}

			/* ----------------
			 *	otherwise move to the next item on the page
			 * ----------------
			 */
			--linesleft;
			if (dir < 0)
			{
				--lpp;			/* move back in this page's ItemId array */
429
				--lineoff;
430 431 432
			}
			else
			{
B
Bruce Momjian 已提交
433 434
				++lpp;			/* move forward in this page's ItemId
								 * array */
435
				++lineoff;
436 437 438 439 440 441 442 443
			}
		}

		/* ----------------
		 *	if we get here, it means we've exhausted the items on
		 *	this page and it's time to move to the next..
		 * ----------------
		 */
V
Vadim B. Mikheev 已提交
444
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
445 446 447 448 449 450 451 452
		page = nextpage(page, dir);

		/* ----------------
		 *	return NULL if we've exhausted all the pages..
		 * ----------------
		 */
		if (page < 0 || page >= pages)
		{
V
Vadim B. Mikheev 已提交
453 454 455
			if (BufferIsValid(*buffer))
				ReleaseBuffer(*buffer);
			*buffer = InvalidBuffer;
456 457
			tuple->t_data = NULL;
			return;
458 459
		}

V
Vadim B. Mikheev 已提交
460
		*buffer = ReleaseAndReadBuffer(*buffer, relation, page);
461

V
Vadim B. Mikheev 已提交
462
		if (!BufferIsValid(*buffer))
463
			elog(ERROR, "heapgettup: failed ReadBuffer");
V
Vadim B. Mikheev 已提交
464 465
		LockBuffer(*buffer, BUFFER_LOCK_SHARE);
		dp = (Page) BufferGetPage(*buffer);
466
		lines = PageGetMaxOffsetNumber((Page) dp);
467 468
		linesleft = lines - 1;
		if (dir < 0)
469 470 471 472
		{
			lineoff = lines;
			lpp = PageGetItemId(dp, lines);
		}
473
		else
474 475
		{
			lineoff = FirstOffsetNumber;
476
			lpp = PageGetItemId(dp, FirstOffsetNumber);
477
		}
478 479 480 481 482
	}
}


/* ----------------------------------------------------------------
483
 *					 heap access method interface
484 485 486
 * ----------------------------------------------------------------
 */
/* ----------------
487
 *		heap_open - open a heap relation by relationId
488
 *
489 490 491 492 493
 *		If lockmode is "NoLock", no lock is obtained on the relation,
 *		and the caller must check for a NULL return value indicating
 *		that no such relation exists.
 *		Otherwise, an error is raised if the relation does not exist,
 *		and the specified kind of lock is obtained on the relation.
494 495 496
 * ----------------
 */
Relation
497
heap_open(Oid relationId, LOCKMODE lockmode)
498
{
499
	Relation	r;
500

501 502
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

503 504 505 506 507 508 509
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_open);
	IncrHeapAccessStat(global_open);

510 511
	/* The relcache does all the real work... */
	r = RelationIdGetRelation(relationId);
512

513
	/* Under no circumstances will we return an index as a relation. */
514
	if (RelationIsValid(r) && r->rd_rel->relkind == RELKIND_INDEX)
515
		elog(ERROR, "%s is an index relation", r->rd_rel->relname.data);
516

517 518 519 520 521 522 523 524
	if (lockmode == NoLock)
		return r;				/* caller must check RelationIsValid! */

	if (! RelationIsValid(r))
		elog(ERROR, "Relation %u does not exist", relationId);

	LockRelation(r, lockmode);

525
	return r;
526 527 528
}

/* ----------------
529
 *		heap_openr - open a heap relation by name
530
 *
531 532 533 534 535
 *		If lockmode is "NoLock", no lock is obtained on the relation,
 *		and the caller must check for a NULL return value indicating
 *		that no such relation exists.
 *		Otherwise, an error is raised if the relation does not exist,
 *		and the specified kind of lock is obtained on the relation.
536 537 538
 * ----------------
 */
Relation
539
heap_openr(char *relationName, LOCKMODE lockmode)
540
{
541
	Relation	r;
542

543 544
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

545 546 547 548 549 550 551
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_openr);
	IncrHeapAccessStat(global_openr);

552
	/* The relcache does all the real work... */
553 554
	r = RelationNameGetRelation(relationName);

555
	/* Under no circumstances will we return an index as a relation. */
556
	if (RelationIsValid(r) && r->rd_rel->relkind == RELKIND_INDEX)
557
		elog(ERROR, "%s is an index relation", r->rd_rel->relname.data);
558

559 560 561 562 563 564 565 566
	if (lockmode == NoLock)
		return r;				/* caller must check RelationIsValid! */

	if (! RelationIsValid(r))
		elog(ERROR, "Relation '%s' does not exist", relationName);

	LockRelation(r, lockmode);

567
	return r;
568 569 570
}

/* ----------------
571
 *		heap_close - close a heap relation
572
 *
573 574 575
 *		If lockmode is not "NoLock", we first release the specified lock.
 *		Note that it is often sensible to hold a lock beyond heap_close;
 *		in that case, the lock is released automatically at xact end.
576 577 578
 * ----------------
 */
void
579
heap_close(Relation relation, LOCKMODE lockmode)
580
{
581 582
	Assert(lockmode >= NoLock && lockmode < MAX_LOCKMODES);

583 584 585 586 587 588 589
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_close);
	IncrHeapAccessStat(global_close);

590 591 592 593
	if (lockmode != NoLock)
		UnlockRelation(relation, lockmode);

	/* The relcache does the real work... */
594
	RelationClose(relation);
595 596 597 598
}


/* ----------------
599
 *		heap_beginscan	- begin relation scan
600 601 602 603
 * ----------------
 */
HeapScanDesc
heap_beginscan(Relation relation,
604
			   int atend,
605
			   Snapshot snapshot,
606 607
			   unsigned nkeys,
			   ScanKey key)
608
{
609
	HeapScanDesc scan;
610 611 612 613 614 615 616 617 618 619 620

	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_beginscan);
	IncrHeapAccessStat(global_beginscan);

	/* ----------------
	 *	sanity checks
	 * ----------------
621
	 */
622
	if (! RelationIsValid(relation))
623
		elog(ERROR, "heap_beginscan: !RelationIsValid(relation)");
624 625 626 627 628 629 630

	/* ----------------
	 *	increment relation ref count while scanning relation
	 * ----------------
	 */
	RelationIncrementReferenceCount(relation);

631 632 633 634 635 636 637 638 639 640 641 642 643 644
	/* ----------------
	 *	Acquire AccessShareLock for the duration of the scan
	 *
	 *	Note: we could get an SI inval message here and consequently have
	 *	to rebuild the relcache entry.  The refcount increment above
	 *	ensures that we will rebuild it and not just flush it...
	 * ----------------
	 */
	LockRelation(relation, AccessShareLock);

	/* XXX someday assert SelfTimeQual if relkind == RELKIND_UNCATALOGED */
	if (relation->rd_rel->relkind == RELKIND_UNCATALOGED)
		snapshot = SnapshotSelf;

645 646 647 648
	/* ----------------
	 *	allocate and initialize scan descriptor
	 * ----------------
	 */
649
	scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
650

B
Bruce Momjian 已提交
651
	relation->rd_nblocks = smgrnblocks(DEFAULT_SMGR, relation);
652
	scan->rs_rd = relation;
653 654

	if (nkeys)
655

656
		/*
657 658
		 * we do this here instead of in initscan() because heap_rescan
		 * also calls initscan() and we don't want to allocate memory
659 660
		 * again
		 */
661
		scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
662
	else
663
		scan->rs_key = NULL;
664

665
	initscan(scan, relation, atend, nkeys, key);
666

667 668 669
	scan->rs_atend = atend;
	scan->rs_snapshot = snapshot;
	scan->rs_nkeys = (short) nkeys;
670

671
	return scan;
672 673 674
}

/* ----------------
675
 *		heap_rescan		- restart a relation scan
676 677 678
 * ----------------
 */
void
679
heap_rescan(HeapScanDesc scan,
680 681
			bool scanFromEnd,
			ScanKey key)
682
{
683 684 685 686 687 688 689 690 691 692 693 694 695
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_rescan);
	IncrHeapAccessStat(global_rescan);

	/* Note: set relation level read lock is still set */

	/* ----------------
	 *	unpin scan buffers
	 * ----------------
	 */
696
	unpinscan(scan);
697 698 699 700 701

	/* ----------------
	 *	reinitialize scan descriptor
	 * ----------------
	 */
702 703
	initscan(scan, scan->rs_rd, scanFromEnd, scan->rs_nkeys, key);
	scan->rs_atend = (bool) scanFromEnd;
704 705 706
}

/* ----------------
707
 *		heap_endscan	- end relation scan
708
 *
709 710
 *		See how to integrate with index scans.
 *		Check handling if reldesc caching.
711 712 713
 * ----------------
 */
void
714
heap_endscan(HeapScanDesc scan)
715
{
716 717 718 719 720 721 722 723 724 725 726 727 728
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_endscan);
	IncrHeapAccessStat(global_endscan);

	/* Note: no locking manipulations needed */

	/* ----------------
	 *	unpin scan buffers
	 * ----------------
	 */
729
	unpinscan(scan);
730

731 732 733 734 735 736
	/* ----------------
	 *	Release AccessShareLock acquired by heap_beginscan()
	 * ----------------
	 */
	UnlockRelation(scan->rs_rd, AccessShareLock);

737 738 739 740
	/* ----------------
	 *	decrement relation reference count and free scan descriptor storage
	 * ----------------
	 */
741
	RelationDecrementReferenceCount(scan->rs_rd);
742

743
	pfree(scan);
744 745 746
}

/* ----------------
747
 *		heap_getnext	- retrieve next tuple in scan
748
 *
749
 *		Fix to work with index relations.
750 751
 *		We don't return the buffer anymore, but you can get it from the
 *		returned HeapTuple.
752 753 754 755 756
 * ----------------
 */

#ifdef HEAPDEBUGALL
#define HEAPDEBUG_1 \
757 758
elog(DEBUG, "heap_getnext([%s,nkeys=%d],backw=%d) called", \
	 scan->rs_rd->rd_rel->relname.data, scan->rs_nkeys, backw)
759

760
#define HEAPDEBUG_2 \
761 762
	 elog(DEBUG, "heap_getnext called with backw (no tracing yet)")

763
#define HEAPDEBUG_3 \
764 765
	 elog(DEBUG, "heap_getnext returns NULL at end")

766
#define HEAPDEBUG_4 \
767 768
	 elog(DEBUG, "heap_getnext valid buffer UNPIN'd")

769
#define HEAPDEBUG_5 \
770 771
	 elog(DEBUG, "heap_getnext next tuple was cached")

772
#define HEAPDEBUG_6 \
773 774
	 elog(DEBUG, "heap_getnext returning EOS")

775
#define HEAPDEBUG_7 \
776
	 elog(DEBUG, "heap_getnext returning tuple");
777 778 779 780 781 782 783 784
#else
#define HEAPDEBUG_1
#define HEAPDEBUG_2
#define HEAPDEBUG_3
#define HEAPDEBUG_4
#define HEAPDEBUG_5
#define HEAPDEBUG_6
#define HEAPDEBUG_7
785
#endif	 /* !defined(HEAPDEBUGALL) */
786 787


788
HeapTuple
789
heap_getnext(HeapScanDesc scandesc, int backw)
790
{
791
	HeapScanDesc scan = scandesc;
792

793
	/* ----------------
794
	 *	increment access statistics
795 796
	 * ----------------
	 */
797 798 799 800 801 802 803 804
	IncrHeapAccessStat(local_getnext);
	IncrHeapAccessStat(global_getnext);

	/* Note: no locking manipulations needed */

	/* ----------------
	 *	argument checks
	 * ----------------
805
	 */
806
	if (scan == NULL)
807
		elog(ERROR, "heap_getnext: NULL relscan");
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823

	/* ----------------
	 *	initialize return buffer to InvalidBuffer
	 * ----------------
	 */

	HEAPDEBUG_1;				/* heap_getnext( info ) */

	if (backw)
	{
		/* ----------------
		 *	handle reverse scan
		 * ----------------
		 */
		HEAPDEBUG_2;			/* heap_getnext called with backw */

824
		if (scan->rs_ptup.t_data == scan->rs_ctup.t_data &&
825
			BufferIsInvalid(scan->rs_pbuf))
826
		{
827 828
			if (BufferIsValid(scan->rs_nbuf))
				ReleaseBuffer(scan->rs_nbuf);
829
			return NULL;
830 831 832 833 834 835
		}

		/*
		 * Copy the "current" tuple/buffer to "next". Pin/unpin the
		 * buffers accordingly
		 */
836
		if (scan->rs_nbuf != scan->rs_cbuf)
837
		{
838 839 840 841
			if (BufferIsValid(scan->rs_nbuf))
				ReleaseBuffer(scan->rs_nbuf);
			if (BufferIsValid(scan->rs_cbuf))
				IncrBufferRefCount(scan->rs_cbuf);
842
		}
843 844
		scan->rs_ntup = scan->rs_ctup;
		scan->rs_nbuf = scan->rs_cbuf;
845

846
		if (scan->rs_ptup.t_data != NULL)
847
		{
848
			if (scan->rs_cbuf != scan->rs_pbuf)
849
			{
850 851 852 853
				if (BufferIsValid(scan->rs_cbuf))
					ReleaseBuffer(scan->rs_cbuf);
				if (BufferIsValid(scan->rs_pbuf))
					IncrBufferRefCount(scan->rs_pbuf);
854
			}
855 856
			scan->rs_ctup = scan->rs_ptup;
			scan->rs_cbuf = scan->rs_pbuf;
857 858 859
		}
		else
		{						/* NONTUP */
B
Bruce Momjian 已提交
860

861
			/*
862
			 * Don't release scan->rs_cbuf at this point, because
863 864 865 866 867 868 869
			 * heapgettup doesn't increase PrivateRefCount if it is
			 * already set. On a backward scan, both rs_ctup and rs_ntup
			 * usually point to the same buffer page, so
			 * PrivateRefCount[rs_cbuf] should be 2 (or more, if for
			 * instance ctup is stored in a TupleTableSlot).  - 01/09/94
			 */

870 871 872 873 874 875 876
			heapgettup(scan->rs_rd,
					   &(scan->rs_ctup),
					   -1,
					   &(scan->rs_cbuf),
					   scan->rs_snapshot,
					   scan->rs_nkeys,
					   scan->rs_key);
877 878
		}

879
		if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
880
		{
881 882
			if (BufferIsValid(scan->rs_pbuf))
				ReleaseBuffer(scan->rs_pbuf);
883
			scan->rs_ptup.t_data = NULL;
884 885 886
			scan->rs_pbuf = InvalidBuffer;
			if (BufferIsValid(scan->rs_nbuf))
				ReleaseBuffer(scan->rs_nbuf);
887
			scan->rs_ntup.t_data = NULL;
888
			scan->rs_nbuf = InvalidBuffer;
889
			return NULL;
890 891
		}

892 893
		if (BufferIsValid(scan->rs_pbuf))
			ReleaseBuffer(scan->rs_pbuf);
894
		scan->rs_ptup.t_data = NULL;
895
		scan->rs_pbuf = UnknownBuffer;
896 897 898 899 900 901 902 903

	}
	else
	{
		/* ----------------
		 *	handle forward scan
		 * ----------------
		 */
904
		if (scan->rs_ctup.t_data == scan->rs_ntup.t_data &&
905
			BufferIsInvalid(scan->rs_nbuf))
906
		{
907 908
			if (BufferIsValid(scan->rs_pbuf))
				ReleaseBuffer(scan->rs_pbuf);
909
			HEAPDEBUG_3;		/* heap_getnext returns NULL at end */
910
			return NULL;
911 912 913 914 915 916
		}

		/*
		 * Copy the "current" tuple/buffer to "previous". Pin/unpin the
		 * buffers accordingly
		 */
917
		if (scan->rs_pbuf != scan->rs_cbuf)
918
		{
919 920 921 922
			if (BufferIsValid(scan->rs_pbuf))
				ReleaseBuffer(scan->rs_pbuf);
			if (BufferIsValid(scan->rs_cbuf))
				IncrBufferRefCount(scan->rs_cbuf);
923
		}
924 925
		scan->rs_ptup = scan->rs_ctup;
		scan->rs_pbuf = scan->rs_cbuf;
926

927
		if (scan->rs_ntup.t_data != NULL)
928
		{
929
			if (scan->rs_cbuf != scan->rs_nbuf)
930
			{
931 932 933 934
				if (BufferIsValid(scan->rs_cbuf))
					ReleaseBuffer(scan->rs_cbuf);
				if (BufferIsValid(scan->rs_nbuf))
					IncrBufferRefCount(scan->rs_nbuf);
935
			}
936 937
			scan->rs_ctup = scan->rs_ntup;
			scan->rs_cbuf = scan->rs_nbuf;
938 939 940 941
			HEAPDEBUG_5;		/* heap_getnext next tuple was cached */
		}
		else
		{						/* NONTUP */
B
Bruce Momjian 已提交
942

943
			/*
944
			 * Don't release scan->rs_cbuf at this point, because
945 946 947 948 949 950 951
			 * heapgettup doesn't increase PrivateRefCount if it is
			 * already set. On a forward scan, both rs_ctup and rs_ptup
			 * usually point to the same buffer page, so
			 * PrivateRefCount[rs_cbuf] should be 2 (or more, if for
			 * instance ctup is stored in a TupleTableSlot).  - 01/09/93
			 */

952 953 954 955 956 957 958
			heapgettup(scan->rs_rd,
					   &(scan->rs_ctup),
					   1,
					   &scan->rs_cbuf,
					   scan->rs_snapshot,
					   scan->rs_nkeys,
					   scan->rs_key);
959 960
		}

961
		if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf))
962
		{
963 964
			if (BufferIsValid(scan->rs_nbuf))
				ReleaseBuffer(scan->rs_nbuf);
965
			scan->rs_ntup.t_data = NULL;
966 967 968
			scan->rs_nbuf = InvalidBuffer;
			if (BufferIsValid(scan->rs_pbuf))
				ReleaseBuffer(scan->rs_pbuf);
969
			scan->rs_ptup.t_data = NULL;
970
			scan->rs_pbuf = InvalidBuffer;
971
			HEAPDEBUG_6;		/* heap_getnext returning EOS */
972
			return NULL;
973 974
		}

975 976
		if (BufferIsValid(scan->rs_nbuf))
			ReleaseBuffer(scan->rs_nbuf);
977
		scan->rs_ntup.t_data = NULL;
978
		scan->rs_nbuf = UnknownBuffer;
979 980
	}

981
	/* ----------------
982 983
	 *	if we get here it means we have a new current scan tuple, so
	 *	point to the proper return buffer and return the tuple.
984 985
	 * ----------------
	 */
986 987 988

	HEAPDEBUG_7;				/* heap_getnext returning tuple */

989
	return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup));
990 991 992
}

/* ----------------
993
 *		heap_fetch		- retrive tuple with tid
994
 *
995
 *		Currently ignores LP_IVALID during processing!
996 997 998 999
 *
 *		Because this is not part of a scan, there is no way to
 *		automatically lock/unlock the shared buffers.
 *		For this reason, we require that the user retrieve the buffer
1000
 *		value, and they are required to BufferRelease() it when they
1001 1002
 *		are done.  If they want to make a copy of it before releasing it,
 *		they can call heap_copytyple().
1003 1004
 * ----------------
 */
1005
void
1006
heap_fetch(Relation relation,
1007
		   Snapshot snapshot,
1008
		   HeapTuple tuple,
1009
		   Buffer *userbuf)
1010
{
B
Bruce Momjian 已提交
1011 1012 1013 1014 1015
	ItemId		lp;
	Buffer		buffer;
	PageHeader	dp;
	ItemPointer tid = &(tuple->t_self);
	OffsetNumber offnum;
1016

1017 1018
	AssertMacro(PointerIsValid(userbuf));		/* see comments above */

1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_fetch);
	IncrHeapAccessStat(global_fetch);

	/* ----------------
	 *	get the buffer from the relation descriptor
	 *	Note that this does a buffer pin.
	 * ----------------
	 */

	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));

	if (!BufferIsValid(buffer))
1035
		elog(ERROR, "heap_fetch: %s relation: ReadBuffer(%lx) failed",
1036
			 &relation->rd_rel->relname, (long) tid);
V
Vadim B. Mikheev 已提交
1037 1038

	LockBuffer(buffer, BUFFER_LOCK_SHARE);
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054

	/* ----------------
	 *	get the item line pointer corresponding to the requested tid
	 * ----------------
	 */
	dp = (PageHeader) BufferGetPage(buffer);
	offnum = ItemPointerGetOffsetNumber(tid);
	lp = PageGetItemId(dp, offnum);

	/* ----------------
	 *	more sanity checks
	 * ----------------
	 */

	Assert(ItemIdIsUsed(lp));

1055 1056 1057
	tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tuple->t_len = ItemIdGetLength(lp);

1058 1059 1060 1061 1062
	/* ----------------
	 *	check time qualification of tid
	 * ----------------
	 */

1063 1064
	HeapTupleSatisfies(tuple, relation, buffer, dp,
					   snapshot, 0, (ScanKey) NULL);
1065

V
Vadim B. Mikheev 已提交
1066 1067
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

1068
	if (tuple->t_data == NULL)
1069
	{
1070
		ReleaseBuffer(buffer);
1071
		return;
1072
	}
1073 1074 1075 1076

	/* ----------------
	 *	all checks passed, now either return a copy of the tuple
	 *	or pin the buffer page and return a pointer, depending on
1077
	 *	whether caller gave us a valid buf.
1078 1079 1080
	 * ----------------
	 */

1081 1082
	*userbuf = buffer;			/* user is required to ReleaseBuffer()
								 * this */
1083

1084
	return;
1085 1086 1087
}

/* ----------------
1088
 *		heap_insert		- insert tuple
1089
 *
1090 1091
 *		The assignment of t_min (and thus the others) should be
 *		removed eventually.
1092
 *
1093 1094 1095 1096
 *		Currently places the tuple onto the last page.	If there is no room,
 *		it is placed on new pages.	(Heap relations)
 *		Note that concurrent inserts during a scan will probably have
 *		unexpected results, though this will be fixed eventually.
1097
 *
1098
 *		Fix to work with indexes.
1099 1100 1101 1102 1103
 * ----------------
 */
Oid
heap_insert(Relation relation, HeapTuple tup)
{
1104 1105 1106 1107 1108 1109
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_insert);
	IncrHeapAccessStat(global_insert);
1110

1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
	/* ----------------
	 *	If the object id of this tuple has already been assigned, trust
	 *	the caller.  There are a couple of ways this can happen.  At initial
	 *	db creation, the backend program sets oids for tuples.	When we
	 *	define an index, we set the oid.  Finally, in the future, we may
	 *	allow users to set their own object ids in order to support a
	 *	persistent object store (objects need to contain pointers to one
	 *	another).
	 * ----------------
	 */
1121
	if (!OidIsValid(tup->t_data->t_oid))
1122
	{
1123 1124
		tup->t_data->t_oid = newoid();
		LastOidProcessed = tup->t_data->t_oid;
1125 1126
	}
	else
1127
		CheckMaxObjectId(tup->t_data->t_oid);
1128

1129 1130 1131 1132 1133
	TransactionIdStore(GetCurrentTransactionId(), &(tup->t_data->t_xmin));
	tup->t_data->t_cmin = GetCurrentCommandId();
	StoreInvalidTransactionId(&(tup->t_data->t_xmax));
	tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
	tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
1134

V
Vadim B. Mikheev 已提交
1135
	RelationPutHeapTupleAtEnd(relation, tup);
1136 1137 1138 1139

	if (IsSystemRelationName(RelationGetRelationName(relation)->data))
		RelationInvalidateHeapTuple(relation, tup);

1140
	return tup->t_data->t_oid;
1141 1142
}

V
Vadim B. Mikheev 已提交
1143 1144
/*
 *	heap_delete		- delete a tuple
1145
 */
1146
int
V
Vadim B. Mikheev 已提交
1147
heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid)
1148
{
B
Bruce Momjian 已提交
1149 1150 1151 1152 1153
	ItemId		lp;
	HeapTupleData tp;
	PageHeader	dp;
	Buffer		buffer;
	int			result;
1154

V
Vadim B. Mikheev 已提交
1155
	/* increment access statistics */
1156 1157 1158 1159 1160
	IncrHeapAccessStat(local_delete);
	IncrHeapAccessStat(global_delete);

	Assert(ItemPointerIsValid(tid));

V
Vadim B. Mikheev 已提交
1161
	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
1162

V
Vadim B. Mikheev 已提交
1163
	if (!BufferIsValid(buffer))
1164
		elog(ERROR, "heap_delete: failed ReadBuffer");
1165

V
Vadim B. Mikheev 已提交
1166
	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1167

V
Vadim B. Mikheev 已提交
1168 1169
	dp = (PageHeader) BufferGetPage(buffer);
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
1170 1171 1172
	tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tp.t_len = ItemIdGetLength(lp);
	tp.t_self = *tid;
B
Bruce Momjian 已提交
1173

V
Vadim B. Mikheev 已提交
1174 1175
l1:
	result = HeapTupleSatisfiesUpdate(&tp);
B
Bruce Momjian 已提交
1176

V
Vadim B. Mikheev 已提交
1177
	if (result == HeapTupleInvisible)
1178
	{
V
Vadim B. Mikheev 已提交
1179 1180 1181
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
		elog(ERROR, "heap_delete: (am)invalid tid");
1182
	}
V
Vadim B. Mikheev 已提交
1183
	else if (result == HeapTupleBeingUpdated)
1184
	{
B
Bruce Momjian 已提交
1185
		TransactionId xwait = tp.t_data->t_xmax;
V
Vadim B. Mikheev 已提交
1186

B
Bruce Momjian 已提交
1187
		/* sleep until concurrent transaction ends */
V
Vadim B. Mikheev 已提交
1188 1189 1190 1191 1192 1193
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
		if (TransactionIdDidAbort(xwait))
			goto l1;
1194 1195 1196 1197 1198 1199 1200
		/* 
		 * xwait is committed but if xwait had just marked
		 * the tuple for update then some other xaction could 
		 * update this tuple before we got to this point.
		 */
		if (tp.t_data->t_xmax != xwait)
			goto l1;
V
Vadim B. Mikheev 已提交
1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219
		if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
	if (result != HeapTupleMayBeUpdated)
	{
		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
		if (ctid != NULL)
			*ctid = tp.t_data->t_ctid;
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(buffer);
		return result;
1220 1221
	}

V
Vadim B. Mikheev 已提交
1222
	/* store transaction information of xact deleting the tuple */
1223 1224
	TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax));
	tp.t_data->t_cmax = GetCurrentCommandId();
B
Bruce Momjian 已提交
1225 1226
	tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
							 HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
1227

V
Vadim B. Mikheev 已提交
1228 1229 1230
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

	/* invalidate caches */
1231
	RelationInvalidateHeapTuple(relation, &tp);
1232

V
Vadim B. Mikheev 已提交
1233
	WriteBuffer(buffer);
1234

V
Vadim B. Mikheev 已提交
1235
	return HeapTupleMayBeUpdated;
1236 1237
}

V
Vadim B. Mikheev 已提交
1238 1239
/*
 *	heap_replace	- replace a tuple
1240 1241
 */
int
B
Bruce Momjian 已提交
1242 1243
heap_replace(Relation relation, ItemPointer otid, HeapTuple newtup,
			 ItemPointer ctid)
1244
{
B
Bruce Momjian 已提交
1245 1246 1247 1248 1249
	ItemId		lp;
	HeapTupleData oldtup;
	PageHeader	dp;
	Buffer		buffer;
	int			result;
1250

V
Vadim B. Mikheev 已提交
1251
	/* increment access statistics */
1252 1253 1254 1255 1256 1257 1258
	IncrHeapAccessStat(local_replace);
	IncrHeapAccessStat(global_replace);

	Assert(ItemPointerIsValid(otid));

	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid));
	if (!BufferIsValid(buffer))
1259
		elog(ERROR, "amreplace: failed ReadBuffer");
V
Vadim B. Mikheev 已提交
1260
	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1261

V
Vadim B. Mikheev 已提交
1262
	dp = (PageHeader) BufferGetPage(buffer);
1263 1264
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid));

1265 1266 1267
	oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
	oldtup.t_len = ItemIdGetLength(lp);
	oldtup.t_self = *otid;
1268

V
Vadim B. Mikheev 已提交
1269 1270
l2:
	result = HeapTupleSatisfiesUpdate(&oldtup);
B
Bruce Momjian 已提交
1271

V
Vadim B. Mikheev 已提交
1272
	if (result == HeapTupleInvisible)
1273
	{
V
Vadim B. Mikheev 已提交
1274
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1275
		ReleaseBuffer(buffer);
V
Vadim B. Mikheev 已提交
1276
		elog(ERROR, "heap_replace: (am)invalid tid");
1277
	}
V
Vadim B. Mikheev 已提交
1278
	else if (result == HeapTupleBeingUpdated)
1279
	{
B
Bruce Momjian 已提交
1280
		TransactionId xwait = oldtup.t_data->t_xmax;
V
Vadim B. Mikheev 已提交
1281 1282 1283 1284 1285 1286 1287 1288

		/* sleep untill concurrent transaction ends */
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
		if (TransactionIdDidAbort(xwait))
			goto l2;
1289 1290 1291 1292 1293 1294 1295
		/* 
		 * xwait is committed but if xwait had just marked
		 * the tuple for update then some other xaction could 
		 * update this tuple before we got to this point.
		 */
		if (oldtup.t_data->t_xmax != xwait)
			goto l2;
V
Vadim B. Mikheev 已提交
1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
		if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
	if (result != HeapTupleMayBeUpdated)
	{
		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
		if (ctid != NULL)
			*ctid = oldtup.t_data->t_ctid;
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
1313
		ReleaseBuffer(buffer);
V
Vadim B. Mikheev 已提交
1314
		return result;
1315 1316 1317
	}

	/* XXX order problems if not atomic assignment ??? */
1318 1319 1320 1321 1322
	newtup->t_data->t_oid = oldtup.t_data->t_oid;
	TransactionIdStore(GetCurrentTransactionId(), &(newtup->t_data->t_xmin));
	newtup->t_data->t_cmin = GetCurrentCommandId();
	StoreInvalidTransactionId(&(newtup->t_data->t_xmax));
	newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
1323
	newtup->t_data->t_infomask |= (HEAP_XMAX_INVALID | HEAP_UPDATED);
1324

V
Vadim B. Mikheev 已提交
1325 1326 1327
	/* logically delete old item */
	TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax));
	oldtup.t_data->t_cmax = GetCurrentCommandId();
B
Bruce Momjian 已提交
1328 1329
	oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
							 HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
V
Vadim B. Mikheev 已提交
1330 1331

	/* insert new item */
1332
	if ((unsigned) MAXALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp))
V
Vadim B. Mikheev 已提交
1333
		RelationPutHeapTuple(relation, buffer, newtup);
1334 1335
	else
	{
B
Bruce Momjian 已提交
1336

V
Vadim B. Mikheev 已提交
1337
		/*
B
Bruce Momjian 已提交
1338 1339 1340 1341
		 * New item won't fit on same page as old item, have to look for a
		 * new place to put it. Note that we have to unlock current buffer
		 * context - not good but RelationPutHeapTupleAtEnd uses extend
		 * lock.
1342
		 */
V
Vadim B. Mikheev 已提交
1343 1344 1345
		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
		RelationPutHeapTupleAtEnd(relation, newtup);
		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
1346 1347
	}

V
Vadim B. Mikheev 已提交
1348
	/*
B
Bruce Momjian 已提交
1349 1350
	 * New item in place, now record address of new tuple in t_ctid of old
	 * one.
1351
	 */
V
Vadim B. Mikheev 已提交
1352
	oldtup.t_data->t_ctid = newtup->t_self;
1353

V
Vadim B. Mikheev 已提交
1354 1355 1356
	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

	/* invalidate caches */
1357
	RelationInvalidateHeapTuple(relation, &oldtup);
1358 1359 1360

	WriteBuffer(buffer);

V
Vadim B. Mikheev 已提交
1361 1362 1363 1364 1365 1366 1367 1368 1369
	return HeapTupleMayBeUpdated;
}

/*
 *	heap_mark4update		- mark a tuple for update
 */
int
heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer)
{
B
Bruce Momjian 已提交
1370 1371 1372 1373
	ItemPointer tid = &(tuple->t_self);
	ItemId		lp;
	PageHeader	dp;
	int			result;
V
Vadim B. Mikheev 已提交
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389

	/* increment access statistics */
	IncrHeapAccessStat(local_mark4update);
	IncrHeapAccessStat(global_mark4update);

	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));

	if (!BufferIsValid(*buffer))
		elog(ERROR, "heap_mark4update: failed ReadBuffer");

	LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);

	dp = (PageHeader) BufferGetPage(*buffer);
	lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
	tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
	tuple->t_len = ItemIdGetLength(lp);
B
Bruce Momjian 已提交
1390

V
Vadim B. Mikheev 已提交
1391 1392
l3:
	result = HeapTupleSatisfiesUpdate(tuple);
B
Bruce Momjian 已提交
1393

V
Vadim B. Mikheev 已提交
1394 1395 1396 1397 1398 1399 1400 1401
	if (result == HeapTupleInvisible)
	{
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		ReleaseBuffer(*buffer);
		elog(ERROR, "heap_mark4update: (am)invalid tid");
	}
	else if (result == HeapTupleBeingUpdated)
	{
B
Bruce Momjian 已提交
1402
		TransactionId xwait = tuple->t_data->t_xmax;
V
Vadim B. Mikheev 已提交
1403 1404 1405 1406 1407 1408 1409 1410

		/* sleep untill concurrent transaction ends */
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		XactLockTableWait(xwait);

		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
		if (TransactionIdDidAbort(xwait))
			goto l3;
1411 1412 1413 1414 1415 1416 1417
		/* 
		 * xwait is committed but if xwait had just marked
		 * the tuple for update then some other xaction could 
		 * update this tuple before we got to this point.
		 */
		if (tuple->t_data->t_xmax != xwait)
			goto l3;
V
Vadim B. Mikheev 已提交
1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431
		if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
		{
			tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
			SetBufferCommitInfoNeedsSave(*buffer);
		}
		/* if tuple was marked for update but not updated... */
		if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
			result = HeapTupleMayBeUpdated;
		else
			result = HeapTupleUpdated;
	}
	if (result != HeapTupleMayBeUpdated)
	{
		Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
1432
		tuple->t_self = tuple->t_data->t_ctid;
V
Vadim B. Mikheev 已提交
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445
		LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
		return result;
	}

	/* store transaction information of xact marking the tuple */
	TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
	tuple->t_data->t_cmax = GetCurrentCommandId();
	tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
	tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;

	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);

	WriteNoReleaseBuffer(*buffer);
1446

V
Vadim B. Mikheev 已提交
1447
	return HeapTupleMayBeUpdated;
1448 1449 1450
}

/* ----------------
1451
 *		heap_markpos	- mark scan position
1452
 *
1453 1454 1455 1456 1457 1458
 *		Note:
 *				Should only one mark be maintained per scan at one time.
 *		Check if this can be done generally--say calls to get the
 *		next/previous tuple and NEVER pass struct scandesc to the
 *		user AM's.  Now, the mark is sent to the executor for safekeeping.
 *		Probably can store this info into a GENERAL scan structure.
1459
 *
1460 1461 1462
 *		May be best to change this call to store the marked position
 *		(up to 2?) in the scan structure itself.
 *		Fix to use the proper caching structure.
1463 1464 1465
 * ----------------
 */
void
1466
heap_markpos(HeapScanDesc scan)
1467
{
1468 1469 1470 1471 1472 1473 1474 1475 1476 1477

	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_markpos);
	IncrHeapAccessStat(global_markpos);

	/* Note: no locking manipulations needed */

1478
	if (scan->rs_ptup.t_data == NULL &&
1479
		BufferIsUnknown(scan->rs_pbuf))
1480
	{							/* == NONTUP */
1481 1482 1483 1484 1485 1486 1487 1488
		scan->rs_ptup = scan->rs_ctup;
		heapgettup(scan->rs_rd,
				   &(scan->rs_ptup),
				   -1,
				   &scan->rs_pbuf,
				   scan->rs_snapshot,
				   scan->rs_nkeys,
				   scan->rs_key);
1489 1490

	}
1491
	else if (scan->rs_ntup.t_data == NULL &&
1492
			 BufferIsUnknown(scan->rs_nbuf))
1493
	{							/* == NONTUP */
1494 1495 1496 1497 1498 1499 1500 1501
		scan->rs_ntup = scan->rs_ctup;
		heapgettup(scan->rs_rd,
				   &(scan->rs_ntup),
				   1,
				   &scan->rs_nbuf,
				   scan->rs_snapshot,
				   scan->rs_nkeys,
				   scan->rs_key);
1502 1503 1504 1505 1506 1507
	}

	/* ----------------
	 * Should not unpin the buffer pages.  They may still be in use.
	 * ----------------
	 */
1508 1509
	if (scan->rs_ptup.t_data != NULL)
		scan->rs_mptid = scan->rs_ptup.t_self;
1510
	else
1511
		ItemPointerSetInvalid(&scan->rs_mptid);
1512 1513
	if (scan->rs_ctup.t_data != NULL)
		scan->rs_mctid = scan->rs_ctup.t_self;
1514
	else
1515
		ItemPointerSetInvalid(&scan->rs_mctid);
1516 1517
	if (scan->rs_ntup.t_data != NULL)
		scan->rs_mntid = scan->rs_ntup.t_self;
1518
	else
1519
		ItemPointerSetInvalid(&scan->rs_mntid);
1520 1521 1522
}

/* ----------------
1523
 *		heap_restrpos	- restore position to marked location
1524
 *
1525 1526 1527 1528 1529
 *		Note:  there are bad side effects here.  If we were past the end
 *		of a relation when heapmarkpos is called, then if the relation is
 *		extended via insert, then the next call to heaprestrpos will set
 *		cause the added tuples to be visible when the scan continues.
 *		Problems also arise if the TID's are rearranged!!!
1530
 *
1531 1532 1533
 *		Now pins buffer once for each valid tuple pointer (rs_ptup,
 *		rs_ctup, rs_ntup) referencing it.
 *		 - 01/13/94
1534 1535
 *
 * XXX	might be better to do direct access instead of
1536
 *		using the generality of heapgettup().
1537 1538 1539 1540 1541 1542 1543
 *
 * XXX It is very possible that when a scan is restored, that a tuple
 * XXX which previously qualified may fail for time range purposes, unless
 * XXX some form of locking exists (ie., portals currently can act funny.
 * ----------------
 */
void
1544
heap_restrpos(HeapScanDesc scan)
1545
{
1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
	/* ----------------
	 *	increment access statistics
	 * ----------------
	 */
	IncrHeapAccessStat(local_restrpos);
	IncrHeapAccessStat(global_restrpos);

	/* XXX no amrestrpos checking that ammarkpos called */

	/* Note: no locking manipulations needed */

1557
	unpinscan(scan);
1558 1559

	/* force heapgettup to pin buffer for each loaded tuple */
1560 1561 1562
	scan->rs_pbuf = InvalidBuffer;
	scan->rs_cbuf = InvalidBuffer;
	scan->rs_nbuf = InvalidBuffer;
1563

1564
	if (!ItemPointerIsValid(&scan->rs_mptid))
1565
		scan->rs_ptup.t_data = NULL;
1566 1567
	else
	{
1568 1569 1570 1571 1572 1573 1574 1575 1576
		scan->rs_ptup.t_self = scan->rs_mptid;
		scan->rs_ptup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
		heapgettup(scan->rs_rd,
				   &(scan->rs_ptup),
				   0,
				   &(scan->rs_pbuf),
				   false,
				   0,
				   (ScanKey) NULL);
1577 1578
	}

1579
	if (!ItemPointerIsValid(&scan->rs_mctid))
1580
		scan->rs_ctup.t_data = NULL;
1581 1582
	else
	{
1583 1584 1585 1586 1587 1588 1589 1590 1591
		scan->rs_ctup.t_self = scan->rs_mctid;
		scan->rs_ctup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
		heapgettup(scan->rs_rd,
				   &(scan->rs_ctup),
				   0,
				   &(scan->rs_cbuf),
				   false,
				   0,
				   (ScanKey) NULL);
1592 1593
	}

1594
	if (!ItemPointerIsValid(&scan->rs_mntid))
1595
		scan->rs_ntup.t_data = NULL;
1596 1597
	else
	{
1598 1599 1600 1601 1602 1603 1604 1605 1606
		scan->rs_ntup.t_self = scan->rs_mntid;
		scan->rs_ntup.t_data = (HeapTupleHeader) 0x1;	/* for heapgettup */
		heapgettup(scan->rs_rd,
				   &(scan->rs_ntup),
				   0,
				   &scan->rs_nbuf,
				   false,
				   0,
				   (ScanKey) NULL);
1607
	}
1608
}