analyze.c 51.6 KB
Newer Older
B
Bruce Momjian 已提交
1 2 3
/*-------------------------------------------------------------------------
 *
 * analyze.c
4
 *	  the postgres statistics generator
B
Bruce Momjian 已提交
5
 *
B
Bruce Momjian 已提交
6
 * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
B
Bruce Momjian 已提交
7 8 9 10
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
11
 *	  $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.67 2004/02/10 03:42:43 tgl Exp $
B
Bruce Momjian 已提交
12 13 14
 *
 *-------------------------------------------------------------------------
 */
15 16
#include "postgres.h"

17
#include <math.h>
B
Bruce Momjian 已提交
18 19

#include "access/heapam.h"
20
#include "access/tuptoaster.h"
21
#include "catalog/catalog.h"
B
Bruce Momjian 已提交
22 23
#include "catalog/catname.h"
#include "catalog/indexing.h"
24
#include "catalog/namespace.h"
B
Bruce Momjian 已提交
25 26 27 28 29 30
#include "catalog/pg_operator.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_type.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "parser/parse_oper.h"
31
#include "parser/parse_relation.h"
B
Bruce Momjian 已提交
32 33
#include "utils/acl.h"
#include "utils/builtins.h"
34
#include "utils/datum.h"
B
Bruce Momjian 已提交
35
#include "utils/fmgroids.h"
36
#include "utils/lsyscache.h"
B
Bruce Momjian 已提交
37
#include "utils/syscache.h"
38
#include "utils/tuplesort.h"
B
Bruce Momjian 已提交
39 40


41 42 43
/*
 * Analysis algorithms supported
 */
44 45
typedef enum
{
46
	ALG_MINIMAL = 1,			/* Compute only most-common-values */
47 48
	ALG_SCALAR					/* Compute MCV, histogram, sort
								 * correlation */
49 50 51 52 53 54 55 56 57 58 59
} AlgCode;

/*
 * To avoid consuming too much memory during analysis and/or too much space
 * in the resulting pg_statistic rows, we ignore varlena datums that are wider
 * than WIDTH_THRESHOLD (after detoasting!).  This is legitimate for MCV
 * and distinct-value calculations since a wide value is unlikely to be
 * duplicated at all, much less be a most-common value.  For the same reason,
 * ignoring wide values will not affect our estimates of histogram bin
 * boundaries very much.
 */
60
#define WIDTH_THRESHOLD  1024
61 62 63

/*
 * We build one of these structs for each attribute (column) that is to be
64
 * analyzed.  The struct and subsidiary data are in anl_context,
65 66 67 68 69 70 71
 * so they live until the end of the ANALYZE operation.
 */
typedef struct
{
	/* These fields are set up by examine_attribute */
	int			attnum;			/* attribute number */
	AlgCode		algcode;		/* Which algorithm to use for this column */
72
	int			minrows;		/* Minimum # of rows wanted for stats */
73 74 75 76 77 78
	Form_pg_attribute attr;		/* copy of pg_attribute row for column */
	Form_pg_type attrtype;		/* copy of pg_type row for column */
	Oid			eqopr;			/* '=' operator for datatype, if any */
	Oid			eqfunc;			/* and associated function */
	Oid			ltopr;			/* '<' operator for datatype, if any */

79 80 81 82
	/*
	 * These fields are filled in by the actual statistics-gathering
	 * routine
	 */
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
	bool		stats_valid;
	float4		stanullfrac;	/* fraction of entries that are NULL */
	int4		stawidth;		/* average width */
	float4		stadistinct;	/* # distinct values */
	int2		stakind[STATISTIC_NUM_SLOTS];
	Oid			staop[STATISTIC_NUM_SLOTS];
	int			numnumbers[STATISTIC_NUM_SLOTS];
	float4	   *stanumbers[STATISTIC_NUM_SLOTS];
	int			numvalues[STATISTIC_NUM_SLOTS];
	Datum	   *stavalues[STATISTIC_NUM_SLOTS];
} VacAttrStats;


typedef struct
{
	Datum		value;			/* a data value */
	int			tupno;			/* position index for tuple it came from */
} ScalarItem;

typedef struct
{
	int			count;			/* # of duplicates */
	int			first;			/* values[] index of first occurrence */
} ScalarMCVItem;


109 110
#define swapInt(a,b)	do {int _tmp; _tmp=a; a=b; b=_tmp;} while(0)
#define swapDatum(a,b)	do {Datum _tmp; _tmp=a; a=b; b=_tmp;} while(0)
B
Bruce Momjian 已提交
111

112 113

/* Default statistics target (GUC parameter) */
B
Bruce Momjian 已提交
114
int			default_statistics_target = 10;
115 116


B
Bruce Momjian 已提交
117
static int	elevel = -1;
118

119 120
static MemoryContext anl_context = NULL;

121 122 123 124 125 126 127 128
/* context information for compare_scalars() */
static FmgrInfo *datumCmpFn;
static SortFunctionKind datumCmpFnKind;
static int *datumCmpTupnoLink;


static VacAttrStats *examine_attribute(Relation onerel, int attnum);
static int acquire_sample_rows(Relation onerel, HeapTuple *rows,
129
					int targrows, double *totalrows);
130 131
static double random_fract(void);
static double init_selection_state(int n);
132
static double select_next_random_record(double t, int n, double *stateptr);
133 134 135
static int	compare_rows(const void *a, const void *b);
static int	compare_scalars(const void *a, const void *b);
static int	compare_mcvs(const void *a, const void *b);
136
static void compute_minimal_stats(VacAttrStats *stats,
137 138
					  TupleDesc tupDesc, double totalrows,
					  HeapTuple *rows, int numrows);
139
static void compute_scalar_stats(VacAttrStats *stats,
140 141
					 TupleDesc tupDesc, double totalrows,
					 HeapTuple *rows, int numrows);
142
static void update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats);
B
Bruce Momjian 已提交
143 144 145


/*
146
 *	analyze_rel() -- analyze one relation
B
Bruce Momjian 已提交
147 148
 */
void
149
analyze_rel(Oid relid, VacuumStmt *vacstmt)
B
Bruce Momjian 已提交
150 151
{
	Relation	onerel;
152 153 154 155 156 157
	int			attr_cnt,
				tcnt,
				i;
	VacAttrStats **vacattrstats;
	int			targrows,
				numrows;
158
	double		totalrows;
159 160 161
	HeapTuple  *rows;

	if (vacstmt->verbose)
162
		elevel = INFO;
163
	else
164
		elevel = DEBUG2;
B
Bruce Momjian 已提交
165

166
	/*
B
Bruce Momjian 已提交
167 168 169
	 * Use the current context for storing analysis info.  vacuum.c
	 * ensures that this context will be cleared when I return, thus
	 * releasing the memory allocated here.
170 171 172
	 */
	anl_context = CurrentMemoryContext;

B
Bruce Momjian 已提交
173 174
	/*
	 * Check for user-requested abort.	Note we want this to be inside a
B
Bruce Momjian 已提交
175
	 * transaction, so xact.c doesn't issue useless WARNING.
B
Bruce Momjian 已提交
176
	 */
177
	CHECK_FOR_INTERRUPTS();
B
Bruce Momjian 已提交
178 179 180

	/*
	 * Race condition -- if the pg_class tuple has gone away since the
181
	 * last time we saw it, we don't need to process it.
B
Bruce Momjian 已提交
182
	 */
183 184 185
	if (!SearchSysCacheExists(RELOID,
							  ObjectIdGetDatum(relid),
							  0, 0, 0))
186
		return;
B
Bruce Momjian 已提交
187

B
Bruce Momjian 已提交
188
	/*
189 190
	 * Open the class, getting only a read lock on it, and check
	 * permissions. Permissions check should match vacuum's check!
B
Bruce Momjian 已提交
191
	 */
192 193 194
	onerel = relation_open(relid, AccessShareLock);

	if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
195
		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
B
Bruce Momjian 已提交
196
	{
197 198
		/* No need for a WARNING if we already complained during VACUUM */
		if (!vacstmt->vacuum)
199
			ereport(WARNING,
200
					(errmsg("skipping \"%s\" --- only table or database owner can analyze it",
201
							RelationGetRelationName(onerel))));
202
		relation_close(onerel, AccessShareLock);
B
Bruce Momjian 已提交
203 204 205
		return;
	}

206
	/*
N
Neil Conway 已提交
207 208 209
	 * Check that it's a plain table; we used to do this in
	 * get_rel_oids() but seems safer to check after we've locked the
	 * relation.
210
	 */
211
	if (onerel->rd_rel->relkind != RELKIND_RELATION)
B
Bruce Momjian 已提交
212
	{
B
Bruce Momjian 已提交
213
		/* No need for a WARNING if we already complained during VACUUM */
214
		if (!vacstmt->vacuum)
215
			ereport(WARNING,
216
					(errmsg("skipping \"%s\" --- cannot analyze indexes, views, or special system tables",
217
							RelationGetRelationName(onerel))));
218 219 220 221
		relation_close(onerel, AccessShareLock);
		return;
	}

222 223
	/*
	 * Silently ignore tables that are temp tables of other backends ---
B
Bruce Momjian 已提交
224 225 226
	 * trying to analyze these is rather pointless, since their contents
	 * are probably not up-to-date on disk.  (We don't throw a warning
	 * here; it would just lead to chatter during a database-wide
227 228 229 230 231 232 233 234
	 * ANALYZE.)
	 */
	if (isOtherTempNamespace(RelationGetNamespace(onerel)))
	{
		relation_close(onerel, AccessShareLock);
		return;
	}

235 236 237
	/*
	 * We can ANALYZE any table except pg_statistic. See update_attstats
	 */
238
	if (IsSystemNamespace(RelationGetNamespace(onerel)) &&
B
Bruce Momjian 已提交
239
	 strcmp(RelationGetRelationName(onerel), StatisticRelationName) == 0)
240 241
	{
		relation_close(onerel, AccessShareLock);
B
Bruce Momjian 已提交
242 243 244
		return;
	}

245 246 247 248
	ereport(elevel,
			(errmsg("analyzing \"%s.%s\"",
					get_namespace_name(RelationGetNamespace(onerel)),
					RelationGetRelationName(onerel))));
B
Bruce Momjian 已提交
249

250 251 252 253 254 255
	/*
	 * Determine which columns to analyze
	 *
	 * Note that system attributes are never analyzed.
	 */
	if (vacstmt->va_cols != NIL)
B
Bruce Momjian 已提交
256 257 258
	{
		List	   *le;

259 260 261 262
		vacattrstats = (VacAttrStats **) palloc(length(vacstmt->va_cols) *
												sizeof(VacAttrStats *));
		tcnt = 0;
		foreach(le, vacstmt->va_cols)
B
Bruce Momjian 已提交
263
		{
264
			char	   *col = strVal(lfirst(le));
B
Bruce Momjian 已提交
265

266 267
			i = attnameAttNum(onerel, col, false);
			vacattrstats[tcnt] = examine_attribute(onerel, i);
268 269 270 271 272 273 274
			if (vacattrstats[tcnt] != NULL)
				tcnt++;
		}
		attr_cnt = tcnt;
	}
	else
	{
275
		attr_cnt = onerel->rd_att->natts;
276 277
		/* +1 here is just to avoid palloc(0) with zero-column table */
		vacattrstats = (VacAttrStats **) palloc((attr_cnt + 1) *
278 279
												sizeof(VacAttrStats *));
		tcnt = 0;
280
		for (i = 1; i <= attr_cnt; i++)
281
		{
282
			vacattrstats[tcnt] = examine_attribute(onerel, i);
283 284
			if (vacattrstats[tcnt] != NULL)
				tcnt++;
B
Bruce Momjian 已提交
285 286 287 288
		}
		attr_cnt = tcnt;
	}

289 290 291 292 293
	/*
	 * Quit if no analyzable columns
	 */
	if (attr_cnt <= 0)
	{
294
		relation_close(onerel, AccessShareLock);
295 296
		return;
	}
B
Bruce Momjian 已提交
297

298 299
	/*
	 * Determine how many rows we need to sample, using the worst case
300 301
	 * from all analyzable columns.  We use a lower bound of 100 rows to
	 * avoid possible overflow in Vitter's algorithm.
302 303
	 */
	targrows = 100;
B
Bruce Momjian 已提交
304 305
	for (i = 0; i < attr_cnt; i++)
	{
306 307 308 309 310 311 312 313 314
		if (targrows < vacattrstats[i]->minrows)
			targrows = vacattrstats[i]->minrows;
	}

	/*
	 * Acquire the sample rows
	 */
	rows = (HeapTuple *) palloc(targrows * sizeof(HeapTuple));
	numrows = acquire_sample_rows(onerel, rows, targrows, &totalrows);
B
Bruce Momjian 已提交
315

316 317 318
	/*
	 * If we are running a standalone ANALYZE, update pages/tuples stats
	 * in pg_class.  We have the accurate page count from heap_beginscan,
319 320
	 * but only an approximate number of tuples; therefore, if we are part
	 * of VACUUM ANALYZE do *not* overwrite the accurate count already
321 322 323 324 325
	 * inserted by VACUUM.
	 */
	if (!vacstmt->vacuum)
		vac_update_relstats(RelationGetRelid(onerel),
							onerel->rd_nblocks,
326
							totalrows,
327 328 329
							RelationGetForm(onerel)->relhasindex);

	/*
330
	 * Compute the statistics.	Temporary results during the calculations
331 332
	 * for each column are stored in a child context.  The calc routines
	 * are responsible to make sure that whatever they store into the
333
	 * VacAttrStats structure is allocated in anl_context.
334 335 336 337 338 339
	 */
	if (numrows > 0)
	{
		MemoryContext col_context,
					old_context;

340
		col_context = AllocSetContextCreate(anl_context,
341 342 343 344 345 346
											"Analyze Column",
											ALLOCSET_DEFAULT_MINSIZE,
											ALLOCSET_DEFAULT_INITSIZE,
											ALLOCSET_DEFAULT_MAXSIZE);
		old_context = MemoryContextSwitchTo(col_context);
		for (i = 0; i < attr_cnt; i++)
B
Bruce Momjian 已提交
347
		{
348 349 350 351 352 353 354 355 356 357 358 359 360 361
			switch (vacattrstats[i]->algcode)
			{
				case ALG_MINIMAL:
					compute_minimal_stats(vacattrstats[i],
										  onerel->rd_att, totalrows,
										  rows, numrows);
					break;
				case ALG_SCALAR:
					compute_scalar_stats(vacattrstats[i],
										 onerel->rd_att, totalrows,
										 rows, numrows);
					break;
			}
			MemoryContextResetAndDeleteChildren(col_context);
B
Bruce Momjian 已提交
362
		}
363 364 365 366 367
		MemoryContextSwitchTo(old_context);
		MemoryContextDelete(col_context);

		/*
		 * Emit the completed stats rows into pg_statistic, replacing any
368 369 370
		 * previous statistics for the target columns.	(If there are
		 * stats in pg_statistic for columns we didn't process, we leave
		 * them alone.)
371 372 373 374 375 376 377 378 379
		 */
		update_attstats(relid, attr_cnt, vacattrstats);
	}

	/*
	 * Close source relation now, but keep lock so that no one deletes it
	 * before we commit.  (If someone did, they'd fail to clean up the
	 * entries we made in pg_statistic.)
	 */
380
	relation_close(onerel, NoLock);
381 382 383 384 385 386 387 388 389 390 391
}

/*
 * examine_attribute -- pre-analysis of a single column
 *
 * Determine whether the column is analyzable; if so, create and initialize
 * a VacAttrStats struct for it.  If not, return NULL.
 */
static VacAttrStats *
examine_attribute(Relation onerel, int attnum)
{
392
	Form_pg_attribute attr = onerel->rd_att->attrs[attnum - 1];
393 394 395 396 397 398 399
	Operator	func_operator;
	HeapTuple	typtuple;
	Oid			eqopr = InvalidOid;
	Oid			eqfunc = InvalidOid;
	Oid			ltopr = InvalidOid;
	VacAttrStats *stats;

400 401 402 403
	/* Don't analyze dropped columns */
	if (attr->attisdropped)
		return NULL;

404
	/* Don't analyze column if user has specified not to */
405
	if (attr->attstattarget == 0)
406 407 408
		return NULL;

	/* If column has no "=" operator, we can't do much of anything */
409
	func_operator = equality_oper(attr->atttypid, true);
410 411
	if (func_operator != NULL)
	{
412 413
		eqopr = oprid(func_operator);
		eqfunc = oprfuncid(func_operator);
414 415 416 417
		ReleaseSysCache(func_operator);
	}
	if (!OidIsValid(eqfunc))
		return NULL;
B
Bruce Momjian 已提交
418

419
	/*
420 421
	 * If we have "=" then we're at least able to do the minimal
	 * algorithm, so start filling in a VacAttrStats struct.
422
	 */
423
	stats = (VacAttrStats *) palloc0(sizeof(VacAttrStats));
424 425 426 427 428 429 430
	stats->attnum = attnum;
	stats->attr = (Form_pg_attribute) palloc(ATTRIBUTE_TUPLE_SIZE);
	memcpy(stats->attr, attr, ATTRIBUTE_TUPLE_SIZE);
	typtuple = SearchSysCache(TYPEOID,
							  ObjectIdGetDatum(attr->atttypid),
							  0, 0, 0);
	if (!HeapTupleIsValid(typtuple))
431
		elog(ERROR, "cache lookup failed for type %u", attr->atttypid);
432 433 434 435 436 437
	stats->attrtype = (Form_pg_type) palloc(sizeof(FormData_pg_type));
	memcpy(stats->attrtype, GETSTRUCT(typtuple), sizeof(FormData_pg_type));
	ReleaseSysCache(typtuple);
	stats->eqopr = eqopr;
	stats->eqfunc = eqfunc;

438 439 440 441
	/* If the attstattarget column is negative, use the default value */
	if (stats->attr->attstattarget < 0)
		stats->attr->attstattarget = default_statistics_target;

442
	/* Is there a "<" operator with suitable semantics? */
443
	func_operator = ordering_oper(attr->atttypid, true);
444 445
	if (func_operator != NULL)
	{
446
		ltopr = oprid(func_operator);
447 448 449 450 451
		ReleaseSysCache(func_operator);
	}
	stats->ltopr = ltopr;

	/*
452 453
	 * Determine the algorithm to use (this will get more complicated
	 * later)
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
	 */
	if (OidIsValid(ltopr))
	{
		/* Seems to be a scalar datatype */
		stats->algcode = ALG_SCALAR;
		/*--------------------
		 * The following choice of minrows is based on the paper
		 * "Random sampling for histogram construction: how much is enough?"
		 * by Surajit Chaudhuri, Rajeev Motwani and Vivek Narasayya, in
		 * Proceedings of ACM SIGMOD International Conference on Management
		 * of Data, 1998, Pages 436-447.  Their Corollary 1 to Theorem 5
		 * says that for table size n, histogram size k, maximum relative
		 * error in bin size f, and error probability gamma, the minimum
		 * random sample size is
		 *		r = 4 * k * ln(2*n/gamma) / f^2
		 * Taking f = 0.5, gamma = 0.01, n = 1 million rows, we obtain
		 *		r = 305.82 * k
		 * Note that because of the log function, the dependence on n is
		 * quite weak; even at n = 1 billion, a 300*k sample gives <= 0.59
		 * bin size error with probability 0.99.  So there's no real need to
		 * scale for n, which is a good thing because we don't necessarily
		 * know it at this point.
		 *--------------------
		 */
478
		stats->minrows = 300 * stats->attr->attstattarget;
479 480 481 482 483 484
	}
	else
	{
		/* Can't do much but the minimal stuff */
		stats->algcode = ALG_MINIMAL;
		/* Might as well use the same minrows as above */
485
		stats->minrows = 300 * stats->attr->attstattarget;
486 487 488 489
	}

	return stats;
}
B
Bruce Momjian 已提交
490

491 492 493 494
/*
 * acquire_sample_rows -- acquire a random sample of rows from the table
 *
 * Up to targrows rows are collected (if there are fewer than that many
495
 * rows in the table, all rows are collected).	When the table is larger
496 497 498 499 500 501 502 503 504 505 506
 * than targrows, a truly random sample is collected: every row has an
 * equal chance of ending up in the final sample.
 *
 * We also estimate the total number of rows in the table, and return that
 * into *totalrows.
 *
 * The returned list of tuples is in order by physical position in the table.
 * (We will rely on this later to derive correlation estimates.)
 */
static int
acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
507
					double *totalrows)
508 509 510 511
{
	int			numrows = 0;
	HeapScanDesc scan;
	HeapTuple	tuple;
512 513
	ItemPointer lasttuple;
	BlockNumber lastblock,
514 515 516 517
				estblock;
	OffsetNumber lastoffset;
	int			numest;
	double		tuplesperpage;
518
	double		t;
519 520 521
	double		rstate;

	Assert(targrows > 1);
522

523 524 525
	/*
	 * Do a simple linear scan until we reach the target number of rows.
	 */
526 527
	scan = heap_beginscan(onerel, SnapshotNow, 0, NULL);
	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
528 529 530 531
	{
		rows[numrows++] = heap_copytuple(tuple);
		if (numrows >= targrows)
			break;
532
		vacuum_delay_point();
533 534
	}
	heap_endscan(scan);
535

536
	/*
537
	 * If we ran out of tuples then we're done, no matter how few we
538 539 540 541
	 * collected.  No sort is needed, since they're already in order.
	 */
	if (!HeapTupleIsValid(tuple))
	{
542
		*totalrows = (double) numrows;
543 544 545 546 547 548

		ereport(elevel,
				(errmsg("\"%s\": %u pages, %d rows sampled, %.0f estimated total rows",
						RelationGetRelationName(onerel),
						onerel->rd_nblocks, numrows, *totalrows)));

549 550
		return numrows;
	}
551

552 553 554
	/*
	 * Otherwise, start replacing tuples in the sample until we reach the
	 * end of the relation.  This algorithm is from Jeff Vitter's paper
555 556 557 558 559 560
	 * (see full citation below).  It works by repeatedly computing the
	 * number of the next tuple we want to fetch, which will replace a
	 * randomly chosen element of the reservoir (current set of tuples).
	 * At all times the reservoir is a true random sample of the tuples
	 * we've passed over so far, so when we fall off the end of the
	 * relation we're done.
561
	 *
562 563 564 565 566 567 568 569 570 571 572
	 * A slight difficulty is that since we don't want to fetch tuples or
	 * even pages that we skip over, it's not possible to fetch *exactly*
	 * the N'th tuple at each step --- we don't know how many valid tuples
	 * are on the skipped pages.  We handle this by assuming that the
	 * average number of valid tuples/page on the pages already scanned
	 * over holds good for the rest of the relation as well; this lets us
	 * estimate which page the next tuple should be on and its position in
	 * the page.  Then we fetch the first valid tuple at or after that
	 * position, being careful not to use the same tuple twice.  This
	 * approach should still give a good random sample, although it's not
	 * perfect.
573
	 */
574
	lasttuple = &(rows[numrows - 1]->t_self);
575 576
	lastblock = ItemPointerGetBlockNumber(lasttuple);
	lastoffset = ItemPointerGetOffsetNumber(lasttuple);
577

578
	/*
579 580
	 * If possible, estimate tuples/page using only completely-scanned
	 * pages.
581 582 583
	 */
	for (numest = numrows; numest > 0; numest--)
	{
584
		if (ItemPointerGetBlockNumber(&(rows[numest - 1]->t_self)) != lastblock)
585 586 587 588 589 590 591 592 593 594 595
			break;
	}
	if (numest == 0)
	{
		numest = numrows;		/* don't have a full page? */
		estblock = lastblock + 1;
	}
	else
		estblock = lastblock;
	tuplesperpage = (double) numest / (double) estblock;

596
	t = (double) numrows;		/* t is the # of records processed so far */
597 598 599
	rstate = init_selection_state(targrows);
	for (;;)
	{
600 601 602 603 604 605
		double		targpos;
		BlockNumber targblock;
		Buffer		targbuffer;
		Page		targpage;
		OffsetNumber targoffset,
					maxoffset;
606

607
		vacuum_delay_point();
608

609 610
		t = select_next_random_record(t, targrows, &rstate);
		/* Try to read the t'th record in the table */
611
		targpos = t / tuplesperpage;
612
		targblock = (BlockNumber) targpos;
613
		targoffset = ((int) ((targpos - targblock) * tuplesperpage)) +
614 615 616
			FirstOffsetNumber;
		/* Make sure we are past the last selected record */
		if (targblock <= lastblock)
B
Bruce Momjian 已提交
617
		{
618 619 620
			targblock = lastblock;
			if (targoffset <= lastoffset)
				targoffset = lastoffset + 1;
B
Bruce Momjian 已提交
621
		}
622
		/* Loop to find first valid record at or after given position */
623 624
pageloop:;

625
		/*
626
		 * Have we fallen off the end of the relation?	(We rely on
627 628 629 630
		 * heap_beginscan to have updated rd_nblocks.)
		 */
		if (targblock >= onerel->rd_nblocks)
			break;
631

632
		/*
633 634 635 636 637 638
		 * We must maintain a pin on the target page's buffer to ensure
		 * that the maxoffset value stays good (else concurrent VACUUM
		 * might delete tuples out from under us).	Hence, pin the page
		 * until we are done looking at it.  We don't maintain a lock on
		 * the page, so tuples could get added to it, but we ignore such
		 * tuples.
639 640 641
		 */
		targbuffer = ReadBuffer(onerel, targblock);
		if (!BufferIsValid(targbuffer))
642
			elog(ERROR, "ReadBuffer failed");
643 644 645 646 647
		LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
		targpage = BufferGetPage(targbuffer);
		maxoffset = PageGetMaxOffsetNumber(targpage);
		LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);

648
		for (;;)
B
Bruce Momjian 已提交
649
		{
650
			HeapTupleData targtuple;
651
			Buffer		tupbuffer;
652 653 654 655

			if (targoffset > maxoffset)
			{
				/* Fell off end of this page, try next */
656
				ReleaseBuffer(targbuffer);
657 658 659 660 661
				targblock++;
				targoffset = FirstOffsetNumber;
				goto pageloop;
			}
			ItemPointerSet(&targtuple.t_self, targblock, targoffset);
662 663
			if (heap_fetch(onerel, SnapshotNow, &targtuple, &tupbuffer,
						   false, NULL))
664 665 666 667 668
			{
				/*
				 * Found a suitable tuple, so save it, replacing one old
				 * tuple at random
				 */
669
				int			k = (int) (targrows * random_fract());
670 671 672 673

				Assert(k >= 0 && k < targrows);
				heap_freetuple(rows[k]);
				rows[k] = heap_copytuple(&targtuple);
674 675 676
				/* this releases the second pin acquired by heap_fetch: */
				ReleaseBuffer(tupbuffer);
				/* this releases the initial pin: */
677 678 679 680 681 682 683
				ReleaseBuffer(targbuffer);
				lastblock = targblock;
				lastoffset = targoffset;
				break;
			}
			/* this tuple is dead, so advance to next one on same page */
			targoffset++;
B
Bruce Momjian 已提交
684 685 686
		}
	}

687 688 689 690
	/*
	 * Now we need to sort the collected tuples by position (itempointer).
	 */
	qsort((void *) rows, numrows, sizeof(HeapTuple), compare_rows);
B
Bruce Momjian 已提交
691

692 693 694
	/*
	 * Estimate total number of valid rows in relation.
	 */
695
	*totalrows = floor((double) onerel->rd_nblocks * tuplesperpage + 0.5);
B
Bruce Momjian 已提交
696

697 698 699
	/*
	 * Emit some interesting relation info 
	 */
700
	ereport(elevel,
701
			(errmsg("\"%s\": %u pages, %d rows sampled, %.0f estimated total rows",
702
					RelationGetRelationName(onerel),
703
					onerel->rd_nblocks, numrows, *totalrows)));
704

705 706
	return numrows;
}
B
Bruce Momjian 已提交
707

708 709 710 711
/* Select a random value R uniformly distributed in 0 < R < 1 */
static double
random_fract(void)
{
712
	long		z;
B
Bruce Momjian 已提交
713

714 715 716 717
	/* random() can produce endpoint values, try again if so */
	do
	{
		z = random();
718
	} while (z <= 0 || z >= MAX_RANDOM_VALUE);
719
	return (double) z / (double) MAX_RANDOM_VALUE;
B
Bruce Momjian 已提交
720 721 722
}

/*
723 724 725 726 727 728 729
 * These two routines embody Algorithm Z from "Random sampling with a
 * reservoir" by Jeffrey S. Vitter, in ACM Trans. Math. Softw. 11, 1
 * (Mar. 1985), Pages 37-57.  While Vitter describes his algorithm in terms
 * of the count S of records to skip before processing another record,
 * it is convenient to work primarily with t, the index (counting from 1)
 * of the last record processed and next record to process.  The only extra
 * state needed between calls is W, a random state variable.
B
Bruce Momjian 已提交
730
 *
731 732 733 734 735 736
 * Note: the original algorithm defines t, S, numer, and denom as integers.
 * Here we express them as doubles to avoid overflow if the number of rows
 * in the table exceeds INT_MAX.  The algorithm should work as long as the
 * row count does not become so large that it is not represented accurately
 * in a double (on IEEE-math machines this would be around 2^52 rows).
 *
737
 * init_selection_state computes the initial W value.
B
Bruce Momjian 已提交
738
 *
739 740 741 742 743 744 745 746
 * Given that we've already processed t records (t >= n),
 * select_next_random_record determines the number of the next record to
 * process.
 */
static double
init_selection_state(int n)
{
	/* Initial value of W (for use when Algorithm Z is first applied) */
747
	return exp(-log(random_fract()) / n);
748 749
}

750 751
static double
select_next_random_record(double t, int n, double *stateptr)
752 753
{
	/* The magic constant here is T from Vitter's paper */
754
	if (t <= (22.0 * n))
755 756
	{
		/* Process records using Algorithm X until t is large enough */
757 758
		double		V,
					quot;
759 760

		V = random_fract();		/* Generate V */
761 762
		t += 1;
		quot = (t - (double) n) / t;
763 764 765
		/* Find min S satisfying (4.1) */
		while (quot > V)
		{
766 767
			t += 1;
			quot *= (t - (double) n) / t;
768 769 770 771 772
		}
	}
	else
	{
		/* Now apply Algorithm Z */
773 774 775
		double		W = *stateptr;
		double		term = t - (double) n + 1;
		double		S;
776 777 778

		for (;;)
		{
779 780 781 782 783 784 785 786 787
			double		numer,
						numer_lim,
						denom;
			double		U,
						X,
						lhs,
						rhs,
						y,
						tmp;
788 789 790 791

			/* Generate U and X */
			U = random_fract();
			X = t * (W - 1.0);
792
			S = floor(X);		/* S is tentatively set to floor(X) */
793
			/* Test if U <= h(S)/cg(X) in the manner of (6.3) */
794
			tmp = (t + 1) / term;
795 796
			lhs = exp(log(((U * tmp * tmp) * (term + S)) / (t + X)) / n);
			rhs = (((t + X) / (term + S)) * term) / t;
797 798
			if (lhs <= rhs)
			{
799
				W = rhs / lhs;
800 801 802
				break;
			}
			/* Test if U <= f(S)/cg(X) */
803
			y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X);
804
			if ((double) n < S)
805 806 807 808 809 810
			{
				denom = t;
				numer_lim = term + S;
			}
			else
			{
811
				denom = t - (double) n + S;
812 813
				numer_lim = t + 1;
			}
814
			for (numer = t + S; numer >= numer_lim; numer -= 1)
815
			{
816 817
				y *= numer / denom;
				denom -= 1;
818
			}
819 820
			W = exp(-log(random_fract()) / n);	/* Generate W in advance */
			if (exp(log(y) / n) <= (t + X) / t)
821 822 823 824 825 826 827 828 829 830 831 832 833 834
				break;
		}
		t += S + 1;
		*stateptr = W;
	}
	return t;
}

/*
 * qsort comparator for sorting rows[] array
 */
static int
compare_rows(const void *a, const void *b)
{
835 836 837
	HeapTuple	ha = *(HeapTuple *) a;
	HeapTuple	hb = *(HeapTuple *) b;
	BlockNumber ba = ItemPointerGetBlockNumber(&ha->t_self);
838
	OffsetNumber oa = ItemPointerGetOffsetNumber(&ha->t_self);
839
	BlockNumber bb = ItemPointerGetBlockNumber(&hb->t_self);
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855
	OffsetNumber ob = ItemPointerGetOffsetNumber(&hb->t_self);

	if (ba < bb)
		return -1;
	if (ba > bb)
		return 1;
	if (oa < ob)
		return -1;
	if (oa > ob)
		return 1;
	return 0;
}


/*
 *	compute_minimal_stats() -- compute minimal column statistics
B
Bruce Momjian 已提交
856
 *
857
 *	We use this when we can find only an "=" operator for the datatype.
B
Bruce Momjian 已提交
858
 *
859 860
 *	We determine the fraction of non-null rows, the average width, the
 *	most common values, and the (estimated) number of distinct values.
B
Bruce Momjian 已提交
861
 *
862 863 864 865 866 867
 *	The most common values are determined by brute force: we keep a list
 *	of previously seen values, ordered by number of times seen, as we scan
 *	the samples.  A newly seen value is inserted just after the last
 *	multiply-seen value, causing the bottommost (oldest) singly-seen value
 *	to drop off the list.  The accuracy of this method, and also its cost,
 *	depend mainly on the length of the list we are willing to keep.
B
Bruce Momjian 已提交
868 869
 */
static void
870
compute_minimal_stats(VacAttrStats *stats,
871
					  TupleDesc tupDesc, double totalrows,
872
					  HeapTuple *rows, int numrows)
B
Bruce Momjian 已提交
873 874
{
	int			i;
875 876 877 878 879 880
	int			null_cnt = 0;
	int			nonnull_cnt = 0;
	int			toowide_cnt = 0;
	double		total_width = 0;
	bool		is_varlena = (!stats->attr->attbyval &&
							  stats->attr->attlen == -1);
881 882
	bool		is_varwidth = (!stats->attr->attbyval &&
							   stats->attr->attlen < 0);
883 884 885
	FmgrInfo	f_cmpeq;
	typedef struct
	{
886 887
		Datum		value;
		int			count;
888 889 890 891 892 893
	} TrackItem;
	TrackItem  *track;
	int			track_cnt,
				track_max;
	int			num_mcv = stats->attr->attstattarget;

894 895 896 897
	/*
	 * We track up to 2*n values for an n-element MCV list; but at least
	 * 10
	 */
898 899 900 901 902 903 904 905 906
	track_max = 2 * num_mcv;
	if (track_max < 10)
		track_max = 10;
	track = (TrackItem *) palloc(track_max * sizeof(TrackItem));
	track_cnt = 0;

	fmgr_info(stats->eqfunc, &f_cmpeq);

	for (i = 0; i < numrows; i++)
B
Bruce Momjian 已提交
907
	{
908
		HeapTuple	tuple = rows[i];
909 910
		Datum		value;
		bool		isnull;
911 912 913
		bool		match;
		int			firstcount1,
					j;
914

915
		vacuum_delay_point();
916

917
		value = heap_getattr(tuple, stats->attnum, tupDesc, &isnull);
B
Bruce Momjian 已提交
918

919
		/* Check for null/nonnull */
B
Bruce Momjian 已提交
920
		if (isnull)
921
		{
922
			null_cnt++;
923 924
			continue;
		}
925
		nonnull_cnt++;
926 927

		/*
928
		 * If it's a variable-width field, add up widths for average width
929 930 931
		 * calculation.  Note that if the value is toasted, we use the
		 * toasted width.  We don't bother with this calculation if it's a
		 * fixed-width type.
932
		 */
933
		if (is_varlena)
B
Bruce Momjian 已提交
934
		{
935
			total_width += VARSIZE(DatumGetPointer(value));
936

937 938
			/*
			 * If the value is toasted, we want to detoast it just once to
939 940 941 942
			 * avoid repeated detoastings and resultant excess memory
			 * usage during the comparisons.  Also, check to see if the
			 * value is excessively wide, and if so don't detoast at all
			 * --- just ignore the value.
943 944
			 */
			if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
B
Bruce Momjian 已提交
945
			{
946 947
				toowide_cnt++;
				continue;
B
Bruce Momjian 已提交
948
			}
949
			value = PointerGetDatum(PG_DETOAST_DATUM(value));
950
		}
951 952 953 954 955
		else if (is_varwidth)
		{
			/* must be cstring */
			total_width += strlen(DatumGetCString(value)) + 1;
		}
B
Bruce Momjian 已提交
956

957 958 959 960 961 962
		/*
		 * See if the value matches anything we're already tracking.
		 */
		match = false;
		firstcount1 = track_cnt;
		for (j = 0; j < track_cnt; j++)
963
		{
964
			if (DatumGetBool(FunctionCall2(&f_cmpeq, value, track[j].value)))
B
Bruce Momjian 已提交
965
			{
966 967
				match = true;
				break;
B
Bruce Momjian 已提交
968
			}
969 970 971
			if (j < firstcount1 && track[j].count == 1)
				firstcount1 = j;
		}
972

973 974 975 976 977
		if (match)
		{
			/* Found a match */
			track[j].count++;
			/* This value may now need to "bubble up" in the track list */
978
			while (j > 0 && track[j].count > track[j - 1].count)
B
Bruce Momjian 已提交
979
			{
980 981
				swapDatum(track[j].value, track[j - 1].value);
				swapInt(track[j].count, track[j - 1].count);
982
				j--;
B
Bruce Momjian 已提交
983
			}
984
		}
985
		else
986
		{
987 988 989
			/* No match.  Insert at head of count-1 list */
			if (track_cnt < track_max)
				track_cnt++;
990
			for (j = track_cnt - 1; j > firstcount1; j--)
991
			{
992 993
				track[j].value = track[j - 1].value;
				track[j].count = track[j - 1].count;
994 995 996 997 998 999
			}
			if (firstcount1 < track_cnt)
			{
				track[firstcount1].value = value;
				track[firstcount1].count = 1;
			}
1000
		}
1001 1002 1003 1004 1005
	}

	/* We can only compute valid stats if we found some non-null values. */
	if (nonnull_cnt > 0)
	{
1006 1007
		int			nmultiple,
					summultiple;
1008 1009 1010 1011

		stats->stats_valid = true;
		/* Do the simple null-frac and width stats */
		stats->stanullfrac = (double) null_cnt / (double) numrows;
1012
		if (is_varwidth)
1013
			stats->stawidth = total_width / (double) nonnull_cnt;
1014
		else
1015
			stats->stawidth = stats->attrtype->typlen;
1016

1017 1018 1019
		/* Count the number of values we found multiple times */
		summultiple = 0;
		for (nmultiple = 0; nmultiple < track_cnt; nmultiple++)
1020
		{
1021 1022 1023
			if (track[nmultiple].count == 1)
				break;
			summultiple += track[nmultiple].count;
1024
		}
1025 1026

		if (nmultiple == 0)
1027
		{
1028 1029
			/* If we found no repeated values, assume it's a unique column */
			stats->stadistinct = -1.0;
1030
		}
1031 1032
		else if (track_cnt < track_max && toowide_cnt == 0 &&
				 nmultiple == track_cnt)
1033
		{
1034
			/*
1035 1036 1037
			 * Our track list includes every value in the sample, and
			 * every value appeared more than once.  Assume the column has
			 * just these values.
1038 1039
			 */
			stats->stadistinct = track_cnt;
B
Bruce Momjian 已提交
1040
		}
1041 1042 1043 1044
		else
		{
			/*----------
			 * Estimate the number of distinct values using the estimator
1045 1046 1047 1048 1049 1050 1051 1052 1053
			 * proposed by Haas and Stokes in IBM Research Report RJ 10025:
			 *		n*d / (n - f1 + f1*n/N)
			 * where f1 is the number of distinct values that occurred
			 * exactly once in our sample of n rows (from a total of N),
			 * and d is the total number of distinct values in the sample.
			 * This is their Duj1 estimator; the other estimators they
			 * recommend are considerably more complex, and are numerically
			 * very unstable when n is much smaller than N.
			 *
1054 1055
			 * We assume (not very reliably!) that all the multiply-occurring
			 * values are reflected in the final track[] list, and the other
1056
			 * nonnull values all appeared but once.  (XXX this usually
1057
			 * results in a drastic overestimate of ndistinct.	Can we do
1058
			 * any better?)
1059 1060
			 *----------
			 */
1061
			int			f1 = nonnull_cnt - summultiple;
1062
			int			d = f1 + nmultiple;
B
Bruce Momjian 已提交
1063 1064 1065 1066 1067
			double		numer,
						denom,
						stadistinct;

			numer = (double) numrows *(double) d;
1068 1069

			denom = (double) (numrows - f1) +
B
Bruce Momjian 已提交
1070 1071
				(double) f1 *(double) numrows / totalrows;

1072 1073 1074 1075 1076 1077 1078
			stadistinct = numer / denom;
			/* Clamp to sane range in case of roundoff error */
			if (stadistinct < (double) d)
				stadistinct = (double) d;
			if (stadistinct > totalrows)
				stadistinct = totalrows;
			stats->stadistinct = floor(stadistinct + 0.5);
1079
		}
B
Bruce Momjian 已提交
1080

1081 1082 1083 1084 1085 1086 1087
		/*
		 * If we estimated the number of distinct values at more than 10%
		 * of the total row count (a very arbitrary limit), then assume
		 * that stadistinct should scale with the row count rather than be
		 * a fixed value.
		 */
		if (stats->stadistinct > 0.1 * totalrows)
1088
			stats->stadistinct = -(stats->stadistinct / totalrows);
B
Bruce Momjian 已提交
1089

1090 1091 1092 1093
		/*
		 * Decide how many values are worth storing as most-common values.
		 * If we are able to generate a complete MCV list (all the values
		 * in the sample will fit, and we think these are all the ones in
1094 1095 1096 1097
		 * the table), then do so.	Otherwise, store only those values
		 * that are significantly more common than the (estimated)
		 * average. We set the threshold rather arbitrarily at 25% more
		 * than average, with at least 2 instances in the sample.
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
		 */
		if (track_cnt < track_max && toowide_cnt == 0 &&
			stats->stadistinct > 0 &&
			track_cnt <= num_mcv)
		{
			/* Track list includes all values seen, and all will fit */
			num_mcv = track_cnt;
		}
		else
		{
1108 1109 1110
			double		ndistinct = stats->stadistinct;
			double		avgcount,
						mincount;
1111 1112

			if (ndistinct < 0)
1113
				ndistinct = -ndistinct * totalrows;
1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
			/* estimate # of occurrences in sample of a typical value */
			avgcount = (double) numrows / ndistinct;
			/* set minimum threshold count to store a value */
			mincount = avgcount * 1.25;
			if (mincount < 2)
				mincount = 2;
			if (num_mcv > track_cnt)
				num_mcv = track_cnt;
			for (i = 0; i < num_mcv; i++)
			{
				if (track[i].count < mincount)
				{
					num_mcv = i;
					break;
				}
			}
		}

		/* Generate MCV slot entry */
1133
		if (num_mcv > 0)
B
Bruce Momjian 已提交
1134
		{
1135
			MemoryContext old_context;
1136 1137
			Datum	   *mcv_values;
			float4	   *mcv_freqs;
1138

1139 1140
			/* Must copy the target values into anl_context */
			old_context = MemoryContextSwitchTo(anl_context);
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157
			mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
			mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
			for (i = 0; i < num_mcv; i++)
			{
				mcv_values[i] = datumCopy(track[i].value,
										  stats->attr->attbyval,
										  stats->attr->attlen);
				mcv_freqs[i] = (double) track[i].count / (double) numrows;
			}
			MemoryContextSwitchTo(old_context);

			stats->stakind[0] = STATISTIC_KIND_MCV;
			stats->staop[0] = stats->eqopr;
			stats->stanumbers[0] = mcv_freqs;
			stats->numnumbers[0] = num_mcv;
			stats->stavalues[0] = mcv_values;
			stats->numvalues[0] = num_mcv;
B
Bruce Momjian 已提交
1158 1159
		}
	}
1160 1161

	/* We don't need to bother cleaning up any of our temporary palloc's */
B
Bruce Momjian 已提交
1162 1163 1164 1165
}


/*
1166
 *	compute_scalar_stats() -- compute column statistics
B
Bruce Momjian 已提交
1167
 *
1168
 *	We use this when we can find "=" and "<" operators for the datatype.
1169
 *
1170 1171 1172
 *	We determine the fraction of non-null rows, the average width, the
 *	most common values, the (estimated) number of distinct values, the
 *	distribution histogram, and the correlation of physical to logical order.
B
Bruce Momjian 已提交
1173
 *
1174 1175
 *	The desired stats can be determined fairly easily after sorting the
 *	data values into order.
B
Bruce Momjian 已提交
1176 1177
 */
static void
1178
compute_scalar_stats(VacAttrStats *stats,
1179
					 TupleDesc tupDesc, double totalrows,
1180
					 HeapTuple *rows, int numrows)
B
Bruce Momjian 已提交
1181
{
1182 1183 1184 1185 1186 1187 1188
	int			i;
	int			null_cnt = 0;
	int			nonnull_cnt = 0;
	int			toowide_cnt = 0;
	double		total_width = 0;
	bool		is_varlena = (!stats->attr->attbyval &&
							  stats->attr->attlen == -1);
1189 1190
	bool		is_varwidth = (!stats->attr->attbyval &&
							   stats->attr->attlen < 0);
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
	double		corr_xysum;
	RegProcedure cmpFn;
	SortFunctionKind cmpFnKind;
	FmgrInfo	f_cmpfn;
	ScalarItem *values;
	int			values_cnt = 0;
	int		   *tupnoLink;
	ScalarMCVItem *track;
	int			track_cnt = 0;
	int			num_mcv = stats->attr->attstattarget;
1201
	int			num_bins = stats->attr->attstattarget;
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211

	values = (ScalarItem *) palloc(numrows * sizeof(ScalarItem));
	tupnoLink = (int *) palloc(numrows * sizeof(int));
	track = (ScalarMCVItem *) palloc(num_mcv * sizeof(ScalarMCVItem));

	SelectSortFunction(stats->ltopr, &cmpFn, &cmpFnKind);
	fmgr_info(cmpFn, &f_cmpfn);

	/* Initial scan to find sortable values */
	for (i = 0; i < numrows; i++)
B
Bruce Momjian 已提交
1212
	{
1213 1214 1215
		HeapTuple	tuple = rows[i];
		Datum		value;
		bool		isnull;
B
Bruce Momjian 已提交
1216

1217
		vacuum_delay_point();
1218

1219
		value = heap_getattr(tuple, stats->attnum, tupDesc, &isnull);
B
Bruce Momjian 已提交
1220

1221 1222
		/* Check for null/nonnull */
		if (isnull)
B
Bruce Momjian 已提交
1223
		{
1224 1225
			null_cnt++;
			continue;
B
Bruce Momjian 已提交
1226
		}
1227
		nonnull_cnt++;
B
Bruce Momjian 已提交
1228

1229
		/*
1230
		 * If it's a variable-width field, add up widths for average width
1231 1232 1233
		 * calculation.  Note that if the value is toasted, we use the
		 * toasted width.  We don't bother with this calculation if it's a
		 * fixed-width type.
1234 1235
		 */
		if (is_varlena)
B
Bruce Momjian 已提交
1236
		{
1237
			total_width += VARSIZE(DatumGetPointer(value));
1238

1239 1240
			/*
			 * If the value is toasted, we want to detoast it just once to
1241 1242 1243 1244
			 * avoid repeated detoastings and resultant excess memory
			 * usage during the comparisons.  Also, check to see if the
			 * value is excessively wide, and if so don't detoast at all
			 * --- just ignore the value.
1245 1246
			 */
			if (toast_raw_datum_size(value) > WIDTH_THRESHOLD)
B
Bruce Momjian 已提交
1247
			{
1248 1249
				toowide_cnt++;
				continue;
B
Bruce Momjian 已提交
1250
			}
1251 1252
			value = PointerGetDatum(PG_DETOAST_DATUM(value));
		}
1253 1254 1255 1256 1257
		else if (is_varwidth)
		{
			/* must be cstring */
			total_width += strlen(DatumGetCString(value)) + 1;
		}
B
Bruce Momjian 已提交
1258

1259 1260 1261 1262 1263 1264 1265 1266 1267 1268
		/* Add it to the list to be sorted */
		values[values_cnt].value = value;
		values[values_cnt].tupno = values_cnt;
		tupnoLink[values_cnt] = values_cnt;
		values_cnt++;
	}

	/* We can only compute valid stats if we found some sortable values. */
	if (values_cnt > 0)
	{
1269 1270 1271 1272 1273
		int			ndistinct,	/* # distinct values in sample */
					nmultiple,	/* # that appear multiple times */
					num_hist,
					dups_cnt;
		int			slot_idx = 0;
1274 1275 1276 1277 1278 1279 1280 1281 1282

		/* Sort the collected values */
		datumCmpFn = &f_cmpfn;
		datumCmpFnKind = cmpFnKind;
		datumCmpTupnoLink = tupnoLink;
		qsort((void *) values, values_cnt,
			  sizeof(ScalarItem), compare_scalars);

		/*
1283 1284
		 * Now scan the values in order, find the most common ones, and
		 * also accumulate ordering-correlation statistics.
1285 1286
		 *
		 * To determine which are most common, we first have to count the
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
		 * number of duplicates of each value.	The duplicates are
		 * adjacent in the sorted list, so a brute-force approach is to
		 * compare successive datum values until we find two that are not
		 * equal. However, that requires N-1 invocations of the datum
		 * comparison routine, which are completely redundant with work
		 * that was done during the sort.  (The sort algorithm must at
		 * some point have compared each pair of items that are adjacent
		 * in the sorted order; otherwise it could not know that it's
		 * ordered the pair correctly.) We exploit this by having
		 * compare_scalars remember the highest tupno index that each
		 * ScalarItem has been found equal to.	At the end of the sort, a
		 * ScalarItem's tupnoLink will still point to itself if and only
		 * if it is the last item of its group of duplicates (since the
		 * group will be ordered by tupno).
1301 1302 1303 1304 1305 1306 1307 1308 1309
		 */
		corr_xysum = 0;
		ndistinct = 0;
		nmultiple = 0;
		dups_cnt = 0;
		for (i = 0; i < values_cnt; i++)
		{
			int			tupno = values[i].tupno;

1310
			corr_xysum += ((double) i) * ((double) tupno);
1311 1312
			dups_cnt++;
			if (tupnoLink[tupno] == tupno)
B
Bruce Momjian 已提交
1313
			{
1314 1315 1316
				/* Reached end of duplicates of this value */
				ndistinct++;
				if (dups_cnt > 1)
B
Bruce Momjian 已提交
1317
				{
1318 1319
					nmultiple++;
					if (track_cnt < num_mcv ||
1320
						dups_cnt > track[track_cnt - 1].count)
1321 1322 1323 1324 1325 1326 1327
					{
						/*
						 * Found a new item for the mcv list; find its
						 * position, bubbling down old items if needed.
						 * Loop invariant is that j points at an empty/
						 * replaceable slot.
						 */
1328
						int			j;
1329 1330 1331

						if (track_cnt < num_mcv)
							track_cnt++;
1332
						for (j = track_cnt - 1; j > 0; j--)
1333
						{
1334
							if (dups_cnt <= track[j - 1].count)
1335
								break;
1336 1337
							track[j].count = track[j - 1].count;
							track[j].first = track[j - 1].first;
1338 1339 1340 1341 1342 1343 1344 1345
						}
						track[j].count = dups_cnt;
						track[j].first = i + 1 - dups_cnt;
					}
				}
				dups_cnt = 0;
			}
		}
B
Bruce Momjian 已提交
1346

1347 1348 1349
		stats->stats_valid = true;
		/* Do the simple null-frac and width stats */
		stats->stanullfrac = (double) null_cnt / (double) numrows;
1350
		if (is_varwidth)
1351 1352 1353
			stats->stawidth = total_width / (double) nonnull_cnt;
		else
			stats->stawidth = stats->attrtype->typlen;
B
Bruce Momjian 已提交
1354

1355 1356 1357 1358 1359 1360 1361 1362
		if (nmultiple == 0)
		{
			/* If we found no repeated values, assume it's a unique column */
			stats->stadistinct = -1.0;
		}
		else if (toowide_cnt == 0 && nmultiple == ndistinct)
		{
			/*
1363 1364
			 * Every value in the sample appeared more than once.  Assume
			 * the column has just these values.
1365 1366 1367 1368 1369 1370 1371
			 */
			stats->stadistinct = ndistinct;
		}
		else
		{
			/*----------
			 * Estimate the number of distinct values using the estimator
1372 1373 1374 1375 1376 1377 1378 1379 1380
			 * proposed by Haas and Stokes in IBM Research Report RJ 10025:
			 *		n*d / (n - f1 + f1*n/N)
			 * where f1 is the number of distinct values that occurred
			 * exactly once in our sample of n rows (from a total of N),
			 * and d is the total number of distinct values in the sample.
			 * This is their Duj1 estimator; the other estimators they
			 * recommend are considerably more complex, and are numerically
			 * very unstable when n is much smaller than N.
			 *
1381 1382 1383
			 * Overwidth values are assumed to have been distinct.
			 *----------
			 */
1384
			int			f1 = ndistinct - nmultiple + toowide_cnt;
1385
			int			d = f1 + nmultiple;
B
Bruce Momjian 已提交
1386 1387 1388 1389 1390
			double		numer,
						denom,
						stadistinct;

			numer = (double) numrows *(double) d;
1391 1392

			denom = (double) (numrows - f1) +
B
Bruce Momjian 已提交
1393 1394
				(double) f1 *(double) numrows / totalrows;

1395 1396 1397 1398 1399 1400 1401
			stadistinct = numer / denom;
			/* Clamp to sane range in case of roundoff error */
			if (stadistinct < (double) d)
				stadistinct = (double) d;
			if (stadistinct > totalrows)
				stadistinct = totalrows;
			stats->stadistinct = floor(stadistinct + 0.5);
1402 1403 1404 1405 1406 1407 1408 1409 1410
		}

		/*
		 * If we estimated the number of distinct values at more than 10%
		 * of the total row count (a very arbitrary limit), then assume
		 * that stadistinct should scale with the row count rather than be
		 * a fixed value.
		 */
		if (stats->stadistinct > 0.1 * totalrows)
1411
			stats->stadistinct = -(stats->stadistinct / totalrows);
1412

1413 1414 1415 1416
		/*
		 * Decide how many values are worth storing as most-common values.
		 * If we are able to generate a complete MCV list (all the values
		 * in the sample will fit, and we think these are all the ones in
1417 1418 1419 1420 1421 1422 1423 1424
		 * the table), then do so.	Otherwise, store only those values
		 * that are significantly more common than the (estimated)
		 * average. We set the threshold rather arbitrarily at 25% more
		 * than average, with at least 2 instances in the sample.  Also,
		 * we won't suppress values that have a frequency of at least 1/K
		 * where K is the intended number of histogram bins; such values
		 * might otherwise cause us to emit duplicate histogram bin
		 * boundaries.
1425 1426 1427 1428 1429 1430 1431 1432 1433 1434
		 */
		if (track_cnt == ndistinct && toowide_cnt == 0 &&
			stats->stadistinct > 0 &&
			track_cnt <= num_mcv)
		{
			/* Track list includes all values seen, and all will fit */
			num_mcv = track_cnt;
		}
		else
		{
1435 1436 1437 1438
			double		ndistinct = stats->stadistinct;
			double		avgcount,
						mincount,
						maxmincount;
1439 1440

			if (ndistinct < 0)
1441
				ndistinct = -ndistinct * totalrows;
1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
			/* estimate # of occurrences in sample of a typical value */
			avgcount = (double) numrows / ndistinct;
			/* set minimum threshold count to store a value */
			mincount = avgcount * 1.25;
			if (mincount < 2)
				mincount = 2;
			/* don't let threshold exceed 1/K, however */
			maxmincount = (double) numrows / (double) num_bins;
			if (mincount > maxmincount)
				mincount = maxmincount;
			if (num_mcv > track_cnt)
				num_mcv = track_cnt;
			for (i = 0; i < num_mcv; i++)
			{
				if (track[i].count < mincount)
				{
					num_mcv = i;
					break;
				}
			}
		}

		/* Generate MCV slot entry */
1465 1466 1467
		if (num_mcv > 0)
		{
			MemoryContext old_context;
1468 1469
			Datum	   *mcv_values;
			float4	   *mcv_freqs;
1470

1471 1472
			/* Must copy the target values into anl_context */
			old_context = MemoryContextSwitchTo(anl_context);
1473 1474 1475 1476 1477 1478 1479 1480
			mcv_values = (Datum *) palloc(num_mcv * sizeof(Datum));
			mcv_freqs = (float4 *) palloc(num_mcv * sizeof(float4));
			for (i = 0; i < num_mcv; i++)
			{
				mcv_values[i] = datumCopy(values[track[i].first].value,
										  stats->attr->attbyval,
										  stats->attr->attlen);
				mcv_freqs[i] = (double) track[i].count / (double) numrows;
B
Bruce Momjian 已提交
1481
			}
1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
			MemoryContextSwitchTo(old_context);

			stats->stakind[slot_idx] = STATISTIC_KIND_MCV;
			stats->staop[slot_idx] = stats->eqopr;
			stats->stanumbers[slot_idx] = mcv_freqs;
			stats->numnumbers[slot_idx] = num_mcv;
			stats->stavalues[slot_idx] = mcv_values;
			stats->numvalues[slot_idx] = num_mcv;
			slot_idx++;
		}
B
Bruce Momjian 已提交
1492

1493 1494 1495 1496 1497 1498
		/*
		 * Generate a histogram slot entry if there are at least two
		 * distinct values not accounted for in the MCV list.  (This
		 * ensures the histogram won't collapse to empty or a singleton.)
		 */
		num_hist = ndistinct - num_mcv;
1499 1500
		if (num_hist > num_bins)
			num_hist = num_bins + 1;
1501 1502 1503
		if (num_hist >= 2)
		{
			MemoryContext old_context;
1504 1505
			Datum	   *hist_values;
			int			nvals;
B
Bruce Momjian 已提交
1506

1507 1508 1509
			/* Sort the MCV items into position order to speed next loop */
			qsort((void *) track, num_mcv,
				  sizeof(ScalarMCVItem), compare_mcvs);
B
Bruce Momjian 已提交
1510 1511

			/*
1512
			 * Collapse out the MCV items from the values[] array.
B
Bruce Momjian 已提交
1513
			 *
1514
			 * Note we destroy the values[] array here... but we don't need
1515 1516 1517
			 * it for anything more.  We do, however, still need
			 * values_cnt. nvals will be the number of remaining entries
			 * in values[].
B
Bruce Momjian 已提交
1518
			 */
1519
			if (num_mcv > 0)
B
Bruce Momjian 已提交
1520
			{
1521 1522 1523
				int			src,
							dest;
				int			j;
B
Bruce Momjian 已提交
1524

1525 1526 1527 1528
				src = dest = 0;
				j = 0;			/* index of next interesting MCV item */
				while (src < values_cnt)
				{
1529
					int			ncopy;
1530 1531 1532

					if (j < num_mcv)
					{
1533
						int			first = track[j].first;
1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555

						if (src >= first)
						{
							/* advance past this MCV item */
							src = first + track[j].count;
							j++;
							continue;
						}
						ncopy = first - src;
					}
					else
						ncopy = values_cnt - src;
					memmove(&values[dest], &values[src],
							ncopy * sizeof(ScalarItem));
					src += ncopy;
					dest += ncopy;
				}
				nvals = dest;
			}
			else
				nvals = values_cnt;
			Assert(nvals >= num_hist);
B
Bruce Momjian 已提交
1556

1557 1558
			/* Must copy the target values into anl_context */
			old_context = MemoryContextSwitchTo(anl_context);
1559 1560 1561
			hist_values = (Datum *) palloc(num_hist * sizeof(Datum));
			for (i = 0; i < num_hist; i++)
			{
1562
				int			pos;
B
Bruce Momjian 已提交
1563

1564 1565 1566 1567
				pos = (i * (nvals - 1)) / (num_hist - 1);
				hist_values[i] = datumCopy(values[pos].value,
										   stats->attr->attbyval,
										   stats->attr->attlen);
B
Bruce Momjian 已提交
1568
			}
1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581
			MemoryContextSwitchTo(old_context);

			stats->stakind[slot_idx] = STATISTIC_KIND_HISTOGRAM;
			stats->staop[slot_idx] = stats->ltopr;
			stats->stavalues[slot_idx] = hist_values;
			stats->numvalues[slot_idx] = num_hist;
			slot_idx++;
		}

		/* Generate a correlation entry if there are multiple values */
		if (values_cnt > 1)
		{
			MemoryContext old_context;
1582 1583 1584
			float4	   *corrs;
			double		corr_xsum,
						corr_x2sum;
1585

1586 1587
			/* Must copy the target values into anl_context */
			old_context = MemoryContextSwitchTo(anl_context);
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599
			corrs = (float4 *) palloc(sizeof(float4));
			MemoryContextSwitchTo(old_context);

			/*----------
			 * Since we know the x and y value sets are both
			 *		0, 1, ..., values_cnt-1
			 * we have sum(x) = sum(y) =
			 *		(values_cnt-1)*values_cnt / 2
			 * and sum(x^2) = sum(y^2) =
			 *		(values_cnt-1)*values_cnt*(2*values_cnt-1) / 6.
			 *----------
			 */
1600 1601 1602 1603
			corr_xsum = ((double) (values_cnt - 1)) *
				((double) values_cnt) / 2.0;
			corr_x2sum = ((double) (values_cnt - 1)) *
				((double) values_cnt) * (double) (2 * values_cnt - 1) / 6.0;
1604

1605 1606 1607 1608 1609 1610 1611 1612 1613
			/* And the correlation coefficient reduces to */
			corrs[0] = (values_cnt * corr_xysum - corr_xsum * corr_xsum) /
				(values_cnt * corr_x2sum - corr_xsum * corr_xsum);

			stats->stakind[slot_idx] = STATISTIC_KIND_CORRELATION;
			stats->staop[slot_idx] = stats->ltopr;
			stats->stanumbers[slot_idx] = corrs;
			stats->numnumbers[slot_idx] = 1;
			slot_idx++;
B
Bruce Momjian 已提交
1614 1615
		}
	}
1616 1617

	/* We don't need to bother cleaning up any of our temporary palloc's */
B
Bruce Momjian 已提交
1618 1619 1620
}

/*
1621
 * qsort comparator for sorting ScalarItems
B
Bruce Momjian 已提交
1622
 *
1623
 * Aside from sorting the items, we update the datumCmpTupnoLink[] array
1624
 * whenever two ScalarItems are found to contain equal datums.	The array
1625 1626 1627
 * is indexed by tupno; for each ScalarItem, it contains the highest
 * tupno that that item's datum has been found to be equal to.  This allows
 * us to avoid additional comparisons in compute_scalar_stats().
B
Bruce Momjian 已提交
1628
 */
1629 1630
static int
compare_scalars(const void *a, const void *b)
B
Bruce Momjian 已提交
1631
{
1632 1633 1634 1635
	Datum		da = ((ScalarItem *) a)->value;
	int			ta = ((ScalarItem *) a)->tupno;
	Datum		db = ((ScalarItem *) b)->value;
	int			tb = ((ScalarItem *) b)->tupno;
1636
	int32		compare;
B
Bruce Momjian 已提交
1637

1638 1639 1640 1641
	compare = ApplySortFunction(datumCmpFn, datumCmpFnKind,
								da, false, db, false);
	if (compare != 0)
		return compare;
B
Bruce Momjian 已提交
1642

1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675
	/*
	 * The two datums are equal, so update datumCmpTupnoLink[].
	 */
	if (datumCmpTupnoLink[ta] < tb)
		datumCmpTupnoLink[ta] = tb;
	if (datumCmpTupnoLink[tb] < ta)
		datumCmpTupnoLink[tb] = ta;

	/*
	 * For equal datums, sort by tupno
	 */
	return ta - tb;
}

/*
 * qsort comparator for sorting ScalarMCVItems by position
 */
static int
compare_mcvs(const void *a, const void *b)
{
	int			da = ((ScalarMCVItem *) a)->first;
	int			db = ((ScalarMCVItem *) b)->first;

	return da - db;
}


/*
 *	update_attstats() -- update attribute statistics for one relation
 *
 *		Statistics are stored in several places: the pg_class row for the
 *		relation has stats about the whole relation, and there is a
 *		pg_statistic row for each (non-system) attribute that has ever
1676
 *		been analyzed.	The pg_class values are updated by VACUUM, not here.
1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
 *
 *		pg_statistic rows are just added or updated normally.  This means
 *		that pg_statistic will probably contain some deleted rows at the
 *		completion of a vacuum cycle, unless it happens to get vacuumed last.
 *
 *		To keep things simple, we punt for pg_statistic, and don't try
 *		to compute or store rows for pg_statistic itself in pg_statistic.
 *		This could possibly be made to work, but it's not worth the trouble.
 *		Note analyze_rel() has seen to it that we won't come here when
 *		vacuuming pg_statistic itself.
1687 1688 1689 1690 1691 1692 1693 1694
 *
 *		Note: if two backends concurrently try to analyze the same relation,
 *		the second one is likely to fail here with a "tuple concurrently
 *		updated" error.  This is slightly annoying, but no real harm is done.
 *		We could prevent the problem by using a stronger lock on the
 *		relation for ANALYZE (ie, ShareUpdateExclusiveLock instead
 *		of AccessShareLock); but that cure seems worse than the disease,
 *		especially now that ANALYZE doesn't start a new transaction
B
Bruce Momjian 已提交
1695
 *		for each relation.	The lock could be held for a long time...
1696 1697 1698 1699 1700 1701 1702
 */
static void
update_attstats(Oid relid, int natts, VacAttrStats **vacattrstats)
{
	Relation	sd;
	int			attno;

1703
	sd = heap_openr(StatisticRelationName, RowExclusiveLock);
1704 1705

	for (attno = 0; attno < natts; attno++)
B
Bruce Momjian 已提交
1706
	{
1707 1708 1709
		VacAttrStats *stats = vacattrstats[attno];
		HeapTuple	stup,
					oldtup;
1710 1711 1712
		int			i,
					k,
					n;
1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724
		Datum		values[Natts_pg_statistic];
		char		nulls[Natts_pg_statistic];
		char		replaces[Natts_pg_statistic];

		/* Ignore attr if we weren't able to collect stats */
		if (!stats->stats_valid)
			continue;

		/*
		 * Construct a new pg_statistic tuple
		 */
		for (i = 0; i < Natts_pg_statistic; ++i)
B
Bruce Momjian 已提交
1725
		{
1726 1727 1728
			nulls[i] = ' ';
			replaces[i] = 'r';
		}
B
Bruce Momjian 已提交
1729

1730
		i = 0;
1731 1732 1733 1734 1735
		values[i++] = ObjectIdGetDatum(relid);	/* starelid */
		values[i++] = Int16GetDatum(stats->attnum);		/* staattnum */
		values[i++] = Float4GetDatum(stats->stanullfrac);		/* stanullfrac */
		values[i++] = Int32GetDatum(stats->stawidth);	/* stawidth */
		values[i++] = Float4GetDatum(stats->stadistinct);		/* stadistinct */
1736 1737
		for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
		{
1738
			values[i++] = Int16GetDatum(stats->stakind[k]);		/* stakindN */
1739 1740 1741
		}
		for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
		{
1742
			values[i++] = ObjectIdGetDatum(stats->staop[k]);	/* staopN */
1743 1744 1745
		}
		for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
		{
1746
			int			nnum = stats->numnumbers[k];
1747 1748

			if (nnum > 0)
B
Bruce Momjian 已提交
1749
			{
1750 1751 1752 1753 1754 1755 1756
				Datum	   *numdatums = (Datum *) palloc(nnum * sizeof(Datum));
				ArrayType  *arry;

				for (n = 0; n < nnum; n++)
					numdatums[n] = Float4GetDatum(stats->stanumbers[k][n]);
				/* XXX knows more than it should about type float4: */
				arry = construct_array(numdatums, nnum,
1757 1758
									   FLOAT4OID,
									   sizeof(float4), false, 'i');
1759
				values[i++] = PointerGetDatum(arry);	/* stanumbersN */
1760 1761 1762 1763 1764
			}
			else
			{
				nulls[i] = 'n';
				values[i++] = (Datum) 0;
B
Bruce Momjian 已提交
1765 1766
			}
		}
1767 1768
		for (k = 0; k < STATISTIC_NUM_SLOTS; k++)
		{
1769
			if (stats->numvalues[k] > 0)
1770 1771
			{
				ArrayType  *arry;
B
Bruce Momjian 已提交
1772

1773 1774 1775 1776 1777 1778
				arry = construct_array(stats->stavalues[k],
									   stats->numvalues[k],
									   stats->attr->atttypid,
									   stats->attrtype->typlen,
									   stats->attrtype->typbyval,
									   stats->attrtype->typalign);
1779
				values[i++] = PointerGetDatum(arry);	/* stavaluesN */
1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808
			}
			else
			{
				nulls[i] = 'n';
				values[i++] = (Datum) 0;
			}
		}

		/* Is there already a pg_statistic tuple for this attribute? */
		oldtup = SearchSysCache(STATRELATT,
								ObjectIdGetDatum(relid),
								Int16GetDatum(stats->attnum),
								0, 0);

		if (HeapTupleIsValid(oldtup))
		{
			/* Yes, replace it */
			stup = heap_modifytuple(oldtup,
									sd,
									values,
									nulls,
									replaces);
			ReleaseSysCache(oldtup);
			simple_heap_update(sd, &stup->t_self, stup);
		}
		else
		{
			/* No, insert new tuple */
			stup = heap_formtuple(sd->rd_att, values, nulls);
1809
			simple_heap_insert(sd, stup);
1810 1811
		}

1812 1813
		/* update indexes too */
		CatalogUpdateIndexes(sd, stup);
1814 1815 1816 1817

		heap_freetuple(stup);
	}

1818
	heap_close(sd, RowExclusiveLock);
B
Bruce Momjian 已提交
1819
}