pgstat.c 95.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* ----------
 * pgstat.c
 *
 *	All the statistics collector stuff hacked up in one big, ugly file.
 *
 *	TODO:	- Separate collector, postmaster and backend stuff
 *			  into different files.
 *
 *			- Add some automatic call for pgstat vacuuming.
 *
 *			- Add a pgstat config column to pg_database, so this
12
 *			  entire thing can be enabled/disabled on a per db basis.
13
 *
14
 *	Copyright (c) 2001-2008, PostgreSQL Global Development Group
15
 *
16
 *	$PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.176 2008/06/30 10:58:47 heikki Exp $
17 18
 * ----------
 */
P
Peter Eisentraut 已提交
19 20
#include "postgres.h"

21 22 23 24 25
#include <unistd.h>
#include <fcntl.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/socket.h>
B
Bruce Momjian 已提交
26
#include <netdb.h>
27 28 29
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
30
#include <time.h>
31 32 33 34 35 36
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
37

38 39
#include "pgstat.h"

40
#include "access/heapam.h"
41
#include "access/transam.h"
42
#include "access/twophase_rmgr.h"
43
#include "access/xact.h"
44
#include "catalog/pg_database.h"
45
#include "catalog/pg_proc.h"
46
#include "libpq/ip.h"
B
Bruce Momjian 已提交
47
#include "libpq/libpq.h"
48
#include "libpq/pqsignal.h"
49
#include "mb/pg_wchar.h"
50
#include "miscadmin.h"
51
#include "postmaster/autovacuum.h"
52
#include "postmaster/fork_process.h"
53
#include "postmaster/postmaster.h"
54
#include "storage/backendid.h"
55
#include "storage/fd.h"
56
#include "storage/ipc.h"
57
#include "storage/pg_shmem.h"
58
#include "storage/pmsignal.h"
59
#include "utils/guc.h"
60
#include "utils/memutils.h"
61
#include "utils/ps_status.h"
62
#include "utils/rel.h"
63
#include "utils/tqual.h"
64 65


66
/* ----------
67
 * Paths for the statistics files (relative to installation's $PGDATA).
68 69
 * ----------
 */
70 71
#define PGSTAT_STAT_FILENAME	"global/pgstat.stat"
#define PGSTAT_STAT_TMPFILE		"global/pgstat.tmp"
72 73 74 75 76

/* ----------
 * Timer definitions.
 * ----------
 */
77 78
#define PGSTAT_STAT_INTERVAL	500		/* How often to write the status file;
										 * in milliseconds. */
79

80 81 82
#define PGSTAT_RESTART_INTERVAL 60		/* How often to attempt to restart a
										 * failed statistics collector; in
										 * seconds. */
83

84 85 86
#define PGSTAT_SELECT_TIMEOUT	2		/* How often to check for postmaster
										 * death; in seconds. */

87 88 89 90 91 92 93

/* ----------
 * The initial size hints for the hash tables used in the collector.
 * ----------
 */
#define PGSTAT_DB_HASH_SIZE		16
#define PGSTAT_TAB_HASH_SIZE	512
94
#define PGSTAT_FUNCTION_HASH_SIZE	512
95 96


97
/* ----------
98
 * GUC parameters
99 100
 * ----------
 */
101 102
bool		pgstat_track_activities = false;
bool		pgstat_track_counts = false;
103
int			pgstat_track_functions = TRACK_FUNC_OFF;
104
int			pgstat_track_activity_query_size = 1024;
105

106 107 108 109 110 111 112
/*
 * BgWriter global statistics counters (unused in other processes).
 * Stored directly in a stats message structure so it can be sent
 * without needing to copy things around.  We assume this inits to zeroes.
 */
PgStat_MsgBgWriter BgWriterStats;

113 114 115 116
/* ----------
 * Local data
 * ----------
 */
B
Bruce Momjian 已提交
117
NON_EXEC_STATIC int pgStatSock = -1;
118

B
Bruce Momjian 已提交
119
static struct sockaddr_storage pgStatAddr;
120

121
static time_t last_pgstat_start_time;
122

123
static bool pgStatRunningInCollector = false;
124

125
/*
126 127
 * Structures in which backends store per-table info that's waiting to be
 * sent to the collector.
128
 *
129 130 131 132 133 134 135
 * NOTE: once allocated, TabStatusArray structures are never moved or deleted
 * for the life of the backend.  Also, we zero out the t_id fields of the
 * contained PgStat_TableStatus structs whenever they are not actively in use.
 * This allows relcache pgstat_info pointers to be treated as long-lived data,
 * avoiding repeated searches in pgstat_initstats() when a relation is
 * repeatedly opened during a transaction.
 */
B
Bruce Momjian 已提交
136
#define TABSTAT_QUANTUM		100 /* we alloc this many at a time */
137 138

typedef struct TabStatusArray
139
{
140
	struct TabStatusArray *tsa_next;	/* link to next array, if any */
B
Bruce Momjian 已提交
141
	int			tsa_used;		/* # entries currently used */
142
	PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM];	/* per-table data */
143
} TabStatusArray;
144 145

static TabStatusArray *pgStatTabList = NULL;
B
Bruce Momjian 已提交
146

147 148 149 150 151 152
/*
 * Backends store per-function info that's waiting to be sent to the collector
 * in this hash table (indexed by function OID).
 */
static HTAB *pgStatFunctions = NULL;

153 154 155 156 157 158 159 160 161
/*
 * Tuple insertion/deletion counts for an open transaction can't be propagated
 * into PgStat_TableStatus counters until we know if it is going to commit
 * or abort.  Hence, we keep these counts in per-subxact structs that live
 * in TopTransactionContext.  This data structure is designed on the assumption
 * that subxacts won't usually modify very many tables.
 */
typedef struct PgStat_SubXactStatus
{
B
Bruce Momjian 已提交
162
	int			nest_level;		/* subtransaction nest level */
163 164
	struct PgStat_SubXactStatus *prev;	/* higher-level subxact if any */
	PgStat_TableXactStatus *first;		/* head of list for this subxact */
165
} PgStat_SubXactStatus;
166

167
static PgStat_SubXactStatus *pgStatXactStack = NULL;
168

169 170
static int	pgStatXactCommit = 0;
static int	pgStatXactRollback = 0;
171

172 173 174
/* Record that's written to 2PC state file when pgstat state is persisted */
typedef struct TwoPhasePgStatRecord
{
B
Bruce Momjian 已提交
175 176 177 178
	PgStat_Counter tuples_inserted;		/* tuples inserted in xact */
	PgStat_Counter tuples_deleted;		/* tuples deleted in xact */
	Oid			t_id;			/* table's OID */
	bool		t_shared;		/* is it a shared catalog? */
179
} TwoPhasePgStatRecord;
180 181 182 183

/*
 * Info about current "snapshot" of stats file
 */
184
static MemoryContext pgStatLocalContext = NULL;
185
static HTAB *pgStatDBHash = NULL;
186 187
static PgBackendStatus *localBackendStatusTable = NULL;
static int	localNumBackends = 0;
188

189 190 191 192 193 194 195
/*
 * Cluster wide statistics, kept in the stats collector.
 * Contains statistics that are not collected per database
 * or per table.
 */
static PgStat_GlobalStats globalStats;

B
Bruce Momjian 已提交
196 197
static volatile bool need_exit = false;
static volatile bool need_statwrite = false;
198

199 200 201 202 203 204 205
/*
 * Total time charged to functions so far in the current backend.
 * We use this to help separate "self" and "other" time charges.
 * (We assume this initializes to zero.)
 */
static instr_time total_func_time;

206

207 208 209 210
/* ----------
 * Local function forward declarations
 * ----------
 */
211
#ifdef EXEC_BACKEND
212
static pid_t pgstat_forkexec(void);
213
#endif
214 215

NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
216
static void pgstat_exit(SIGNAL_ARGS);
217
static void force_statwrite(SIGNAL_ARGS);
218
static void pgstat_beshutdown_hook(int code, Datum arg);
219

220
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
221
static void pgstat_write_statsfile(void);
222
static HTAB *pgstat_read_statsfile(Oid onlydb);
223
static void backend_read_statsfile(void);
224
static void pgstat_read_current_status(void);
225 226

static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
227
static void pgstat_send_funcstats(void);
228
static HTAB *pgstat_collect_oids(Oid catalogid);
229

230 231
static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);

232 233
static void pgstat_setup_memcxt(void);

234
static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
235 236 237 238 239 240
static void pgstat_send(void *msg, int len);

static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
241 242 243
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
244
static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
245 246
static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
247 248 249 250 251 252 253 254 255 256 257


/* ------------------------------------------------------------
 * Public functions called from postmaster follow
 * ------------------------------------------------------------
 */

/* ----------
 * pgstat_init() -
 *
 *	Called from postmaster at startup. Create the resources required
258 259 260
 *	by the statistics collector process.  If unable to do so, do not
 *	fail --- better to let the postmaster start with stats collection
 *	disabled.
261 262
 * ----------
 */
263
void
264 265
pgstat_init(void)
{
B
Bruce Momjian 已提交
266 267 268 269
	ACCEPT_TYPE_ARG3 alen;
	struct addrinfo *addrs = NULL,
			   *addr,
				hints;
B
Bruce Momjian 已提交
270
	int			ret;
B
Bruce Momjian 已提交
271
	fd_set		rset;
272
	struct timeval tv;
B
Bruce Momjian 已提交
273 274
	char		test_byte;
	int			sel_res;
275
	int			tries = 0;
B
Bruce Momjian 已提交
276

277
#define TESTBYTEVAL ((char) 199)
278 279

	/*
280
	 * Create the UDP socket for sending and receiving statistic messages
281
	 */
B
Bruce Momjian 已提交
282 283 284 285 286 287 288 289
	hints.ai_flags = AI_PASSIVE;
	hints.ai_family = PF_UNSPEC;
	hints.ai_socktype = SOCK_DGRAM;
	hints.ai_protocol = 0;
	hints.ai_addrlen = 0;
	hints.ai_addr = NULL;
	hints.ai_canonname = NULL;
	hints.ai_next = NULL;
290
	ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
291
	if (ret || !addrs)
B
Bruce Momjian 已提交
292
	{
293
		ereport(LOG,
294
				(errmsg("could not resolve \"localhost\": %s",
295
						gai_strerror(ret))));
B
Bruce Momjian 已提交
296 297
		goto startup_failed;
	}
B
Bruce Momjian 已提交
298

299
	/*
300 301 302
	 * On some platforms, pg_getaddrinfo_all() may return multiple addresses
	 * only one of which will actually work (eg, both IPv6 and IPv4 addresses
	 * when kernel will reject IPv6).  Worse, the failure may occur at the
303
	 * bind() or perhaps even connect() stage.	So we must loop through the
304 305
	 * results till we find a working combination. We will generate LOG
	 * messages, but no error, for bogus combinations.
306
	 */
307 308 309 310 311 312 313
	for (addr = addrs; addr; addr = addr->ai_next)
	{
#ifdef HAVE_UNIX_SOCKETS
		/* Ignore AF_UNIX sockets, if any are returned. */
		if (addr->ai_family == AF_UNIX)
			continue;
#endif
B
Bruce Momjian 已提交
314

315 316
		if (++tries > 1)
			ereport(LOG,
B
Bruce Momjian 已提交
317 318
			(errmsg("trying another address for the statistics collector")));

319 320 321 322 323 324 325
		/*
		 * Create the socket.
		 */
		if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
326
			errmsg("could not create socket for statistics collector: %m")));
327 328 329 330
			continue;
		}

		/*
B
Bruce Momjian 已提交
331 332
		 * Bind it to a kernel assigned port on localhost and get the assigned
		 * port via getsockname().
333 334 335 336 337
		 */
		if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
338
			  errmsg("could not bind socket for statistics collector: %m")));
339 340 341 342 343 344
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		alen = sizeof(pgStatAddr);
B
Bruce Momjian 已提交
345
		if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
346 347 348 349 350 351 352 353 354 355
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not get address of socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		/*
B
Bruce Momjian 已提交
356 357 358 359
		 * Connect the socket to its own address.  This saves a few cycles by
		 * not having to respecify the target address on every send. This also
		 * provides a kernel-level check that only packets from this same
		 * address will be received.
360
		 */
B
Bruce Momjian 已提交
361
		if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
362 363 364
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
365
			errmsg("could not connect socket for statistics collector: %m")));
366 367 368 369
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}
B
Bruce Momjian 已提交
370

371
		/*
B
Bruce Momjian 已提交
372 373 374 375
		 * Try to send and receive a one-byte test message on the socket. This
		 * is to catch situations where the socket can be created but will not
		 * actually pass data (for instance, because kernel packet filtering
		 * rules prevent it).
376 377
		 */
		test_byte = TESTBYTEVAL;
378 379

retry1:
380 381
		if (send(pgStatSock, &test_byte, 1, 0) != 1)
		{
382 383
			if (errno == EINTR)
				goto retry1;	/* if interrupted, just retry */
384 385 386 387 388 389 390 391 392
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not send test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		/*
B
Bruce Momjian 已提交
393 394 395
		 * There could possibly be a little delay before the message can be
		 * received.  We arbitrarily allow up to half a second before deciding
		 * it's broken.
396 397 398 399 400 401 402
		 */
		for (;;)				/* need a loop to handle EINTR */
		{
			FD_ZERO(&rset);
			FD_SET(pgStatSock, &rset);
			tv.tv_sec = 0;
			tv.tv_usec = 500000;
B
Bruce Momjian 已提交
403
			sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
404 405 406 407 408 409 410
			if (sel_res >= 0 || errno != EINTR)
				break;
		}
		if (sel_res < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
411
					 errmsg("select() failed in statistics collector: %m")));
412 413 414 415 416 417 418
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}
		if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
		{
			/*
B
Bruce Momjian 已提交
419 420
			 * This is the case we actually think is likely, so take pains to
			 * give a specific message for it.
421 422 423 424
			 *
			 * errno will not be set meaningfully here, so don't use it.
			 */
			ereport(LOG,
425
					(errcode(ERRCODE_CONNECTION_FAILURE),
426 427 428 429 430 431 432 433
					 errmsg("test message did not get through on socket for statistics collector")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		test_byte++;			/* just make sure variable is changed */

434
retry2:
435 436
		if (recv(pgStatSock, &test_byte, 1, 0) != 1)
		{
437 438
			if (errno == EINTR)
				goto retry2;	/* if interrupted, just retry */
439 440 441 442 443 444 445 446
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not receive test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

B
Bruce Momjian 已提交
447
		if (test_byte != TESTBYTEVAL)	/* strictly paranoia ... */
448 449
		{
			ereport(LOG,
450
					(errcode(ERRCODE_INTERNAL_ERROR),
451 452 453 454 455 456
					 errmsg("incorrect test message transmission on socket for statistics collector")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

457 458
		/* If we get here, we have a working socket */
		break;
459 460
	}

461 462
	/* Did we find a working address? */
	if (!addr || pgStatSock < 0)
463
		goto startup_failed;
464 465

	/*
B
Bruce Momjian 已提交
466
	 * Set the socket to non-blocking IO.  This ensures that if the collector
467 468
	 * falls behind, statistics messages will be discarded; backends won't
	 * block waiting to send messages to the collector.
469
	 */
470
	if (!pg_set_noblock(pgStatSock))
471
	{
472 473
		ereport(LOG,
				(errcode_for_socket_access(),
B
Bruce Momjian 已提交
474
				 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
475
		goto startup_failed;
476 477
	}

478
	pg_freeaddrinfo_all(hints.ai_family, addrs);
479

480
	return;
481 482

startup_failed:
483
	ereport(LOG,
B
Bruce Momjian 已提交
484
	  (errmsg("disabling statistics collector for lack of working socket")));
485

486
	if (addrs)
487
		pg_freeaddrinfo_all(hints.ai_family, addrs);
B
Bruce Momjian 已提交
488

489
	if (pgStatSock >= 0)
490
		closesocket(pgStatSock);
491 492
	pgStatSock = -1;

493 494
	/*
	 * Adjust GUC variables to suppress useless activity, and for debugging
B
Bruce Momjian 已提交
495 496 497
	 * purposes (seeing track_counts off is a clue that we failed here). We
	 * use PGC_S_OVERRIDE because there is no point in trying to turn it back
	 * on from postgresql.conf without a restart.
498 499
	 */
	SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
500 501
}

502 503 504
/*
 * pgstat_reset_all() -
 *
505
 * Remove the stats file.  This is currently used only if WAL
506 507 508 509 510 511 512
 * recovery is needed after a crash.
 */
void
pgstat_reset_all(void)
{
	unlink(PGSTAT_STAT_FILENAME);
}
513

514 515
#ifdef EXEC_BACKEND

516
/*
517
 * pgstat_forkexec() -
518
 *
519
 * Format up the arglist for, then fork and exec, statistics collector process
520
 */
521
static pid_t
522
pgstat_forkexec(void)
523
{
B
Bruce Momjian 已提交
524
	char	   *av[10];
525
	int			ac = 0;
526 527

	av[ac++] = "postgres";
528
	av[ac++] = "--forkcol";
529 530 531 532
	av[ac++] = NULL;			/* filled in by postmaster_forkexec */

	av[ac] = NULL;
	Assert(ac < lengthof(av));
533

534
	return postmaster_forkexec(ac, av);
535
}
B
Bruce Momjian 已提交
536
#endif   /* EXEC_BACKEND */
537

538

539
/*
540 541 542
 * pgstat_start() -
 *
 *	Called from postmaster at startup or after an existing collector
543
 *	died.  Attempt to fire up a fresh statistics collector.
544
 *
545 546
 *	Returns PID of child process, or 0 if fail.
 *
547
 *	Note: if fail, we will be called again from the postmaster main loop.
548
 */
549
int
550
pgstat_start(void)
551
{
552
	time_t		curtime;
553
	pid_t		pgStatPid;
554

555
	/*
B
Bruce Momjian 已提交
556 557
	 * Check that the socket is there, else pgstat_init failed and we can do
	 * nothing useful.
558
	 */
559
	if (pgStatSock < 0)
560
		return 0;
561

562
	/*
B
Bruce Momjian 已提交
563 564 565 566
	 * Do nothing if too soon since last collector start.  This is a safety
	 * valve to protect against continuous respawn attempts if the collector
	 * is dying immediately at launch.	Note that since we will be re-called
	 * from the postmaster main loop, we will get another chance later.
567 568 569 570
	 */
	curtime = time(NULL);
	if ((unsigned int) (curtime - last_pgstat_start_time) <
		(unsigned int) PGSTAT_RESTART_INTERVAL)
571
		return 0;
572 573
	last_pgstat_start_time = curtime;

574
	/*
575
	 * Okay, fork off the collector.
576
	 */
577
#ifdef EXEC_BACKEND
578
	switch ((pgStatPid = pgstat_forkexec()))
579
#else
580
	switch ((pgStatPid = fork_process()))
581
#endif
582 583
	{
		case -1:
584
			ereport(LOG,
585
					(errmsg("could not fork statistics collector: %m")));
586
			return 0;
587

588
#ifndef EXEC_BACKEND
589
		case 0:
590
			/* in postmaster child ... */
591
			/* Close the postmaster's sockets */
592
			ClosePostmasterPorts(false);
593

594 595 596
			/* Lose the postmaster's on-exit routines */
			on_exit_reset();

597 598 599
			/* Drop our connection to postmaster's shared memory, as well */
			PGSharedMemoryDetach();

600
			PgstatCollectorMain(0, NULL);
601
			break;
602
#endif
603 604

		default:
605
			return (int) pgStatPid;
606 607
	}

608 609
	/* shouldn't get here */
	return 0;
610 611
}

B
Bruce Momjian 已提交
612 613
void
allow_immediate_pgstat_restart(void)
614
{
B
Bruce Momjian 已提交
615
	last_pgstat_start_time = 0;
616
}
617 618 619

/* ------------------------------------------------------------
 * Public functions used by backends follow
620
 *------------------------------------------------------------
621 622 623 624
 */


/* ----------
625
 * pgstat_report_stat() -
626
 *
627
 *	Called from tcop/postgres.c to send the so far collected per-table
628 629 630
 *	and function usage statistics to the collector.  Note that this is
 *	called only when not within a transaction, so it is fair to use
 *	transaction stop time as an approximation of current time.
631 632 633
 * ----------
 */
void
634
pgstat_report_stat(bool force)
635
{
636 637
	/* we assume this inits to all zeroes: */
	static const PgStat_TableCounts all_zeroes;
B
Bruce Momjian 已提交
638
	static TimestampTz last_report = 0;
639

640
	TimestampTz now;
641 642 643 644
	PgStat_MsgTabstat regular_msg;
	PgStat_MsgTabstat shared_msg;
	TabStatusArray *tsa;
	int			i;
645 646

	/* Don't expend a clock check if nothing to do */
647
	/* Note: we ignore pending function stats in this test ... OK? */
648 649
	if (pgStatTabList == NULL ||
		pgStatTabList->tsa_used == 0)
650 651 652 653
		return;

	/*
	 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
654
	 * msec since we last sent one, or the caller wants to force stats out.
655 656
	 */
	now = GetCurrentTransactionStopTimestamp();
657 658
	if (!force &&
		!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
659 660 661
		return;
	last_report = now;

662
	/*
663 664
	 * Scan through the TabStatusArray struct(s) to find tables that actually
	 * have counts, and build messages to send.  We have to separate shared
B
Bruce Momjian 已提交
665 666
	 * relations from regular ones because the databaseid field in the message
	 * header has to depend on that.
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
	 */
	regular_msg.m_databaseid = MyDatabaseId;
	shared_msg.m_databaseid = InvalidOid;
	regular_msg.m_nentries = 0;
	shared_msg.m_nentries = 0;

	for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
	{
		for (i = 0; i < tsa->tsa_used; i++)
		{
			PgStat_TableStatus *entry = &tsa->tsa_entries[i];
			PgStat_MsgTabstat *this_msg;
			PgStat_TableEntry *this_ent;

			/* Shouldn't have any pending transaction-dependent counts */
			Assert(entry->trans == NULL);

			/*
B
Bruce Momjian 已提交
685 686
			 * Ignore entries that didn't accumulate any actual counts, such
			 * as indexes that were opened by the planner but not used.
687 688 689 690
			 */
			if (memcmp(&entry->t_counts, &all_zeroes,
					   sizeof(PgStat_TableCounts)) == 0)
				continue;
B
Bruce Momjian 已提交
691

692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
			/*
			 * OK, insert data into the appropriate message, and send if full.
			 */
			this_msg = entry->t_shared ? &shared_msg : &regular_msg;
			this_ent = &this_msg->m_entry[this_msg->m_nentries];
			this_ent->t_id = entry->t_id;
			memcpy(&this_ent->t_counts, &entry->t_counts,
				   sizeof(PgStat_TableCounts));
			if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
			{
				pgstat_send_tabstat(this_msg);
				this_msg->m_nentries = 0;
			}
		}
		/* zero out TableStatus structs after use */
		MemSet(tsa->tsa_entries, 0,
			   tsa->tsa_used * sizeof(PgStat_TableStatus));
		tsa->tsa_used = 0;
	}

	/*
	 * Send partial messages.  If force is true, make sure that any pending
	 * xact commit/abort gets counted, even if no table stats to send.
715
	 */
716 717 718 719 720
	if (regular_msg.m_nentries > 0 ||
		(force && (pgStatXactCommit > 0 || pgStatXactRollback > 0)))
		pgstat_send_tabstat(&regular_msg);
	if (shared_msg.m_nentries > 0)
		pgstat_send_tabstat(&shared_msg);
721 722 723

	/* Now, send function statistics */
	pgstat_send_funcstats();
724 725
}

726
/*
727
 * Subroutine for pgstat_report_stat: finish and send a tabstat message
728
 */
729
static void
730
pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg)
731
{
732 733
	int			n;
	int			len;
734

735 736 737
	/* It's unlikely we'd get here with no socket, but maybe not impossible */
	if (pgStatSock < 0)
		return;
738

739 740 741 742 743 744
	/*
	 * Report accumulated xact commit/rollback whenever we send a normal
	 * tabstat message
	 */
	if (OidIsValid(tsmsg->m_databaseid))
	{
745 746
		tsmsg->m_xact_commit = pgStatXactCommit;
		tsmsg->m_xact_rollback = pgStatXactRollback;
747
		pgStatXactCommit = 0;
748
		pgStatXactRollback = 0;
749 750 751 752 753 754
	}
	else
	{
		tsmsg->m_xact_commit = 0;
		tsmsg->m_xact_rollback = 0;
	}
755

756 757 758
	n = tsmsg->m_nentries;
	len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
		n * sizeof(PgStat_TableEntry);
759

760 761
	pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
	pgstat_send(tsmsg, len);
762 763
}

764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
/*
 * Subroutine for pgstat_report_stat: populate and send a function stat message
 */
static void
pgstat_send_funcstats(void)
{
	/* we assume this inits to all zeroes: */
	static const PgStat_FunctionCounts all_zeroes;

	PgStat_MsgFuncstat msg;
	PgStat_BackendFunctionEntry *entry;
	HASH_SEQ_STATUS fstat;

	if (pgStatFunctions == NULL)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_FUNCSTAT);
	msg.m_databaseid = MyDatabaseId;
	msg.m_nentries = 0;

	hash_seq_init(&fstat, pgStatFunctions);
	while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
	{
		PgStat_FunctionEntry *m_ent;

		/* Skip it if no counts accumulated since last time */
		if (memcmp(&entry->f_counts, &all_zeroes,
				   sizeof(PgStat_FunctionCounts)) == 0)
			continue;

		/* need to convert format of time accumulators */
		m_ent = &msg.m_entry[msg.m_nentries];
		m_ent->f_id = entry->f_id;
		m_ent->f_numcalls = entry->f_counts.f_numcalls;
		m_ent->f_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_time);
		m_ent->f_time_self = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_time_self);

		if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
		{
			pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
						msg.m_nentries * sizeof(PgStat_FunctionEntry));
			msg.m_nentries = 0;
		}

		/* reset the entry's counts */
		MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
	}

	if (msg.m_nentries > 0)
		pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
					msg.m_nentries * sizeof(PgStat_FunctionEntry));
}

817 818

/* ----------
819
 * pgstat_vacuum_stat() -
820 821 822 823
 *
 *	Will tell the collector about objects he can get rid of.
 * ----------
 */
824
void
825
pgstat_vacuum_stat(void)
826
{
827
	HTAB	   *htab;
828
	PgStat_MsgTabpurge msg;
829
	PgStat_MsgFuncpurge f_msg;
830 831 832
	HASH_SEQ_STATUS hstat;
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
833
	PgStat_StatFuncEntry *funcentry;
834
	int			len;
835 836

	if (pgStatSock < 0)
837
		return;
838 839

	/*
B
Bruce Momjian 已提交
840 841
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
842
	 */
843
	backend_read_statsfile();
844 845

	/*
846 847
	 * Read pg_database and make a list of OIDs of all existing databases
	 */
848
	htab = pgstat_collect_oids(DatabaseRelationId);
849 850 851 852 853 854 855 856 857 858

	/*
	 * Search the database hash table for dead databases and tell the
	 * collector to drop them.
	 */
	hash_seq_init(&hstat, pgStatDBHash);
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
	{
		Oid			dbid = dbentry->databaseid;

859 860
		CHECK_FOR_INTERRUPTS();

861 862 863
		/* the DB entry for shared tables (with InvalidOid) is never dropped */
		if (OidIsValid(dbid) &&
			hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
864 865 866 867
			pgstat_drop_database(dbid);
	}

	/* Clean up */
868
	hash_destroy(htab);
869 870 871

	/*
	 * Lookup our own database entry; if not found, nothing more to do.
872
	 */
873 874 875
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &MyDatabaseId,
												 HASH_FIND, NULL);
876 877 878 879 880 881
	if (dbentry == NULL || dbentry->tables == NULL)
		return;

	/*
	 * Similarly to above, make a list of all known relations in this DB.
	 */
882
	htab = pgstat_collect_oids(RelationRelationId);
883 884 885 886 887 888 889

	/*
	 * Initialize our messages table counter to zero
	 */
	msg.m_nentries = 0;

	/*
890
	 * Check for all tables listed in stats hashtable if they still exist.
891
	 */
892
	hash_seq_init(&hstat, dbentry->tables);
893
	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
894
	{
895 896 897 898 899
		Oid			tabid = tabentry->tableid;

		CHECK_FOR_INTERRUPTS();

		if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
900 901 902
			continue;

		/*
903
		 * Not there, so add this table's Oid to the message
904
		 */
905
		msg.m_tableid[msg.m_nentries++] = tabid;
906 907

		/*
908
		 * If the message is full, send it out and reinitialize to empty
909 910 911
		 */
		if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
		{
912
			len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
913
				+msg.m_nentries * sizeof(Oid);
914 915

			pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
916
			msg.m_databaseid = MyDatabaseId;
917 918 919 920 921 922 923 924 925 926 927
			pgstat_send(&msg, len);

			msg.m_nentries = 0;
		}
	}

	/*
	 * Send the rest
	 */
	if (msg.m_nentries > 0)
	{
928
		len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
929
			+msg.m_nentries * sizeof(Oid);
930 931

		pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
932
		msg.m_databaseid = MyDatabaseId;
933 934 935
		pgstat_send(&msg, len);
	}

936
	/* Clean up */
937
	hash_destroy(htab);
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988

	/*
	 * Now repeat the above steps for functions.
	 */
	htab = pgstat_collect_oids(ProcedureRelationId);

	pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
	f_msg.m_databaseid = MyDatabaseId;
	f_msg.m_nentries = 0;

	hash_seq_init(&hstat, dbentry->functions);
	while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
	{
		Oid			funcid = funcentry->functionid;

		CHECK_FOR_INTERRUPTS();

		if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
			continue;

		/*
		 * Not there, so add this function's Oid to the message
		 */
		f_msg.m_functionid[f_msg.m_nentries++] = funcid;

		/*
		 * If the message is full, send it out and reinitialize to empty
		 */
		if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
		{
			len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
				+f_msg.m_nentries * sizeof(Oid);

			pgstat_send(&f_msg, len);

			f_msg.m_nentries = 0;
		}
	}

	/*
	 * Send the rest
	 */
	if (f_msg.m_nentries > 0)
	{
		len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
			+f_msg.m_nentries * sizeof(Oid);

		pgstat_send(&f_msg, len);
	}

	hash_destroy(htab);
989 990 991 992 993 994
}


/* ----------
 * pgstat_collect_oids() -
 *
995 996 997
 *	Collect the OIDs of all objects listed in the specified system catalog
 *	into a temporary hash table.  Caller should hash_destroy the result
 *	when done with it.
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
 * ----------
 */
static HTAB *
pgstat_collect_oids(Oid catalogid)
{
	HTAB	   *htab;
	HASHCTL		hash_ctl;
	Relation	rel;
	HeapScanDesc scan;
	HeapTuple	tup;

	memset(&hash_ctl, 0, sizeof(hash_ctl));
	hash_ctl.keysize = sizeof(Oid);
	hash_ctl.entrysize = sizeof(Oid);
	hash_ctl.hash = oid_hash;
	htab = hash_create("Temporary table of OIDs",
					   PGSTAT_TAB_HASH_SIZE,
					   &hash_ctl,
					   HASH_ELEM | HASH_FUNCTION);

	rel = heap_open(catalogid, AccessShareLock);
	scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
	while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
	{
B
Bruce Momjian 已提交
1022
		Oid			thisoid = HeapTupleGetOid(tup);
1023 1024 1025 1026 1027 1028 1029 1030 1031

		CHECK_FOR_INTERRUPTS();

		(void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
	}
	heap_endscan(scan);
	heap_close(rel, AccessShareLock);

	return htab;
1032 1033 1034 1035 1036 1037 1038
}


/* ----------
 * pgstat_drop_database() -
 *
 *	Tell the collector that we just dropped a database.
1039
 *	(If the message gets lost, we will still clean the dead DB eventually
1040
 *	via future invocations of pgstat_vacuum_stat().)
1041 1042
 * ----------
 */
1043
void
1044 1045
pgstat_drop_database(Oid databaseid)
{
1046
	PgStat_MsgDropdb msg;
1047 1048 1049 1050 1051

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1052
	msg.m_databaseid = databaseid;
1053 1054 1055 1056
	pgstat_send(&msg, sizeof(msg));
}


1057 1058 1059 1060 1061
/* ----------
 * pgstat_drop_relation() -
 *
 *	Tell the collector that we just dropped a relation.
 *	(If the message gets lost, we will still clean the dead entry eventually
1062
 *	via future invocations of pgstat_vacuum_stat().)
1063 1064
 *
 *	Currently not used for lack of any good place to call it; we rely
1065
 *	entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
1066 1067
 * ----------
 */
1068
#ifdef NOT_USED
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
void
pgstat_drop_relation(Oid relid)
{
	PgStat_MsgTabpurge msg;
	int			len;

	if (pgStatSock < 0)
		return;

	msg.m_tableid[0] = relid;
	msg.m_nentries = 1;

B
Bruce Momjian 已提交
1081
	len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) +sizeof(Oid);
1082 1083 1084 1085 1086

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
	msg.m_databaseid = MyDatabaseId;
	pgstat_send(&msg, len);
}
B
Bruce Momjian 已提交
1087
#endif   /* NOT_USED */
1088 1089


1090 1091 1092 1093 1094 1095 1096 1097 1098
/* ----------
 * pgstat_reset_counters() -
 *
 *	Tell the statistics collector to reset counters for our database.
 * ----------
 */
void
pgstat_reset_counters(void)
{
1099
	PgStat_MsgResetcounter msg;
1100 1101 1102 1103 1104

	if (pgStatSock < 0)
		return;

	if (!superuser())
1105 1106
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
B
Bruce Momjian 已提交
1107
				 errmsg("must be superuser to reset statistics counters")));
1108 1109

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1110
	msg.m_databaseid = MyDatabaseId;
1111 1112 1113 1114
	pgstat_send(&msg, sizeof(msg));
}


1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
/* ----------
 * pgstat_report_autovac() -
 *
 *	Called from autovacuum.c to report startup of an autovacuum process.
 *	We are called before InitPostgres is done, so can't rely on MyDatabaseId;
 *	the db OID must be passed in, instead.
 * ----------
 */
void
pgstat_report_autovac(Oid dboid)
{
	PgStat_MsgAutovacStart msg;

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
	msg.m_databaseid = dboid;
	msg.m_start_time = GetCurrentTimestamp();

	pgstat_send(&msg, sizeof(msg));
}


/* ---------
 * pgstat_report_vacuum() -
 *
 *	Tell the collector about the table we just vacuumed.
 * ---------
 */
void
pgstat_report_vacuum(Oid tableoid, bool shared,
					 bool analyze, PgStat_Counter tuples)
{
	PgStat_MsgVacuum msg;

1151
	if (pgStatSock < 0 || !pgstat_track_counts)
1152 1153 1154 1155 1156 1157
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
	msg.m_tableoid = tableoid;
	msg.m_analyze = analyze;
B
Bruce Momjian 已提交
1158
	msg.m_autovacuum = IsAutoVacuumWorkerProcess();		/* is this autovacuum? */
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
	msg.m_vacuumtime = GetCurrentTimestamp();
	msg.m_tuples = tuples;
	pgstat_send(&msg, sizeof(msg));
}

/* --------
 * pgstat_report_analyze() -
 *
 *	Tell the collector about the table we just analyzed.
 * --------
 */
void
1171
pgstat_report_analyze(Relation rel, PgStat_Counter livetuples,
1172 1173 1174 1175
					  PgStat_Counter deadtuples)
{
	PgStat_MsgAnalyze msg;

1176
	if (pgStatSock < 0 || !pgstat_track_counts)
1177 1178
		return;

1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
	/*
	 * Unlike VACUUM, ANALYZE might be running inside a transaction that
	 * has already inserted and/or deleted rows in the target table.
	 * ANALYZE will have counted such rows as live or dead respectively.
	 * Because we will report our counts of such rows at transaction end,
	 * we should subtract off these counts from what we send to the collector
	 * now, else they'll be double-counted after commit.  (This approach also
	 * ensures that the collector ends up with the right numbers if we abort
	 * instead of committing.)
	 */
	if (rel->pgstat_info != NULL)
	{
		PgStat_TableXactStatus *trans;

		for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
		{
			livetuples -= trans->tuples_inserted - trans->tuples_deleted;
			deadtuples -= trans->tuples_deleted;
		}
		/* count stuff inserted by already-aborted subxacts, too */
		deadtuples -= rel->pgstat_info->t_counts.t_new_dead_tuples;
		/* Since ANALYZE's counts are estimates, we could have underflowed */
		livetuples = Max(livetuples, 0);
		deadtuples = Max(deadtuples, 0);
	}

1205
	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
1206 1207 1208
	msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
	msg.m_tableoid = RelationGetRelid(rel);
	msg.m_autovacuum = IsAutoVacuumWorkerProcess();	/* is this autovacuum? */
1209 1210 1211 1212 1213 1214 1215
	msg.m_analyzetime = GetCurrentTimestamp();
	msg.m_live_tuples = livetuples;
	msg.m_dead_tuples = deadtuples;
	pgstat_send(&msg, sizeof(msg));
}


1216 1217 1218 1219 1220 1221 1222 1223 1224
/* ----------
 * pgstat_ping() -
 *
 *	Send some junk data to the collector to increase traffic.
 * ----------
 */
void
pgstat_ping(void)
{
1225
	PgStat_MsgDummy msg;
1226 1227 1228 1229 1230 1231 1232 1233

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
	pgstat_send(&msg, sizeof(msg));
}

1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334
/*
 * Initialize function call usage data.
 * Called by the executor before invoking a function.
 */
void
pgstat_init_function_usage(FunctionCallInfoData *fcinfo,
						   PgStat_FunctionCallUsage *fcu)
{
	PgStat_BackendFunctionEntry *htabent;
	bool 		found;

	if (pgstat_track_functions <= fcinfo->flinfo->fn_stats)
	{
		/* stats not wanted */
		fcu->fs = NULL;
		return;
	}

	if (!pgStatFunctions)
	{
		/* First time through - initialize function stat table */
		HASHCTL		hash_ctl;

		memset(&hash_ctl, 0, sizeof(hash_ctl));
		hash_ctl.keysize = sizeof(Oid);
		hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry);
		hash_ctl.hash = oid_hash;
		pgStatFunctions = hash_create("Function stat entries",
									  PGSTAT_FUNCTION_HASH_SIZE,
									  &hash_ctl,
									  HASH_ELEM | HASH_FUNCTION);
	}

	/* Get the stats entry for this function, create if necessary */
	htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid,
						  HASH_ENTER, &found);
	if (!found)
		MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts));

	fcu->fs = &htabent->f_counts;

	/* save stats for this function, later used to compensate for recursion */
	fcu->save_f_time = htabent->f_counts.f_time;

	/* save current backend-wide total time */
	fcu->save_total = total_func_time;

	/* get clock time as of function start */
	INSTR_TIME_SET_CURRENT(fcu->f_start);
}

/*
 * Calculate function call usage and update stat counters.
 * Called by the executor after invoking a function.
 *
 * In the case of a set-returning function that runs in value-per-call mode,
 * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage
 * calls for what the user considers a single call of the function.  The
 * finalize flag should be TRUE on the last call.
 */
void
pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize)
{
	PgStat_FunctionCounts *fs = fcu->fs;
	instr_time	f_total;
	instr_time	f_others;
	instr_time	f_self;

	/* stats not wanted? */
	if (fs == NULL)
		return;

	/* total elapsed time in this function call */
	INSTR_TIME_SET_CURRENT(f_total);
	INSTR_TIME_SUBTRACT(f_total, fcu->f_start);

	/* self usage: elapsed minus anything already charged to other calls */
	f_others = total_func_time;
	INSTR_TIME_SUBTRACT(f_others, fcu->save_total);
	f_self = f_total;
	INSTR_TIME_SUBTRACT(f_self, f_others);

	/* update backend-wide total time */
	INSTR_TIME_ADD(total_func_time, f_self);

	/*
	 * Compute the new total f_time as the total elapsed time added to the
	 * pre-call value of f_time.  This is necessary to avoid double-counting
	 * any time taken by recursive calls of myself.  (We do not need any
	 * similar kluge for self time, since that already excludes any
	 * recursive calls.)
	 */
	INSTR_TIME_ADD(f_total, fcu->save_f_time);

	/* update counters in function stats table */
	if (finalize)
		fs->f_numcalls++;
	fs->f_time = f_total;
	INSTR_TIME_ADD(fs->f_time_self, f_self);
}

1335 1336 1337 1338

/* ----------
 * pgstat_initstats() -
 *
1339 1340
 *	Initialize a relcache entry to count access statistics.
 *	Called whenever a relation is opened.
1341 1342 1343
 *
 *	We assume that a relcache entry's pgstat_info field is zeroed by
 *	relcache.c when the relcache entry is made; thereafter it is long-lived
1344
 *	data.  We can avoid repeated searches of the TabStatus arrays when the
1345
 *	same relation is touched repeatedly within a transaction.
1346 1347 1348
 * ----------
 */
void
1349
pgstat_initstats(Relation rel)
1350
{
1351
	Oid			rel_id = rel->rd_id;
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361
	char		relkind = rel->rd_rel->relkind;

	/* We only count stats for things that have storage */
	if (!(relkind == RELKIND_RELATION ||
		  relkind == RELKIND_INDEX ||
		  relkind == RELKIND_TOASTVALUE))
	{
		rel->pgstat_info = NULL;
		return;
	}
1362

1363
	if (pgStatSock < 0 || !pgstat_track_counts)
1364
	{
1365 1366
		/* We're not counting at all */
		rel->pgstat_info = NULL;
1367
		return;
1368
	}
1369

1370
	/*
B
Bruce Momjian 已提交
1371 1372
	 * If we already set up this relation in the current transaction, nothing
	 * to do.
1373
	 */
1374 1375
	if (rel->pgstat_info != NULL &&
		rel->pgstat_info->t_id == rel_id)
1376
		return;
1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391

	/* Else find or make the PgStat_TableStatus entry, and update link */
	rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
}

/*
 * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
 */
static PgStat_TableStatus *
get_tabstat_entry(Oid rel_id, bool isshared)
{
	PgStat_TableStatus *entry;
	TabStatusArray *tsa;
	TabStatusArray *prev_tsa;
	int			i;
1392

1393
	/*
1394
	 * Search the already-used tabstat slots for this relation.
1395
	 */
1396 1397
	prev_tsa = NULL;
	for (tsa = pgStatTabList; tsa != NULL; prev_tsa = tsa, tsa = tsa->tsa_next)
1398
	{
1399
		for (i = 0; i < tsa->tsa_used; i++)
1400
		{
1401 1402 1403
			entry = &tsa->tsa_entries[i];
			if (entry->t_id == rel_id)
				return entry;
1404 1405
		}

1406 1407 1408
		if (tsa->tsa_used < TABSTAT_QUANTUM)
		{
			/*
B
Bruce Momjian 已提交
1409 1410 1411
			 * It must not be present, but we found a free slot instead. Fine,
			 * let's use this one.  We assume the entry was already zeroed,
			 * either at creation or after last use.
1412 1413 1414 1415 1416 1417
			 */
			entry = &tsa->tsa_entries[tsa->tsa_used++];
			entry->t_id = rel_id;
			entry->t_shared = isshared;
			return entry;
		}
1418 1419 1420
	}

	/*
1421
	 * We ran out of tabstat slots, so allocate more.  Be sure they're zeroed.
1422
	 */
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
	tsa = (TabStatusArray *) MemoryContextAllocZero(TopMemoryContext,
													sizeof(TabStatusArray));
	if (prev_tsa)
		prev_tsa->tsa_next = tsa;
	else
		pgStatTabList = tsa;

	/*
	 * Use the first entry of the new TabStatusArray.
	 */
	entry = &tsa->tsa_entries[tsa->tsa_used++];
	entry->t_id = rel_id;
	entry->t_shared = isshared;
	return entry;
}

/*
 * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
 */
static PgStat_SubXactStatus *
get_tabstat_stack_level(int nest_level)
{
	PgStat_SubXactStatus *xact_state;

	xact_state = pgStatXactStack;
	if (xact_state == NULL || xact_state->nest_level != nest_level)
	{
		xact_state = (PgStat_SubXactStatus *)
			MemoryContextAlloc(TopTransactionContext,
							   sizeof(PgStat_SubXactStatus));
		xact_state->nest_level = nest_level;
		xact_state->prev = pgStatXactStack;
		xact_state->first = NULL;
		pgStatXactStack = xact_state;
	}
	return xact_state;
}

/*
 * add_tabstat_xact_level - add a new (sub)transaction state record
 */
static void
1465
add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
1466 1467 1468
{
	PgStat_SubXactStatus *xact_state;
	PgStat_TableXactStatus *trans;
1469 1470

	/*
B
Bruce Momjian 已提交
1471 1472
	 * If this is the first rel to be modified at the current nest level, we
	 * first have to push a transaction stack entry.
1473
	 */
1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495
	xact_state = get_tabstat_stack_level(nest_level);

	/* Now make a per-table stack entry */
	trans = (PgStat_TableXactStatus *)
		MemoryContextAllocZero(TopTransactionContext,
							   sizeof(PgStat_TableXactStatus));
	trans->nest_level = nest_level;
	trans->upper = pgstat_info->trans;
	trans->parent = pgstat_info;
	trans->next = xact_state->first;
	xact_state->first = trans;
	pgstat_info->trans = trans;
}

/*
 * pgstat_count_heap_insert - count a tuple insertion
 */
void
pgstat_count_heap_insert(Relation rel)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1496
	if (pgstat_track_counts && pgstat_info != NULL)
1497
	{
B
Bruce Momjian 已提交
1498
		int			nest_level = GetCurrentTransactionNestLevel();
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515

		/* t_tuples_inserted is nontransactional, so just advance it */
		pgstat_info->t_counts.t_tuples_inserted++;

		/* We have to log the transactional effect at the proper level */
		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

		pgstat_info->trans->tuples_inserted++;
	}
}

/*
 * pgstat_count_heap_update - count a tuple update
 */
void
1516
pgstat_count_heap_update(Relation rel, bool hot)
1517 1518 1519
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1520
	if (pgstat_track_counts && pgstat_info != NULL)
1521
	{
B
Bruce Momjian 已提交
1522
		int			nest_level = GetCurrentTransactionNestLevel();
1523 1524 1525

		/* t_tuples_updated is nontransactional, so just advance it */
		pgstat_info->t_counts.t_tuples_updated++;
1526 1527 1528
		/* ditto for the hot_update counter */
		if (hot)
			pgstat_info->t_counts.t_tuples_hot_updated++;
1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548

		/* We have to log the transactional effect at the proper level */
		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

		/* An UPDATE both inserts a new tuple and deletes the old */
		pgstat_info->trans->tuples_inserted++;
		pgstat_info->trans->tuples_deleted++;
	}
}

/*
 * pgstat_count_heap_delete - count a tuple deletion
 */
void
pgstat_count_heap_delete(Relation rel)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1549
	if (pgstat_track_counts && pgstat_info != NULL)
1550
	{
B
Bruce Momjian 已提交
1551
		int			nest_level = GetCurrentTransactionNestLevel();
1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562

		/* t_tuples_deleted is nontransactional, so just advance it */
		pgstat_info->t_counts.t_tuples_deleted++;

		/* We have to log the transactional effect at the proper level */
		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

		pgstat_info->trans->tuples_deleted++;
	}
1563 1564
}

1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577
/*
 * pgstat_update_heap_dead_tuples - update dead-tuples count
 *
 * The semantics of this are that we are reporting the nontransactional
 * recovery of "delta" dead tuples; so t_new_dead_tuples decreases
 * rather than increasing, and the change goes straight into the per-table
 * counter, not into transactional state.
 */
void
pgstat_update_heap_dead_tuples(Relation rel, int delta)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1578
	if (pgstat_track_counts && pgstat_info != NULL)
1579 1580 1581
		pgstat_info->t_counts.t_new_dead_tuples -= delta;
}

1582 1583

/* ----------
1584
 * AtEOXact_PgStat
1585
 *
1586
 *	Called from access/transam/xact.c at top-level transaction commit/abort.
1587 1588 1589
 * ----------
 */
void
1590
AtEOXact_PgStat(bool isCommit)
1591
{
1592
	PgStat_SubXactStatus *xact_state;
1593 1594

	/*
1595 1596
	 * Count transaction commit or abort.  (We use counters, not just bools,
	 * in case the reporting message isn't sent right away.)
1597
	 */
1598 1599 1600 1601
	if (isCommit)
		pgStatXactCommit++;
	else
		pgStatXactRollback++;
1602

1603 1604
	/*
	 * Transfer transactional insert/update counts into the base tabstat
B
Bruce Momjian 已提交
1605 1606
	 * entries.  We don't bother to free any of the transactional state, since
	 * it's all in TopTransactionContext and will go away anyway.
1607 1608 1609
	 */
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
1610
	{
1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
		PgStat_TableXactStatus *trans;

		Assert(xact_state->nest_level == 1);
		Assert(xact_state->prev == NULL);
		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;

			Assert(trans->nest_level == 1);
			Assert(trans->upper == NULL);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);
			if (isCommit)
			{
1625 1626
				tabstat->t_counts.t_new_live_tuples +=
					trans->tuples_inserted - trans->tuples_deleted;
1627 1628 1629 1630 1631 1632 1633 1634 1635
				tabstat->t_counts.t_new_dead_tuples += trans->tuples_deleted;
			}
			else
			{
				/* inserted tuples are dead, deleted tuples are unaffected */
				tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
			}
			tabstat->trans = NULL;
		}
1636
	}
1637
	pgStatXactStack = NULL;
1638

1639 1640 1641
	/* Make sure any stats snapshot is thrown away */
	pgstat_clear_snapshot();
}
1642 1643

/* ----------
1644
 * AtEOSubXact_PgStat
1645
 *
1646
 *	Called from access/transam/xact.c at subtransaction commit/abort.
1647 1648 1649
 * ----------
 */
void
1650
AtEOSubXact_PgStat(bool isCommit, int nestDepth)
1651
{
1652
	PgStat_SubXactStatus *xact_state;
1653 1654

	/*
1655 1656
	 * Transfer transactional insert/update counts into the next higher
	 * subtransaction state.
1657
	 */
1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687
	xact_state = pgStatXactStack;
	if (xact_state != NULL &&
		xact_state->nest_level >= nestDepth)
	{
		PgStat_TableXactStatus *trans;
		PgStat_TableXactStatus *next_trans;

		/* delink xact_state from stack immediately to simplify reuse case */
		pgStatXactStack = xact_state->prev;

		for (trans = xact_state->first; trans != NULL; trans = next_trans)
		{
			PgStat_TableStatus *tabstat;

			next_trans = trans->next;
			Assert(trans->nest_level == nestDepth);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);
			if (isCommit)
			{
				if (trans->upper && trans->upper->nest_level == nestDepth - 1)
				{
					trans->upper->tuples_inserted += trans->tuples_inserted;
					trans->upper->tuples_deleted += trans->tuples_deleted;
					tabstat->trans = trans->upper;
					pfree(trans);
				}
				else
				{
					/*
B
Bruce Momjian 已提交
1688 1689
					 * When there isn't an immediate parent state, we can just
					 * reuse the record instead of going through a
1690
					 * palloc/pfree pushup (this works since it's all in
B
Bruce Momjian 已提交
1691 1692
					 * TopTransactionContext anyway).  We have to re-link it
					 * into the parent level, though, and that might mean
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729
					 * pushing a new entry into the pgStatXactStack.
					 */
					PgStat_SubXactStatus *upper_xact_state;

					upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
					trans->next = upper_xact_state->first;
					upper_xact_state->first = trans;
					trans->nest_level = nestDepth - 1;
				}
			}
			else
			{
				/*
				 * On abort, inserted tuples are dead (and can be bounced out
				 * to the top-level tabstat), deleted tuples are unaffected
				 */
				tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
				tabstat->trans = trans->upper;
				pfree(trans);
			}
		}
		pfree(xact_state);
	}
}


/*
 * AtPrepare_PgStat
 *		Save the transactional stats state at 2PC transaction prepare.
 *
 * In this phase we just generate 2PC records for all the pending
 * transaction-dependent stats work.
 */
void
AtPrepare_PgStat(void)
{
	PgStat_SubXactStatus *xact_state;
1730

1731 1732
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
1733
	{
1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755
		PgStat_TableXactStatus *trans;

		Assert(xact_state->nest_level == 1);
		Assert(xact_state->prev == NULL);
		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;
			TwoPhasePgStatRecord record;

			Assert(trans->nest_level == 1);
			Assert(trans->upper == NULL);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);

			record.tuples_inserted = trans->tuples_inserted;
			record.tuples_deleted = trans->tuples_deleted;
			record.t_id = tabstat->t_id;
			record.t_shared = tabstat->t_shared;

			RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
								   &record, sizeof(TwoPhasePgStatRecord));
		}
1756 1757 1758
	}
}

1759 1760 1761 1762 1763
/*
 * PostPrepare_PgStat
 *		Clean up after successful PREPARE.
 *
 * All we need do here is unlink the transaction stats state from the
B
Bruce Momjian 已提交
1764
 * nontransactional state.	The nontransactional action counts will be
1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775
 * reported to the stats collector immediately, while the effects on live
 * and dead tuple counts are preserved in the 2PC state file.
 *
 * Note: AtEOXact_PgStat is not called during PREPARE.
 */
void
PostPrepare_PgStat(void)
{
	PgStat_SubXactStatus *xact_state;

	/*
B
Bruce Momjian 已提交
1776 1777
	 * We don't bother to free any of the transactional state, since it's all
	 * in TopTransactionContext and will go away anyway.
1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812
	 */
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
	{
		PgStat_TableXactStatus *trans;

		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;

			tabstat = trans->parent;
			tabstat->trans = NULL;
		}
	}
	pgStatXactStack = NULL;

	/* Make sure any stats snapshot is thrown away */
	pgstat_clear_snapshot();
}

/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Load the saved counts into our local pgstats state.
 */
void
pgstat_twophase_postcommit(TransactionId xid, uint16 info,
						   void *recdata, uint32 len)
{
	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
	PgStat_TableStatus *pgstat_info;

	/* Find or create a tabstat entry for the rel */
	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);

1813 1814
	pgstat_info->t_counts.t_new_live_tuples +=
		rec->tuples_inserted - rec->tuples_deleted;
1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837
	pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_deleted;
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * Load the saved counts into our local pgstats state, but treat them
 * as aborted.
 */
void
pgstat_twophase_postabort(TransactionId xid, uint16 info,
						  void *recdata, uint32 len)
{
	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
	PgStat_TableStatus *pgstat_info;

	/* Find or create a tabstat entry for the rel */
	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);

	/* inserted tuples are dead, deleted tuples are no-ops */
	pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_inserted;
}

1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851

/* ----------
 * pgstat_fetch_stat_dbentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one database or NULL. NULL doesn't mean
 *	that the database doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatDBEntry *
pgstat_fetch_stat_dbentry(Oid dbid)
{
	/*
B
Bruce Momjian 已提交
1852 1853
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
1854
	 */
1855
	backend_read_statsfile();
1856 1857

	/*
1858
	 * Lookup the requested database; return NULL if not found
1859
	 */
1860 1861 1862
	return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
											  (void *) &dbid,
											  HASH_FIND, NULL);
1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877
}


/* ----------
 * pgstat_fetch_stat_tabentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one table or NULL. NULL doesn't mean
 *	that the table doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatTabEntry *
pgstat_fetch_stat_tabentry(Oid relid)
{
1878
	Oid			dbid;
1879 1880
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
1881 1882

	/*
B
Bruce Momjian 已提交
1883 1884
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
1885
	 */
1886
	backend_read_statsfile();
1887 1888

	/*
1889
	 * Lookup our database, then look in its table hash table.
1890
	 */
1891
	dbid = MyDatabaseId;
1892
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1893
												 (void *) &dbid,
1894
												 HASH_FIND, NULL);
1895 1896 1897 1898 1899 1900 1901 1902
	if (dbentry != NULL && dbentry->tables != NULL)
	{
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
													   (void *) &relid,
													   HASH_FIND, NULL);
		if (tabentry)
			return tabentry;
	}
1903 1904

	/*
1905
	 * If we didn't find it, maybe it's a shared table.
1906
	 */
1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
	dbid = InvalidOid;
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &dbid,
												 HASH_FIND, NULL);
	if (dbentry != NULL && dbentry->tables != NULL)
	{
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
													   (void *) &relid,
													   HASH_FIND, NULL);
		if (tabentry)
			return tabentry;
	}
1919

1920
	return NULL;
1921 1922 1923
}


1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952
/* ----------
 * pgstat_fetch_stat_funcentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one function or NULL.
 * ----------
 */
PgStat_StatFuncEntry *
pgstat_fetch_stat_funcentry(Oid func_id)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatFuncEntry *funcentry = NULL;

	/* load the stats file if needed */
	backend_read_statsfile();

	/* Lookup our database, then find the requested function.  */
	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
	if (dbentry != NULL && dbentry->functions != NULL)
	{
		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
														 (void *) &func_id,
														 HASH_FIND, NULL);
	}

	return funcentry;
}


1953 1954 1955 1956
/* ----------
 * pgstat_fetch_stat_beentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
1957 1958 1959 1960
 *	our local copy of the current-activity entry for one backend.
 *
 *	NB: caller is responsible for a check if the user is permitted to see
 *	this info (especially the querystring).
1961 1962
 * ----------
 */
1963
PgBackendStatus *
1964 1965
pgstat_fetch_stat_beentry(int beid)
{
1966
	pgstat_read_current_status();
1967

1968
	if (beid < 1 || beid > localNumBackends)
1969 1970
		return NULL;

1971
	return &localBackendStatusTable[beid - 1];
1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984
}


/* ----------
 * pgstat_fetch_stat_numbackends() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the maximum current backend id.
 * ----------
 */
int
pgstat_fetch_stat_numbackends(void)
{
1985
	pgstat_read_current_status();
1986

1987
	return localNumBackends;
1988 1989
}

1990 1991 1992 1993
/*
 * ---------
 * pgstat_fetch_global() -
 *
B
Bruce Momjian 已提交
1994 1995
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	a pointer to the global statistics struct.
1996 1997 1998 1999 2000 2001 2002 2003 2004 2005
 * ---------
 */
PgStat_GlobalStats *
pgstat_fetch_global(void)
{
	backend_read_statsfile();

	return &globalStats;
}

2006 2007

/* ------------------------------------------------------------
2008
 * Functions for management of the shared-memory PgBackendStatus array
2009 2010 2011
 * ------------------------------------------------------------
 */

2012 2013
static PgBackendStatus *BackendStatusArray = NULL;
static PgBackendStatus *MyBEEntry = NULL;
2014
static char			   *BackendActivityBuffer = NULL;
2015

2016 2017 2018

/*
 * Report shared-memory space needed by CreateSharedBackendStatus.
2019
 */
2020 2021
Size
BackendStatusShmemSize(void)
2022
{
2023
	Size		size;
2024

2025 2026
	size = add_size(mul_size(sizeof(PgBackendStatus), MaxBackends),
					mul_size(pgstat_track_activity_query_size, MaxBackends));
2027 2028
	return size;
}
2029

2030
/*
2031 2032
 * Initialize the shared status array and activity string buffer during
 * postmaster startup.
2033
 */
2034 2035
void
CreateSharedBackendStatus(void)
2036
{
2037
	Size		size;
2038
	bool		found;
2039 2040
	int			i;
	char	   *buffer;
2041 2042

	/* Create or attach to the shared array */
2043
	size = mul_size(sizeof(PgBackendStatus), MaxBackends);
2044 2045 2046 2047 2048 2049 2050 2051 2052 2053
	BackendStatusArray = (PgBackendStatus *)
		ShmemInitStruct("Backend Status Array", size, &found);

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		MemSet(BackendStatusArray, 0, size);
	}
2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070

	/* Create or attach to the shared activity buffer */
	size = mul_size(pgstat_track_activity_query_size, MaxBackends);
	BackendActivityBuffer = (char*)
		ShmemInitStruct("Backend Activity Buffer", size, &found);

	if (!found)
	{
		MemSet(BackendActivityBuffer, 0, size);

		/* Initialize st_activity pointers. */
		buffer = BackendActivityBuffer;
		for (i = 0; i < MaxBackends; i++) {
			BackendStatusArray[i].st_activity = buffer;
			buffer += pgstat_track_activity_query_size;
		}
	}
2071 2072 2073
}


2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093
/* ----------
 * pgstat_initialize() -
 *
 *	Initialize pgstats state, and set up our on-proc-exit hook.
 *	Called from InitPostgres.  MyBackendId must be set,
 *	but we must not have started any transaction yet (since the
 *	exit hook must run after the last transaction exit).
 * ----------
 */
void
pgstat_initialize(void)
{
	/* Initialize MyBEEntry */
	Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
	MyBEEntry = &BackendStatusArray[MyBackendId - 1];

	/* Set up a process-exit hook to clean up */
	on_shmem_exit(pgstat_beshutdown_hook, 0);
}

2094 2095 2096
/* ----------
 * pgstat_bestart() -
 *
2097 2098 2099
 *	Initialize this backend's entry in the PgBackendStatus array.
 *	Called from InitPostgres.  MyDatabaseId and session userid must be set
 *	(hence, this cannot be combined with pgstat_initialize).
2100 2101 2102 2103 2104 2105 2106 2107
 * ----------
 */
void
pgstat_bestart(void)
{
	TimestampTz proc_start_timestamp;
	Oid			userid;
	SockAddr	clientaddr;
2108
	volatile PgBackendStatus *beentry;
2109 2110

	/*
B
Bruce Momjian 已提交
2111 2112
	 * To minimize the time spent modifying the PgBackendStatus entry, fetch
	 * all the needed data first.
2113 2114 2115
	 *
	 * If we have a MyProcPort, use its session start time (for consistency,
	 * and to save a kernel call).
2116
	 */
2117 2118 2119 2120
	if (MyProcPort)
		proc_start_timestamp = MyProcPort->SessionStartTime;
	else
		proc_start_timestamp = GetCurrentTimestamp();
2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134
	userid = GetSessionUserId();

	/*
	 * We may not have a MyProcPort (eg, if this is the autovacuum process).
	 * If so, use all-zeroes client address, which is dealt with specially in
	 * pg_stat_get_backend_client_addr and pg_stat_get_backend_client_port.
	 */
	if (MyProcPort)
		memcpy(&clientaddr, &MyProcPort->raddr, sizeof(clientaddr));
	else
		MemSet(&clientaddr, 0, sizeof(clientaddr));

	/*
	 * Initialize my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
2135 2136 2137
	 * st_changecount before and after; and make sure it's even afterwards. We
	 * use a volatile pointer here to ensure the compiler doesn't try to get
	 * cute.
2138 2139
	 */
	beentry = MyBEEntry;
B
Bruce Momjian 已提交
2140 2141
	do
	{
2142 2143 2144 2145 2146 2147
		beentry->st_changecount++;
	} while ((beentry->st_changecount & 1) == 0);

	beentry->st_procpid = MyProcPid;
	beentry->st_proc_start_timestamp = proc_start_timestamp;
	beentry->st_activity_start_timestamp = 0;
2148
	beentry->st_xact_start_timestamp = 0;
2149 2150 2151
	beentry->st_databaseid = MyDatabaseId;
	beentry->st_userid = userid;
	beentry->st_clientaddr = clientaddr;
2152
	beentry->st_waiting = false;
2153 2154
	beentry->st_activity[0] = '\0';
	/* Also make sure the last byte in the string area is always 0 */
2155
	beentry->st_activity[pgstat_track_activity_query_size - 1] = '\0';
2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172

	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}

/*
 * Shut down a single backend's statistics reporting at process exit.
 *
 * Flush any remaining statistics counts out to the collector.
 * Without this, operations triggered during backend exit (such as
 * temp table deletions) won't be counted.
 *
 * Lastly, clear out our entry in the PgBackendStatus array.
 */
static void
pgstat_beshutdown_hook(int code, Datum arg)
{
2173
	volatile PgBackendStatus *beentry = MyBEEntry;
2174

2175
	pgstat_report_stat(true);
2176 2177

	/*
B
Bruce Momjian 已提交
2178 2179 2180
	 * Clear my status entry, following the protocol of bumping st_changecount
	 * before and after.  We use a volatile pointer here to ensure the
	 * compiler doesn't try to get cute.
2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200
	 */
	beentry->st_changecount++;

	beentry->st_procpid = 0;	/* mark invalid */

	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}


/* ----------
 * pgstat_report_activity() -
 *
 *	Called from tcop/postgres.c to report what the backend is actually doing
 *	(usually "<IDLE>" or the start of the query to be executed).
 * ----------
 */
void
pgstat_report_activity(const char *cmd_str)
{
2201
	volatile PgBackendStatus *beentry = MyBEEntry;
2202 2203 2204
	TimestampTz start_timestamp;
	int			len;

2205
	if (!pgstat_track_activities || !beentry)
2206 2207 2208
		return;

	/*
B
Bruce Momjian 已提交
2209 2210
	 * To minimize the time spent modifying the entry, fetch all the needed
	 * data first.
2211
	 */
2212
	start_timestamp = GetCurrentStatementStartTimestamp();
2213 2214

	len = strlen(cmd_str);
2215
	len = pg_mbcliplen(cmd_str, len, pgstat_track_activity_query_size - 1);
2216 2217 2218

	/*
	 * Update my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
2219 2220
	 * st_changecount before and after.  We use a volatile pointer here to
	 * ensure the compiler doesn't try to get cute.
2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231
	 */
	beentry->st_changecount++;

	beentry->st_activity_start_timestamp = start_timestamp;
	memcpy((char *) beentry->st_activity, cmd_str, len);
	beentry->st_activity[len] = '\0';

	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}

2232
/*
2233 2234
 * Report current transaction start timestamp as the specified value.
 * Zero means there is no active transaction.
2235 2236
 */
void
2237
pgstat_report_xact_timestamp(TimestampTz tstamp)
2238 2239 2240
{
	volatile PgBackendStatus *beentry = MyBEEntry;

2241
	if (!pgstat_track_activities || !beentry)
2242 2243 2244 2245
		return;

	/*
	 * Update my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
2246 2247
	 * st_changecount before and after.  We use a volatile pointer here to
	 * ensure the compiler doesn't try to get cute.
2248 2249
	 */
	beentry->st_changecount++;
2250
	beentry->st_xact_start_timestamp = tstamp;
2251 2252 2253
	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}
2254

2255 2256 2257 2258
/* ----------
 * pgstat_report_waiting() -
 *
 *	Called from lock manager to report beginning or end of a lock wait.
2259 2260 2261
 *
 * NB: this *must* be able to survive being called before MyBEEntry has been
 * initialized.
2262 2263 2264 2265 2266
 * ----------
 */
void
pgstat_report_waiting(bool waiting)
{
2267
	volatile PgBackendStatus *beentry = MyBEEntry;
2268

2269
	if (!pgstat_track_activities || !beentry)
2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280
		return;

	/*
	 * Since this is a single-byte field in a struct that only this process
	 * may modify, there seems no need to bother with the st_changecount
	 * protocol.  The update must appear atomic in any case.
	 */
	beentry->st_waiting = waiting;
}


2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291
/* ----------
 * pgstat_read_current_status() -
 *
 *	Copy the current contents of the PgBackendStatus array to local memory,
 *	if not already done in this transaction.
 * ----------
 */
static void
pgstat_read_current_status(void)
{
	volatile PgBackendStatus *beentry;
2292
	PgBackendStatus *localtable;
2293
	PgBackendStatus *localentry;
2294
	char			*localactivity;
2295 2296 2297
	int			i;

	Assert(!pgStatRunningInCollector);
2298
	if (localBackendStatusTable)
2299 2300
		return;					/* already done */

2301 2302 2303 2304
	pgstat_setup_memcxt();

	localtable = (PgBackendStatus *)
		MemoryContextAlloc(pgStatLocalContext,
2305
						   sizeof(PgBackendStatus) * MaxBackends);
2306 2307 2308
	localactivity = (char *)
		MemoryContextAlloc(pgStatLocalContext,
						   pgstat_track_activity_query_size * MaxBackends);
2309 2310 2311
	localNumBackends = 0;

	beentry = BackendStatusArray;
2312
	localentry = localtable;
2313 2314 2315
	for (i = 1; i <= MaxBackends; i++)
	{
		/*
B
Bruce Momjian 已提交
2316 2317 2318 2319 2320
		 * Follow the protocol of retrying if st_changecount changes while we
		 * copy the entry, or if it's odd.  (The check for odd is needed to
		 * cover the case where we are able to completely copy the entry while
		 * the source backend is between increment steps.)	We use a volatile
		 * pointer here to ensure the compiler doesn't try to get cute.
2321 2322 2323
		 */
		for (;;)
		{
B
Bruce Momjian 已提交
2324
			int			save_changecount = beentry->st_changecount;
2325

2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336
			localentry->st_procpid = beentry->st_procpid;
			if (localentry->st_procpid > 0)
			{
				memcpy(localentry, (char *) beentry, sizeof(PgBackendStatus));
				/*
				 * strcpy is safe even if the string is modified concurrently,
				 * because there's always a \0 at the end of the buffer.
				 */
				strcpy(localactivity, (char *) beentry->st_activity);
				localentry->st_activity = localactivity;
			}
2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350

			if (save_changecount == beentry->st_changecount &&
				(save_changecount & 1) == 0)
				break;

			/* Make sure we can break out of loop if stuck... */
			CHECK_FOR_INTERRUPTS();
		}

		beentry++;
		/* Only valid entries get included into the local array */
		if (localentry->st_procpid > 0)
		{
			localentry++;
2351
			localactivity += pgstat_track_activity_query_size;
2352 2353 2354 2355
			localNumBackends++;
		}
	}

2356 2357
	/* Set the pointer only after completion of a valid table */
	localBackendStatusTable = localtable;
2358 2359 2360
}


2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380
/* ----------
 * pgstat_get_backend_current_activity() -
 *
 *	Return a string representing the current activity of the backend with
 *	the specified PID.  This looks directly at the BackendStatusArray,
 *	and so will provide current information regardless of the age of our
 *	transaction's snapshot of the status array.
 *
 *	It is the caller's responsibility to invoke this only for backends whose
 *	state is expected to remain stable while the result is in use.  The
 *	only current use is in deadlock reporting, where we can expect that
 *	the target backend is blocked on a lock.  (There are corner cases
 *	where the target's wait could get aborted while we are looking at it,
 *	but the very worst consequence is to return a pointer to a string
 *	that's been changed, so we won't worry too much.)
 *
 *	Note: return strings for special cases match pg_stat_get_backend_activity.
 * ----------
 */
const char *
2381
pgstat_get_backend_current_activity(int pid, bool checkUser)
2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418
{
	PgBackendStatus *beentry;
	int			i;

	beentry = BackendStatusArray;
	for (i = 1; i <= MaxBackends; i++)
	{
		/*
		 * Although we expect the target backend's entry to be stable, that
		 * doesn't imply that anyone else's is.  To avoid identifying the
		 * wrong backend, while we check for a match to the desired PID we
		 * must follow the protocol of retrying if st_changecount changes
		 * while we examine the entry, or if it's odd.  (This might be
		 * unnecessary, since fetching or storing an int is almost certainly
		 * atomic, but let's play it safe.)  We use a volatile pointer here
		 * to ensure the compiler doesn't try to get cute.
		 */
		volatile PgBackendStatus *vbeentry = beentry;
		bool	found;

		for (;;)
		{
			int			save_changecount = vbeentry->st_changecount;

			found = (vbeentry->st_procpid == pid);

			if (save_changecount == vbeentry->st_changecount &&
				(save_changecount & 1) == 0)
				break;

			/* Make sure we can break out of loop if stuck... */
			CHECK_FOR_INTERRUPTS();
		}

		if (found)
		{
			/* Now it is safe to use the non-volatile pointer */
2419
			if (checkUser && !superuser() && beentry->st_userid != GetUserId())
2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434
				return "<insufficient privilege>";
			else if (*(beentry->st_activity) == '\0')
				return "<command string not enabled>";
			else
				return beentry->st_activity;
		}

		beentry++;
	}

	/* If we get here, caller is in error ... */
	return "<backend information not available>";
}


2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462
/* ------------------------------------------------------------
 * Local support functions follow
 * ------------------------------------------------------------
 */


/* ----------
 * pgstat_setheader() -
 *
 *		Set common header fields in a statistics message
 * ----------
 */
static void
pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
{
	hdr->m_type = mtype;
}


/* ----------
 * pgstat_send() -
 *
 *		Send out one statistics message to the collector
 * ----------
 */
static void
pgstat_send(void *msg, int len)
{
2463 2464
	int			rc;

2465 2466
	if (pgStatSock < 0)
		return;
2467

2468
	((PgStat_MsgHdr *) msg)->m_size = len;
2469

2470 2471 2472 2473 2474 2475
	/* We'll retry after EINTR, but ignore all other failures */
	do
	{
		rc = send(pgStatSock, msg, len, 0);
	} while (rc < 0 && errno == EINTR);

2476
#ifdef USE_ASSERT_CHECKING
2477 2478
	/* In debug builds, log send failures ... */
	if (rc < 0)
2479 2480
		elog(LOG, "could not send to statistics collector: %m");
#endif
2481 2482
}

2483 2484 2485
/* ----------
 * pgstat_send_bgwriter() -
 *
B
Bruce Momjian 已提交
2486
 *		Send bgwriter statistics to the collector
2487 2488 2489 2490 2491
 * ----------
 */
void
pgstat_send_bgwriter(void)
{
2492 2493 2494
	/* We assume this initializes to zeroes */
	static const PgStat_MsgBgWriter all_zeroes;

2495
	/*
B
Bruce Momjian 已提交
2496 2497 2498
	 * This function can be called even if nothing at all has happened. In
	 * this case, avoid sending a completely empty message to the stats
	 * collector.
2499
	 */
2500
	if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
2501 2502 2503 2504 2505 2506 2507 2508 2509
		return;

	/*
	 * Prepare and send the message
	 */
	pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
	pgstat_send(&BgWriterStats, sizeof(BgWriterStats));

	/*
2510
	 * Clear out the statistics buffer, so it can be re-used.
2511
	 */
2512
	MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
2513 2514
}

2515

2516 2517 2518
/* ----------
 * PgstatCollectorMain() -
 *
B
Bruce Momjian 已提交
2519
 *	Start up the statistics collector process.	This is the body of the
2520
 *	postmaster child process.
2521 2522 2523 2524
 *
 *	The argc/argv parameters are valid only in EXEC_BACKEND case.
 * ----------
 */
2525
NON_EXEC_STATIC void
2526
PgstatCollectorMain(int argc, char *argv[])
2527
{
2528 2529 2530
	struct itimerval write_timeout;
	bool		need_timer = false;
	int			len;
2531
	PgStat_Msg	msg;
B
Bruce Momjian 已提交
2532

2533
#ifndef WIN32
2534 2535 2536 2537
#ifdef HAVE_POLL
	struct pollfd input_fd;
#else
	struct timeval sel_timeout;
2538
	fd_set		rfds;
2539
#endif
2540 2541 2542
#endif

	IsUnderPostmaster = true;	/* we are a postmaster subprocess now */
2543

2544 2545
	MyProcPid = getpid();		/* reset MyProcPid */

B
Bruce Momjian 已提交
2546
	MyStartTime = time(NULL);	/* record Start Time for logging */
2547

2548 2549
	/*
	 * If possible, make this process a group leader, so that the postmaster
B
Bruce Momjian 已提交
2550 2551 2552
	 * can signal any child processes too.	(pgstat probably never has any
	 * child processes, but for consistency we make all postmaster child
	 * processes do this.)
2553 2554 2555 2556 2557 2558
	 */
#ifdef HAVE_SETSID
	if (setsid() < 0)
		elog(FATAL, "setsid() failed: %m");
#endif

2559
	/*
2560 2561
	 * Ignore all signals usually bound to some action in the postmaster,
	 * except SIGQUIT and SIGALRM.
2562 2563 2564 2565
	 */
	pqsignal(SIGHUP, SIG_IGN);
	pqsignal(SIGINT, SIG_IGN);
	pqsignal(SIGTERM, SIG_IGN);
2566
	pqsignal(SIGQUIT, pgstat_exit);
2567
	pqsignal(SIGALRM, force_statwrite);
2568 2569 2570 2571 2572 2573 2574 2575 2576 2577
	pqsignal(SIGPIPE, SIG_IGN);
	pqsignal(SIGUSR1, SIG_IGN);
	pqsignal(SIGUSR2, SIG_IGN);
	pqsignal(SIGCHLD, SIG_DFL);
	pqsignal(SIGTTIN, SIG_DFL);
	pqsignal(SIGTTOU, SIG_DFL);
	pqsignal(SIGCONT, SIG_DFL);
	pqsignal(SIGWINCH, SIG_DFL);
	PG_SETMASK(&UnBlockSig);

2578 2579 2580
	/*
	 * Identify myself via ps
	 */
2581
	init_ps_display("stats collector process", "", "", "");
2582

2583 2584 2585
	/*
	 * Arrange to write the initial status file right away
	 */
2586 2587
	need_statwrite = true;

2588
	/* Preset the delay between status file writes */
2589 2590
	MemSet(&write_timeout, 0, sizeof(struct itimerval));
	write_timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
2591
	write_timeout.it_value.tv_usec = (PGSTAT_STAT_INTERVAL % 1000) * 1000;
2592

2593
	/*
B
Bruce Momjian 已提交
2594 2595
	 * Read in an existing statistics stats file or initialize the stats to
	 * zero.
2596
	 */
2597
	pgStatRunningInCollector = true;
2598
	pgStatDBHash = pgstat_read_statsfile(InvalidOid);
2599

2600
	/*
B
Bruce Momjian 已提交
2601 2602
	 * Setup the descriptor set for select(2).	Since only one bit in the set
	 * ever changes, we need not repeat FD_ZERO each time.
2603
	 */
2604
#if !defined(HAVE_POLL) && !defined(WIN32)
2605 2606
	FD_ZERO(&rfds);
#endif
2607

2608
	/*
2609 2610 2611
	 * Loop to process messages until we get SIGQUIT or detect ungraceful
	 * death of our parent postmaster.
	 *
B
Bruce Momjian 已提交
2612 2613
	 * For performance reasons, we don't want to do a PostmasterIsAlive() test
	 * after every message; instead, do it at statwrite time and if
2614
	 * select()/poll() is interrupted by timeout.
2615 2616 2617
	 */
	for (;;)
	{
B
Bruce Momjian 已提交
2618
		int			got_data;
2619 2620 2621 2622 2623 2624 2625

		/*
		 * Quit if we get SIGQUIT from the postmaster.
		 */
		if (need_exit)
			break;

2626
		/*
B
Bruce Momjian 已提交
2627
		 * If time to write the stats file, do so.	Note that the alarm
2628 2629 2630 2631
		 * interrupt isn't re-enabled immediately, but only after we next
		 * receive a stats message; so no cycles are wasted when there is
		 * nothing going on.
		 */
2632 2633
		if (need_statwrite)
		{
2634 2635 2636 2637
			/* Check for postmaster death; if so we'll write file below */
			if (!PostmasterIsAlive(true))
				break;

2638 2639 2640
			pgstat_write_statsfile();
			need_statwrite = false;
			need_timer = true;
2641 2642 2643
		}

		/*
2644 2645 2646
		 * Wait for a message to arrive; but not for more than
		 * PGSTAT_SELECT_TIMEOUT seconds. (This determines how quickly we will
		 * shut down after an ungraceful postmaster termination; so it needn't
B
Bruce Momjian 已提交
2647 2648 2649
		 * be very fast.  However, on some systems SIGQUIT won't interrupt the
		 * poll/select call, so this also limits speed of response to SIGQUIT,
		 * which is more important.)
2650
		 *
B
Bruce Momjian 已提交
2651 2652
		 * We use poll(2) if available, otherwise select(2). Win32 has its own
		 * implementation.
2653
		 */
2654
#ifndef WIN32
2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672
#ifdef HAVE_POLL
		input_fd.fd = pgStatSock;
		input_fd.events = POLLIN | POLLERR;
		input_fd.revents = 0;

		if (poll(&input_fd, 1, PGSTAT_SELECT_TIMEOUT * 1000) < 0)
		{
			if (errno == EINTR)
				continue;
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("poll() failed in statistics collector: %m")));
		}

		got_data = (input_fd.revents != 0);
#else							/* !HAVE_POLL */

		FD_SET(pgStatSock, &rfds);
2673 2674

		/*
2675 2676
		 * timeout struct is modified by select() on some operating systems,
		 * so re-fill it each time.
2677
		 */
2678 2679 2680 2681
		sel_timeout.tv_sec = PGSTAT_SELECT_TIMEOUT;
		sel_timeout.tv_usec = 0;

		if (select(pgStatSock + 1, &rfds, NULL, NULL, &sel_timeout) < 0)
2682
		{
2683 2684
			if (errno == EINTR)
				continue;
2685
			ereport(ERROR,
2686
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
2687
					 errmsg("select() failed in statistics collector: %m")));
2688 2689
		}

2690 2691
		got_data = FD_ISSET(pgStatSock, &rfds);
#endif   /* HAVE_POLL */
B
Bruce Momjian 已提交
2692
#else							/* WIN32 */
2693
		got_data = pgwin32_waitforsinglesocket(pgStatSock, FD_READ,
B
Bruce Momjian 已提交
2694
											   PGSTAT_SELECT_TIMEOUT * 1000);
2695
#endif
2696

2697
		/*
2698 2699
		 * If there is a message on the socket, read it and check for
		 * validity.
2700
		 */
2701
		if (got_data)
2702
		{
2703 2704 2705
			len = recv(pgStatSock, (char *) &msg,
					   sizeof(PgStat_Msg), 0);
			if (len < 0)
2706 2707 2708
			{
				if (errno == EINTR)
					continue;
2709 2710 2711
				ereport(ERROR,
						(errcode_for_socket_access(),
						 errmsg("could not read statistics message: %m")));
2712
			}
2713

2714
			/*
2715
			 * We ignore messages that are smaller than our common header
2716
			 */
2717 2718
			if (len < sizeof(PgStat_MsgHdr))
				continue;
2719

2720
			/*
2721
			 * The received length must match the length in the header
2722
			 */
2723 2724
			if (msg.msg_hdr.m_size != len)
				continue;
2725 2726

			/*
2727
			 * O.K. - we accept this message.  Process it.
2728 2729 2730 2731 2732 2733 2734
			 */
			switch (msg.msg_hdr.m_type)
			{
				case PGSTAT_MTYPE_DUMMY:
					break;

				case PGSTAT_MTYPE_TABSTAT:
2735
					pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len);
2736 2737 2738
					break;

				case PGSTAT_MTYPE_TABPURGE:
2739
					pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, len);
2740 2741 2742
					break;

				case PGSTAT_MTYPE_DROPDB:
2743
					pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, len);
2744 2745 2746
					break;

				case PGSTAT_MTYPE_RESETCOUNTER:
2747
					pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
2748
											 len);
2749 2750
					break;

2751
				case PGSTAT_MTYPE_AUTOVAC_START:
2752
					pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len);
2753 2754 2755
					break;

				case PGSTAT_MTYPE_VACUUM:
2756
					pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, len);
2757 2758 2759
					break;

				case PGSTAT_MTYPE_ANALYZE:
2760
					pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
2761 2762
					break;

2763
				case PGSTAT_MTYPE_BGWRITER:
2764
					pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len);
2765 2766
					break;

2767 2768 2769 2770 2771 2772 2773 2774
 				case PGSTAT_MTYPE_FUNCSTAT:
 					pgstat_recv_funcstat((PgStat_MsgFuncstat *) &msg, len);
 					break;

				case PGSTAT_MTYPE_FUNCPURGE:
					pgstat_recv_funcpurge((PgStat_MsgFuncpurge *) &msg, len);
					break;

2775 2776 2777 2778
				default:
					break;
			}

2779 2780 2781 2782 2783
			/*
			 * If this is the first message after we wrote the stats file the
			 * last time, enable the alarm interrupt to make it be written
			 * again later.
			 */
2784
			if (need_timer)
2785
			{
2786
				if (setitimer(ITIMER_REAL, &write_timeout, NULL))
2787
					ereport(ERROR,
B
Bruce Momjian 已提交
2788
					(errmsg("could not set statistics collector timer: %m")));
2789
				need_timer = false;
2790
			}
2791 2792 2793 2794
		}
		else
		{
			/*
B
Bruce Momjian 已提交
2795 2796
			 * We can only get here if the select/poll timeout elapsed. Check
			 * for postmaster death.
2797
			 */
2798 2799
			if (!PostmasterIsAlive(true))
				break;
2800
		}
B
Bruce Momjian 已提交
2801
	}							/* end of message-processing loop */
2802

2803 2804 2805 2806
	/*
	 * Save the final stats to reuse at next startup.
	 */
	pgstat_write_statsfile();
2807

2808
	exit(0);
2809 2810
}

2811 2812

/* SIGQUIT signal handler for collector process */
2813 2814 2815
static void
pgstat_exit(SIGNAL_ARGS)
{
2816
	need_exit = true;
2817 2818
}

2819
/* SIGALRM signal handler for collector process */
2820
static void
2821
force_statwrite(SIGNAL_ARGS)
2822
{
2823
	need_statwrite = true;
2824 2825
}

2826

2827 2828
/*
 * Lookup the hash table entry for the specified database. If no hash
2829 2830
 * table entry exists, initialize it, if the create parameter is true.
 * Else, return NULL.
2831 2832
 */
static PgStat_StatDBEntry *
2833
pgstat_get_db_entry(Oid databaseid, bool create)
2834 2835
{
	PgStat_StatDBEntry *result;
B
Bruce Momjian 已提交
2836 2837
	bool		found;
	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
2838 2839 2840 2841

	/* Lookup or create the hash table entry for this database */
	result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												&databaseid,
2842 2843 2844 2845
												action, &found);

	if (!create && !found)
		return NULL;
2846

2847
	/* If not found, initialize the new one. */
2848 2849
	if (!found)
	{
2850
		HASHCTL		hash_ctl;
2851

2852
		result->tables = NULL;
2853
		result->functions = NULL;
2854 2855 2856 2857
		result->n_xact_commit = 0;
		result->n_xact_rollback = 0;
		result->n_blocks_fetched = 0;
		result->n_blocks_hit = 0;
2858 2859 2860 2861 2862
		result->n_tuples_returned = 0;
		result->n_tuples_fetched = 0;
		result->n_tuples_inserted = 0;
		result->n_tuples_updated = 0;
		result->n_tuples_deleted = 0;
2863
		result->last_autovac_time = 0;
2864 2865

		memset(&hash_ctl, 0, sizeof(hash_ctl));
2866
		hash_ctl.keysize = sizeof(Oid);
2867
		hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2868
		hash_ctl.hash = oid_hash;
2869
		result->tables = hash_create("Per-database table",
B
Bruce Momjian 已提交
2870 2871 2872
									 PGSTAT_TAB_HASH_SIZE,
									 &hash_ctl,
									 HASH_ELEM | HASH_FUNCTION);
2873 2874 2875 2876 2877 2878 2879 2880

		hash_ctl.keysize = sizeof(Oid);
		hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
		hash_ctl.hash = oid_hash;
		result->functions = hash_create("Per-database function",
										PGSTAT_FUNCTION_HASH_SIZE,
										&hash_ctl,
										HASH_ELEM | HASH_FUNCTION);
2881 2882
	}

2883
	return result;
2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895
}


/* ----------
 * pgstat_write_statsfile() -
 *
 *	Tell the news.
 * ----------
 */
static void
pgstat_write_statsfile(void)
{
2896 2897
	HASH_SEQ_STATUS hstat;
	HASH_SEQ_STATUS tstat;
2898
	HASH_SEQ_STATUS fstat;
2899 2900
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
2901
	PgStat_StatFuncEntry *funcentry;
2902
	FILE	   *fpout;
2903
	int32		format_id;
2904 2905

	/*
2906
	 * Open the statistics temp file to write out the current values.
2907
	 */
2908
	fpout = fopen(PGSTAT_STAT_TMPFILE, PG_BINARY_W);
2909 2910
	if (fpout == NULL)
	{
2911 2912
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2913 2914
				 errmsg("could not open temporary statistics file \"%s\": %m",
						PGSTAT_STAT_TMPFILE)));
2915 2916 2917
		return;
	}

2918 2919 2920 2921 2922 2923
	/*
	 * Write the file header --- currently just a format ID.
	 */
	format_id = PGSTAT_FILE_FORMAT_ID;
	fwrite(&format_id, sizeof(format_id), 1, fpout);

2924 2925 2926 2927 2928
	/*
	 * Write global stats struct
	 */
	fwrite(&globalStats, sizeof(globalStats), 1, fpout);

2929 2930 2931 2932
	/*
	 * Walk through the database table.
	 */
	hash_seq_init(&hstat, pgStatDBHash);
2933
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2934 2935
	{
		/*
B
Bruce Momjian 已提交
2936
		 * Write out the DB entry including the number of live backends. We
2937 2938
		 * don't write the tables or functions pointers, since they're of
		 * no use to any other process.
2939 2940
		 */
		fputc('D', fpout);
2941
		fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
2942 2943

		/*
2944
		 * Walk through the database's access stats per table.
2945 2946
		 */
		hash_seq_init(&tstat, dbentry->tables);
2947
		while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2948 2949 2950 2951
		{
			fputc('T', fpout);
			fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
		}
2952

2953 2954 2955 2956 2957 2958 2959 2960 2961 2962
		/*
		 * Walk through the database's function stats table.
		 */
		hash_seq_init(&fstat, dbentry->functions);
		while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
		{
			fputc('F', fpout);
			fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
		}

2963 2964 2965 2966 2967 2968 2969
		/*
		 * Mark the end of this DB
		 */
		fputc('d', fpout);
	}

	/*
2970
	 * No more output to be done. Close the temp file and replace the old
2971 2972
	 * pgstat.stat with it.  The ferror() check replaces testing for error
	 * after each individual fputc or fwrite above.
2973 2974
	 */
	fputc('E', fpout);
2975 2976 2977 2978 2979

	if (ferror(fpout))
	{
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2980 2981
			   errmsg("could not write temporary statistics file \"%s\": %m",
					  PGSTAT_STAT_TMPFILE)));
2982 2983 2984 2985
		fclose(fpout);
		unlink(PGSTAT_STAT_TMPFILE);
	}
	else if (fclose(fpout) < 0)
2986
	{
2987 2988
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2989 2990
			   errmsg("could not close temporary statistics file \"%s\": %m",
					  PGSTAT_STAT_TMPFILE)));
2991
		unlink(PGSTAT_STAT_TMPFILE);
2992
	}
2993
	else if (rename(PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME) < 0)
2994
	{
2995 2996 2997 2998 2999
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
						PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME)));
		unlink(PGSTAT_STAT_TMPFILE);
3000 3001 3002 3003 3004 3005 3006
	}
}


/* ----------
 * pgstat_read_statsfile() -
 *
3007 3008
 *	Reads in an existing statistics collector file and initializes the
 *	databases' hash table (whose entries point to the tables' hash tables).
3009 3010
 * ----------
 */
3011 3012
static HTAB *
pgstat_read_statsfile(Oid onlydb)
3013
{
3014 3015 3016 3017
	PgStat_StatDBEntry *dbentry;
	PgStat_StatDBEntry dbbuf;
	PgStat_StatTabEntry *tabentry;
	PgStat_StatTabEntry tabbuf;
3018 3019
	PgStat_StatFuncEntry funcbuf;
	PgStat_StatFuncEntry *funcentry;
3020
	HASHCTL		hash_ctl;
3021
	HTAB	   *dbhash;
3022
	HTAB	   *tabhash = NULL;
3023
	HTAB	   *funchash = NULL;
3024
	FILE	   *fpin;
3025
	int32		format_id;
3026 3027 3028
	bool		found;

	/*
3029
	 * The tables will live in pgStatLocalContext.
3030
	 */
3031
	pgstat_setup_memcxt();
3032 3033 3034 3035 3036

	/*
	 * Create the DB hashtable
	 */
	memset(&hash_ctl, 0, sizeof(hash_ctl));
3037
	hash_ctl.keysize = sizeof(Oid);
3038
	hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
3039
	hash_ctl.hash = oid_hash;
3040 3041 3042
	hash_ctl.hcxt = pgStatLocalContext;
	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
						 HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
3043

3044 3045 3046 3047 3048 3049
	/*
	 * Clear out global statistics so they start from zero in case we can't
	 * load an existing statsfile.
	 */
	memset(&globalStats, 0, sizeof(globalStats));

3050
	/*
B
Bruce Momjian 已提交
3051 3052 3053
	 * Try to open the status file. If it doesn't exist, the backends simply
	 * return zero for anything and the collector simply starts from scratch
	 * with empty counters.
3054
	 */
3055
	if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
3056
		return dbhash;
3057

3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068
	/*
	 * Verify it's of the expected format.
	 */
	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
		|| format_id != PGSTAT_FILE_FORMAT_ID)
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted pgstat.stat file")));
		goto done;
	}

3069 3070 3071 3072 3073 3074 3075 3076 3077 3078
	/*
	 * Read global stats struct
	 */
	if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted pgstat.stat file")));
		goto done;
	}

3079
	/*
3080 3081
	 * We found an existing collector stats file. Read it and put all the
	 * hashtable entries into place.
3082 3083 3084 3085 3086
	 */
	for (;;)
	{
		switch (fgetc(fpin))
		{
3087 3088
				/*
				 * 'D'	A PgStat_StatDBEntry struct describing a database
3089 3090
				 * follows. Subsequently, zero to many 'T' and 'F' entries
				 * will follow until a 'd' is encountered.
3091
				 */
3092
			case 'D':
3093 3094
				if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
						  fpin) != offsetof(PgStat_StatDBEntry, tables))
3095
				{
3096 3097
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
3098
					goto done;
3099 3100 3101 3102 3103
				}

				/*
				 * Add to the DB hash
				 */
3104
				dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
B
Bruce Momjian 已提交
3105
												  (void *) &dbbuf.databaseid,
3106 3107
															 HASH_ENTER,
															 &found);
3108 3109
				if (found)
				{
3110 3111
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
3112
					goto done;
3113 3114 3115
				}

				memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
3116
				dbentry->tables = NULL;
3117
				dbentry->functions = NULL;
3118 3119

				/*
3120 3121
				 * Don't collect tables if not the requested DB (or the
				 * shared-table info)
3122
				 */
3123 3124 3125 3126
				if (onlydb != InvalidOid)
				{
					if (dbbuf.databaseid != onlydb &&
						dbbuf.databaseid != InvalidOid)
B
Bruce Momjian 已提交
3127
						break;
3128
				}
3129 3130

				memset(&hash_ctl, 0, sizeof(hash_ctl));
3131
				hash_ctl.keysize = sizeof(Oid);
3132
				hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3133
				hash_ctl.hash = oid_hash;
3134
				hash_ctl.hcxt = pgStatLocalContext;
3135 3136 3137
				dbentry->tables = hash_create("Per-database table",
											  PGSTAT_TAB_HASH_SIZE,
											  &hash_ctl,
B
Bruce Momjian 已提交
3138
								   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
3139

3140 3141 3142 3143 3144 3145 3146 3147
				hash_ctl.keysize = sizeof(Oid);
				hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
				hash_ctl.hash = oid_hash;
				hash_ctl.hcxt = pgStatLocalContext;
				dbentry->functions = hash_create("Per-database function",
												 PGSTAT_FUNCTION_HASH_SIZE,
												 &hash_ctl,
								   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
3148
				/*
3149 3150
				 * Arrange that following records add entries to this
				 * database's hash tables.
3151 3152
				 */
				tabhash = dbentry->tables;
3153
				funchash = dbentry->functions;
3154 3155
				break;

3156 3157 3158
				/*
				 * 'd'	End of this database.
				 */
3159 3160
			case 'd':
				tabhash = NULL;
3161
				funchash = NULL;
3162 3163
				break;

3164 3165 3166
				/*
				 * 'T'	A PgStat_StatTabEntry follows.
				 */
3167
			case 'T':
3168 3169
				if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
						  fpin) != sizeof(PgStat_StatTabEntry))
3170
				{
3171 3172
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
3173
					goto done;
3174 3175 3176 3177 3178 3179 3180 3181
				}

				/*
				 * Skip if table belongs to a not requested database.
				 */
				if (tabhash == NULL)
					break;

3182
				tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
B
Bruce Momjian 已提交
3183 3184
													(void *) &tabbuf.tableid,
														 HASH_ENTER, &found);
3185 3186 3187

				if (found)
				{
3188 3189
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
3190
					goto done;
3191 3192 3193 3194 3195
				}

				memcpy(tabentry, &tabbuf, sizeof(tabbuf));
				break;

3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227
				/*
				 * 'F'	A PgStat_StatFuncEntry follows.
				 */
			case 'F':
				if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
						  fpin) != sizeof(PgStat_StatFuncEntry))
				{
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
					goto done;
				}

				/*
				 * Skip if function belongs to a not requested database.
				 */
				if (funchash == NULL)
					break;

				funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
													(void *) &funcbuf.functionid,
														 HASH_ENTER, &found);

				if (found)
				{
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
					goto done;
				}

				memcpy(funcentry, &funcbuf, sizeof(funcbuf));
				break;

3228
				/*
3229
				 * 'E'	The EOF marker of a complete stats file.
3230
				 */
3231 3232
			case 'E':
				goto done;
3233

3234 3235 3236 3237 3238 3239
			default:
				ereport(pgStatRunningInCollector ? LOG : WARNING,
						(errmsg("corrupted pgstat.stat file")));
				goto done;
		}
	}
3240

3241 3242
done:
	FreeFile(fpin);
3243 3244

	return dbhash;
3245
}
3246

3247
/*
3248 3249 3250
 * If not already done, read the statistics collector stats file into
 * some hash tables.  The results will be kept until pgstat_clear_snapshot()
 * is called (typically, at end of transaction).
3251 3252 3253 3254
 */
static void
backend_read_statsfile(void)
{
3255 3256 3257 3258 3259
	/* already read it? */
	if (pgStatDBHash)
		return;
	Assert(!pgStatRunningInCollector);

3260 3261
	/* Autovacuum launcher wants stats about all databases */
	if (IsAutoVacuumLauncherProcess())
3262
		pgStatDBHash = pgstat_read_statsfile(InvalidOid);
3263
	else
3264 3265
		pgStatDBHash = pgstat_read_statsfile(MyDatabaseId);
}
3266

3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282

/* ----------
 * pgstat_setup_memcxt() -
 *
 *	Create pgStatLocalContext, if not already done.
 * ----------
 */
static void
pgstat_setup_memcxt(void)
{
	if (!pgStatLocalContext)
		pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
												   "Statistics snapshot",
												   ALLOCSET_SMALL_MINSIZE,
												   ALLOCSET_SMALL_INITSIZE,
												   ALLOCSET_SMALL_MAXSIZE);
3283 3284
}

3285 3286 3287 3288

/* ----------
 * pgstat_clear_snapshot() -
 *
B
Bruce Momjian 已提交
3289
 *	Discard any data collected in the current transaction.	Any subsequent
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310
 *	request will cause new snapshots to be read.
 *
 *	This is also invoked during transaction commit or abort to discard
 *	the no-longer-wanted snapshot.
 * ----------
 */
void
pgstat_clear_snapshot(void)
{
	/* Release memory, if any was allocated */
	if (pgStatLocalContext)
		MemoryContextDelete(pgStatLocalContext);

	/* Reset variables */
	pgStatLocalContext = NULL;
	pgStatDBHash = NULL;
	localBackendStatusTable = NULL;
	localNumBackends = 0;
}


3311 3312 3313 3314 3315 3316 3317 3318 3319
/* ----------
 * pgstat_recv_tabstat() -
 *
 *	Count what the backend has done.
 * ----------
 */
static void
pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
{
3320 3321 3322 3323 3324
	PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	int			i;
	bool		found;
3325

3326
	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
3327 3328

	/*
3329
	 * Update database-wide stats.
3330
	 */
3331 3332
	dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
	dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
3333 3334 3335 3336 3337 3338

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
3339
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
B
Bruce Momjian 已提交
3340 3341
												  (void *) &(tabmsg[i].t_id),
													   HASH_ENTER, &found);
3342 3343 3344 3345

		if (!found)
		{
			/*
B
Bruce Momjian 已提交
3346 3347
			 * If it's a new table entry, initialize counters to the values we
			 * just got.
3348
			 */
3349 3350 3351 3352 3353 3354
			tabentry->numscans = tabmsg[i].t_counts.t_numscans;
			tabentry->tuples_returned = tabmsg[i].t_counts.t_tuples_returned;
			tabentry->tuples_fetched = tabmsg[i].t_counts.t_tuples_fetched;
			tabentry->tuples_inserted = tabmsg[i].t_counts.t_tuples_inserted;
			tabentry->tuples_updated = tabmsg[i].t_counts.t_tuples_updated;
			tabentry->tuples_deleted = tabmsg[i].t_counts.t_tuples_deleted;
3355
			tabentry->tuples_hot_updated = tabmsg[i].t_counts.t_tuples_hot_updated;
3356 3357 3358 3359 3360
			tabentry->n_live_tuples = tabmsg[i].t_counts.t_new_live_tuples;
			tabentry->n_dead_tuples = tabmsg[i].t_counts.t_new_dead_tuples;
			tabentry->blocks_fetched = tabmsg[i].t_counts.t_blocks_fetched;
			tabentry->blocks_hit = tabmsg[i].t_counts.t_blocks_hit;

3361
			tabentry->last_anl_tuples = 0;
3362 3363 3364 3365
			tabentry->vacuum_timestamp = 0;
			tabentry->autovac_vacuum_timestamp = 0;
			tabentry->analyze_timestamp = 0;
			tabentry->autovac_analyze_timestamp = 0;
3366 3367 3368 3369 3370 3371
		}
		else
		{
			/*
			 * Otherwise add the values to the existing entry.
			 */
3372 3373 3374 3375 3376 3377
			tabentry->numscans += tabmsg[i].t_counts.t_numscans;
			tabentry->tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
			tabentry->tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
			tabentry->tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
			tabentry->tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
			tabentry->tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
3378
			tabentry->tuples_hot_updated += tabmsg[i].t_counts.t_tuples_hot_updated;
3379 3380 3381 3382
			tabentry->n_live_tuples += tabmsg[i].t_counts.t_new_live_tuples;
			tabentry->n_dead_tuples += tabmsg[i].t_counts.t_new_dead_tuples;
			tabentry->blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
			tabentry->blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
3383 3384
		}

3385 3386
		/* Clamp n_live_tuples in case of negative new_live_tuples */
		tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
3387 3388
		/* Likewise for n_dead_tuples */
		tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
3389

3390
		/*
3391
		 * Add per-table stats to the per-database entry, too.
3392
		 */
3393 3394 3395 3396 3397 3398 3399
		dbentry->n_tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
		dbentry->n_tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
		dbentry->n_tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
		dbentry->n_tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
		dbentry->n_tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
		dbentry->n_blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
		dbentry->n_blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412
	}
}


/* ----------
 * pgstat_recv_tabpurge() -
 *
 *	Arrange for dead table removal.
 * ----------
 */
static void
pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
{
3413 3414
	PgStat_StatDBEntry *dbentry;
	int			i;
3415

3416 3417 3418 3419 3420 3421 3422
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	/*
	 * No need to purge if we don't even know the database.
	 */
	if (!dbentry || !dbentry->tables)
		return;
3423 3424 3425 3426 3427 3428

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
3429 3430 3431 3432
		/* Remove from hashtable if present; we don't care if it's not. */
		(void) hash_search(dbentry->tables,
						   (void *) &(msg->m_tableid[i]),
						   HASH_REMOVE, NULL);
3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445
	}
}


/* ----------
 * pgstat_recv_dropdb() -
 *
 *	Arrange for dead database removal
 * ----------
 */
static void
pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
{
3446
	PgStat_StatDBEntry *dbentry;
3447 3448 3449 3450

	/*
	 * Lookup the database in the hashtable.
	 */
3451
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
3452 3453

	/*
3454
	 * If found, remove it.
3455
	 */
3456
	if (dbentry)
3457 3458 3459
	{
		if (dbentry->tables != NULL)
			hash_destroy(dbentry->tables);
3460 3461
		if (dbentry->functions != NULL)
			hash_destroy(dbentry->functions);
3462 3463 3464 3465 3466 3467 3468 3469

		if (hash_search(pgStatDBHash,
						(void *) &(dbentry->databaseid),
						HASH_REMOVE, NULL) == NULL)
			ereport(ERROR,
					(errmsg("database hash table corrupted "
							"during cleanup --- abort")));
	}
3470 3471 3472 3473
}


/* ----------
3474
 * pgstat_recv_resetcounter() -
3475
 *
3476
 *	Reset the statistics for the specified database.
3477 3478 3479 3480 3481
 * ----------
 */
static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
{
3482 3483
	HASHCTL		hash_ctl;
	PgStat_StatDBEntry *dbentry;
3484 3485

	/*
3486
	 * Lookup the database in the hashtable.  Nothing to do if not there.
3487
	 */
3488 3489 3490 3491
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	if (!dbentry)
		return;
3492 3493

	/*
B
Bruce Momjian 已提交
3494 3495
	 * We simply throw away all the database's table entries by recreating a
	 * new hash table for them.
3496 3497 3498
	 */
	if (dbentry->tables != NULL)
		hash_destroy(dbentry->tables);
3499 3500
	if (dbentry->functions != NULL)
		hash_destroy(dbentry->functions);
3501

3502
	dbentry->tables = NULL;
3503
	dbentry->functions = NULL;
3504 3505 3506 3507
	dbentry->n_xact_commit = 0;
	dbentry->n_xact_rollback = 0;
	dbentry->n_blocks_fetched = 0;
	dbentry->n_blocks_hit = 0;
3508 3509

	memset(&hash_ctl, 0, sizeof(hash_ctl));
3510
	hash_ctl.keysize = sizeof(Oid);
3511
	hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3512
	hash_ctl.hash = oid_hash;
3513 3514 3515 3516
	dbentry->tables = hash_create("Per-database table",
								  PGSTAT_TAB_HASH_SIZE,
								  &hash_ctl,
								  HASH_ELEM | HASH_FUNCTION);
3517 3518 3519 3520 3521 3522 3523 3524

	hash_ctl.keysize = sizeof(Oid);
	hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
	hash_ctl.hash = oid_hash;
	dbentry->functions = hash_create("Per-database function",
									 PGSTAT_FUNCTION_HASH_SIZE,
									 &hash_ctl,
									 HASH_ELEM | HASH_FUNCTION);
3525
}
3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580

/* ----------
 * pgstat_recv_autovac() -
 *
 *	Process an autovacuum signalling message.
 * ----------
 */
static void
pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
{
	PgStat_StatDBEntry *dbentry;

	/*
	 * Lookup the database in the hashtable.  Don't create the entry if it
	 * doesn't exist, because autovacuum may be processing a template
	 * database.  If this isn't the case, the database is most likely to have
	 * an entry already.  (If it doesn't, not much harm is done anyway --
	 * it'll get created as soon as somebody actually uses the database.)
	 */
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
	if (dbentry == NULL)
		return;

	/*
	 * Store the last autovacuum time in the database entry.
	 */
	dbentry->last_autovac_time = msg->m_start_time;
}

/* ----------
 * pgstat_recv_vacuum() -
 *
 *	Process a VACUUM message.
 * ----------
 */
static void
pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;

	/*
	 * Don't create either the database or table entry if it doesn't already
	 * exist.  This avoids bloating the stats with entries for stuff that is
	 * only touched by vacuum and not by live operations.
	 */
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
	if (dbentry == NULL)
		return;

	tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
						   HASH_FIND, NULL);
	if (tabentry == NULL)
		return;

B
Bruce Momjian 已提交
3581
	if (msg->m_autovacuum)
3582
		tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
B
Bruce Momjian 已提交
3583 3584
	else
		tabentry->vacuum_timestamp = msg->m_vacuumtime;
3585
	tabentry->n_live_tuples = msg->m_tuples;
3586
	/* Resetting dead_tuples to 0 is an approximation ... */
3587 3588 3589 3590 3591 3592 3593 3594 3595
	tabentry->n_dead_tuples = 0;
	if (msg->m_analyze)
	{
		tabentry->last_anl_tuples = msg->m_tuples;
		if (msg->m_autovacuum)
			tabentry->autovac_analyze_timestamp = msg->m_vacuumtime;
		else
			tabentry->analyze_timestamp = msg->m_vacuumtime;
	}
3596 3597
	else
	{
3598
		/* last_anl_tuples must never exceed n_live_tuples+n_dead_tuples */
3599 3600 3601
		tabentry->last_anl_tuples = Min(tabentry->last_anl_tuples,
										msg->m_tuples);
	}
3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629
}

/* ----------
 * pgstat_recv_analyze() -
 *
 *	Process an ANALYZE message.
 * ----------
 */
static void
pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;

	/*
	 * Don't create either the database or table entry if it doesn't already
	 * exist.  This avoids bloating the stats with entries for stuff that is
	 * only touched by analyze and not by live operations.
	 */
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
	if (dbentry == NULL)
		return;

	tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
						   HASH_FIND, NULL);
	if (tabentry == NULL)
		return;

B
Bruce Momjian 已提交
3630
	if (msg->m_autovacuum)
3631
		tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
B
Bruce Momjian 已提交
3632
	else
3633 3634 3635 3636 3637
		tabentry->analyze_timestamp = msg->m_analyzetime;
	tabentry->n_live_tuples = msg->m_live_tuples;
	tabentry->n_dead_tuples = msg->m_dead_tuples;
	tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
}
3638 3639 3640 3641 3642 3643 3644 3645 3646


/* ----------
 * pgstat_recv_bgwriter() -
 *
 *	Process a BGWRITER message.
 * ----------
 */
static void
3647
pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
3648 3649 3650 3651
{
	globalStats.timed_checkpoints += msg->m_timed_checkpoints;
	globalStats.requested_checkpoints += msg->m_requested_checkpoints;
	globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
3652 3653
	globalStats.buf_written_clean += msg->m_buf_written_clean;
	globalStats.maxwritten_clean += msg->m_maxwritten_clean;
3654 3655
	globalStats.buf_written_backend += msg->m_buf_written_backend;
	globalStats.buf_alloc += msg->m_buf_alloc;
3656
}
3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736

/* ----------
 * pgstat_recv_funcstat() -
 *
 *	Count what the backend has done.
 * ----------
 */
static void
pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
{
	PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
	PgStat_StatDBEntry *dbentry;
	PgStat_StatFuncEntry *funcentry;
	int			i;
	bool		found;

	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);

	/*
	 * Process all function entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++, funcmsg++)
	{
		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
												  (void *) &(funcmsg->f_id),
													   HASH_ENTER, &found);

		if (!found)
		{
			/*
			 * If it's a new function entry, initialize counters to the values
			 * we just got.
			 */
			funcentry->f_numcalls = funcmsg->f_numcalls;
			funcentry->f_time = funcmsg->f_time;
			funcentry->f_time_self = funcmsg->f_time_self;
		}
		else
		{
			/*
			 * Otherwise add the values to the existing entry.
			 */
			funcentry->f_numcalls += funcmsg->f_numcalls;
			funcentry->f_time += funcmsg->f_time;
			funcentry->f_time_self += funcmsg->f_time_self;
		}
	}
}

/* ----------
 * pgstat_recv_funcpurge() -
 *
 *	Arrange for dead function removal.
 * ----------
 */
static void
pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	int			i;

	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	/*
	 * No need to purge if we don't even know the database.
	 */
	if (!dbentry || !dbentry->functions)
		return;

	/*
	 * Process all function entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
		/* Remove from hashtable if present; we don't care if it's not. */
		(void) hash_search(dbentry->functions,
						   (void *) &(msg->m_functionid[i]),
						   HASH_REMOVE, NULL);
	}
}