tuplestore.c 23.2 KB
Newer Older
1 2 3 4 5 6 7 8
/*-------------------------------------------------------------------------
 *
 * tuplestore.c
 *	  Generalized routines for temporary tuple storage.
 *
 * This module handles temporary storage of tuples for purposes such
 * as Materialize nodes, hashjoin batch files, etc.  It is essentially
 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 10
 * but can only store and regurgitate a sequence of tuples.  However,
 * because no sort is required, it is allowed to start reading the sequence
B
Bruce Momjian 已提交
11
 * before it has all been written.	This is particularly useful for cursors,
12 13
 * because it allows random access within the already-scanned portion of
 * a query without having to process the underlying scan to completion.
14 15 16 17
 * A temporary file is used to handle the data if it exceeds the
 * space limit specified by the caller.
 *
 * The (approximate) amount of memory allowed to the tuplestore is specified
B
Bruce Momjian 已提交
18
 * in kilobytes by the caller.	We absorb tuples and simply store them in an
19
 * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
20
 * maxKBytes, we dump all the tuples into a temp file and then read from that
21
 * when needed.
22 23
 *
 * When the caller requests random access to the data, we write the temp file
24 25 26 27 28 29
 * in a format that allows either forward or backward scan.  Otherwise, only
 * forward scan is allowed.  But rewind and markpos/restorepos are allowed
 * in any case.
 *
 * Because we allow reading before writing is complete, there are two
 * interesting positions in the temp file: the current read position and
B
Bruce Momjian 已提交
30
 * the current write position.	At any given instant, the temp file's seek
31 32
 * position corresponds to one of these, and the other one is remembered in
 * the Tuplestore's state.
33 34
 *
 *
35
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
36 37 38
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
39
 *	  $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.28 2006/06/27 02:51:39 tgl Exp $
40 41 42 43 44 45 46 47
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/heapam.h"
#include "storage/buffile.h"
48
#include "utils/memutils.h"
49 50
#include "utils/tuplestore.h"

51

52
/*
B
Bruce Momjian 已提交
53
 * Possible states of a Tuplestore object.	These denote the states that
54 55 56 57
 * persist between calls of Tuplestore routines.
 */
typedef enum
{
58 59 60
	TSS_INMEM,					/* Tuples still fit in memory */
	TSS_WRITEFILE,				/* Writing to temp file */
	TSS_READFILE				/* Reading from temp file */
61 62 63 64 65 66 67 68 69
} TupStoreStatus;

/*
 * Private state of a Tuplestore operation.
 */
struct Tuplestorestate
{
	TupStoreStatus status;		/* enumerated value as shown above */
	bool		randomAccess;	/* did caller request random access? */
70
	bool		interXact;		/* keep open through transactions? */
71 72 73 74
	long		availMem;		/* remaining memory available, in bytes */
	BufFile    *myfile;			/* underlying file, or NULL if none */

	/*
B
Bruce Momjian 已提交
75 76 77
	 * These function pointers decouple the routines that must know what kind
	 * of tuple we are handling from the routines that don't need to know it.
	 * They are set up by the tuplestore_begin_xxx routines.
78
	 *
B
Bruce Momjian 已提交
79 80 81
	 * (Although tuplestore.c currently only supports heap tuples, I've copied
	 * this part of tuplesort.c so that extension to other kinds of objects
	 * will be easy if it's ever needed.)
82
	 *
B
Bruce Momjian 已提交
83
	 * Function to copy a supplied input tuple into palloc'd space. (NB: we
B
Bruce Momjian 已提交
84 85 86
	 * assume that a single pfree() is enough to release the tuple later, so
	 * the representation must be "flat" in one palloc chunk.) state->availMem
	 * must be decreased by the amount of space used.
87 88 89 90
	 */
	void	   *(*copytup) (Tuplestorestate *state, void *tup);

	/*
B
Bruce Momjian 已提交
91 92 93 94 95
	 * Function to write a stored tuple onto tape.	The representation of the
	 * tuple on tape need not be the same as it is in memory; requirements on
	 * the tape representation are given below.  After writing the tuple,
	 * pfree() it, and increase state->availMem by the amount of memory space
	 * thereby released.
96 97 98 99
	 */
	void		(*writetup) (Tuplestorestate *state, void *tup);

	/*
B
Bruce Momjian 已提交
100 101 102 103
	 * Function to read a stored tuple from tape back into memory. 'len' is
	 * the already-read length of the stored tuple.  Create and return a
	 * palloc'd copy, and decrease state->availMem by the amount of memory
	 * space consumed.
104 105 106 107
	 */
	void	   *(*readtup) (Tuplestorestate *state, unsigned int len);

	/*
B
Bruce Momjian 已提交
108 109
	 * This array holds pointers to tuples in memory if we are in state INMEM.
	 * In states WRITEFILE and READFILE it's not used.
110 111 112 113 114 115
	 */
	void	  **memtuples;		/* array of pointers to palloc'd tuples */
	int			memtupcount;	/* number of tuples currently present */
	int			memtupsize;		/* allocated length of memtuples array */

	/*
116 117
	 * These variables are used to keep track of the current position.
	 *
118 119 120 121 122 123
	 * In state WRITEFILE, the current file seek position is the write point,
	 * and the read position is remembered in readpos_xxx; in state READFILE,
	 * the current file seek position is the read point, and the write
	 * position is remembered in writepos_xxx.	(The write position is the
	 * same as EOF, but since BufFileSeek doesn't currently implement
	 * SEEK_END, we have to remember it explicitly.)
124
	 *
125 126 127 128
	 * Special case: if we are in WRITEFILE state and eof_reached is true,
	 * then the read position is implicitly equal to the write position (and
	 * hence to the file seek position); this way we need not update the
	 * readpos_xxx variables on each write.
129
	 */
130 131 132 133 134
	bool		eof_reached;	/* read reached EOF (always valid) */
	int			current;		/* next array index (valid if INMEM) */
	int			readpos_file;	/* file# (valid if WRITEFILE and not eof) */
	long		readpos_offset; /* offset (valid if WRITEFILE and not eof) */
	int			writepos_file;	/* file# (valid if READFILE) */
B
Bruce Momjian 已提交
135
	long		writepos_offset;	/* offset (valid if READFILE) */
136 137

	/* markpos_xxx holds marked position for mark and restore */
B
Bruce Momjian 已提交
138
	int			markpos_current;	/* saved "current" */
139 140
	int			markpos_file;	/* saved "readpos_file" */
	long		markpos_offset; /* saved "readpos_offset" */
141 142 143
};

#define COPYTUP(state,tup)	((*(state)->copytup) (state, tup))
B
Bruce Momjian 已提交
144
#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
145 146 147 148 149 150 151 152 153 154
#define READTUP(state,len)	((*(state)->readtup) (state, len))
#define LACKMEM(state)		((state)->availMem < 0)
#define USEMEM(state,amt)	((state)->availMem -= (amt))
#define FREEMEM(state,amt)	((state)->availMem += (amt))

/*--------------------
 *
 * NOTES about on-tape representation of tuples:
 *
 * We require the first "unsigned int" of a stored tuple to be the total size
155 156
 * on-tape of the tuple, including itself (so it is never zero).
 * The remainder of the stored tuple
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
 * may or may not match the in-memory representation of the tuple ---
 * any conversion needed is the job of the writetup and readtup routines.
 *
 * If state->randomAccess is true, then the stored representation of the
 * tuple must be followed by another "unsigned int" that is a copy of the
 * length --- so the total tape space used is actually sizeof(unsigned int)
 * more than the stored length value.  This allows read-backwards.	When
 * randomAccess is not true, the write/read routines may omit the extra
 * length word.
 *
 * writetup is expected to write both length words as well as the tuple
 * data.  When readtup is called, the tape is positioned just after the
 * front length word; readtup must read the tuple data and advance past
 * the back length word (if present).
 *
 * The write/read routines can make use of the tuple description data
B
Bruce Momjian 已提交
173
 * stored in the Tuplestorestate record, if needed. They are also expected
174 175
 * to adjust state->availMem by the amount of memory space (not tape space!)
 * released or consumed.  There is no error return from either writetup
176
 * or readtup; they should ereport() on failure.
177 178 179 180
 *
 *
 * NOTES about memory consumption calculations:
 *
181 182 183
 * We count space allocated for tuples against the maxKBytes limit,
 * plus the space used by the variable-size array memtuples.
 * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
184
 *
185 186 187 188 189
 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
 * rather than the originally-requested size.  This is important since
 * palloc can add substantial overhead.  It's not a complete answer since
 * we won't count any wasted space in palloc allocation blocks, but it's
 * a lot better than what we were doing before 7.3.
190 191 192 193 194 195
 *
 *--------------------
 */


static Tuplestorestate *tuplestore_begin_common(bool randomAccess,
B
Bruce Momjian 已提交
196 197
						bool interXact,
						int maxKBytes);
198
static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
199 200 201 202 203 204 205 206 207 208 209 210 211
static void dumptuples(Tuplestorestate *state);
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
static void *readtup_heap(Tuplestorestate *state, unsigned int len);


/*
 *		tuplestore_begin_xxx
 *
 * Initialize for a tuple store operation.
 */
static Tuplestorestate *
212
tuplestore_begin_common(bool randomAccess, bool interXact, int maxKBytes)
213 214 215
{
	Tuplestorestate *state;

216
	state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
217

218
	state->status = TSS_INMEM;
219
	state->randomAccess = randomAccess;
220
	state->interXact = interXact;
221 222 223 224
	state->availMem = maxKBytes * 1024L;
	state->myfile = NULL;

	state->memtupcount = 0;
B
Bruce Momjian 已提交
225
	state->memtupsize = 1024;	/* initial guess */
226 227
	state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));

228 229
	USEMEM(state, GetMemoryChunkSpace(state->memtuples));

230 231 232
	state->eof_reached = false;
	state->current = 0;

233 234 235
	return state;
}

236 237 238 239 240 241 242 243 244 245
/*
 * tuplestore_begin_heap
 *
 * Create a new tuplestore; other types of tuple stores (other than
 * "heap" tuple stores, for heap tuples) are possible, but not presently
 * implemented.
 *
 * randomAccess: if true, both forward and backward accesses to the
 * tuple store are allowed.
 *
246
 * interXact: if true, the files used for on-disk storage persist beyond the
B
Bruce Momjian 已提交
247
 * end of the current transaction.	NOTE: It's the caller's responsibility to
248 249 250
 * create such a tuplestore in a memory context that will also survive
 * transaction boundaries, and to ensure the tuplestore is closed when it's
 * no longer wanted.
251 252
 *
 * maxKBytes: how much data to store in memory (any data beyond this
253
 * amount is paged to disk).  When in doubt, use work_mem.
254
 */
255
Tuplestorestate *
256
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
257
{
258
	Tuplestorestate *state = tuplestore_begin_common(randomAccess,
259 260
													 interXact,
													 maxKBytes);
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289

	state->copytup = copytup_heap;
	state->writetup = writetup_heap;
	state->readtup = readtup_heap;

	return state;
}

/*
 * tuplestore_end
 *
 *	Release resources and clean up.
 */
void
tuplestore_end(Tuplestorestate *state)
{
	int			i;

	if (state->myfile)
		BufFileClose(state->myfile);
	if (state->memtuples)
	{
		for (i = 0; i < state->memtupcount; i++)
			pfree(state->memtuples[i]);
		pfree(state->memtuples);
	}
}

/*
290 291 292 293 294 295 296 297 298 299 300 301
 * tuplestore_ateof
 *
 * Returns the current eof_reached state.
 */
bool
tuplestore_ateof(Tuplestorestate *state)
{
	return state->eof_reached;
}

/*
 * Accept one tuple and append it to the tuplestore.
302 303
 *
 * Note that the input tuple is always copied; the caller need not save it.
304 305 306 307
 *
 * If the read status is currently "AT EOF" then it remains so (the read
 * pointer advances along with the write pointer); otherwise the read
 * pointer is unchanged.  This is for the convenience of nodeMaterial.c.
308 309 310
 *
 * tuplestore_puttupleslot() is a convenience routine to collect data from
 * a TupleTableSlot without an extra copy operation.
311 312
 */
void
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
tuplestore_puttupleslot(Tuplestorestate *state,
						TupleTableSlot *slot)
{
	MinimalTuple tuple;

	/*
	 * Form a MinimalTuple in working memory
	 */
	tuple = ExecCopySlotMinimalTuple(slot);
	USEMEM(state, GetMemoryChunkSpace(tuple));

	tuplestore_puttuple_common(state, (void *) tuple);
}

/*
 * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
 * deprecated, but not worth getting rid of in view of the number of callers.
 * (Consider adding something that takes a tupdesc+values/nulls arrays so
 * that we can use heap_form_minimal_tuple() and avoid a copy step.)
 */
void
tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
335 336
{
	/*
B
Bruce Momjian 已提交
337
	 * Copy the tuple.	(Must do this even in WRITEFILE case.)
338 339 340
	 */
	tuple = COPYTUP(state, tuple);

341 342 343 344 345 346
	tuplestore_puttuple_common(state, (void *) tuple);
}

static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
347 348
	switch (state->status)
	{
349
		case TSS_INMEM:
350 351 352 353 354 355 356
			/*
			 * Grow the array as needed.  Note that we try to grow the array
			 * when there is still one free slot remaining --- if we fail,
			 * there'll still be room to store the incoming tuple, and then
			 * we'll switch to tape-based operation.
			 */
			if (state->memtupcount >= state->memtupsize - 1)
357
			{
358 359 360 361 362 363 364 365 366 367 368 369 370 371
				/*
				 * See grow_memtuples() in tuplesort.c for the rationale
				 * behind these two tests.
				 */
				if (state->availMem > (long) (state->memtupsize * sizeof(void *)) &&
					(Size) (state->memtupsize * 2) < MaxAllocSize / sizeof(void *))
				{
					FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
					state->memtupsize *= 2;
					state->memtuples = (void **)
						repalloc(state->memtuples,
								 state->memtupsize * sizeof(void *));
					USEMEM(state, GetMemoryChunkSpace(state->memtuples));
				}
372
			}
373 374

			/* Stash the tuple in the in-memory array */
375 376
			state->memtuples[state->memtupcount++] = tuple;

377 378 379 380
			/* If eof_reached, keep read position in sync */
			if (state->eof_reached)
				state->current = state->memtupcount;

381
			/*
382
			 * Done if we still fit in available memory and have array slots.
383
			 */
384
			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
385 386 387 388 389
				return;

			/*
			 * Nope; time to switch to tape-based operation.
			 */
390
			state->myfile = BufFileCreateTemp(state->interXact);
391 392 393 394 395 396
			state->status = TSS_WRITEFILE;
			dumptuples(state);
			break;
		case TSS_WRITEFILE:
			WRITETUP(state, tuple);
			break;
397
		case TSS_READFILE:
B
Bruce Momjian 已提交
398

399
			/*
400
			 * Switch from reading to writing.
401
			 */
402 403 404 405 406 407
			if (!state->eof_reached)
				BufFileTell(state->myfile,
							&state->readpos_file, &state->readpos_offset);
			if (BufFileSeek(state->myfile,
							state->writepos_file, state->writepos_offset,
							SEEK_SET) != 0)
408
				elog(ERROR, "seek to EOF failed");
409 410
			state->status = TSS_WRITEFILE;
			WRITETUP(state, tuple);
411 412
			break;
		default:
413
			elog(ERROR, "invalid tuplestore state");
414 415 416 417 418 419 420 421 422
			break;
	}
}

/*
 * Fetch the next tuple in either forward or back direction.
 * Returns NULL if no more tuples.	If should_free is set, the
 * caller must pfree the returned tuple when done with it.
 */
423
static void *
424
tuplestore_gettuple(Tuplestorestate *state, bool forward,
B
Bruce Momjian 已提交
425
					bool *should_free)
426 427 428 429
{
	unsigned int tuplen;
	void	   *tup;

430 431
	Assert(forward || state->randomAccess);

432 433
	switch (state->status)
	{
434
		case TSS_INMEM:
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
			*should_free = false;
			if (forward)
			{
				if (state->current < state->memtupcount)
					return state->memtuples[state->current++];
				state->eof_reached = true;
				return NULL;
			}
			else
			{
				if (state->current <= 0)
					return NULL;

				/*
				 * if all tuples are fetched already then we return last
				 * tuple, else - tuple before last returned.
				 */
				if (state->eof_reached)
					state->eof_reached = false;
				else
				{
					state->current--;	/* last returned tuple */
					if (state->current <= 0)
						return NULL;
				}
				return state->memtuples[state->current - 1];
			}
			break;

464 465 466 467
		case TSS_WRITEFILE:
			/* Skip state change if we'll just return NULL */
			if (state->eof_reached && forward)
				return NULL;
B
Bruce Momjian 已提交
468

469 470 471 472 473 474 475
			/*
			 * Switch from writing to reading.
			 */
			BufFileTell(state->myfile,
						&state->writepos_file, &state->writepos_offset);
			if (!state->eof_reached)
				if (BufFileSeek(state->myfile,
B
Bruce Momjian 已提交
476
								state->readpos_file, state->readpos_offset,
477
								SEEK_SET) != 0)
478
					elog(ERROR, "seek failed");
479 480 481
			state->status = TSS_READFILE;
			/* FALL THRU into READFILE case */

482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
		case TSS_READFILE:
			*should_free = true;
			if (forward)
			{
				if ((tuplen = getlen(state, true)) != 0)
				{
					tup = READTUP(state, tuplen);
					return tup;
				}
				else
				{
					state->eof_reached = true;
					return NULL;
				}
			}

			/*
			 * Backward.
			 *
501 502
			 * if all tuples are fetched already then we return last tuple,
			 * else - tuple before last returned.
503
			 *
504 505
			 * Back up to fetch previously-returned tuple's ending length
			 * word. If seek fails, assume we are at start of file.
506
			 */
507 508 509 510 511
			if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
							SEEK_CUR) != 0)
				return NULL;
			tuplen = getlen(state, false);

512 513 514
			if (state->eof_reached)
			{
				state->eof_reached = false;
515
				/* We will return the tuple returned before returning NULL */
516 517 518 519 520 521 522
			}
			else
			{
				/*
				 * Back up to get ending length word of tuple before it.
				 */
				if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
523
								-(long) (tuplen + 2 * sizeof(unsigned int)),
524 525 526
								SEEK_CUR) != 0)
				{
					/*
B
Bruce Momjian 已提交
527 528 529 530
					 * If that fails, presumably the prev tuple is the first
					 * in the file.  Back up so that it becomes next to read
					 * in forward direction (not obviously right, but that is
					 * what in-memory case does).
531 532
					 */
					if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
533
									-(long) (tuplen + sizeof(unsigned int)),
534
									SEEK_CUR) != 0)
535
						elog(ERROR, "bogus tuple length in backward scan");
536 537
					return NULL;
				}
538
				tuplen = getlen(state, false);
539 540 541
			}

			/*
B
Bruce Momjian 已提交
542 543 544
			 * Now we have the length of the prior tuple, back up and read it.
			 * Note: READTUP expects we are positioned after the initial
			 * length word of the tuple, so back up to that point.
545 546
			 */
			if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
547
							-(long) tuplen,
548
							SEEK_CUR) != 0)
549
				elog(ERROR, "bogus tuple length in backward scan");
550 551 552 553
			tup = READTUP(state, tuplen);
			return tup;

		default:
554
			elog(ERROR, "invalid tuplestore state");
555 556 557 558
			return NULL;		/* keep compiler quiet */
	}
}

559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
/*
 * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
 *
 * If successful, put tuple in slot and return TRUE; else, clear the slot
 * and return FALSE.
 */
bool
tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
						TupleTableSlot *slot)
{
	MinimalTuple tuple;
	bool		should_free;

	tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);

	if (tuple)
	{
		ExecStoreMinimalTuple(tuple, slot, should_free);
		return true;
	}
	else
	{
		ExecClearTuple(slot);
		return false;
	}
}

/*
 * tuplestore_advance - exported function to adjust position without fetching
 *
 * We could optimize this case to avoid palloc/pfree overhead, but for the
 * moment it doesn't seem worthwhile.
 */
bool
tuplestore_advance(Tuplestorestate *state, bool forward)
{
	void	   *tuple;
	bool		should_free;

	tuple = tuplestore_gettuple(state, forward, &should_free);

	if (tuple)
	{
		if (should_free)
			pfree(tuple);
		return true;
	}
	else
	{
		return false;
	}
}

612 613
/*
 * dumptuples - remove tuples from memory and write to tape
614 615 616 617
 *
 * As a side effect, we must set readpos and markpos to the value
 * corresponding to "current"; otherwise, a dump would lose the current read
 * position.
618 619 620 621 622 623
 */
static void
dumptuples(Tuplestorestate *state)
{
	int			i;

B
Bruce Momjian 已提交
624
	for (i = 0;; i++)
625 626 627 628 629 630 631 632 633
	{
		if (i == state->current)
			BufFileTell(state->myfile,
						&state->readpos_file, &state->readpos_offset);
		if (i == state->markpos_current)
			BufFileTell(state->myfile,
						&state->markpos_file, &state->markpos_offset);
		if (i >= state->memtupcount)
			break;
634
		WRITETUP(state, state->memtuples[i]);
635
	}
636 637 638 639 640 641 642 643 644 645 646
	state->memtupcount = 0;
}

/*
 * tuplestore_rescan		- rewind and replay the scan
 */
void
tuplestore_rescan(Tuplestorestate *state)
{
	switch (state->status)
	{
647 648
		case TSS_INMEM:
			state->eof_reached = false;
649
			state->current = 0;
650 651
			break;
		case TSS_WRITEFILE:
652
			state->eof_reached = false;
653 654
			state->readpos_file = 0;
			state->readpos_offset = 0L;
655 656
			break;
		case TSS_READFILE:
657
			state->eof_reached = false;
658
			if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
659
				elog(ERROR, "seek to start failed");
660 661
			break;
		default:
662
			elog(ERROR, "invalid tuplestore state");
663 664 665 666 667 668 669 670 671 672 673 674
			break;
	}
}

/*
 * tuplestore_markpos	- saves current position in the tuple sequence
 */
void
tuplestore_markpos(Tuplestorestate *state)
{
	switch (state->status)
	{
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
		case TSS_INMEM:
			state->markpos_current = state->current;
			break;
		case TSS_WRITEFILE:
			if (state->eof_reached)
			{
				/* Need to record the implicit read position */
				BufFileTell(state->myfile,
							&state->markpos_file,
							&state->markpos_offset);
			}
			else
			{
				state->markpos_file = state->readpos_file;
				state->markpos_offset = state->readpos_offset;
			}
691 692 693 694 695 696 697
			break;
		case TSS_READFILE:
			BufFileTell(state->myfile,
						&state->markpos_file,
						&state->markpos_offset);
			break;
		default:
698
			elog(ERROR, "invalid tuplestore state");
699 700 701 702 703 704 705 706 707 708 709 710 711
			break;
	}
}

/*
 * tuplestore_restorepos - restores current position in tuple sequence to
 *						  last saved position
 */
void
tuplestore_restorepos(Tuplestorestate *state)
{
	switch (state->status)
	{
712 713 714 715 716 717 718 719
		case TSS_INMEM:
			state->eof_reached = false;
			state->current = state->markpos_current;
			break;
		case TSS_WRITEFILE:
			state->eof_reached = false;
			state->readpos_file = state->markpos_file;
			state->readpos_offset = state->markpos_offset;
720 721
			break;
		case TSS_READFILE:
722
			state->eof_reached = false;
723 724 725 726 727 728 729
			if (BufFileSeek(state->myfile,
							state->markpos_file,
							state->markpos_offset,
							SEEK_SET) != 0)
				elog(ERROR, "tuplestore_restorepos failed");
			break;
		default:
730
			elog(ERROR, "invalid tuplestore state");
731 732 733 734 735 736 737 738 739 740 741 742 743
			break;
	}
}


/*
 * Tape interface routines
 */

static unsigned int
getlen(Tuplestorestate *state, bool eofOK)
{
	unsigned int len;
744
	size_t		nbytes;
745

746 747 748 749
	nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
	if (nbytes == sizeof(len))
		return len;
	if (nbytes != 0)
750
		elog(ERROR, "unexpected end of tape");
751
	if (!eofOK)
752
		elog(ERROR, "unexpected end of data");
753
	return 0;
754 755 756 757 758
}


/*
 * Routines specialized for HeapTuple case
759 760 761 762 763 764
 *
 * The stored form is actually a MinimalTuple, but for largely historical
 * reasons we allow COPYTUP to work from a HeapTuple.
 *
 * Since MinimalTuple already has length in its first word, we don't need
 * to write that separately.
765 766 767 768 769
 */

static void *
copytup_heap(Tuplestorestate *state, void *tup)
{
770
	MinimalTuple tuple;
771

772
	tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
773 774
	USEMEM(state, GetMemoryChunkSpace(tuple));
	return (void *) tuple;
775 776 777 778 779
}

static void
writetup_heap(Tuplestorestate *state, void *tup)
{
780 781
	MinimalTuple tuple = (MinimalTuple) tup;
	unsigned int tuplen = tuple->t_len;
782

783
	if (BufFileWrite(state->myfile, (void *) tuple, tuplen) != (size_t) tuplen)
784
		elog(ERROR, "write failed");
785 786 787
	if (state->randomAccess)	/* need trailing length word? */
		if (BufFileWrite(state->myfile, (void *) &tuplen,
						 sizeof(tuplen)) != sizeof(tuplen))
788
			elog(ERROR, "write failed");
789

790
	FREEMEM(state, GetMemoryChunkSpace(tuple));
791
	heap_free_minimal_tuple(tuple);
792 793 794 795 796
}

static void *
readtup_heap(Tuplestorestate *state, unsigned int len)
{
797 798
	MinimalTuple tuple = (MinimalTuple) palloc(len);
	unsigned int tuplen;
799

800
	USEMEM(state, GetMemoryChunkSpace(tuple));
801
	/* read in the tuple proper */
802 803 804
	tuple->t_len = len;
	if (BufFileRead(state->myfile, (void *) ((char *) tuple + sizeof(int)),
					len - sizeof(int)) != (size_t) (len - sizeof(int)))
805
		elog(ERROR, "unexpected end of data");
806 807 808
	if (state->randomAccess)	/* need trailing length word? */
		if (BufFileRead(state->myfile, (void *) &tuplen,
						sizeof(tuplen)) != sizeof(tuplen))
809
			elog(ERROR, "unexpected end of data");
810 811
	return (void *) tuple;
}