tuplestore.c 19.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*-------------------------------------------------------------------------
 *
 * tuplestore.c
 *	  Generalized routines for temporary tuple storage.
 *
 * This module handles temporary storage of tuples for purposes such
 * as Materialize nodes, hashjoin batch files, etc.  It is essentially
 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
 * but can only store a sequence of tuples and regurgitate it later.
 * A temporary file is used to handle the data if it exceeds the
 * space limit specified by the caller.
 *
 * The (approximate) amount of memory allowed to the tuplestore is specified
B
Bruce Momjian 已提交
14
 * in kilobytes by the caller.	We absorb tuples and simply store them in an
15 16 17 18 19 20 21 22 23 24
 * in-memory array as long as we haven't exceeded maxKBytes.  If we reach the
 * end of the input without exceeding maxKBytes, we just return tuples during
 * the read phase by scanning the tuple array sequentially.  If we do exceed
 * maxKBytes, we dump all the tuples into a temp file and then read from that
 * during the read phase.
 *
 * When the caller requests random access to the data, we write the temp file
 * in a format that allows either forward or backward scan.
 *
 *
B
Bruce Momjian 已提交
25
 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
26 27 28
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
29
 *	  $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.9 2002/11/11 03:02:19 momjian Exp $
30 31 32 33 34 35 36 37 38 39 40
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/heapam.h"
#include "storage/buffile.h"
#include "utils/tuplestore.h"

/*
B
Bruce Momjian 已提交
41
 * Possible states of a Tuplestore object.	These denote the states that
42 43 44 45
 * persist between calls of Tuplestore routines.
 */
typedef enum
{
46
	TSS_INITIAL,				/* Loading tuples; still within memory
47
								 * limit */
48 49 50
	TSS_WRITEFILE,				/* Loading tuples; writing to temp file */
	TSS_READMEM,				/* Reading tuples; entirely in memory */
	TSS_READFILE				/* Reading tuples from temp file */
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
} TupStoreStatus;

/*
 * Private state of a Tuplestore operation.
 */
struct Tuplestorestate
{
	TupStoreStatus status;		/* enumerated value as shown above */
	bool		randomAccess;	/* did caller request random access? */
	long		availMem;		/* remaining memory available, in bytes */
	BufFile    *myfile;			/* underlying file, or NULL if none */

	/*
	 * These function pointers decouple the routines that must know what
	 * kind of tuple we are handling from the routines that don't need to
	 * know it. They are set up by the tuplestore_begin_xxx routines.
	 *
	 * (Although tuplestore.c currently only supports heap tuples, I've
B
Bruce Momjian 已提交
69 70
	 * copied this part of tuplesort.c so that extension to other kinds of
	 * objects will be easy if it's ever needed.)
71
	 *
B
Bruce Momjian 已提交
72 73 74
	 * Function to copy a supplied input tuple into palloc'd space. (NB: we
	 * assume that a single pfree() is enough to release the tuple later,
	 * so the representation must be "flat" in one palloc chunk.)
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
	 * state->availMem must be decreased by the amount of space used.
	 */
	void	   *(*copytup) (Tuplestorestate *state, void *tup);

	/*
	 * Function to write a stored tuple onto tape.	The representation of
	 * the tuple on tape need not be the same as it is in memory;
	 * requirements on the tape representation are given below.  After
	 * writing the tuple, pfree() it, and increase state->availMem by the
	 * amount of memory space thereby released.
	 */
	void		(*writetup) (Tuplestorestate *state, void *tup);

	/*
	 * Function to read a stored tuple from tape back into memory. 'len'
	 * is the already-read length of the stored tuple.	Create and return
	 * a palloc'd copy, and decrease state->availMem by the amount of
	 * memory space consumed.
	 */
	void	   *(*readtup) (Tuplestorestate *state, unsigned int len);

	/*
	 * This array holds pointers to tuples in memory if we are in state
B
Bruce Momjian 已提交
98 99
	 * INITIAL or READMEM.	In states WRITEFILE and READFILE it's not
	 * used.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
	 */
	void	  **memtuples;		/* array of pointers to palloc'd tuples */
	int			memtupcount;	/* number of tuples currently present */
	int			memtupsize;		/* allocated length of memtuples array */

	/*
	 * These variables are used after completion of storing to keep track
	 * of the next tuple to return.  (In the tape case, the tape's current
	 * read position is also critical state.)
	 */
	int			current;		/* array index (only used if READMEM) */
	bool		eof_reached;	/* reached EOF (needed for cursors) */

	/* markpos_xxx holds marked position for mark and restore */
	int			markpos_file;	/* file# (only used if READFILE) */
	long		markpos_offset; /* saved "current", or offset in tape file */
	bool		markpos_eof;	/* saved "eof_reached" */
};

#define COPYTUP(state,tup)	((*(state)->copytup) (state, tup))
B
Bruce Momjian 已提交
120
#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
#define READTUP(state,len)	((*(state)->readtup) (state, len))
#define LACKMEM(state)		((state)->availMem < 0)
#define USEMEM(state,amt)	((state)->availMem -= (amt))
#define FREEMEM(state,amt)	((state)->availMem += (amt))

/*--------------------
 *
 * NOTES about on-tape representation of tuples:
 *
 * We require the first "unsigned int" of a stored tuple to be the total size
 * on-tape of the tuple, including itself (so it is never zero; an all-zero
 * unsigned int is used to delimit runs).  The remainder of the stored tuple
 * may or may not match the in-memory representation of the tuple ---
 * any conversion needed is the job of the writetup and readtup routines.
 *
 * If state->randomAccess is true, then the stored representation of the
 * tuple must be followed by another "unsigned int" that is a copy of the
 * length --- so the total tape space used is actually sizeof(unsigned int)
 * more than the stored length value.  This allows read-backwards.	When
 * randomAccess is not true, the write/read routines may omit the extra
 * length word.
 *
 * writetup is expected to write both length words as well as the tuple
 * data.  When readtup is called, the tape is positioned just after the
 * front length word; readtup must read the tuple data and advance past
 * the back length word (if present).
 *
 * The write/read routines can make use of the tuple description data
B
Bruce Momjian 已提交
149
 * stored in the Tuplestorestate record, if needed. They are also expected
150 151 152 153 154 155 156
 * to adjust state->availMem by the amount of memory space (not tape space!)
 * released or consumed.  There is no error return from either writetup
 * or readtup; they should elog() on failure.
 *
 *
 * NOTES about memory consumption calculations:
 *
157 158 159
 * We count space allocated for tuples against the maxKBytes limit,
 * plus the space used by the variable-size array memtuples.
 * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
160
 *
161 162 163 164 165
 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
 * rather than the originally-requested size.  This is important since
 * palloc can add substantial overhead.  It's not a complete answer since
 * we won't count any wasted space in palloc allocation blocks, but it's
 * a lot better than what we were doing before 7.3.
166 167 168 169 170 171
 *
 *--------------------
 */


static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
B
Bruce Momjian 已提交
172
						int maxKBytes);
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
static void dumptuples(Tuplestorestate *state);
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
static void markrunend(Tuplestorestate *state);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
static void *readtup_heap(Tuplestorestate *state, unsigned int len);


/*
 *		tuplestore_begin_xxx
 *
 * Initialize for a tuple store operation.
 *
 * After calling tuplestore_begin, the caller should call tuplestore_puttuple
 * zero or more times, then call tuplestore_donestoring when all the tuples
 * have been supplied.	After donestoring, retrieve the tuples in order
 * by calling tuplestore_gettuple until it returns NULL.  (If random
 * access was requested, rescan, markpos, and restorepos can also be called.)
 * Call tuplestore_end to terminate the operation and release memory/disk
 * space.
 */

static Tuplestorestate *
tuplestore_begin_common(bool randomAccess, int maxKBytes)
{
	Tuplestorestate *state;

200 201 202
	state = (Tuplestorestate *) palloc(sizeof(Tuplestorestate));

	MemSet((char *) state, 0, sizeof(Tuplestorestate));
203 204 205 206 207 208 209 210

	state->status = TSS_INITIAL;
	state->randomAccess = randomAccess;
	state->availMem = maxKBytes * 1024L;
	state->myfile = NULL;

	state->memtupcount = 0;
	if (maxKBytes > 0)
B
Bruce Momjian 已提交
211
		state->memtupsize = 1024;		/* initial guess */
212 213 214 215
	else
		state->memtupsize = 1;	/* won't really need any space */
	state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));

216 217
	USEMEM(state, GetMemoryChunkSpace(state->memtuples));

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
	return state;
}

Tuplestorestate *
tuplestore_begin_heap(bool randomAccess, int maxKBytes)
{
	Tuplestorestate *state = tuplestore_begin_common(randomAccess, maxKBytes);

	state->copytup = copytup_heap;
	state->writetup = writetup_heap;
	state->readtup = readtup_heap;

	return state;
}

/*
 * tuplestore_end
 *
 *	Release resources and clean up.
 */
void
tuplestore_end(Tuplestorestate *state)
{
	int			i;

	if (state->myfile)
		BufFileClose(state->myfile);
	if (state->memtuples)
	{
		for (i = 0; i < state->memtupcount; i++)
			pfree(state->memtuples[i]);
		pfree(state->memtuples);
	}
}

/*
 * Accept one tuple while collecting input data.
 *
 * Note that the input tuple is always copied; the caller need not save it.
 */
void
tuplestore_puttuple(Tuplestorestate *state, void *tuple)
{
	/*
B
Bruce Momjian 已提交
262
	 * Copy the tuple.	(Must do this even in WRITEFILE case.)
263 264 265 266 267 268
	 */
	tuple = COPYTUP(state, tuple);

	switch (state->status)
	{
		case TSS_INITIAL:
B
Bruce Momjian 已提交
269

270 271 272 273 274 275
			/*
			 * Stash the tuple in the in-memory array.
			 */
			if (state->memtupcount >= state->memtupsize)
			{
				/* Grow the array as needed. */
276
				FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
277 278 279 280
				state->memtupsize *= 2;
				state->memtuples = (void **)
					repalloc(state->memtuples,
							 state->memtupsize * sizeof(void *));
281
				USEMEM(state, GetMemoryChunkSpace(state->memtuples));
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
			}
			state->memtuples[state->memtupcount++] = tuple;

			/*
			 * Done if we still fit in available memory.
			 */
			if (!LACKMEM(state))
				return;

			/*
			 * Nope; time to switch to tape-based operation.
			 */
			state->myfile = BufFileCreateTemp();
			state->status = TSS_WRITEFILE;
			dumptuples(state);
			break;
		case TSS_WRITEFILE:
			WRITETUP(state, tuple);
			break;
		default:
			elog(ERROR, "tuplestore_puttuple: invalid state");
			break;
	}
}

/*
 * All tuples have been provided; finish writing.
 */
void
tuplestore_donestoring(Tuplestorestate *state)
{
	switch (state->status)
	{
315
		case TSS_INITIAL:
B
Bruce Momjian 已提交
316

317 318 319 320 321 322 323 324 325 326 327
			/*
			 * We were able to accumulate all the tuples within the
			 * allowed amount of memory.  Just set up to scan them.
			 */
			state->current = 0;
			state->eof_reached = false;
			state->markpos_offset = 0L;
			state->markpos_eof = false;
			state->status = TSS_READMEM;
			break;
		case TSS_WRITEFILE:
B
Bruce Momjian 已提交
328

329 330 331 332
			/*
			 * Write the EOF marker.
			 */
			markrunend(state);
B
Bruce Momjian 已提交
333

334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
			/*
			 * Set up for reading from tape.
			 */
			if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
				elog(ERROR, "tuplestore_donestoring: seek(0) failed");
			state->eof_reached = false;
			state->markpos_file = 0;
			state->markpos_offset = 0L;
			state->markpos_eof = false;
			state->status = TSS_READFILE;
			break;
		default:
			elog(ERROR, "tuplestore_donestoring: invalid state");
			break;
	}
}

/*
 * Fetch the next tuple in either forward or back direction.
 * Returns NULL if no more tuples.	If should_free is set, the
 * caller must pfree the returned tuple when done with it.
 */
void *
tuplestore_gettuple(Tuplestorestate *state, bool forward,
B
Bruce Momjian 已提交
358
					bool *should_free)
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
{
	unsigned int tuplen;
	void	   *tup;

	switch (state->status)
	{
		case TSS_READMEM:
			Assert(forward || state->randomAccess);
			*should_free = false;
			if (forward)
			{
				if (state->current < state->memtupcount)
					return state->memtuples[state->current++];
				state->eof_reached = true;
				return NULL;
			}
			else
			{
				if (state->current <= 0)
					return NULL;

				/*
				 * if all tuples are fetched already then we return last
				 * tuple, else - tuple before last returned.
				 */
				if (state->eof_reached)
					state->eof_reached = false;
				else
				{
					state->current--;	/* last returned tuple */
					if (state->current <= 0)
						return NULL;
				}
				return state->memtuples[state->current - 1];
			}
			break;

		case TSS_READFILE:
			Assert(forward || state->randomAccess);
			*should_free = true;
			if (forward)
			{
				if (state->eof_reached)
					return NULL;
				if ((tuplen = getlen(state, true)) != 0)
				{
					tup = READTUP(state, tuplen);
					return tup;
				}
				else
				{
					state->eof_reached = true;
					return NULL;
				}
			}

			/*
			 * Backward.
			 *
			 * if all tuples are fetched already then we return last tuple,
			 * else - tuple before last returned.
			 */
			if (state->eof_reached)
			{
				/*
				 * Seek position is pointing just past the zero tuplen at
				 * the end of file; back up to fetch last tuple's ending
				 * length word.  If seek fails we must have a completely
				 * empty file.
				 */
				if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
430
								-(long) (2 * sizeof(unsigned int)),
431 432 433 434 435 436 437 438 439 440 441 442
								SEEK_CUR) != 0)
					return NULL;
				state->eof_reached = false;
			}
			else
			{
				/*
				 * Back up and fetch previously-returned tuple's ending
				 * length word.  If seek fails, assume we are at start of
				 * file.
				 */
				if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
443
								-(long) sizeof(unsigned int),
444 445 446 447 448 449 450 451
								SEEK_CUR) != 0)
					return NULL;
				tuplen = getlen(state, false);

				/*
				 * Back up to get ending length word of tuple before it.
				 */
				if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
452
							 -(long) (tuplen + 2 * sizeof(unsigned int)),
453 454 455 456 457 458 459 460 461
								SEEK_CUR) != 0)
				{
					/*
					 * If that fails, presumably the prev tuple is the
					 * first in the file.  Back up so that it becomes next
					 * to read in forward direction (not obviously right,
					 * but that is what in-memory case does).
					 */
					if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
462
								 -(long) (tuplen + sizeof(unsigned int)),
463 464 465 466 467 468 469 470 471 472 473 474 475 476
									SEEK_CUR) != 0)
						elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
					return NULL;
				}
			}

			tuplen = getlen(state, false);

			/*
			 * Now we have the length of the prior tuple, back up and read
			 * it. Note: READTUP expects we are positioned after the
			 * initial length word of the tuple, so back up to that point.
			 */
			if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
477
							-(long) tuplen,
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
							SEEK_CUR) != 0)
				elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
			tup = READTUP(state, tuplen);
			return tup;

		default:
			elog(ERROR, "tuplestore_gettuple: invalid state");
			return NULL;		/* keep compiler quiet */
	}
}

/*
 * dumptuples - remove tuples from memory and write to tape
 */
static void
dumptuples(Tuplestorestate *state)
{
	int			i;

	for (i = 0; i < state->memtupcount; i++)
		WRITETUP(state, state->memtuples[i]);
	state->memtupcount = 0;
}

/*
 * tuplestore_rescan		- rewind and replay the scan
 */
void
tuplestore_rescan(Tuplestorestate *state)
{
	Assert(state->randomAccess);

	switch (state->status)
	{
		case TSS_READMEM:
			state->current = 0;
			state->eof_reached = false;
			state->markpos_offset = 0L;
			state->markpos_eof = false;
			break;
		case TSS_READFILE:
			if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
				elog(ERROR, "tuplestore_rescan: seek(0) failed");
			state->eof_reached = false;
			state->markpos_file = 0;
			state->markpos_offset = 0L;
			state->markpos_eof = false;
			break;
		default:
			elog(ERROR, "tuplestore_rescan: invalid state");
			break;
	}
}

/*
 * tuplestore_markpos	- saves current position in the tuple sequence
 */
void
tuplestore_markpos(Tuplestorestate *state)
{
	Assert(state->randomAccess);

	switch (state->status)
	{
		case TSS_READMEM:
			state->markpos_offset = state->current;
			state->markpos_eof = state->eof_reached;
			break;
		case TSS_READFILE:
			BufFileTell(state->myfile,
						&state->markpos_file,
						&state->markpos_offset);
			state->markpos_eof = state->eof_reached;
			break;
		default:
			elog(ERROR, "tuplestore_markpos: invalid state");
			break;
	}
}

/*
 * tuplestore_restorepos - restores current position in tuple sequence to
 *						  last saved position
 */
void
tuplestore_restorepos(Tuplestorestate *state)
{
	Assert(state->randomAccess);

	switch (state->status)
	{
		case TSS_READMEM:
			state->current = (int) state->markpos_offset;
			state->eof_reached = state->markpos_eof;
			break;
		case TSS_READFILE:
			if (BufFileSeek(state->myfile,
							state->markpos_file,
							state->markpos_offset,
							SEEK_SET) != 0)
				elog(ERROR, "tuplestore_restorepos failed");
			state->eof_reached = state->markpos_eof;
			break;
		default:
			elog(ERROR, "tuplestore_restorepos: invalid state");
			break;
	}
}


/*
 * Tape interface routines
 */

static unsigned int
getlen(Tuplestorestate *state, bool eofOK)
{
	unsigned int len;

	if (BufFileRead(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
		elog(ERROR, "tuplestore: unexpected end of tape");
	if (len == 0 && !eofOK)
		elog(ERROR, "tuplestore: unexpected end of data");
	return len;
}

static void
markrunend(Tuplestorestate *state)
{
	unsigned int len = 0;

	if (BufFileWrite(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
		elog(ERROR, "tuplestore: write failed");
}


/*
 * Routines specialized for HeapTuple case
 */

static void *
copytup_heap(Tuplestorestate *state, void *tup)
{
	HeapTuple	tuple = (HeapTuple) tup;

623 624 625
	tuple = heap_copytuple(tuple);
	USEMEM(state, GetMemoryChunkSpace(tuple));
	return (void *) tuple;
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
}

/*
 * We don't bother to write the HeapTupleData part of the tuple.
 */

static void
writetup_heap(Tuplestorestate *state, void *tup)
{
	HeapTuple	tuple = (HeapTuple) tup;
	unsigned int tuplen;

	tuplen = tuple->t_len + sizeof(tuplen);
	if (BufFileWrite(state->myfile, (void *) &tuplen,
					 sizeof(tuplen)) != sizeof(tuplen))
		elog(ERROR, "tuplestore: write failed");
	if (BufFileWrite(state->myfile, (void *) tuple->t_data,
					 tuple->t_len) != (size_t) tuple->t_len)
		elog(ERROR, "tuplestore: write failed");
	if (state->randomAccess)	/* need trailing length word? */
		if (BufFileWrite(state->myfile, (void *) &tuplen,
						 sizeof(tuplen)) != sizeof(tuplen))
			elog(ERROR, "tuplestore: write failed");

650
	FREEMEM(state, GetMemoryChunkSpace(tuple));
651 652 653 654 655 656 657 658 659
	heap_freetuple(tuple);
}

static void *
readtup_heap(Tuplestorestate *state, unsigned int len)
{
	unsigned int tuplen = len - sizeof(unsigned int) + HEAPTUPLESIZE;
	HeapTuple	tuple = (HeapTuple) palloc(tuplen);

660
	USEMEM(state, GetMemoryChunkSpace(tuple));
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
	/* reconstruct the HeapTupleData portion */
	tuple->t_len = len - sizeof(unsigned int);
	ItemPointerSetInvalid(&(tuple->t_self));
	tuple->t_datamcxt = CurrentMemoryContext;
	tuple->t_data = (HeapTupleHeader) (((char *) tuple) + HEAPTUPLESIZE);
	/* read in the tuple proper */
	if (BufFileRead(state->myfile, (void *) tuple->t_data,
					tuple->t_len) != (size_t) tuple->t_len)
		elog(ERROR, "tuplestore: unexpected end of data");
	if (state->randomAccess)	/* need trailing length word? */
		if (BufFileRead(state->myfile, (void *) &tuplen,
						sizeof(tuplen)) != sizeof(tuplen))
			elog(ERROR, "tuplestore: unexpected end of data");
	return (void *) tuple;
}