tuplestore.c 27.5 KB
Newer Older
1 2 3 4 5 6 7 8
/*-------------------------------------------------------------------------
 *
 * tuplestore.c
 *	  Generalized routines for temporary tuple storage.
 *
 * This module handles temporary storage of tuples for purposes such
 * as Materialize nodes, hashjoin batch files, etc.  It is essentially
 * a dumbed-down version of tuplesort.c; it does no sorting of tuples
9 10
 * but can only store and regurgitate a sequence of tuples.  However,
 * because no sort is required, it is allowed to start reading the sequence
B
Bruce Momjian 已提交
11
 * before it has all been written.	This is particularly useful for cursors,
12 13
 * because it allows random access within the already-scanned portion of
 * a query without having to process the underlying scan to completion.
14 15 16 17
 * A temporary file is used to handle the data if it exceeds the
 * space limit specified by the caller.
 *
 * The (approximate) amount of memory allowed to the tuplestore is specified
B
Bruce Momjian 已提交
18
 * in kilobytes by the caller.	We absorb tuples and simply store them in an
19
 * in-memory array as long as we haven't exceeded maxKBytes.  If we do exceed
20
 * maxKBytes, we dump all the tuples into a temp file and then read from that
21
 * when needed.
22
 *
23
 * When the caller requests backward-scan capability, we write the temp file
24
 * in a format that allows either forward or backward scan.  Otherwise, only
25 26 27 28
 * forward scan is allowed.  Rewind and markpos/restorepos are normally allowed
 * but can be turned off via tuplestore_set_eflags; turning off both backward
 * scan and rewind enables truncation of the tuplestore at the mark point
 * (if any) for minimal memory usage.
29 30 31
 *
 * Because we allow reading before writing is complete, there are two
 * interesting positions in the temp file: the current read position and
B
Bruce Momjian 已提交
32
 * the current write position.	At any given instant, the temp file's seek
33 34
 * position corresponds to one of these, and the other one is remembered in
 * the Tuplestore's state.
35 36
 *
 *
37
 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
38 39 40
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
41
 *	  $PostgreSQL: pgsql/src/backend/utils/sort/tuplestore.c,v 1.36.2.1 2009/12/29 17:41:18 heikki Exp $
42 43 44 45 46 47 48
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/heapam.h"
49
#include "commands/tablespace.h"
50
#include "executor/executor.h"
51
#include "storage/buffile.h"
52
#include "utils/memutils.h"
53
#include "utils/resowner.h"
54 55
#include "utils/tuplestore.h"

56

57
/*
B
Bruce Momjian 已提交
58
 * Possible states of a Tuplestore object.	These denote the states that
59 60 61 62
 * persist between calls of Tuplestore routines.
 */
typedef enum
{
63 64 65
	TSS_INMEM,					/* Tuples still fit in memory */
	TSS_WRITEFILE,				/* Writing to temp file */
	TSS_READFILE				/* Reading from temp file */
66 67 68 69 70 71 72 73
} TupStoreStatus;

/*
 * Private state of a Tuplestore operation.
 */
struct Tuplestorestate
{
	TupStoreStatus status;		/* enumerated value as shown above */
74
	int			eflags;			/* capability flags */
75
	bool		interXact;		/* keep open through transactions? */
76 77
	long		availMem;		/* remaining memory available, in bytes */
	BufFile    *myfile;			/* underlying file, or NULL if none */
78 79
	MemoryContext context;		/* memory context for holding tuples */
	ResourceOwner resowner;		/* resowner for holding temp files */
80 81

	/*
B
Bruce Momjian 已提交
82 83 84
	 * These function pointers decouple the routines that must know what kind
	 * of tuple we are handling from the routines that don't need to know it.
	 * They are set up by the tuplestore_begin_xxx routines.
85
	 *
B
Bruce Momjian 已提交
86 87 88
	 * (Although tuplestore.c currently only supports heap tuples, I've copied
	 * this part of tuplesort.c so that extension to other kinds of objects
	 * will be easy if it's ever needed.)
89
	 *
B
Bruce Momjian 已提交
90
	 * Function to copy a supplied input tuple into palloc'd space. (NB: we
B
Bruce Momjian 已提交
91 92 93
	 * assume that a single pfree() is enough to release the tuple later, so
	 * the representation must be "flat" in one palloc chunk.) state->availMem
	 * must be decreased by the amount of space used.
94 95 96 97
	 */
	void	   *(*copytup) (Tuplestorestate *state, void *tup);

	/*
B
Bruce Momjian 已提交
98 99 100 101 102
	 * Function to write a stored tuple onto tape.	The representation of the
	 * tuple on tape need not be the same as it is in memory; requirements on
	 * the tape representation are given below.  After writing the tuple,
	 * pfree() it, and increase state->availMem by the amount of memory space
	 * thereby released.
103 104 105 106
	 */
	void		(*writetup) (Tuplestorestate *state, void *tup);

	/*
B
Bruce Momjian 已提交
107 108 109 110
	 * Function to read a stored tuple from tape back into memory. 'len' is
	 * the already-read length of the stored tuple.  Create and return a
	 * palloc'd copy, and decrease state->availMem by the amount of memory
	 * space consumed.
111 112 113 114
	 */
	void	   *(*readtup) (Tuplestorestate *state, unsigned int len);

	/*
B
Bruce Momjian 已提交
115 116
	 * This array holds pointers to tuples in memory if we are in state INMEM.
	 * In states WRITEFILE and READFILE it's not used.
117 118 119 120 121 122
	 */
	void	  **memtuples;		/* array of pointers to palloc'd tuples */
	int			memtupcount;	/* number of tuples currently present */
	int			memtupsize;		/* allocated length of memtuples array */

	/*
123 124
	 * These variables are used to keep track of the current position.
	 *
125 126 127 128 129 130
	 * In state WRITEFILE, the current file seek position is the write point,
	 * and the read position is remembered in readpos_xxx; in state READFILE,
	 * the current file seek position is the read point, and the write
	 * position is remembered in writepos_xxx.	(The write position is the
	 * same as EOF, but since BufFileSeek doesn't currently implement
	 * SEEK_END, we have to remember it explicitly.)
131
	 *
132 133 134 135
	 * Special case: if we are in WRITEFILE state and eof_reached is true,
	 * then the read position is implicitly equal to the write position (and
	 * hence to the file seek position); this way we need not update the
	 * readpos_xxx variables on each write.
136
	 */
137 138 139 140 141
	bool		eof_reached;	/* read reached EOF (always valid) */
	int			current;		/* next array index (valid if INMEM) */
	int			readpos_file;	/* file# (valid if WRITEFILE and not eof) */
	long		readpos_offset; /* offset (valid if WRITEFILE and not eof) */
	int			writepos_file;	/* file# (valid if READFILE) */
B
Bruce Momjian 已提交
142
	long		writepos_offset;	/* offset (valid if READFILE) */
143 144

	/* markpos_xxx holds marked position for mark and restore */
B
Bruce Momjian 已提交
145
	int			markpos_current;	/* saved "current" */
146 147
	int			markpos_file;	/* saved "readpos_file" */
	long		markpos_offset; /* saved "readpos_offset" */
148 149 150
};

#define COPYTUP(state,tup)	((*(state)->copytup) (state, tup))
B
Bruce Momjian 已提交
151
#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
152 153 154 155 156 157 158 159 160 161
#define READTUP(state,len)	((*(state)->readtup) (state, len))
#define LACKMEM(state)		((state)->availMem < 0)
#define USEMEM(state,amt)	((state)->availMem -= (amt))
#define FREEMEM(state,amt)	((state)->availMem += (amt))

/*--------------------
 *
 * NOTES about on-tape representation of tuples:
 *
 * We require the first "unsigned int" of a stored tuple to be the total size
162 163
 * on-tape of the tuple, including itself (so it is never zero).
 * The remainder of the stored tuple
164 165 166
 * may or may not match the in-memory representation of the tuple ---
 * any conversion needed is the job of the writetup and readtup routines.
 *
167 168
 * If state->eflags & EXEC_FLAG_BACKWARD, then the stored representation of
 * the tuple must be followed by another "unsigned int" that is a copy of the
169 170
 * length --- so the total tape space used is actually sizeof(unsigned int)
 * more than the stored length value.  This allows read-backwards.	When
171
 * EXEC_FLAG_BACKWARD is not set, the write/read routines may omit the extra
172 173 174 175 176 177 178 179
 * length word.
 *
 * writetup is expected to write both length words as well as the tuple
 * data.  When readtup is called, the tape is positioned just after the
 * front length word; readtup must read the tuple data and advance past
 * the back length word (if present).
 *
 * The write/read routines can make use of the tuple description data
B
Bruce Momjian 已提交
180
 * stored in the Tuplestorestate record, if needed. They are also expected
181 182
 * to adjust state->availMem by the amount of memory space (not tape space!)
 * released or consumed.  There is no error return from either writetup
183
 * or readtup; they should ereport() on failure.
184 185 186 187
 *
 *
 * NOTES about memory consumption calculations:
 *
188 189 190
 * We count space allocated for tuples against the maxKBytes limit,
 * plus the space used by the variable-size array memtuples.
 * Fixed-size space (primarily the BufFile I/O buffer) is not counted.
191
 *
192 193 194 195 196
 * Note that we count actual space used (as shown by GetMemoryChunkSpace)
 * rather than the originally-requested size.  This is important since
 * palloc can add substantial overhead.  It's not a complete answer since
 * we won't count any wasted space in palloc allocation blocks, but it's
 * a lot better than what we were doing before 7.3.
197 198 199 200 201
 *
 *--------------------
 */


202
static Tuplestorestate *tuplestore_begin_common(int eflags,
B
Bruce Momjian 已提交
203 204
						bool interXact,
						int maxKBytes);
205
static void tuplestore_puttuple_common(Tuplestorestate *state, void *tuple);
206
static void dumptuples(Tuplestorestate *state);
207
static void tuplestore_trim(Tuplestorestate *state, int ntuples);
208 209 210 211 212 213 214 215 216 217 218 219
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
static void *readtup_heap(Tuplestorestate *state, unsigned int len);


/*
 *		tuplestore_begin_xxx
 *
 * Initialize for a tuple store operation.
 */
static Tuplestorestate *
220
tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
221 222 223
{
	Tuplestorestate *state;

224
	state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
225

226
	state->status = TSS_INMEM;
227
	state->eflags = eflags;
228
	state->interXact = interXact;
229 230
	state->availMem = maxKBytes * 1024L;
	state->myfile = NULL;
231 232
	state->context = CurrentMemoryContext;
	state->resowner = CurrentResourceOwner;
233 234

	state->memtupcount = 0;
B
Bruce Momjian 已提交
235
	state->memtupsize = 1024;	/* initial guess */
236 237
	state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));

238 239
	USEMEM(state, GetMemoryChunkSpace(state->memtuples));

240 241 242
	state->eof_reached = false;
	state->current = 0;

243 244 245
	return state;
}

246 247 248 249 250 251 252 253 254 255
/*
 * tuplestore_begin_heap
 *
 * Create a new tuplestore; other types of tuple stores (other than
 * "heap" tuple stores, for heap tuples) are possible, but not presently
 * implemented.
 *
 * randomAccess: if true, both forward and backward accesses to the
 * tuple store are allowed.
 *
256
 * interXact: if true, the files used for on-disk storage persist beyond the
B
Bruce Momjian 已提交
257
 * end of the current transaction.	NOTE: It's the caller's responsibility to
258 259 260
 * create such a tuplestore in a memory context and resource owner that will
 * also survive transaction boundaries, and to ensure the tuplestore is closed
 * when it's no longer wanted.
261 262
 *
 * maxKBytes: how much data to store in memory (any data beyond this
263
 * amount is paged to disk).  When in doubt, use work_mem.
264
 */
265
Tuplestorestate *
266
tuplestore_begin_heap(bool randomAccess, bool interXact, int maxKBytes)
267
{
268
	Tuplestorestate *state;
B
Bruce Momjian 已提交
269
	int			eflags;
270 271

	/*
B
Bruce Momjian 已提交
272 273
	 * This interpretation of the meaning of randomAccess is compatible with
	 * the pre-8.3 behavior of tuplestores.
274 275 276 277 278 279
	 */
	eflags = randomAccess ?
		(EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND | EXEC_FLAG_MARK) :
		(EXEC_FLAG_REWIND | EXEC_FLAG_MARK);

	state = tuplestore_begin_common(eflags, interXact, maxKBytes);
280 281 282 283 284 285 286 287

	state->copytup = copytup_heap;
	state->writetup = writetup_heap;
	state->readtup = readtup_heap;

	return state;
}

288 289 290 291 292 293 294 295
/*
 * tuplestore_set_eflags
 *
 * Set capability flags at a finer grain than is allowed by
 * tuplestore_begin_xxx.  This must be called before inserting any data
 * into the tuplestore.
 *
 * eflags is a bitmask following the meanings used for executor node
B
Bruce Momjian 已提交
296
 * startup flags (see executor.h).	tuplestore pays attention to these bits:
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
 *		EXEC_FLAG_REWIND		need rewind to start
 *		EXEC_FLAG_BACKWARD		need backward fetch
 *		EXEC_FLAG_MARK			need mark/restore
 * If tuplestore_set_eflags is not called, REWIND and MARK are allowed,
 * and BACKWARD is set per "randomAccess" in the tuplestore_begin_xxx call.
 */
void
tuplestore_set_eflags(Tuplestorestate *state, int eflags)
{
	Assert(state->status == TSS_INMEM);
	Assert(state->memtupcount == 0);

	state->eflags = eflags;
}

312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
/*
 * tuplestore_end
 *
 *	Release resources and clean up.
 */
void
tuplestore_end(Tuplestorestate *state)
{
	int			i;

	if (state->myfile)
		BufFileClose(state->myfile);
	if (state->memtuples)
	{
		for (i = 0; i < state->memtupcount; i++)
			pfree(state->memtuples[i]);
		pfree(state->memtuples);
	}
330
	pfree(state);
331 332 333
}

/*
334 335 336 337 338 339 340 341 342 343 344 345
 * tuplestore_ateof
 *
 * Returns the current eof_reached state.
 */
bool
tuplestore_ateof(Tuplestorestate *state)
{
	return state->eof_reached;
}

/*
 * Accept one tuple and append it to the tuplestore.
346 347
 *
 * Note that the input tuple is always copied; the caller need not save it.
348 349 350 351
 *
 * If the read status is currently "AT EOF" then it remains so (the read
 * pointer advances along with the write pointer); otherwise the read
 * pointer is unchanged.  This is for the convenience of nodeMaterial.c.
352 353 354
 *
 * tuplestore_puttupleslot() is a convenience routine to collect data from
 * a TupleTableSlot without an extra copy operation.
355 356
 */
void
357 358 359 360
tuplestore_puttupleslot(Tuplestorestate *state,
						TupleTableSlot *slot)
{
	MinimalTuple tuple;
361
	MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
362 363 364 365 366 367 368 369

	/*
	 * Form a MinimalTuple in working memory
	 */
	tuple = ExecCopySlotMinimalTuple(slot);
	USEMEM(state, GetMemoryChunkSpace(tuple));

	tuplestore_puttuple_common(state, (void *) tuple);
370 371

	MemoryContextSwitchTo(oldcxt);
372 373 374 375 376 377 378 379 380 381
}

/*
 * "Standard" case to copy from a HeapTuple.  This is actually now somewhat
 * deprecated, but not worth getting rid of in view of the number of callers.
 * (Consider adding something that takes a tupdesc+values/nulls arrays so
 * that we can use heap_form_minimal_tuple() and avoid a copy step.)
 */
void
tuplestore_puttuple(Tuplestorestate *state, HeapTuple tuple)
382
{
383 384
	MemoryContext oldcxt = MemoryContextSwitchTo(state->context);

385
	/*
B
Bruce Momjian 已提交
386
	 * Copy the tuple.	(Must do this even in WRITEFILE case.)
387 388 389
	 */
	tuple = COPYTUP(state, tuple);

390
	tuplestore_puttuple_common(state, (void *) tuple);
391 392

	MemoryContextSwitchTo(oldcxt);
393 394 395 396 397
}

static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
398 399
	ResourceOwner oldowner;

400 401
	switch (state->status)
	{
402
		case TSS_INMEM:
B
Bruce Momjian 已提交
403

404 405 406 407 408 409 410
			/*
			 * Grow the array as needed.  Note that we try to grow the array
			 * when there is still one free slot remaining --- if we fail,
			 * there'll still be room to store the incoming tuple, and then
			 * we'll switch to tape-based operation.
			 */
			if (state->memtupcount >= state->memtupsize - 1)
411
			{
412 413 414 415 416 417 418 419 420 421 422 423 424 425
				/*
				 * See grow_memtuples() in tuplesort.c for the rationale
				 * behind these two tests.
				 */
				if (state->availMem > (long) (state->memtupsize * sizeof(void *)) &&
					(Size) (state->memtupsize * 2) < MaxAllocSize / sizeof(void *))
				{
					FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
					state->memtupsize *= 2;
					state->memtuples = (void **)
						repalloc(state->memtuples,
								 state->memtupsize * sizeof(void *));
					USEMEM(state, GetMemoryChunkSpace(state->memtuples));
				}
426
			}
427 428

			/* Stash the tuple in the in-memory array */
429 430
			state->memtuples[state->memtupcount++] = tuple;

431 432 433 434
			/* If eof_reached, keep read position in sync */
			if (state->eof_reached)
				state->current = state->memtupcount;

435
			/*
436
			 * Done if we still fit in available memory and have array slots.
437
			 */
438
			if (state->memtupcount < state->memtupsize && !LACKMEM(state))
439 440 441
				return;

			/*
442 443
			 * Nope; time to switch to tape-based operation.  Make sure that
			 * the temp file(s) are created in suitable temp tablespaces.
444
			 */
445
			PrepareTempTablespaces();
446 447 448 449 450

			/* associate the file with the store's resource owner */
			oldowner = CurrentResourceOwner;
			CurrentResourceOwner = state->resowner;

451
			state->myfile = BufFileCreateTemp(state->interXact);
452 453 454

			CurrentResourceOwner = oldowner;

455 456 457 458 459 460
			state->status = TSS_WRITEFILE;
			dumptuples(state);
			break;
		case TSS_WRITEFILE:
			WRITETUP(state, tuple);
			break;
461
		case TSS_READFILE:
B
Bruce Momjian 已提交
462

463
			/*
464
			 * Switch from reading to writing.
465
			 */
466 467 468 469 470 471
			if (!state->eof_reached)
				BufFileTell(state->myfile,
							&state->readpos_file, &state->readpos_offset);
			if (BufFileSeek(state->myfile,
							state->writepos_file, state->writepos_offset,
							SEEK_SET) != 0)
472
				elog(ERROR, "seek to EOF failed");
473 474
			state->status = TSS_WRITEFILE;
			WRITETUP(state, tuple);
475 476
			break;
		default:
477
			elog(ERROR, "invalid tuplestore state");
478 479 480 481 482 483 484 485
			break;
	}
}

/*
 * Fetch the next tuple in either forward or back direction.
 * Returns NULL if no more tuples.	If should_free is set, the
 * caller must pfree the returned tuple when done with it.
486 487 488
 *
 * Backward scan is only allowed if randomAccess was set true or
 * EXEC_FLAG_BACKWARD was specified to tuplestore_set_eflags().
489
 */
490
static void *
491
tuplestore_gettuple(Tuplestorestate *state, bool forward,
B
Bruce Momjian 已提交
492
					bool *should_free)
493 494 495 496
{
	unsigned int tuplen;
	void	   *tup;

497
	Assert(forward || (state->eflags & EXEC_FLAG_BACKWARD));
498

499 500
	switch (state->status)
	{
501
		case TSS_INMEM:
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
			*should_free = false;
			if (forward)
			{
				if (state->current < state->memtupcount)
					return state->memtuples[state->current++];
				state->eof_reached = true;
				return NULL;
			}
			else
			{
				if (state->current <= 0)
					return NULL;

				/*
				 * if all tuples are fetched already then we return last
				 * tuple, else - tuple before last returned.
				 */
				if (state->eof_reached)
					state->eof_reached = false;
				else
				{
					state->current--;	/* last returned tuple */
					if (state->current <= 0)
						return NULL;
				}
				return state->memtuples[state->current - 1];
			}
			break;

531 532 533 534
		case TSS_WRITEFILE:
			/* Skip state change if we'll just return NULL */
			if (state->eof_reached && forward)
				return NULL;
B
Bruce Momjian 已提交
535

536 537 538 539 540 541 542
			/*
			 * Switch from writing to reading.
			 */
			BufFileTell(state->myfile,
						&state->writepos_file, &state->writepos_offset);
			if (!state->eof_reached)
				if (BufFileSeek(state->myfile,
B
Bruce Momjian 已提交
543
								state->readpos_file, state->readpos_offset,
544
								SEEK_SET) != 0)
545
					elog(ERROR, "seek failed");
546 547 548
			state->status = TSS_READFILE;
			/* FALL THRU into READFILE case */

549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
		case TSS_READFILE:
			*should_free = true;
			if (forward)
			{
				if ((tuplen = getlen(state, true)) != 0)
				{
					tup = READTUP(state, tuplen);
					return tup;
				}
				else
				{
					state->eof_reached = true;
					return NULL;
				}
			}

			/*
			 * Backward.
			 *
568 569
			 * if all tuples are fetched already then we return last tuple,
			 * else - tuple before last returned.
570
			 *
571 572
			 * Back up to fetch previously-returned tuple's ending length
			 * word. If seek fails, assume we are at start of file.
573
			 */
574 575 576 577 578
			if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
							SEEK_CUR) != 0)
				return NULL;
			tuplen = getlen(state, false);

579 580 581
			if (state->eof_reached)
			{
				state->eof_reached = false;
582
				/* We will return the tuple returned before returning NULL */
583 584 585 586 587 588 589
			}
			else
			{
				/*
				 * Back up to get ending length word of tuple before it.
				 */
				if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
590
								-(long) (tuplen + 2 * sizeof(unsigned int)),
591 592 593
								SEEK_CUR) != 0)
				{
					/*
B
Bruce Momjian 已提交
594 595 596 597
					 * If that fails, presumably the prev tuple is the first
					 * in the file.  Back up so that it becomes next to read
					 * in forward direction (not obviously right, but that is
					 * what in-memory case does).
598 599
					 */
					if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
600
									-(long) (tuplen + sizeof(unsigned int)),
601
									SEEK_CUR) != 0)
602
						elog(ERROR, "bogus tuple length in backward scan");
603 604
					return NULL;
				}
605
				tuplen = getlen(state, false);
606 607 608
			}

			/*
B
Bruce Momjian 已提交
609 610 611
			 * Now we have the length of the prior tuple, back up and read it.
			 * Note: READTUP expects we are positioned after the initial
			 * length word of the tuple, so back up to that point.
612 613
			 */
			if (BufFileSeek(state->myfile, 0,
B
Bruce Momjian 已提交
614
							-(long) tuplen,
615
							SEEK_CUR) != 0)
616
				elog(ERROR, "bogus tuple length in backward scan");
617 618 619 620
			tup = READTUP(state, tuplen);
			return tup;

		default:
621
			elog(ERROR, "invalid tuplestore state");
622 623 624 625
			return NULL;		/* keep compiler quiet */
	}
}

626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
/*
 * tuplestore_gettupleslot - exported function to fetch a MinimalTuple
 *
 * If successful, put tuple in slot and return TRUE; else, clear the slot
 * and return FALSE.
 */
bool
tuplestore_gettupleslot(Tuplestorestate *state, bool forward,
						TupleTableSlot *slot)
{
	MinimalTuple tuple;
	bool		should_free;

	tuple = (MinimalTuple) tuplestore_gettuple(state, forward, &should_free);

	if (tuple)
	{
		ExecStoreMinimalTuple(tuple, slot, should_free);
		return true;
	}
	else
	{
		ExecClearTuple(slot);
		return false;
	}
}

/*
 * tuplestore_advance - exported function to adjust position without fetching
 *
 * We could optimize this case to avoid palloc/pfree overhead, but for the
 * moment it doesn't seem worthwhile.
 */
bool
tuplestore_advance(Tuplestorestate *state, bool forward)
{
	void	   *tuple;
	bool		should_free;

	tuple = tuplestore_gettuple(state, forward, &should_free);

	if (tuple)
	{
		if (should_free)
			pfree(tuple);
		return true;
	}
	else
	{
		return false;
	}
}

679 680
/*
 * dumptuples - remove tuples from memory and write to tape
681 682 683 684
 *
 * As a side effect, we must set readpos and markpos to the value
 * corresponding to "current"; otherwise, a dump would lose the current read
 * position.
685 686 687 688 689 690
 */
static void
dumptuples(Tuplestorestate *state)
{
	int			i;

B
Bruce Momjian 已提交
691
	for (i = 0;; i++)
692 693 694 695 696 697 698 699 700
	{
		if (i == state->current)
			BufFileTell(state->myfile,
						&state->readpos_file, &state->readpos_offset);
		if (i == state->markpos_current)
			BufFileTell(state->myfile,
						&state->markpos_file, &state->markpos_offset);
		if (i >= state->memtupcount)
			break;
701
		WRITETUP(state, state->memtuples[i]);
702
	}
703 704 705 706 707 708 709 710 711
	state->memtupcount = 0;
}

/*
 * tuplestore_rescan		- rewind and replay the scan
 */
void
tuplestore_rescan(Tuplestorestate *state)
{
712 713
	Assert(state->eflags & EXEC_FLAG_REWIND);

714 715
	switch (state->status)
	{
716 717
		case TSS_INMEM:
			state->eof_reached = false;
718
			state->current = 0;
719 720
			break;
		case TSS_WRITEFILE:
721
			state->eof_reached = false;
722 723
			state->readpos_file = 0;
			state->readpos_offset = 0L;
724 725
			break;
		case TSS_READFILE:
726
			state->eof_reached = false;
727
			if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
728
				elog(ERROR, "seek to start failed");
729 730
			break;
		default:
731
			elog(ERROR, "invalid tuplestore state");
732 733 734 735 736 737 738 739 740 741
			break;
	}
}

/*
 * tuplestore_markpos	- saves current position in the tuple sequence
 */
void
tuplestore_markpos(Tuplestorestate *state)
{
742 743
	Assert(state->eflags & EXEC_FLAG_MARK);

744 745
	switch (state->status)
	{
746 747
		case TSS_INMEM:
			state->markpos_current = state->current;
B
Bruce Momjian 已提交
748

749 750
			/*
			 * We can truncate the tuplestore if neither backward scan nor
B
Bruce Momjian 已提交
751 752
			 * rewind capability are required by the caller.  There will never
			 * be a need to back up past the mark point.
753 754 755 756 757 758 759 760 761 762
			 *
			 * Note: you might think we could remove all the tuples before
			 * "current", since that one is the next to be returned.  However,
			 * since tuplestore_gettuple returns a direct pointer to our
			 * internal copy of the tuple, it's likely that the caller has
			 * still got the tuple just before "current" referenced in a slot.
			 * Don't free it yet.
			 */
			if (!(state->eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_REWIND)))
				tuplestore_trim(state, 1);
763 764 765 766 767 768 769 770 771 772 773 774 775 776
			break;
		case TSS_WRITEFILE:
			if (state->eof_reached)
			{
				/* Need to record the implicit read position */
				BufFileTell(state->myfile,
							&state->markpos_file,
							&state->markpos_offset);
			}
			else
			{
				state->markpos_file = state->readpos_file;
				state->markpos_offset = state->readpos_offset;
			}
777 778 779 780 781 782 783
			break;
		case TSS_READFILE:
			BufFileTell(state->myfile,
						&state->markpos_file,
						&state->markpos_offset);
			break;
		default:
784
			elog(ERROR, "invalid tuplestore state");
785 786 787 788 789 790 791 792 793 794 795
			break;
	}
}

/*
 * tuplestore_restorepos - restores current position in tuple sequence to
 *						  last saved position
 */
void
tuplestore_restorepos(Tuplestorestate *state)
{
796 797
	Assert(state->eflags & EXEC_FLAG_MARK);

798 799
	switch (state->status)
	{
800 801 802 803 804 805 806 807
		case TSS_INMEM:
			state->eof_reached = false;
			state->current = state->markpos_current;
			break;
		case TSS_WRITEFILE:
			state->eof_reached = false;
			state->readpos_file = state->markpos_file;
			state->readpos_offset = state->markpos_offset;
808 809
			break;
		case TSS_READFILE:
810
			state->eof_reached = false;
811 812 813 814 815 816 817
			if (BufFileSeek(state->myfile,
							state->markpos_file,
							state->markpos_offset,
							SEEK_SET) != 0)
				elog(ERROR, "tuplestore_restorepos failed");
			break;
		default:
818
			elog(ERROR, "invalid tuplestore state");
819 820 821 822
			break;
	}
}

823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
/*
 * tuplestore_trim	- remove all but ntuples tuples before current
 */
static void
tuplestore_trim(Tuplestorestate *state, int ntuples)
{
	int			nremove;
	int			i;

	/*
	 * We don't bother trimming temp files since it usually would mean more
	 * work than just letting them sit in kernel buffers until they age out.
	 */
	if (state->status != TSS_INMEM)
		return;

	nremove = state->current - ntuples;
	if (nremove <= 0)
		return;					/* nothing to do */
	Assert(nremove <= state->memtupcount);

	/* Release no-longer-needed tuples */
	for (i = 0; i < nremove; i++)
	{
		FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
		pfree(state->memtuples[i]);
	}

	/*
B
Bruce Momjian 已提交
852
	 * Slide the array down and readjust pointers.	This may look pretty
853
	 * stupid, but we expect that there will usually not be very many
B
Bruce Momjian 已提交
854 855
	 * tuple-pointers to move, so this isn't that expensive; and it keeps a
	 * lot of other logic simple.
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
	 *
	 * In fact, in the current usage for merge joins, it's demonstrable that
	 * there will always be exactly one non-removed tuple; so optimize that
	 * case.
	 */
	if (nremove + 1 == state->memtupcount)
		state->memtuples[0] = state->memtuples[nremove];
	else
		memmove(state->memtuples, state->memtuples + nremove,
				(state->memtupcount - nremove) * sizeof(void *));

	state->memtupcount -= nremove;
	state->current -= nremove;
	state->markpos_current -= nremove;
}

872 873 874 875 876 877 878 879 880

/*
 * Tape interface routines
 */

static unsigned int
getlen(Tuplestorestate *state, bool eofOK)
{
	unsigned int len;
881
	size_t		nbytes;
882

883 884 885 886
	nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
	if (nbytes == sizeof(len))
		return len;
	if (nbytes != 0)
887
		elog(ERROR, "unexpected end of tape");
888
	if (!eofOK)
889
		elog(ERROR, "unexpected end of data");
890
	return 0;
891 892 893 894 895
}


/*
 * Routines specialized for HeapTuple case
896 897 898 899 900 901
 *
 * The stored form is actually a MinimalTuple, but for largely historical
 * reasons we allow COPYTUP to work from a HeapTuple.
 *
 * Since MinimalTuple already has length in its first word, we don't need
 * to write that separately.
902 903 904 905 906
 */

static void *
copytup_heap(Tuplestorestate *state, void *tup)
{
907
	MinimalTuple tuple;
908

909
	tuple = minimal_tuple_from_heap_tuple((HeapTuple) tup);
910 911
	USEMEM(state, GetMemoryChunkSpace(tuple));
	return (void *) tuple;
912 913 914 915 916
}

static void
writetup_heap(Tuplestorestate *state, void *tup)
{
917 918
	MinimalTuple tuple = (MinimalTuple) tup;
	unsigned int tuplen = tuple->t_len;
919

920
	if (BufFileWrite(state->myfile, (void *) tuple, tuplen) != (size_t) tuplen)
921
		elog(ERROR, "write failed");
B
Bruce Momjian 已提交
922
	if (state->eflags & EXEC_FLAG_BACKWARD)		/* need trailing length word? */
923 924
		if (BufFileWrite(state->myfile, (void *) &tuplen,
						 sizeof(tuplen)) != sizeof(tuplen))
925
			elog(ERROR, "write failed");
926

927
	FREEMEM(state, GetMemoryChunkSpace(tuple));
928
	heap_free_minimal_tuple(tuple);
929 930 931 932 933
}

static void *
readtup_heap(Tuplestorestate *state, unsigned int len)
{
934 935
	MinimalTuple tuple = (MinimalTuple) palloc(len);
	unsigned int tuplen;
936

937
	USEMEM(state, GetMemoryChunkSpace(tuple));
938
	/* read in the tuple proper */
939 940 941
	tuple->t_len = len;
	if (BufFileRead(state->myfile, (void *) ((char *) tuple + sizeof(int)),
					len - sizeof(int)) != (size_t) (len - sizeof(int)))
942
		elog(ERROR, "unexpected end of data");
B
Bruce Momjian 已提交
943
	if (state->eflags & EXEC_FLAG_BACKWARD)		/* need trailing length word? */
944 945
		if (BufFileRead(state->myfile, (void *) &tuplen,
						sizeof(tuplen)) != sizeof(tuplen))
946
			elog(ERROR, "unexpected end of data");
947 948
	return (void *) tuple;
}