pgstat.c 79.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* ----------
 * pgstat.c
 *
 *	All the statistics collector stuff hacked up in one big, ugly file.
 *
 *	TODO:	- Separate collector, postmaster and backend stuff
 *			  into different files.
 *
 *			- Add some automatic call for pgstat vacuuming.
 *
 *			- Add a pgstat config column to pg_database, so this
12
 *			  entire thing can be enabled/disabled on a per db basis.
13
 *
14
 *	Copyright (c) 2001-2007, PostgreSQL Global Development Group
15
 *
16
 *	$PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.168 2007/11/15 22:25:16 momjian Exp $
17 18
 * ----------
 */
P
Peter Eisentraut 已提交
19 20
#include "postgres.h"

21 22 23 24 25
#include <unistd.h>
#include <fcntl.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/socket.h>
B
Bruce Momjian 已提交
26
#include <netdb.h>
27 28 29
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
30
#include <time.h>
31 32 33 34 35 36
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
37

38 39
#include "pgstat.h"

40
#include "access/heapam.h"
41
#include "access/transam.h"
42
#include "access/twophase_rmgr.h"
43
#include "access/xact.h"
44
#include "catalog/pg_database.h"
45
#include "libpq/ip.h"
B
Bruce Momjian 已提交
46
#include "libpq/libpq.h"
47
#include "libpq/pqsignal.h"
48
#include "mb/pg_wchar.h"
49
#include "miscadmin.h"
50
#include "postmaster/autovacuum.h"
51
#include "postmaster/fork_process.h"
52
#include "postmaster/postmaster.h"
53
#include "storage/backendid.h"
54
#include "storage/fd.h"
55
#include "storage/ipc.h"
56
#include "storage/pg_shmem.h"
57
#include "storage/pmsignal.h"
58
#include "utils/guc.h"
59
#include "utils/memutils.h"
60
#include "utils/ps_status.h"
61 62


63
/* ----------
64
 * Paths for the statistics files (relative to installation's $PGDATA).
65 66
 * ----------
 */
67 68
#define PGSTAT_STAT_FILENAME	"global/pgstat.stat"
#define PGSTAT_STAT_TMPFILE		"global/pgstat.tmp"
69 70 71 72 73

/* ----------
 * Timer definitions.
 * ----------
 */
74 75
#define PGSTAT_STAT_INTERVAL	500		/* How often to write the status file;
										 * in milliseconds. */
76

77 78 79
#define PGSTAT_RESTART_INTERVAL 60		/* How often to attempt to restart a
										 * failed statistics collector; in
										 * seconds. */
80

81 82 83
#define PGSTAT_SELECT_TIMEOUT	2		/* How often to check for postmaster
										 * death; in seconds. */

84 85 86 87 88 89 90 91 92

/* ----------
 * The initial size hints for the hash tables used in the collector.
 * ----------
 */
#define PGSTAT_DB_HASH_SIZE		16
#define PGSTAT_TAB_HASH_SIZE	512


93
/* ----------
94
 * GUC parameters
95 96
 * ----------
 */
97 98
bool		pgstat_track_activities = false;
bool		pgstat_track_counts = false;
99

100 101 102 103 104 105 106
/*
 * BgWriter global statistics counters (unused in other processes).
 * Stored directly in a stats message structure so it can be sent
 * without needing to copy things around.  We assume this inits to zeroes.
 */
PgStat_MsgBgWriter BgWriterStats;

107 108 109 110
/* ----------
 * Local data
 * ----------
 */
B
Bruce Momjian 已提交
111
NON_EXEC_STATIC int pgStatSock = -1;
112

B
Bruce Momjian 已提交
113
static struct sockaddr_storage pgStatAddr;
114

115
static time_t last_pgstat_start_time;
116

117
static bool pgStatRunningInCollector = false;
118

119
/*
120 121
 * Structures in which backends store per-table info that's waiting to be
 * sent to the collector.
122
 *
123 124 125 126 127 128 129
 * NOTE: once allocated, TabStatusArray structures are never moved or deleted
 * for the life of the backend.  Also, we zero out the t_id fields of the
 * contained PgStat_TableStatus structs whenever they are not actively in use.
 * This allows relcache pgstat_info pointers to be treated as long-lived data,
 * avoiding repeated searches in pgstat_initstats() when a relation is
 * repeatedly opened during a transaction.
 */
B
Bruce Momjian 已提交
130
#define TABSTAT_QUANTUM		100 /* we alloc this many at a time */
131 132

typedef struct TabStatusArray
133
{
134
	struct TabStatusArray *tsa_next;	/* link to next array, if any */
B
Bruce Momjian 已提交
135
	int			tsa_used;		/* # entries currently used */
136
	PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM];	/* per-table data */
137
} TabStatusArray;
138 139

static TabStatusArray *pgStatTabList = NULL;
B
Bruce Momjian 已提交
140

141 142 143 144 145 146 147 148 149
/*
 * Tuple insertion/deletion counts for an open transaction can't be propagated
 * into PgStat_TableStatus counters until we know if it is going to commit
 * or abort.  Hence, we keep these counts in per-subxact structs that live
 * in TopTransactionContext.  This data structure is designed on the assumption
 * that subxacts won't usually modify very many tables.
 */
typedef struct PgStat_SubXactStatus
{
B
Bruce Momjian 已提交
150
	int			nest_level;		/* subtransaction nest level */
151 152
	struct PgStat_SubXactStatus *prev;	/* higher-level subxact if any */
	PgStat_TableXactStatus *first;		/* head of list for this subxact */
153
} PgStat_SubXactStatus;
154

155
static PgStat_SubXactStatus *pgStatXactStack = NULL;
156

157 158
static int	pgStatXactCommit = 0;
static int	pgStatXactRollback = 0;
159

160 161 162
/* Record that's written to 2PC state file when pgstat state is persisted */
typedef struct TwoPhasePgStatRecord
{
B
Bruce Momjian 已提交
163 164 165 166
	PgStat_Counter tuples_inserted;		/* tuples inserted in xact */
	PgStat_Counter tuples_deleted;		/* tuples deleted in xact */
	Oid			t_id;			/* table's OID */
	bool		t_shared;		/* is it a shared catalog? */
167
} TwoPhasePgStatRecord;
168 169 170 171

/*
 * Info about current "snapshot" of stats file
 */
172
static MemoryContext pgStatLocalContext = NULL;
173
static HTAB *pgStatDBHash = NULL;
174 175
static PgBackendStatus *localBackendStatusTable = NULL;
static int	localNumBackends = 0;
176

177 178 179 180 181 182 183
/*
 * Cluster wide statistics, kept in the stats collector.
 * Contains statistics that are not collected per database
 * or per table.
 */
static PgStat_GlobalStats globalStats;

B
Bruce Momjian 已提交
184 185
static volatile bool need_exit = false;
static volatile bool need_statwrite = false;
186

187

188 189 190 191
/* ----------
 * Local function forward declarations
 * ----------
 */
192
#ifdef EXEC_BACKEND
193
static pid_t pgstat_forkexec(void);
194
#endif
195 196

NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
197
static void pgstat_exit(SIGNAL_ARGS);
198
static void force_statwrite(SIGNAL_ARGS);
199
static void pgstat_beshutdown_hook(int code, Datum arg);
200

201
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
202
static void pgstat_write_statsfile(void);
203
static HTAB *pgstat_read_statsfile(Oid onlydb);
204
static void backend_read_statsfile(void);
205
static void pgstat_read_current_status(void);
206 207

static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
208
static HTAB *pgstat_collect_oids(Oid catalogid);
209

210 211
static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);

212 213
static void pgstat_setup_memcxt(void);

214
static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
215 216 217 218 219 220
static void pgstat_send(void *msg, int len);

static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
221 222 223
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
224
static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
225 226 227 228 229 230 231 232 233 234 235


/* ------------------------------------------------------------
 * Public functions called from postmaster follow
 * ------------------------------------------------------------
 */

/* ----------
 * pgstat_init() -
 *
 *	Called from postmaster at startup. Create the resources required
236 237 238
 *	by the statistics collector process.  If unable to do so, do not
 *	fail --- better to let the postmaster start with stats collection
 *	disabled.
239 240
 * ----------
 */
241
void
242 243
pgstat_init(void)
{
B
Bruce Momjian 已提交
244 245 246 247
	ACCEPT_TYPE_ARG3 alen;
	struct addrinfo *addrs = NULL,
			   *addr,
				hints;
B
Bruce Momjian 已提交
248
	int			ret;
B
Bruce Momjian 已提交
249
	fd_set		rset;
250
	struct timeval tv;
B
Bruce Momjian 已提交
251 252
	char		test_byte;
	int			sel_res;
253
	int			tries = 0;
B
Bruce Momjian 已提交
254

255
#define TESTBYTEVAL ((char) 199)
256 257

	/*
258
	 * Create the UDP socket for sending and receiving statistic messages
259
	 */
B
Bruce Momjian 已提交
260 261 262 263 264 265 266 267
	hints.ai_flags = AI_PASSIVE;
	hints.ai_family = PF_UNSPEC;
	hints.ai_socktype = SOCK_DGRAM;
	hints.ai_protocol = 0;
	hints.ai_addrlen = 0;
	hints.ai_addr = NULL;
	hints.ai_canonname = NULL;
	hints.ai_next = NULL;
268
	ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
269
	if (ret || !addrs)
B
Bruce Momjian 已提交
270
	{
271
		ereport(LOG,
272
				(errmsg("could not resolve \"localhost\": %s",
273
						gai_strerror(ret))));
B
Bruce Momjian 已提交
274 275
		goto startup_failed;
	}
B
Bruce Momjian 已提交
276

277
	/*
278 279 280
	 * On some platforms, pg_getaddrinfo_all() may return multiple addresses
	 * only one of which will actually work (eg, both IPv6 and IPv4 addresses
	 * when kernel will reject IPv6).  Worse, the failure may occur at the
281
	 * bind() or perhaps even connect() stage.	So we must loop through the
282 283
	 * results till we find a working combination. We will generate LOG
	 * messages, but no error, for bogus combinations.
284
	 */
285 286 287 288 289 290 291
	for (addr = addrs; addr; addr = addr->ai_next)
	{
#ifdef HAVE_UNIX_SOCKETS
		/* Ignore AF_UNIX sockets, if any are returned. */
		if (addr->ai_family == AF_UNIX)
			continue;
#endif
B
Bruce Momjian 已提交
292

293 294
		if (++tries > 1)
			ereport(LOG,
B
Bruce Momjian 已提交
295 296
			(errmsg("trying another address for the statistics collector")));

297 298 299 300 301 302 303
		/*
		 * Create the socket.
		 */
		if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
304
			errmsg("could not create socket for statistics collector: %m")));
305 306 307 308
			continue;
		}

		/*
B
Bruce Momjian 已提交
309 310
		 * Bind it to a kernel assigned port on localhost and get the assigned
		 * port via getsockname().
311 312 313 314 315
		 */
		if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
316
			  errmsg("could not bind socket for statistics collector: %m")));
317 318 319 320 321 322
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		alen = sizeof(pgStatAddr);
B
Bruce Momjian 已提交
323
		if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
324 325 326 327 328 329 330 331 332 333
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not get address of socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		/*
B
Bruce Momjian 已提交
334 335 336 337
		 * Connect the socket to its own address.  This saves a few cycles by
		 * not having to respecify the target address on every send. This also
		 * provides a kernel-level check that only packets from this same
		 * address will be received.
338
		 */
B
Bruce Momjian 已提交
339
		if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
340 341 342
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
343
			errmsg("could not connect socket for statistics collector: %m")));
344 345 346 347
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}
B
Bruce Momjian 已提交
348

349
		/*
B
Bruce Momjian 已提交
350 351 352 353
		 * Try to send and receive a one-byte test message on the socket. This
		 * is to catch situations where the socket can be created but will not
		 * actually pass data (for instance, because kernel packet filtering
		 * rules prevent it).
354 355
		 */
		test_byte = TESTBYTEVAL;
356 357

retry1:
358 359
		if (send(pgStatSock, &test_byte, 1, 0) != 1)
		{
360 361
			if (errno == EINTR)
				goto retry1;	/* if interrupted, just retry */
362 363 364 365 366 367 368 369 370
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not send test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		/*
B
Bruce Momjian 已提交
371 372 373
		 * There could possibly be a little delay before the message can be
		 * received.  We arbitrarily allow up to half a second before deciding
		 * it's broken.
374 375 376 377 378 379 380
		 */
		for (;;)				/* need a loop to handle EINTR */
		{
			FD_ZERO(&rset);
			FD_SET(pgStatSock, &rset);
			tv.tv_sec = 0;
			tv.tv_usec = 500000;
B
Bruce Momjian 已提交
381
			sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
382 383 384 385 386 387 388
			if (sel_res >= 0 || errno != EINTR)
				break;
		}
		if (sel_res < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
389
					 errmsg("select() failed in statistics collector: %m")));
390 391 392 393 394 395 396
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}
		if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
		{
			/*
B
Bruce Momjian 已提交
397 398
			 * This is the case we actually think is likely, so take pains to
			 * give a specific message for it.
399 400 401 402
			 *
			 * errno will not be set meaningfully here, so don't use it.
			 */
			ereport(LOG,
403
					(errcode(ERRCODE_CONNECTION_FAILURE),
404 405 406 407 408 409 410 411
					 errmsg("test message did not get through on socket for statistics collector")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		test_byte++;			/* just make sure variable is changed */

412
retry2:
413 414
		if (recv(pgStatSock, &test_byte, 1, 0) != 1)
		{
415 416
			if (errno == EINTR)
				goto retry2;	/* if interrupted, just retry */
417 418 419 420 421 422 423 424
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not receive test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

B
Bruce Momjian 已提交
425
		if (test_byte != TESTBYTEVAL)	/* strictly paranoia ... */
426 427
		{
			ereport(LOG,
428
					(errcode(ERRCODE_INTERNAL_ERROR),
429 430 431 432 433 434
					 errmsg("incorrect test message transmission on socket for statistics collector")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

435 436
		/* If we get here, we have a working socket */
		break;
437 438
	}

439 440
	/* Did we find a working address? */
	if (!addr || pgStatSock < 0)
441
		goto startup_failed;
442 443

	/*
B
Bruce Momjian 已提交
444
	 * Set the socket to non-blocking IO.  This ensures that if the collector
445 446
	 * falls behind, statistics messages will be discarded; backends won't
	 * block waiting to send messages to the collector.
447
	 */
448
	if (!pg_set_noblock(pgStatSock))
449
	{
450 451
		ereport(LOG,
				(errcode_for_socket_access(),
B
Bruce Momjian 已提交
452
				 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
453
		goto startup_failed;
454 455
	}

456
	pg_freeaddrinfo_all(hints.ai_family, addrs);
457

458
	return;
459 460

startup_failed:
461
	ereport(LOG,
B
Bruce Momjian 已提交
462
	  (errmsg("disabling statistics collector for lack of working socket")));
463

464
	if (addrs)
465
		pg_freeaddrinfo_all(hints.ai_family, addrs);
B
Bruce Momjian 已提交
466

467
	if (pgStatSock >= 0)
468
		closesocket(pgStatSock);
469 470
	pgStatSock = -1;

471 472
	/*
	 * Adjust GUC variables to suppress useless activity, and for debugging
B
Bruce Momjian 已提交
473 474 475
	 * purposes (seeing track_counts off is a clue that we failed here). We
	 * use PGC_S_OVERRIDE because there is no point in trying to turn it back
	 * on from postgresql.conf without a restart.
476 477
	 */
	SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
478 479
}

480 481 482
/*
 * pgstat_reset_all() -
 *
483
 * Remove the stats file.  This is currently used only if WAL
484 485 486 487 488 489 490
 * recovery is needed after a crash.
 */
void
pgstat_reset_all(void)
{
	unlink(PGSTAT_STAT_FILENAME);
}
491

492 493
#ifdef EXEC_BACKEND

494
/*
495
 * pgstat_forkexec() -
496
 *
497
 * Format up the arglist for, then fork and exec, statistics collector process
498
 */
499
static pid_t
500
pgstat_forkexec(void)
501
{
B
Bruce Momjian 已提交
502
	char	   *av[10];
503
	int			ac = 0;
504 505

	av[ac++] = "postgres";
506
	av[ac++] = "--forkcol";
507 508 509 510
	av[ac++] = NULL;			/* filled in by postmaster_forkexec */

	av[ac] = NULL;
	Assert(ac < lengthof(av));
511

512
	return postmaster_forkexec(ac, av);
513
}
B
Bruce Momjian 已提交
514
#endif   /* EXEC_BACKEND */
515

516

517
/*
518 519 520
 * pgstat_start() -
 *
 *	Called from postmaster at startup or after an existing collector
521
 *	died.  Attempt to fire up a fresh statistics collector.
522
 *
523 524
 *	Returns PID of child process, or 0 if fail.
 *
525
 *	Note: if fail, we will be called again from the postmaster main loop.
526
 */
527
int
528
pgstat_start(void)
529
{
530
	time_t		curtime;
531
	pid_t		pgStatPid;
532

533
	/*
B
Bruce Momjian 已提交
534 535
	 * Check that the socket is there, else pgstat_init failed and we can do
	 * nothing useful.
536
	 */
537
	if (pgStatSock < 0)
538
		return 0;
539

540
	/*
B
Bruce Momjian 已提交
541 542 543 544
	 * Do nothing if too soon since last collector start.  This is a safety
	 * valve to protect against continuous respawn attempts if the collector
	 * is dying immediately at launch.	Note that since we will be re-called
	 * from the postmaster main loop, we will get another chance later.
545 546 547 548
	 */
	curtime = time(NULL);
	if ((unsigned int) (curtime - last_pgstat_start_time) <
		(unsigned int) PGSTAT_RESTART_INTERVAL)
549
		return 0;
550 551
	last_pgstat_start_time = curtime;

552
	/*
553
	 * Okay, fork off the collector.
554
	 */
555
#ifdef EXEC_BACKEND
556
	switch ((pgStatPid = pgstat_forkexec()))
557
#else
558
	switch ((pgStatPid = fork_process()))
559
#endif
560 561
	{
		case -1:
562
			ereport(LOG,
563
					(errmsg("could not fork statistics collector: %m")));
564
			return 0;
565

566
#ifndef EXEC_BACKEND
567
		case 0:
568
			/* in postmaster child ... */
569
			/* Close the postmaster's sockets */
570
			ClosePostmasterPorts(false);
571

572 573 574
			/* Lose the postmaster's on-exit routines */
			on_exit_reset();

575 576 577
			/* Drop our connection to postmaster's shared memory, as well */
			PGSharedMemoryDetach();

578
			PgstatCollectorMain(0, NULL);
579
			break;
580
#endif
581 582

		default:
583
			return (int) pgStatPid;
584 585
	}

586 587
	/* shouldn't get here */
	return 0;
588 589
}

B
Bruce Momjian 已提交
590 591
void
allow_immediate_pgstat_restart(void)
592
{
B
Bruce Momjian 已提交
593
	last_pgstat_start_time = 0;
594
}
595 596 597

/* ------------------------------------------------------------
 * Public functions used by backends follow
598
 *------------------------------------------------------------
599 600 601 602 603 604
 */


/* ----------
 * pgstat_report_tabstat() -
 *
605 606 607 608
 *	Called from tcop/postgres.c to send the so far collected per-table
 *	access statistics to the collector.  Note that this is called only
 *	when not within a transaction, so it is fair to use transaction stop
 *	time as an approximation of current time.
609 610 611
 * ----------
 */
void
612
pgstat_report_tabstat(bool force)
613
{
614 615
	/* we assume this inits to all zeroes: */
	static const PgStat_TableCounts all_zeroes;
B
Bruce Momjian 已提交
616
	static TimestampTz last_report = 0;
617

618
	TimestampTz now;
619 620 621 622
	PgStat_MsgTabstat regular_msg;
	PgStat_MsgTabstat shared_msg;
	TabStatusArray *tsa;
	int			i;
623 624

	/* Don't expend a clock check if nothing to do */
625 626
	if (pgStatTabList == NULL ||
		pgStatTabList->tsa_used == 0)
627 628 629 630
		return;

	/*
	 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
631
	 * msec since we last sent one, or the caller wants to force stats out.
632 633
	 */
	now = GetCurrentTransactionStopTimestamp();
634 635
	if (!force &&
		!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
636 637 638
		return;
	last_report = now;

639
	/*
640 641
	 * Scan through the TabStatusArray struct(s) to find tables that actually
	 * have counts, and build messages to send.  We have to separate shared
B
Bruce Momjian 已提交
642 643
	 * relations from regular ones because the databaseid field in the message
	 * header has to depend on that.
644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
	 */
	regular_msg.m_databaseid = MyDatabaseId;
	shared_msg.m_databaseid = InvalidOid;
	regular_msg.m_nentries = 0;
	shared_msg.m_nentries = 0;

	for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
	{
		for (i = 0; i < tsa->tsa_used; i++)
		{
			PgStat_TableStatus *entry = &tsa->tsa_entries[i];
			PgStat_MsgTabstat *this_msg;
			PgStat_TableEntry *this_ent;

			/* Shouldn't have any pending transaction-dependent counts */
			Assert(entry->trans == NULL);

			/*
B
Bruce Momjian 已提交
662 663
			 * Ignore entries that didn't accumulate any actual counts, such
			 * as indexes that were opened by the planner but not used.
664 665 666 667
			 */
			if (memcmp(&entry->t_counts, &all_zeroes,
					   sizeof(PgStat_TableCounts)) == 0)
				continue;
B
Bruce Momjian 已提交
668

669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
			/*
			 * OK, insert data into the appropriate message, and send if full.
			 */
			this_msg = entry->t_shared ? &shared_msg : &regular_msg;
			this_ent = &this_msg->m_entry[this_msg->m_nentries];
			this_ent->t_id = entry->t_id;
			memcpy(&this_ent->t_counts, &entry->t_counts,
				   sizeof(PgStat_TableCounts));
			if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
			{
				pgstat_send_tabstat(this_msg);
				this_msg->m_nentries = 0;
			}
		}
		/* zero out TableStatus structs after use */
		MemSet(tsa->tsa_entries, 0,
			   tsa->tsa_used * sizeof(PgStat_TableStatus));
		tsa->tsa_used = 0;
	}

	/*
	 * Send partial messages.  If force is true, make sure that any pending
	 * xact commit/abort gets counted, even if no table stats to send.
692
	 */
693 694 695 696 697
	if (regular_msg.m_nentries > 0 ||
		(force && (pgStatXactCommit > 0 || pgStatXactRollback > 0)))
		pgstat_send_tabstat(&regular_msg);
	if (shared_msg.m_nentries > 0)
		pgstat_send_tabstat(&shared_msg);
698 699
}

700 701 702
/*
 * Subroutine for pgstat_report_tabstat: finish and send a tabstat message
 */
703
static void
704
pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg)
705
{
706 707
	int			n;
	int			len;
708

709 710 711
	/* It's unlikely we'd get here with no socket, but maybe not impossible */
	if (pgStatSock < 0)
		return;
712

713 714 715 716 717 718
	/*
	 * Report accumulated xact commit/rollback whenever we send a normal
	 * tabstat message
	 */
	if (OidIsValid(tsmsg->m_databaseid))
	{
719 720
		tsmsg->m_xact_commit = pgStatXactCommit;
		tsmsg->m_xact_rollback = pgStatXactRollback;
721
		pgStatXactCommit = 0;
722
		pgStatXactRollback = 0;
723 724 725 726 727 728
	}
	else
	{
		tsmsg->m_xact_commit = 0;
		tsmsg->m_xact_rollback = 0;
	}
729

730 731 732
	n = tsmsg->m_nentries;
	len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
		n * sizeof(PgStat_TableEntry);
733

734 735
	pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
	pgstat_send(tsmsg, len);
736 737 738 739 740 741 742 743 744
}


/* ----------
 * pgstat_vacuum_tabstat() -
 *
 *	Will tell the collector about objects he can get rid of.
 * ----------
 */
745
void
746 747
pgstat_vacuum_tabstat(void)
{
748
	HTAB	   *htab;
749
	PgStat_MsgTabpurge msg;
750 751 752 753
	HASH_SEQ_STATUS hstat;
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	int			len;
754 755

	if (pgStatSock < 0)
756
		return;
757 758

	/*
B
Bruce Momjian 已提交
759 760
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
761
	 */
762
	backend_read_statsfile();
763 764

	/*
765 766
	 * Read pg_database and make a list of OIDs of all existing databases
	 */
767
	htab = pgstat_collect_oids(DatabaseRelationId);
768 769 770 771 772 773 774 775 776 777

	/*
	 * Search the database hash table for dead databases and tell the
	 * collector to drop them.
	 */
	hash_seq_init(&hstat, pgStatDBHash);
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
	{
		Oid			dbid = dbentry->databaseid;

778 779
		CHECK_FOR_INTERRUPTS();

780 781 782
		/* the DB entry for shared tables (with InvalidOid) is never dropped */
		if (OidIsValid(dbid) &&
			hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
783 784 785 786
			pgstat_drop_database(dbid);
	}

	/* Clean up */
787
	hash_destroy(htab);
788 789 790

	/*
	 * Lookup our own database entry; if not found, nothing more to do.
791
	 */
792 793 794
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &MyDatabaseId,
												 HASH_FIND, NULL);
795 796 797 798 799 800
	if (dbentry == NULL || dbentry->tables == NULL)
		return;

	/*
	 * Similarly to above, make a list of all known relations in this DB.
	 */
801
	htab = pgstat_collect_oids(RelationRelationId);
802 803 804 805 806 807 808

	/*
	 * Initialize our messages table counter to zero
	 */
	msg.m_nentries = 0;

	/*
809
	 * Check for all tables listed in stats hashtable if they still exist.
810
	 */
811
	hash_seq_init(&hstat, dbentry->tables);
812
	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
813
	{
814 815 816 817 818
		Oid			tabid = tabentry->tableid;

		CHECK_FOR_INTERRUPTS();

		if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
819 820 821
			continue;

		/*
822
		 * Not there, so add this table's Oid to the message
823
		 */
824
		msg.m_tableid[msg.m_nentries++] = tabid;
825 826

		/*
827
		 * If the message is full, send it out and reinitialize to empty
828 829 830
		 */
		if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
		{
831
			len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
832
				+msg.m_nentries * sizeof(Oid);
833 834

			pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
835
			msg.m_databaseid = MyDatabaseId;
836 837 838 839 840 841 842 843 844 845 846
			pgstat_send(&msg, len);

			msg.m_nentries = 0;
		}
	}

	/*
	 * Send the rest
	 */
	if (msg.m_nentries > 0)
	{
847
		len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
848
			+msg.m_nentries * sizeof(Oid);
849 850

		pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
851
		msg.m_databaseid = MyDatabaseId;
852 853 854
		pgstat_send(&msg, len);
	}

855
	/* Clean up */
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
	hash_destroy(htab);
}


/* ----------
 * pgstat_collect_oids() -
 *
 *	Collect the OIDs of either all databases or all tables, according to
 *	the parameter, into a temporary hash table.  Caller should hash_destroy
 *	the result when done with it.
 * ----------
 */
static HTAB *
pgstat_collect_oids(Oid catalogid)
{
	HTAB	   *htab;
	HASHCTL		hash_ctl;
	Relation	rel;
	HeapScanDesc scan;
	HeapTuple	tup;

	memset(&hash_ctl, 0, sizeof(hash_ctl));
	hash_ctl.keysize = sizeof(Oid);
	hash_ctl.entrysize = sizeof(Oid);
	hash_ctl.hash = oid_hash;
	htab = hash_create("Temporary table of OIDs",
					   PGSTAT_TAB_HASH_SIZE,
					   &hash_ctl,
					   HASH_ELEM | HASH_FUNCTION);

	rel = heap_open(catalogid, AccessShareLock);
	scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
	while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
	{
B
Bruce Momjian 已提交
890
		Oid			thisoid = HeapTupleGetOid(tup);
891 892 893 894 895 896 897 898 899

		CHECK_FOR_INTERRUPTS();

		(void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
	}
	heap_endscan(scan);
	heap_close(rel, AccessShareLock);

	return htab;
900 901 902 903 904 905 906
}


/* ----------
 * pgstat_drop_database() -
 *
 *	Tell the collector that we just dropped a database.
907 908
 *	(If the message gets lost, we will still clean the dead DB eventually
 *	via future invocations of pgstat_vacuum_tabstat().)
909 910
 * ----------
 */
911
void
912 913
pgstat_drop_database(Oid databaseid)
{
914
	PgStat_MsgDropdb msg;
915 916 917 918 919

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
920
	msg.m_databaseid = databaseid;
921 922 923 924
	pgstat_send(&msg, sizeof(msg));
}


925 926 927 928 929 930
/* ----------
 * pgstat_drop_relation() -
 *
 *	Tell the collector that we just dropped a relation.
 *	(If the message gets lost, we will still clean the dead entry eventually
 *	via future invocations of pgstat_vacuum_tabstat().)
931 932 933
 *
 *	Currently not used for lack of any good place to call it; we rely
 *	entirely on pgstat_vacuum_tabstat() to clean out stats for dead rels.
934 935
 * ----------
 */
936
#ifdef NOT_USED
937 938 939 940 941 942 943 944 945 946 947 948
void
pgstat_drop_relation(Oid relid)
{
	PgStat_MsgTabpurge msg;
	int			len;

	if (pgStatSock < 0)
		return;

	msg.m_tableid[0] = relid;
	msg.m_nentries = 1;

B
Bruce Momjian 已提交
949
	len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) +sizeof(Oid);
950 951 952 953 954

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
	msg.m_databaseid = MyDatabaseId;
	pgstat_send(&msg, len);
}
B
Bruce Momjian 已提交
955
#endif   /* NOT_USED */
956 957


958 959 960 961 962 963 964 965 966
/* ----------
 * pgstat_reset_counters() -
 *
 *	Tell the statistics collector to reset counters for our database.
 * ----------
 */
void
pgstat_reset_counters(void)
{
967
	PgStat_MsgResetcounter msg;
968 969 970 971 972

	if (pgStatSock < 0)
		return;

	if (!superuser())
973 974
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
B
Bruce Momjian 已提交
975
				 errmsg("must be superuser to reset statistics counters")));
976 977

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
978
	msg.m_databaseid = MyDatabaseId;
979 980 981 982
	pgstat_send(&msg, sizeof(msg));
}


983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
/* ----------
 * pgstat_report_autovac() -
 *
 *	Called from autovacuum.c to report startup of an autovacuum process.
 *	We are called before InitPostgres is done, so can't rely on MyDatabaseId;
 *	the db OID must be passed in, instead.
 * ----------
 */
void
pgstat_report_autovac(Oid dboid)
{
	PgStat_MsgAutovacStart msg;

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
	msg.m_databaseid = dboid;
	msg.m_start_time = GetCurrentTimestamp();

	pgstat_send(&msg, sizeof(msg));
}


/* ---------
 * pgstat_report_vacuum() -
 *
 *	Tell the collector about the table we just vacuumed.
 * ---------
 */
void
pgstat_report_vacuum(Oid tableoid, bool shared,
					 bool analyze, PgStat_Counter tuples)
{
	PgStat_MsgVacuum msg;

1019
	if (pgStatSock < 0 || !pgstat_track_counts)
1020 1021 1022 1023 1024 1025
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
	msg.m_tableoid = tableoid;
	msg.m_analyze = analyze;
B
Bruce Momjian 已提交
1026
	msg.m_autovacuum = IsAutoVacuumWorkerProcess();		/* is this autovacuum? */
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
	msg.m_vacuumtime = GetCurrentTimestamp();
	msg.m_tuples = tuples;
	pgstat_send(&msg, sizeof(msg));
}

/* --------
 * pgstat_report_analyze() -
 *
 *	Tell the collector about the table we just analyzed.
 * --------
 */
void
pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
					  PgStat_Counter deadtuples)
{
	PgStat_MsgAnalyze msg;

1044
	if (pgStatSock < 0 || !pgstat_track_counts)
1045 1046 1047 1048 1049
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
	msg.m_tableoid = tableoid;
B
Bruce Momjian 已提交
1050
	msg.m_autovacuum = IsAutoVacuumWorkerProcess();		/* is this autovacuum? */
1051 1052 1053 1054 1055 1056 1057
	msg.m_analyzetime = GetCurrentTimestamp();
	msg.m_live_tuples = livetuples;
	msg.m_dead_tuples = deadtuples;
	pgstat_send(&msg, sizeof(msg));
}


1058 1059 1060 1061 1062 1063 1064 1065 1066
/* ----------
 * pgstat_ping() -
 *
 *	Send some junk data to the collector to increase traffic.
 * ----------
 */
void
pgstat_ping(void)
{
1067
	PgStat_MsgDummy msg;
1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
	pgstat_send(&msg, sizeof(msg));
}


/* ----------
 * pgstat_initstats() -
 *
1080 1081
 *	Initialize a relcache entry to count access statistics.
 *	Called whenever a relation is opened.
1082 1083 1084
 *
 *	We assume that a relcache entry's pgstat_info field is zeroed by
 *	relcache.c when the relcache entry is made; thereafter it is long-lived
1085
 *	data.  We can avoid repeated searches of the TabStatus arrays when the
1086
 *	same relation is touched repeatedly within a transaction.
1087 1088 1089
 * ----------
 */
void
1090
pgstat_initstats(Relation rel)
1091
{
1092
	Oid			rel_id = rel->rd_id;
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
	char		relkind = rel->rd_rel->relkind;

	/* We only count stats for things that have storage */
	if (!(relkind == RELKIND_RELATION ||
		  relkind == RELKIND_INDEX ||
		  relkind == RELKIND_TOASTVALUE))
	{
		rel->pgstat_info = NULL;
		return;
	}
1103

1104
	if (pgStatSock < 0 || !pgstat_track_counts)
1105
	{
1106 1107
		/* We're not counting at all */
		rel->pgstat_info = NULL;
1108
		return;
1109
	}
1110

1111
	/*
B
Bruce Momjian 已提交
1112 1113
	 * If we already set up this relation in the current transaction, nothing
	 * to do.
1114
	 */
1115 1116
	if (rel->pgstat_info != NULL &&
		rel->pgstat_info->t_id == rel_id)
1117
		return;
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132

	/* Else find or make the PgStat_TableStatus entry, and update link */
	rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
}

/*
 * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
 */
static PgStat_TableStatus *
get_tabstat_entry(Oid rel_id, bool isshared)
{
	PgStat_TableStatus *entry;
	TabStatusArray *tsa;
	TabStatusArray *prev_tsa;
	int			i;
1133

1134
	/*
1135
	 * Search the already-used tabstat slots for this relation.
1136
	 */
1137 1138
	prev_tsa = NULL;
	for (tsa = pgStatTabList; tsa != NULL; prev_tsa = tsa, tsa = tsa->tsa_next)
1139
	{
1140
		for (i = 0; i < tsa->tsa_used; i++)
1141
		{
1142 1143 1144
			entry = &tsa->tsa_entries[i];
			if (entry->t_id == rel_id)
				return entry;
1145 1146
		}

1147 1148 1149
		if (tsa->tsa_used < TABSTAT_QUANTUM)
		{
			/*
B
Bruce Momjian 已提交
1150 1151 1152
			 * It must not be present, but we found a free slot instead. Fine,
			 * let's use this one.  We assume the entry was already zeroed,
			 * either at creation or after last use.
1153 1154 1155 1156 1157 1158
			 */
			entry = &tsa->tsa_entries[tsa->tsa_used++];
			entry->t_id = rel_id;
			entry->t_shared = isshared;
			return entry;
		}
1159 1160 1161
	}

	/*
1162
	 * We ran out of tabstat slots, so allocate more.  Be sure they're zeroed.
1163
	 */
1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
	tsa = (TabStatusArray *) MemoryContextAllocZero(TopMemoryContext,
													sizeof(TabStatusArray));
	if (prev_tsa)
		prev_tsa->tsa_next = tsa;
	else
		pgStatTabList = tsa;

	/*
	 * Use the first entry of the new TabStatusArray.
	 */
	entry = &tsa->tsa_entries[tsa->tsa_used++];
	entry->t_id = rel_id;
	entry->t_shared = isshared;
	return entry;
}

/*
 * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
 */
static PgStat_SubXactStatus *
get_tabstat_stack_level(int nest_level)
{
	PgStat_SubXactStatus *xact_state;

	xact_state = pgStatXactStack;
	if (xact_state == NULL || xact_state->nest_level != nest_level)
	{
		xact_state = (PgStat_SubXactStatus *)
			MemoryContextAlloc(TopTransactionContext,
							   sizeof(PgStat_SubXactStatus));
		xact_state->nest_level = nest_level;
		xact_state->prev = pgStatXactStack;
		xact_state->first = NULL;
		pgStatXactStack = xact_state;
	}
	return xact_state;
}

/*
 * add_tabstat_xact_level - add a new (sub)transaction state record
 */
static void
1206
add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
1207 1208 1209
{
	PgStat_SubXactStatus *xact_state;
	PgStat_TableXactStatus *trans;
1210 1211

	/*
B
Bruce Momjian 已提交
1212 1213
	 * If this is the first rel to be modified at the current nest level, we
	 * first have to push a transaction stack entry.
1214
	 */
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
	xact_state = get_tabstat_stack_level(nest_level);

	/* Now make a per-table stack entry */
	trans = (PgStat_TableXactStatus *)
		MemoryContextAllocZero(TopTransactionContext,
							   sizeof(PgStat_TableXactStatus));
	trans->nest_level = nest_level;
	trans->upper = pgstat_info->trans;
	trans->parent = pgstat_info;
	trans->next = xact_state->first;
	xact_state->first = trans;
	pgstat_info->trans = trans;
}

/*
 * pgstat_count_heap_insert - count a tuple insertion
 */
void
pgstat_count_heap_insert(Relation rel)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1237
	if (pgstat_track_counts && pgstat_info != NULL)
1238
	{
B
Bruce Momjian 已提交
1239
		int			nest_level = GetCurrentTransactionNestLevel();
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256

		/* t_tuples_inserted is nontransactional, so just advance it */
		pgstat_info->t_counts.t_tuples_inserted++;

		/* We have to log the transactional effect at the proper level */
		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

		pgstat_info->trans->tuples_inserted++;
	}
}

/*
 * pgstat_count_heap_update - count a tuple update
 */
void
1257
pgstat_count_heap_update(Relation rel, bool hot)
1258 1259 1260
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1261
	if (pgstat_track_counts && pgstat_info != NULL)
1262
	{
B
Bruce Momjian 已提交
1263
		int			nest_level = GetCurrentTransactionNestLevel();
1264 1265 1266

		/* t_tuples_updated is nontransactional, so just advance it */
		pgstat_info->t_counts.t_tuples_updated++;
1267 1268 1269
		/* ditto for the hot_update counter */
		if (hot)
			pgstat_info->t_counts.t_tuples_hot_updated++;
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289

		/* We have to log the transactional effect at the proper level */
		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

		/* An UPDATE both inserts a new tuple and deletes the old */
		pgstat_info->trans->tuples_inserted++;
		pgstat_info->trans->tuples_deleted++;
	}
}

/*
 * pgstat_count_heap_delete - count a tuple deletion
 */
void
pgstat_count_heap_delete(Relation rel)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1290
	if (pgstat_track_counts && pgstat_info != NULL)
1291
	{
B
Bruce Momjian 已提交
1292
		int			nest_level = GetCurrentTransactionNestLevel();
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303

		/* t_tuples_deleted is nontransactional, so just advance it */
		pgstat_info->t_counts.t_tuples_deleted++;

		/* We have to log the transactional effect at the proper level */
		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

		pgstat_info->trans->tuples_deleted++;
	}
1304 1305
}

1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
/*
 * pgstat_update_heap_dead_tuples - update dead-tuples count
 *
 * The semantics of this are that we are reporting the nontransactional
 * recovery of "delta" dead tuples; so t_new_dead_tuples decreases
 * rather than increasing, and the change goes straight into the per-table
 * counter, not into transactional state.
 */
void
pgstat_update_heap_dead_tuples(Relation rel, int delta)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1319
	if (pgstat_track_counts && pgstat_info != NULL)
1320 1321 1322
		pgstat_info->t_counts.t_new_dead_tuples -= delta;
}

1323 1324

/* ----------
1325
 * AtEOXact_PgStat
1326
 *
1327
 *	Called from access/transam/xact.c at top-level transaction commit/abort.
1328 1329 1330
 * ----------
 */
void
1331
AtEOXact_PgStat(bool isCommit)
1332
{
1333
	PgStat_SubXactStatus *xact_state;
1334 1335

	/*
1336 1337
	 * Count transaction commit or abort.  (We use counters, not just bools,
	 * in case the reporting message isn't sent right away.)
1338
	 */
1339 1340 1341 1342
	if (isCommit)
		pgStatXactCommit++;
	else
		pgStatXactRollback++;
1343

1344 1345
	/*
	 * Transfer transactional insert/update counts into the base tabstat
B
Bruce Momjian 已提交
1346 1347
	 * entries.  We don't bother to free any of the transactional state, since
	 * it's all in TopTransactionContext and will go away anyway.
1348 1349 1350
	 */
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
1351
	{
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
		PgStat_TableXactStatus *trans;

		Assert(xact_state->nest_level == 1);
		Assert(xact_state->prev == NULL);
		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;

			Assert(trans->nest_level == 1);
			Assert(trans->upper == NULL);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);
			if (isCommit)
			{
1366 1367
				tabstat->t_counts.t_new_live_tuples +=
					trans->tuples_inserted - trans->tuples_deleted;
1368 1369 1370 1371 1372 1373 1374 1375 1376
				tabstat->t_counts.t_new_dead_tuples += trans->tuples_deleted;
			}
			else
			{
				/* inserted tuples are dead, deleted tuples are unaffected */
				tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
			}
			tabstat->trans = NULL;
		}
1377
	}
1378
	pgStatXactStack = NULL;
1379

1380 1381 1382
	/* Make sure any stats snapshot is thrown away */
	pgstat_clear_snapshot();
}
1383 1384

/* ----------
1385
 * AtEOSubXact_PgStat
1386
 *
1387
 *	Called from access/transam/xact.c at subtransaction commit/abort.
1388 1389 1390
 * ----------
 */
void
1391
AtEOSubXact_PgStat(bool isCommit, int nestDepth)
1392
{
1393
	PgStat_SubXactStatus *xact_state;
1394 1395

	/*
1396 1397
	 * Transfer transactional insert/update counts into the next higher
	 * subtransaction state.
1398
	 */
1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
	xact_state = pgStatXactStack;
	if (xact_state != NULL &&
		xact_state->nest_level >= nestDepth)
	{
		PgStat_TableXactStatus *trans;
		PgStat_TableXactStatus *next_trans;

		/* delink xact_state from stack immediately to simplify reuse case */
		pgStatXactStack = xact_state->prev;

		for (trans = xact_state->first; trans != NULL; trans = next_trans)
		{
			PgStat_TableStatus *tabstat;

			next_trans = trans->next;
			Assert(trans->nest_level == nestDepth);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);
			if (isCommit)
			{
				if (trans->upper && trans->upper->nest_level == nestDepth - 1)
				{
					trans->upper->tuples_inserted += trans->tuples_inserted;
					trans->upper->tuples_deleted += trans->tuples_deleted;
					tabstat->trans = trans->upper;
					pfree(trans);
				}
				else
				{
					/*
B
Bruce Momjian 已提交
1429 1430
					 * When there isn't an immediate parent state, we can just
					 * reuse the record instead of going through a
1431
					 * palloc/pfree pushup (this works since it's all in
B
Bruce Momjian 已提交
1432 1433
					 * TopTransactionContext anyway).  We have to re-link it
					 * into the parent level, though, and that might mean
1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
					 * pushing a new entry into the pgStatXactStack.
					 */
					PgStat_SubXactStatus *upper_xact_state;

					upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
					trans->next = upper_xact_state->first;
					upper_xact_state->first = trans;
					trans->nest_level = nestDepth - 1;
				}
			}
			else
			{
				/*
				 * On abort, inserted tuples are dead (and can be bounced out
				 * to the top-level tabstat), deleted tuples are unaffected
				 */
				tabstat->t_counts.t_new_dead_tuples += trans->tuples_inserted;
				tabstat->trans = trans->upper;
				pfree(trans);
			}
		}
		pfree(xact_state);
	}
}


/*
 * AtPrepare_PgStat
 *		Save the transactional stats state at 2PC transaction prepare.
 *
 * In this phase we just generate 2PC records for all the pending
 * transaction-dependent stats work.
 */
void
AtPrepare_PgStat(void)
{
	PgStat_SubXactStatus *xact_state;
1471

1472 1473
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
1474
	{
1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496
		PgStat_TableXactStatus *trans;

		Assert(xact_state->nest_level == 1);
		Assert(xact_state->prev == NULL);
		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;
			TwoPhasePgStatRecord record;

			Assert(trans->nest_level == 1);
			Assert(trans->upper == NULL);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);

			record.tuples_inserted = trans->tuples_inserted;
			record.tuples_deleted = trans->tuples_deleted;
			record.t_id = tabstat->t_id;
			record.t_shared = tabstat->t_shared;

			RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
								   &record, sizeof(TwoPhasePgStatRecord));
		}
1497 1498 1499
	}
}

1500 1501 1502 1503 1504
/*
 * PostPrepare_PgStat
 *		Clean up after successful PREPARE.
 *
 * All we need do here is unlink the transaction stats state from the
B
Bruce Momjian 已提交
1505
 * nontransactional state.	The nontransactional action counts will be
1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516
 * reported to the stats collector immediately, while the effects on live
 * and dead tuple counts are preserved in the 2PC state file.
 *
 * Note: AtEOXact_PgStat is not called during PREPARE.
 */
void
PostPrepare_PgStat(void)
{
	PgStat_SubXactStatus *xact_state;

	/*
B
Bruce Momjian 已提交
1517 1518
	 * We don't bother to free any of the transactional state, since it's all
	 * in TopTransactionContext and will go away anyway.
1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553
	 */
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
	{
		PgStat_TableXactStatus *trans;

		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;

			tabstat = trans->parent;
			tabstat->trans = NULL;
		}
	}
	pgStatXactStack = NULL;

	/* Make sure any stats snapshot is thrown away */
	pgstat_clear_snapshot();
}

/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Load the saved counts into our local pgstats state.
 */
void
pgstat_twophase_postcommit(TransactionId xid, uint16 info,
						   void *recdata, uint32 len)
{
	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
	PgStat_TableStatus *pgstat_info;

	/* Find or create a tabstat entry for the rel */
	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);

1554 1555
	pgstat_info->t_counts.t_new_live_tuples +=
		rec->tuples_inserted - rec->tuples_deleted;
1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
	pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_deleted;
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * Load the saved counts into our local pgstats state, but treat them
 * as aborted.
 */
void
pgstat_twophase_postabort(TransactionId xid, uint16 info,
						  void *recdata, uint32 len)
{
	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
	PgStat_TableStatus *pgstat_info;

	/* Find or create a tabstat entry for the rel */
	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);

	/* inserted tuples are dead, deleted tuples are no-ops */
	pgstat_info->t_counts.t_new_dead_tuples += rec->tuples_inserted;
}

1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592

/* ----------
 * pgstat_fetch_stat_dbentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one database or NULL. NULL doesn't mean
 *	that the database doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatDBEntry *
pgstat_fetch_stat_dbentry(Oid dbid)
{
	/*
B
Bruce Momjian 已提交
1593 1594
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
1595
	 */
1596
	backend_read_statsfile();
1597 1598

	/*
1599
	 * Lookup the requested database; return NULL if not found
1600
	 */
1601 1602 1603
	return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
											  (void *) &dbid,
											  HASH_FIND, NULL);
1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
}


/* ----------
 * pgstat_fetch_stat_tabentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one table or NULL. NULL doesn't mean
 *	that the table doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatTabEntry *
pgstat_fetch_stat_tabentry(Oid relid)
{
1619
	Oid			dbid;
1620 1621
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
1622 1623

	/*
B
Bruce Momjian 已提交
1624 1625
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
1626
	 */
1627
	backend_read_statsfile();
1628 1629

	/*
1630
	 * Lookup our database, then look in its table hash table.
1631
	 */
1632
	dbid = MyDatabaseId;
1633
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1634
												 (void *) &dbid,
1635
												 HASH_FIND, NULL);
1636 1637 1638 1639 1640 1641 1642 1643
	if (dbentry != NULL && dbentry->tables != NULL)
	{
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
													   (void *) &relid,
													   HASH_FIND, NULL);
		if (tabentry)
			return tabentry;
	}
1644 1645

	/*
1646
	 * If we didn't find it, maybe it's a shared table.
1647
	 */
1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
	dbid = InvalidOid;
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &dbid,
												 HASH_FIND, NULL);
	if (dbentry != NULL && dbentry->tables != NULL)
	{
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
													   (void *) &relid,
													   HASH_FIND, NULL);
		if (tabentry)
			return tabentry;
	}
1660

1661
	return NULL;
1662 1663 1664 1665 1666 1667 1668
}


/* ----------
 * pgstat_fetch_stat_beentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
1669 1670 1671 1672
 *	our local copy of the current-activity entry for one backend.
 *
 *	NB: caller is responsible for a check if the user is permitted to see
 *	this info (especially the querystring).
1673 1674
 * ----------
 */
1675
PgBackendStatus *
1676 1677
pgstat_fetch_stat_beentry(int beid)
{
1678
	pgstat_read_current_status();
1679

1680
	if (beid < 1 || beid > localNumBackends)
1681 1682
		return NULL;

1683
	return &localBackendStatusTable[beid - 1];
1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696
}


/* ----------
 * pgstat_fetch_stat_numbackends() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the maximum current backend id.
 * ----------
 */
int
pgstat_fetch_stat_numbackends(void)
{
1697
	pgstat_read_current_status();
1698

1699
	return localNumBackends;
1700 1701
}

1702 1703 1704 1705
/*
 * ---------
 * pgstat_fetch_global() -
 *
B
Bruce Momjian 已提交
1706 1707
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	a pointer to the global statistics struct.
1708 1709 1710 1711 1712 1713 1714 1715 1716 1717
 * ---------
 */
PgStat_GlobalStats *
pgstat_fetch_global(void)
{
	backend_read_statsfile();

	return &globalStats;
}

1718 1719

/* ------------------------------------------------------------
1720
 * Functions for management of the shared-memory PgBackendStatus array
1721 1722 1723
 * ------------------------------------------------------------
 */

1724 1725
static PgBackendStatus *BackendStatusArray = NULL;
static PgBackendStatus *MyBEEntry = NULL;
1726

1727 1728 1729

/*
 * Report shared-memory space needed by CreateSharedBackendStatus.
1730
 */
1731 1732
Size
BackendStatusShmemSize(void)
1733
{
1734
	Size		size;
1735

1736 1737 1738
	size = mul_size(sizeof(PgBackendStatus), MaxBackends);
	return size;
}
1739

1740 1741
/*
 * Initialize the shared status array during postmaster startup.
1742
 */
1743 1744
void
CreateSharedBackendStatus(void)
1745
{
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762
	Size		size = BackendStatusShmemSize();
	bool		found;

	/* Create or attach to the shared array */
	BackendStatusArray = (PgBackendStatus *)
		ShmemInitStruct("Backend Status Array", size, &found);

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		MemSet(BackendStatusArray, 0, size);
	}
}


1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782
/* ----------
 * pgstat_initialize() -
 *
 *	Initialize pgstats state, and set up our on-proc-exit hook.
 *	Called from InitPostgres.  MyBackendId must be set,
 *	but we must not have started any transaction yet (since the
 *	exit hook must run after the last transaction exit).
 * ----------
 */
void
pgstat_initialize(void)
{
	/* Initialize MyBEEntry */
	Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
	MyBEEntry = &BackendStatusArray[MyBackendId - 1];

	/* Set up a process-exit hook to clean up */
	on_shmem_exit(pgstat_beshutdown_hook, 0);
}

1783 1784 1785
/* ----------
 * pgstat_bestart() -
 *
1786 1787 1788
 *	Initialize this backend's entry in the PgBackendStatus array.
 *	Called from InitPostgres.  MyDatabaseId and session userid must be set
 *	(hence, this cannot be combined with pgstat_initialize).
1789 1790 1791 1792 1793 1794 1795 1796
 * ----------
 */
void
pgstat_bestart(void)
{
	TimestampTz proc_start_timestamp;
	Oid			userid;
	SockAddr	clientaddr;
1797
	volatile PgBackendStatus *beentry;
1798 1799

	/*
B
Bruce Momjian 已提交
1800 1801
	 * To minimize the time spent modifying the PgBackendStatus entry, fetch
	 * all the needed data first.
1802 1803 1804
	 *
	 * If we have a MyProcPort, use its session start time (for consistency,
	 * and to save a kernel call).
1805
	 */
1806 1807 1808 1809
	if (MyProcPort)
		proc_start_timestamp = MyProcPort->SessionStartTime;
	else
		proc_start_timestamp = GetCurrentTimestamp();
1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823
	userid = GetSessionUserId();

	/*
	 * We may not have a MyProcPort (eg, if this is the autovacuum process).
	 * If so, use all-zeroes client address, which is dealt with specially in
	 * pg_stat_get_backend_client_addr and pg_stat_get_backend_client_port.
	 */
	if (MyProcPort)
		memcpy(&clientaddr, &MyProcPort->raddr, sizeof(clientaddr));
	else
		MemSet(&clientaddr, 0, sizeof(clientaddr));

	/*
	 * Initialize my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
1824 1825 1826
	 * st_changecount before and after; and make sure it's even afterwards. We
	 * use a volatile pointer here to ensure the compiler doesn't try to get
	 * cute.
1827 1828
	 */
	beentry = MyBEEntry;
B
Bruce Momjian 已提交
1829 1830
	do
	{
1831 1832 1833 1834 1835 1836
		beentry->st_changecount++;
	} while ((beentry->st_changecount & 1) == 0);

	beentry->st_procpid = MyProcPid;
	beentry->st_proc_start_timestamp = proc_start_timestamp;
	beentry->st_activity_start_timestamp = 0;
1837
	beentry->st_xact_start_timestamp = 0;
1838 1839 1840
	beentry->st_databaseid = MyDatabaseId;
	beentry->st_userid = userid;
	beentry->st_clientaddr = clientaddr;
1841
	beentry->st_waiting = false;
1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861
	beentry->st_activity[0] = '\0';
	/* Also make sure the last byte in the string area is always 0 */
	beentry->st_activity[PGBE_ACTIVITY_SIZE - 1] = '\0';

	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}

/*
 * Shut down a single backend's statistics reporting at process exit.
 *
 * Flush any remaining statistics counts out to the collector.
 * Without this, operations triggered during backend exit (such as
 * temp table deletions) won't be counted.
 *
 * Lastly, clear out our entry in the PgBackendStatus array.
 */
static void
pgstat_beshutdown_hook(int code, Datum arg)
{
1862
	volatile PgBackendStatus *beentry = MyBEEntry;
1863

1864
	pgstat_report_tabstat(true);
1865 1866

	/*
B
Bruce Momjian 已提交
1867 1868 1869
	 * Clear my status entry, following the protocol of bumping st_changecount
	 * before and after.  We use a volatile pointer here to ensure the
	 * compiler doesn't try to get cute.
1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889
	 */
	beentry->st_changecount++;

	beentry->st_procpid = 0;	/* mark invalid */

	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}


/* ----------
 * pgstat_report_activity() -
 *
 *	Called from tcop/postgres.c to report what the backend is actually doing
 *	(usually "<IDLE>" or the start of the query to be executed).
 * ----------
 */
void
pgstat_report_activity(const char *cmd_str)
{
1890
	volatile PgBackendStatus *beentry = MyBEEntry;
1891 1892 1893
	TimestampTz start_timestamp;
	int			len;

1894
	if (!pgstat_track_activities || !beentry)
1895 1896 1897
		return;

	/*
B
Bruce Momjian 已提交
1898 1899
	 * To minimize the time spent modifying the entry, fetch all the needed
	 * data first.
1900
	 */
1901
	start_timestamp = GetCurrentStatementStartTimestamp();
1902 1903 1904 1905 1906 1907

	len = strlen(cmd_str);
	len = pg_mbcliplen(cmd_str, len, PGBE_ACTIVITY_SIZE - 1);

	/*
	 * Update my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
1908 1909
	 * st_changecount before and after.  We use a volatile pointer here to
	 * ensure the compiler doesn't try to get cute.
1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920
	 */
	beentry->st_changecount++;

	beentry->st_activity_start_timestamp = start_timestamp;
	memcpy((char *) beentry->st_activity, cmd_str, len);
	beentry->st_activity[len] = '\0';

	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}

1921
/*
1922 1923
 * Report current transaction start timestamp as the specified value.
 * Zero means there is no active transaction.
1924 1925
 */
void
1926
pgstat_report_xact_timestamp(TimestampTz tstamp)
1927 1928 1929
{
	volatile PgBackendStatus *beentry = MyBEEntry;

1930
	if (!pgstat_track_activities || !beentry)
1931 1932 1933 1934
		return;

	/*
	 * Update my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
1935 1936
	 * st_changecount before and after.  We use a volatile pointer here to
	 * ensure the compiler doesn't try to get cute.
1937 1938
	 */
	beentry->st_changecount++;
1939
	beentry->st_xact_start_timestamp = tstamp;
1940 1941 1942
	beentry->st_changecount++;
	Assert((beentry->st_changecount & 1) == 0);
}
1943

1944 1945 1946 1947
/* ----------
 * pgstat_report_waiting() -
 *
 *	Called from lock manager to report beginning or end of a lock wait.
1948 1949 1950
 *
 * NB: this *must* be able to survive being called before MyBEEntry has been
 * initialized.
1951 1952 1953 1954 1955
 * ----------
 */
void
pgstat_report_waiting(bool waiting)
{
1956
	volatile PgBackendStatus *beentry = MyBEEntry;
1957

1958
	if (!pgstat_track_activities || !beentry)
1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969
		return;

	/*
	 * Since this is a single-byte field in a struct that only this process
	 * may modify, there seems no need to bother with the st_changecount
	 * protocol.  The update must appear atomic in any case.
	 */
	beentry->st_waiting = waiting;
}


1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980
/* ----------
 * pgstat_read_current_status() -
 *
 *	Copy the current contents of the PgBackendStatus array to local memory,
 *	if not already done in this transaction.
 * ----------
 */
static void
pgstat_read_current_status(void)
{
	volatile PgBackendStatus *beentry;
1981
	PgBackendStatus *localtable;
1982 1983 1984 1985
	PgBackendStatus *localentry;
	int			i;

	Assert(!pgStatRunningInCollector);
1986
	if (localBackendStatusTable)
1987 1988
		return;					/* already done */

1989 1990 1991 1992
	pgstat_setup_memcxt();

	localtable = (PgBackendStatus *)
		MemoryContextAlloc(pgStatLocalContext,
1993 1994 1995 1996
						   sizeof(PgBackendStatus) * MaxBackends);
	localNumBackends = 0;

	beentry = BackendStatusArray;
1997
	localentry = localtable;
1998 1999 2000
	for (i = 1; i <= MaxBackends; i++)
	{
		/*
B
Bruce Momjian 已提交
2001 2002 2003 2004 2005
		 * Follow the protocol of retrying if st_changecount changes while we
		 * copy the entry, or if it's odd.  (The check for odd is needed to
		 * cover the case where we are able to completely copy the entry while
		 * the source backend is between increment steps.)	We use a volatile
		 * pointer here to ensure the compiler doesn't try to get cute.
2006 2007 2008
		 */
		for (;;)
		{
B
Bruce Momjian 已提交
2009
			int			save_changecount = beentry->st_changecount;
2010 2011

			/*
B
Bruce Momjian 已提交
2012 2013
			 * XXX if PGBE_ACTIVITY_SIZE is really large, it might be best to
			 * use strcpy not memcpy for copying the activity string?
2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033
			 */
			memcpy(localentry, (char *) beentry, sizeof(PgBackendStatus));

			if (save_changecount == beentry->st_changecount &&
				(save_changecount & 1) == 0)
				break;

			/* Make sure we can break out of loop if stuck... */
			CHECK_FOR_INTERRUPTS();
		}

		beentry++;
		/* Only valid entries get included into the local array */
		if (localentry->st_procpid > 0)
		{
			localentry++;
			localNumBackends++;
		}
	}

2034 2035
	/* Set the pointer only after completion of a valid table */
	localBackendStatusTable = localtable;
2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066
}


/* ------------------------------------------------------------
 * Local support functions follow
 * ------------------------------------------------------------
 */


/* ----------
 * pgstat_setheader() -
 *
 *		Set common header fields in a statistics message
 * ----------
 */
static void
pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
{
	hdr->m_type = mtype;
}


/* ----------
 * pgstat_send() -
 *
 *		Send out one statistics message to the collector
 * ----------
 */
static void
pgstat_send(void *msg, int len)
{
2067 2068
	int			rc;

2069 2070
	if (pgStatSock < 0)
		return;
2071

2072
	((PgStat_MsgHdr *) msg)->m_size = len;
2073

2074 2075 2076 2077 2078 2079
	/* We'll retry after EINTR, but ignore all other failures */
	do
	{
		rc = send(pgStatSock, msg, len, 0);
	} while (rc < 0 && errno == EINTR);

2080
#ifdef USE_ASSERT_CHECKING
2081 2082
	/* In debug builds, log send failures ... */
	if (rc < 0)
2083 2084
		elog(LOG, "could not send to statistics collector: %m");
#endif
2085 2086
}

2087 2088 2089
/* ----------
 * pgstat_send_bgwriter() -
 *
B
Bruce Momjian 已提交
2090
 *		Send bgwriter statistics to the collector
2091 2092 2093 2094 2095
 * ----------
 */
void
pgstat_send_bgwriter(void)
{
2096 2097 2098
	/* We assume this initializes to zeroes */
	static const PgStat_MsgBgWriter all_zeroes;

2099
	/*
B
Bruce Momjian 已提交
2100 2101 2102
	 * This function can be called even if nothing at all has happened. In
	 * this case, avoid sending a completely empty message to the stats
	 * collector.
2103
	 */
2104
	if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
2105 2106 2107 2108 2109 2110 2111 2112 2113
		return;

	/*
	 * Prepare and send the message
	 */
	pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
	pgstat_send(&BgWriterStats, sizeof(BgWriterStats));

	/*
2114
	 * Clear out the statistics buffer, so it can be re-used.
2115
	 */
2116
	MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
2117 2118
}

2119

2120 2121 2122
/* ----------
 * PgstatCollectorMain() -
 *
B
Bruce Momjian 已提交
2123
 *	Start up the statistics collector process.	This is the body of the
2124
 *	postmaster child process.
2125 2126 2127 2128
 *
 *	The argc/argv parameters are valid only in EXEC_BACKEND case.
 * ----------
 */
2129
NON_EXEC_STATIC void
2130
PgstatCollectorMain(int argc, char *argv[])
2131
{
2132 2133 2134
	struct itimerval write_timeout;
	bool		need_timer = false;
	int			len;
2135
	PgStat_Msg	msg;
B
Bruce Momjian 已提交
2136

2137
#ifndef WIN32
2138 2139 2140 2141
#ifdef HAVE_POLL
	struct pollfd input_fd;
#else
	struct timeval sel_timeout;
2142
	fd_set		rfds;
2143
#endif
2144 2145 2146
#endif

	IsUnderPostmaster = true;	/* we are a postmaster subprocess now */
2147

2148 2149
	MyProcPid = getpid();		/* reset MyProcPid */

B
Bruce Momjian 已提交
2150
	MyStartTime = time(NULL);	/* record Start Time for logging */
2151

2152 2153
	/*
	 * If possible, make this process a group leader, so that the postmaster
B
Bruce Momjian 已提交
2154 2155 2156
	 * can signal any child processes too.	(pgstat probably never has any
	 * child processes, but for consistency we make all postmaster child
	 * processes do this.)
2157 2158 2159 2160 2161 2162
	 */
#ifdef HAVE_SETSID
	if (setsid() < 0)
		elog(FATAL, "setsid() failed: %m");
#endif

2163
	/*
2164 2165
	 * Ignore all signals usually bound to some action in the postmaster,
	 * except SIGQUIT and SIGALRM.
2166 2167 2168 2169
	 */
	pqsignal(SIGHUP, SIG_IGN);
	pqsignal(SIGINT, SIG_IGN);
	pqsignal(SIGTERM, SIG_IGN);
2170
	pqsignal(SIGQUIT, pgstat_exit);
2171
	pqsignal(SIGALRM, force_statwrite);
2172 2173 2174 2175 2176 2177 2178 2179 2180 2181
	pqsignal(SIGPIPE, SIG_IGN);
	pqsignal(SIGUSR1, SIG_IGN);
	pqsignal(SIGUSR2, SIG_IGN);
	pqsignal(SIGCHLD, SIG_DFL);
	pqsignal(SIGTTIN, SIG_DFL);
	pqsignal(SIGTTOU, SIG_DFL);
	pqsignal(SIGCONT, SIG_DFL);
	pqsignal(SIGWINCH, SIG_DFL);
	PG_SETMASK(&UnBlockSig);

2182 2183 2184
	/*
	 * Identify myself via ps
	 */
2185
	init_ps_display("stats collector process", "", "", "");
2186

2187 2188 2189
	/*
	 * Arrange to write the initial status file right away
	 */
2190 2191
	need_statwrite = true;

2192
	/* Preset the delay between status file writes */
2193 2194
	MemSet(&write_timeout, 0, sizeof(struct itimerval));
	write_timeout.it_value.tv_sec = PGSTAT_STAT_INTERVAL / 1000;
2195
	write_timeout.it_value.tv_usec = (PGSTAT_STAT_INTERVAL % 1000) * 1000;
2196

2197
	/*
B
Bruce Momjian 已提交
2198 2199
	 * Read in an existing statistics stats file or initialize the stats to
	 * zero.
2200
	 */
2201
	pgStatRunningInCollector = true;
2202
	pgStatDBHash = pgstat_read_statsfile(InvalidOid);
2203

2204
	/*
B
Bruce Momjian 已提交
2205 2206
	 * Setup the descriptor set for select(2).	Since only one bit in the set
	 * ever changes, we need not repeat FD_ZERO each time.
2207
	 */
2208
#if !defined(HAVE_POLL) && !defined(WIN32)
2209 2210
	FD_ZERO(&rfds);
#endif
2211

2212
	/*
2213 2214 2215
	 * Loop to process messages until we get SIGQUIT or detect ungraceful
	 * death of our parent postmaster.
	 *
B
Bruce Momjian 已提交
2216 2217
	 * For performance reasons, we don't want to do a PostmasterIsAlive() test
	 * after every message; instead, do it at statwrite time and if
2218
	 * select()/poll() is interrupted by timeout.
2219 2220 2221
	 */
	for (;;)
	{
B
Bruce Momjian 已提交
2222
		int			got_data;
2223 2224 2225 2226 2227 2228 2229

		/*
		 * Quit if we get SIGQUIT from the postmaster.
		 */
		if (need_exit)
			break;

2230
		/*
B
Bruce Momjian 已提交
2231
		 * If time to write the stats file, do so.	Note that the alarm
2232 2233 2234 2235
		 * interrupt isn't re-enabled immediately, but only after we next
		 * receive a stats message; so no cycles are wasted when there is
		 * nothing going on.
		 */
2236 2237
		if (need_statwrite)
		{
2238 2239 2240 2241
			/* Check for postmaster death; if so we'll write file below */
			if (!PostmasterIsAlive(true))
				break;

2242 2243 2244
			pgstat_write_statsfile();
			need_statwrite = false;
			need_timer = true;
2245 2246 2247
		}

		/*
2248 2249 2250
		 * Wait for a message to arrive; but not for more than
		 * PGSTAT_SELECT_TIMEOUT seconds. (This determines how quickly we will
		 * shut down after an ungraceful postmaster termination; so it needn't
B
Bruce Momjian 已提交
2251 2252 2253
		 * be very fast.  However, on some systems SIGQUIT won't interrupt the
		 * poll/select call, so this also limits speed of response to SIGQUIT,
		 * which is more important.)
2254
		 *
B
Bruce Momjian 已提交
2255 2256
		 * We use poll(2) if available, otherwise select(2). Win32 has its own
		 * implementation.
2257
		 */
2258
#ifndef WIN32
2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276
#ifdef HAVE_POLL
		input_fd.fd = pgStatSock;
		input_fd.events = POLLIN | POLLERR;
		input_fd.revents = 0;

		if (poll(&input_fd, 1, PGSTAT_SELECT_TIMEOUT * 1000) < 0)
		{
			if (errno == EINTR)
				continue;
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("poll() failed in statistics collector: %m")));
		}

		got_data = (input_fd.revents != 0);
#else							/* !HAVE_POLL */

		FD_SET(pgStatSock, &rfds);
2277 2278

		/*
2279 2280
		 * timeout struct is modified by select() on some operating systems,
		 * so re-fill it each time.
2281
		 */
2282 2283 2284 2285
		sel_timeout.tv_sec = PGSTAT_SELECT_TIMEOUT;
		sel_timeout.tv_usec = 0;

		if (select(pgStatSock + 1, &rfds, NULL, NULL, &sel_timeout) < 0)
2286
		{
2287 2288
			if (errno == EINTR)
				continue;
2289
			ereport(ERROR,
2290
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
2291
					 errmsg("select() failed in statistics collector: %m")));
2292 2293
		}

2294 2295
		got_data = FD_ISSET(pgStatSock, &rfds);
#endif   /* HAVE_POLL */
B
Bruce Momjian 已提交
2296
#else							/* WIN32 */
2297
		got_data = pgwin32_waitforsinglesocket(pgStatSock, FD_READ,
B
Bruce Momjian 已提交
2298
											   PGSTAT_SELECT_TIMEOUT * 1000);
2299
#endif
2300

2301
		/*
2302 2303
		 * If there is a message on the socket, read it and check for
		 * validity.
2304
		 */
2305
		if (got_data)
2306
		{
2307 2308 2309
			len = recv(pgStatSock, (char *) &msg,
					   sizeof(PgStat_Msg), 0);
			if (len < 0)
2310 2311 2312
			{
				if (errno == EINTR)
					continue;
2313 2314 2315
				ereport(ERROR,
						(errcode_for_socket_access(),
						 errmsg("could not read statistics message: %m")));
2316
			}
2317

2318
			/*
2319
			 * We ignore messages that are smaller than our common header
2320
			 */
2321 2322
			if (len < sizeof(PgStat_MsgHdr))
				continue;
2323

2324
			/*
2325
			 * The received length must match the length in the header
2326
			 */
2327 2328
			if (msg.msg_hdr.m_size != len)
				continue;
2329 2330

			/*
2331
			 * O.K. - we accept this message.  Process it.
2332 2333 2334 2335 2336 2337 2338
			 */
			switch (msg.msg_hdr.m_type)
			{
				case PGSTAT_MTYPE_DUMMY:
					break;

				case PGSTAT_MTYPE_TABSTAT:
2339
					pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len);
2340 2341 2342
					break;

				case PGSTAT_MTYPE_TABPURGE:
2343
					pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, len);
2344 2345 2346
					break;

				case PGSTAT_MTYPE_DROPDB:
2347
					pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, len);
2348 2349 2350
					break;

				case PGSTAT_MTYPE_RESETCOUNTER:
2351
					pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
2352
											 len);
2353 2354
					break;

2355
				case PGSTAT_MTYPE_AUTOVAC_START:
2356
					pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len);
2357 2358 2359
					break;

				case PGSTAT_MTYPE_VACUUM:
2360
					pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, len);
2361 2362 2363
					break;

				case PGSTAT_MTYPE_ANALYZE:
2364
					pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
2365 2366
					break;

2367
				case PGSTAT_MTYPE_BGWRITER:
2368
					pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len);
2369 2370
					break;

2371 2372 2373 2374
				default:
					break;
			}

2375 2376 2377 2378 2379
			/*
			 * If this is the first message after we wrote the stats file the
			 * last time, enable the alarm interrupt to make it be written
			 * again later.
			 */
2380
			if (need_timer)
2381
			{
2382
				if (setitimer(ITIMER_REAL, &write_timeout, NULL))
2383
					ereport(ERROR,
B
Bruce Momjian 已提交
2384
					(errmsg("could not set statistics collector timer: %m")));
2385
				need_timer = false;
2386
			}
2387 2388 2389 2390
		}
		else
		{
			/*
B
Bruce Momjian 已提交
2391 2392
			 * We can only get here if the select/poll timeout elapsed. Check
			 * for postmaster death.
2393
			 */
2394 2395
			if (!PostmasterIsAlive(true))
				break;
2396
		}
B
Bruce Momjian 已提交
2397
	}							/* end of message-processing loop */
2398

2399 2400 2401 2402
	/*
	 * Save the final stats to reuse at next startup.
	 */
	pgstat_write_statsfile();
2403

2404
	exit(0);
2405 2406
}

2407 2408

/* SIGQUIT signal handler for collector process */
2409 2410 2411
static void
pgstat_exit(SIGNAL_ARGS)
{
2412
	need_exit = true;
2413 2414
}

2415
/* SIGALRM signal handler for collector process */
2416
static void
2417
force_statwrite(SIGNAL_ARGS)
2418
{
2419
	need_statwrite = true;
2420 2421
}

2422

2423 2424
/*
 * Lookup the hash table entry for the specified database. If no hash
2425 2426
 * table entry exists, initialize it, if the create parameter is true.
 * Else, return NULL.
2427 2428
 */
static PgStat_StatDBEntry *
2429
pgstat_get_db_entry(Oid databaseid, bool create)
2430 2431
{
	PgStat_StatDBEntry *result;
B
Bruce Momjian 已提交
2432 2433
	bool		found;
	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
2434 2435 2436 2437

	/* Lookup or create the hash table entry for this database */
	result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												&databaseid,
2438 2439 2440 2441
												action, &found);

	if (!create && !found)
		return NULL;
2442

2443
	/* If not found, initialize the new one. */
2444 2445
	if (!found)
	{
2446
		HASHCTL		hash_ctl;
2447

2448 2449 2450 2451 2452
		result->tables = NULL;
		result->n_xact_commit = 0;
		result->n_xact_rollback = 0;
		result->n_blocks_fetched = 0;
		result->n_blocks_hit = 0;
2453 2454 2455 2456 2457
		result->n_tuples_returned = 0;
		result->n_tuples_fetched = 0;
		result->n_tuples_inserted = 0;
		result->n_tuples_updated = 0;
		result->n_tuples_deleted = 0;
2458
		result->last_autovac_time = 0;
2459 2460

		memset(&hash_ctl, 0, sizeof(hash_ctl));
2461
		hash_ctl.keysize = sizeof(Oid);
2462
		hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2463
		hash_ctl.hash = oid_hash;
2464
		result->tables = hash_create("Per-database table",
B
Bruce Momjian 已提交
2465 2466 2467
									 PGSTAT_TAB_HASH_SIZE,
									 &hash_ctl,
									 HASH_ELEM | HASH_FUNCTION);
2468 2469
	}

2470
	return result;
2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482
}


/* ----------
 * pgstat_write_statsfile() -
 *
 *	Tell the news.
 * ----------
 */
static void
pgstat_write_statsfile(void)
{
2483 2484 2485 2486 2487
	HASH_SEQ_STATUS hstat;
	HASH_SEQ_STATUS tstat;
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	FILE	   *fpout;
2488
	int32		format_id;
2489 2490

	/*
2491
	 * Open the statistics temp file to write out the current values.
2492
	 */
2493
	fpout = fopen(PGSTAT_STAT_TMPFILE, PG_BINARY_W);
2494 2495
	if (fpout == NULL)
	{
2496 2497
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2498 2499
				 errmsg("could not open temporary statistics file \"%s\": %m",
						PGSTAT_STAT_TMPFILE)));
2500 2501 2502
		return;
	}

2503 2504 2505 2506 2507 2508
	/*
	 * Write the file header --- currently just a format ID.
	 */
	format_id = PGSTAT_FILE_FORMAT_ID;
	fwrite(&format_id, sizeof(format_id), 1, fpout);

2509 2510 2511 2512 2513
	/*
	 * Write global stats struct
	 */
	fwrite(&globalStats, sizeof(globalStats), 1, fpout);

2514 2515 2516 2517
	/*
	 * Walk through the database table.
	 */
	hash_seq_init(&hstat, pgStatDBHash);
2518
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2519 2520
	{
		/*
B
Bruce Momjian 已提交
2521 2522 2523
		 * Write out the DB entry including the number of live backends. We
		 * don't write the tables pointer since it's of no use to any other
		 * process.
2524 2525
		 */
		fputc('D', fpout);
2526
		fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
2527 2528

		/*
2529
		 * Walk through the database's access stats per table.
2530 2531
		 */
		hash_seq_init(&tstat, dbentry->tables);
2532
		while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2533 2534 2535 2536
		{
			fputc('T', fpout);
			fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
		}
2537

2538 2539 2540 2541 2542 2543 2544
		/*
		 * Mark the end of this DB
		 */
		fputc('d', fpout);
	}

	/*
2545
	 * No more output to be done. Close the temp file and replace the old
2546 2547
	 * pgstat.stat with it.  The ferror() check replaces testing for error
	 * after each individual fputc or fwrite above.
2548 2549
	 */
	fputc('E', fpout);
2550 2551 2552 2553 2554

	if (ferror(fpout))
	{
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2555 2556
			   errmsg("could not write temporary statistics file \"%s\": %m",
					  PGSTAT_STAT_TMPFILE)));
2557 2558 2559 2560
		fclose(fpout);
		unlink(PGSTAT_STAT_TMPFILE);
	}
	else if (fclose(fpout) < 0)
2561
	{
2562 2563
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2564 2565
			   errmsg("could not close temporary statistics file \"%s\": %m",
					  PGSTAT_STAT_TMPFILE)));
2566
		unlink(PGSTAT_STAT_TMPFILE);
2567
	}
2568
	else if (rename(PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME) < 0)
2569
	{
2570 2571 2572 2573 2574
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
						PGSTAT_STAT_TMPFILE, PGSTAT_STAT_FILENAME)));
		unlink(PGSTAT_STAT_TMPFILE);
2575 2576 2577 2578 2579 2580 2581
	}
}


/* ----------
 * pgstat_read_statsfile() -
 *
2582 2583
 *	Reads in an existing statistics collector file and initializes the
 *	databases' hash table (whose entries point to the tables' hash tables).
2584 2585
 * ----------
 */
2586 2587
static HTAB *
pgstat_read_statsfile(Oid onlydb)
2588
{
2589 2590 2591 2592 2593
	PgStat_StatDBEntry *dbentry;
	PgStat_StatDBEntry dbbuf;
	PgStat_StatTabEntry *tabentry;
	PgStat_StatTabEntry tabbuf;
	HASHCTL		hash_ctl;
2594
	HTAB	   *dbhash;
2595 2596
	HTAB	   *tabhash = NULL;
	FILE	   *fpin;
2597
	int32		format_id;
2598 2599 2600
	bool		found;

	/*
2601
	 * The tables will live in pgStatLocalContext.
2602
	 */
2603
	pgstat_setup_memcxt();
2604 2605 2606 2607 2608

	/*
	 * Create the DB hashtable
	 */
	memset(&hash_ctl, 0, sizeof(hash_ctl));
2609
	hash_ctl.keysize = sizeof(Oid);
2610
	hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2611
	hash_ctl.hash = oid_hash;
2612 2613 2614
	hash_ctl.hcxt = pgStatLocalContext;
	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
						 HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
2615

2616 2617 2618 2619 2620 2621
	/*
	 * Clear out global statistics so they start from zero in case we can't
	 * load an existing statsfile.
	 */
	memset(&globalStats, 0, sizeof(globalStats));

2622
	/*
B
Bruce Momjian 已提交
2623 2624 2625
	 * Try to open the status file. If it doesn't exist, the backends simply
	 * return zero for anything and the collector simply starts from scratch
	 * with empty counters.
2626
	 */
2627
	if ((fpin = AllocateFile(PGSTAT_STAT_FILENAME, PG_BINARY_R)) == NULL)
2628
		return dbhash;
2629

2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640
	/*
	 * Verify it's of the expected format.
	 */
	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
		|| format_id != PGSTAT_FILE_FORMAT_ID)
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted pgstat.stat file")));
		goto done;
	}

2641 2642 2643 2644 2645 2646 2647 2648 2649 2650
	/*
	 * Read global stats struct
	 */
	if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted pgstat.stat file")));
		goto done;
	}

2651
	/*
2652 2653
	 * We found an existing collector stats file. Read it and put all the
	 * hashtable entries into place.
2654 2655 2656 2657 2658
	 */
	for (;;)
	{
		switch (fgetc(fpin))
		{
2659 2660
				/*
				 * 'D'	A PgStat_StatDBEntry struct describing a database
B
Bruce Momjian 已提交
2661 2662
				 * follows. Subsequently, zero to many 'T' entries will follow
				 * until a 'd' is encountered.
2663
				 */
2664
			case 'D':
2665 2666
				if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
						  fpin) != offsetof(PgStat_StatDBEntry, tables))
2667
				{
2668 2669
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2670
					goto done;
2671 2672 2673 2674 2675
				}

				/*
				 * Add to the DB hash
				 */
2676
				dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
B
Bruce Momjian 已提交
2677
												  (void *) &dbbuf.databaseid,
2678 2679
															 HASH_ENTER,
															 &found);
2680 2681
				if (found)
				{
2682 2683
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2684
					goto done;
2685 2686 2687
				}

				memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2688
				dbentry->tables = NULL;
2689 2690

				/*
2691 2692
				 * Don't collect tables if not the requested DB (or the
				 * shared-table info)
2693
				 */
2694 2695 2696 2697
				if (onlydb != InvalidOid)
				{
					if (dbbuf.databaseid != onlydb &&
						dbbuf.databaseid != InvalidOid)
B
Bruce Momjian 已提交
2698
						break;
2699
				}
2700 2701

				memset(&hash_ctl, 0, sizeof(hash_ctl));
2702
				hash_ctl.keysize = sizeof(Oid);
2703
				hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2704
				hash_ctl.hash = oid_hash;
2705
				hash_ctl.hcxt = pgStatLocalContext;
2706 2707 2708
				dbentry->tables = hash_create("Per-database table",
											  PGSTAT_TAB_HASH_SIZE,
											  &hash_ctl,
B
Bruce Momjian 已提交
2709
								   HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
2710 2711

				/*
2712
				 * Arrange that following 'T's add entries to this database's
B
Bruce Momjian 已提交
2713
				 * tables hash table.
2714 2715 2716 2717
				 */
				tabhash = dbentry->tables;
				break;

2718 2719 2720
				/*
				 * 'd'	End of this database.
				 */
2721 2722 2723 2724
			case 'd':
				tabhash = NULL;
				break;

2725 2726 2727
				/*
				 * 'T'	A PgStat_StatTabEntry follows.
				 */
2728
			case 'T':
2729 2730
				if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
						  fpin) != sizeof(PgStat_StatTabEntry))
2731
				{
2732 2733
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2734
					goto done;
2735 2736 2737 2738 2739 2740 2741 2742
				}

				/*
				 * Skip if table belongs to a not requested database.
				 */
				if (tabhash == NULL)
					break;

2743
				tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
B
Bruce Momjian 已提交
2744 2745
													(void *) &tabbuf.tableid,
														 HASH_ENTER, &found);
2746 2747 2748

				if (found)
				{
2749 2750
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2751
					goto done;
2752 2753 2754 2755 2756
				}

				memcpy(tabentry, &tabbuf, sizeof(tabbuf));
				break;

2757
				/*
2758
				 * 'E'	The EOF marker of a complete stats file.
2759
				 */
2760 2761
			case 'E':
				goto done;
2762

2763 2764 2765 2766 2767 2768
			default:
				ereport(pgStatRunningInCollector ? LOG : WARNING,
						(errmsg("corrupted pgstat.stat file")));
				goto done;
		}
	}
2769

2770 2771
done:
	FreeFile(fpin);
2772 2773

	return dbhash;
2774
}
2775

2776
/*
2777 2778 2779
 * If not already done, read the statistics collector stats file into
 * some hash tables.  The results will be kept until pgstat_clear_snapshot()
 * is called (typically, at end of transaction).
2780 2781 2782 2783
 */
static void
backend_read_statsfile(void)
{
2784 2785 2786 2787 2788
	/* already read it? */
	if (pgStatDBHash)
		return;
	Assert(!pgStatRunningInCollector);

2789 2790
	/* Autovacuum launcher wants stats about all databases */
	if (IsAutoVacuumLauncherProcess())
2791
		pgStatDBHash = pgstat_read_statsfile(InvalidOid);
2792
	else
2793 2794
		pgStatDBHash = pgstat_read_statsfile(MyDatabaseId);
}
2795

2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811

/* ----------
 * pgstat_setup_memcxt() -
 *
 *	Create pgStatLocalContext, if not already done.
 * ----------
 */
static void
pgstat_setup_memcxt(void)
{
	if (!pgStatLocalContext)
		pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
												   "Statistics snapshot",
												   ALLOCSET_SMALL_MINSIZE,
												   ALLOCSET_SMALL_INITSIZE,
												   ALLOCSET_SMALL_MAXSIZE);
2812 2813
}

2814 2815 2816 2817

/* ----------
 * pgstat_clear_snapshot() -
 *
B
Bruce Momjian 已提交
2818
 *	Discard any data collected in the current transaction.	Any subsequent
2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839
 *	request will cause new snapshots to be read.
 *
 *	This is also invoked during transaction commit or abort to discard
 *	the no-longer-wanted snapshot.
 * ----------
 */
void
pgstat_clear_snapshot(void)
{
	/* Release memory, if any was allocated */
	if (pgStatLocalContext)
		MemoryContextDelete(pgStatLocalContext);

	/* Reset variables */
	pgStatLocalContext = NULL;
	pgStatDBHash = NULL;
	localBackendStatusTable = NULL;
	localNumBackends = 0;
}


2840 2841 2842 2843 2844 2845 2846 2847 2848
/* ----------
 * pgstat_recv_tabstat() -
 *
 *	Count what the backend has done.
 * ----------
 */
static void
pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
{
2849 2850 2851 2852 2853
	PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	int			i;
	bool		found;
2854

2855
	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
2856 2857

	/*
2858
	 * Update database-wide stats.
2859
	 */
2860 2861
	dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
	dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2862 2863 2864 2865 2866 2867

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
2868
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
B
Bruce Momjian 已提交
2869 2870
												  (void *) &(tabmsg[i].t_id),
													   HASH_ENTER, &found);
2871 2872 2873 2874

		if (!found)
		{
			/*
B
Bruce Momjian 已提交
2875 2876
			 * If it's a new table entry, initialize counters to the values we
			 * just got.
2877
			 */
2878 2879 2880 2881 2882 2883
			tabentry->numscans = tabmsg[i].t_counts.t_numscans;
			tabentry->tuples_returned = tabmsg[i].t_counts.t_tuples_returned;
			tabentry->tuples_fetched = tabmsg[i].t_counts.t_tuples_fetched;
			tabentry->tuples_inserted = tabmsg[i].t_counts.t_tuples_inserted;
			tabentry->tuples_updated = tabmsg[i].t_counts.t_tuples_updated;
			tabentry->tuples_deleted = tabmsg[i].t_counts.t_tuples_deleted;
2884
			tabentry->tuples_hot_updated = tabmsg[i].t_counts.t_tuples_hot_updated;
2885 2886 2887 2888 2889
			tabentry->n_live_tuples = tabmsg[i].t_counts.t_new_live_tuples;
			tabentry->n_dead_tuples = tabmsg[i].t_counts.t_new_dead_tuples;
			tabentry->blocks_fetched = tabmsg[i].t_counts.t_blocks_fetched;
			tabentry->blocks_hit = tabmsg[i].t_counts.t_blocks_hit;

2890
			tabentry->last_anl_tuples = 0;
2891 2892 2893 2894
			tabentry->vacuum_timestamp = 0;
			tabentry->autovac_vacuum_timestamp = 0;
			tabentry->analyze_timestamp = 0;
			tabentry->autovac_analyze_timestamp = 0;
2895 2896 2897 2898 2899 2900
		}
		else
		{
			/*
			 * Otherwise add the values to the existing entry.
			 */
2901 2902 2903 2904 2905 2906
			tabentry->numscans += tabmsg[i].t_counts.t_numscans;
			tabentry->tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
			tabentry->tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
			tabentry->tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
			tabentry->tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
			tabentry->tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
2907
			tabentry->tuples_hot_updated += tabmsg[i].t_counts.t_tuples_hot_updated;
2908 2909 2910 2911
			tabentry->n_live_tuples += tabmsg[i].t_counts.t_new_live_tuples;
			tabentry->n_dead_tuples += tabmsg[i].t_counts.t_new_dead_tuples;
			tabentry->blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
			tabentry->blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
2912 2913
		}

2914 2915
		/* Clamp n_live_tuples in case of negative new_live_tuples */
		tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
2916 2917
		/* Likewise for n_dead_tuples */
		tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
2918

2919
		/*
2920
		 * Add per-table stats to the per-database entry, too.
2921
		 */
2922 2923 2924 2925 2926 2927 2928
		dbentry->n_tuples_returned += tabmsg[i].t_counts.t_tuples_returned;
		dbentry->n_tuples_fetched += tabmsg[i].t_counts.t_tuples_fetched;
		dbentry->n_tuples_inserted += tabmsg[i].t_counts.t_tuples_inserted;
		dbentry->n_tuples_updated += tabmsg[i].t_counts.t_tuples_updated;
		dbentry->n_tuples_deleted += tabmsg[i].t_counts.t_tuples_deleted;
		dbentry->n_blocks_fetched += tabmsg[i].t_counts.t_blocks_fetched;
		dbentry->n_blocks_hit += tabmsg[i].t_counts.t_blocks_hit;
2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941
	}
}


/* ----------
 * pgstat_recv_tabpurge() -
 *
 *	Arrange for dead table removal.
 * ----------
 */
static void
pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
{
2942 2943
	PgStat_StatDBEntry *dbentry;
	int			i;
2944

2945 2946 2947 2948 2949 2950 2951
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	/*
	 * No need to purge if we don't even know the database.
	 */
	if (!dbentry || !dbentry->tables)
		return;
2952 2953 2954 2955 2956 2957

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
2958 2959 2960 2961
		/* Remove from hashtable if present; we don't care if it's not. */
		(void) hash_search(dbentry->tables,
						   (void *) &(msg->m_tableid[i]),
						   HASH_REMOVE, NULL);
2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974
	}
}


/* ----------
 * pgstat_recv_dropdb() -
 *
 *	Arrange for dead database removal
 * ----------
 */
static void
pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
{
2975
	PgStat_StatDBEntry *dbentry;
2976 2977 2978 2979

	/*
	 * Lookup the database in the hashtable.
	 */
2980
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
2981 2982

	/*
2983
	 * If found, remove it.
2984
	 */
2985
	if (dbentry)
2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996
	{
		if (dbentry->tables != NULL)
			hash_destroy(dbentry->tables);

		if (hash_search(pgStatDBHash,
						(void *) &(dbentry->databaseid),
						HASH_REMOVE, NULL) == NULL)
			ereport(ERROR,
					(errmsg("database hash table corrupted "
							"during cleanup --- abort")));
	}
2997 2998 2999 3000
}


/* ----------
3001
 * pgstat_recv_resetcounter() -
3002
 *
3003
 *	Reset the statistics for the specified database.
3004 3005 3006 3007 3008
 * ----------
 */
static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
{
3009 3010
	HASHCTL		hash_ctl;
	PgStat_StatDBEntry *dbentry;
3011 3012

	/*
3013
	 * Lookup the database in the hashtable.  Nothing to do if not there.
3014
	 */
3015 3016 3017 3018
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	if (!dbentry)
		return;
3019 3020

	/*
B
Bruce Momjian 已提交
3021 3022
	 * We simply throw away all the database's table entries by recreating a
	 * new hash table for them.
3023 3024 3025 3026
	 */
	if (dbentry->tables != NULL)
		hash_destroy(dbentry->tables);

3027 3028 3029 3030 3031
	dbentry->tables = NULL;
	dbentry->n_xact_commit = 0;
	dbentry->n_xact_rollback = 0;
	dbentry->n_blocks_fetched = 0;
	dbentry->n_blocks_hit = 0;
3032 3033

	memset(&hash_ctl, 0, sizeof(hash_ctl));
3034
	hash_ctl.keysize = sizeof(Oid);
3035
	hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3036
	hash_ctl.hash = oid_hash;
3037 3038 3039 3040
	dbentry->tables = hash_create("Per-database table",
								  PGSTAT_TAB_HASH_SIZE,
								  &hash_ctl,
								  HASH_ELEM | HASH_FUNCTION);
3041
}
3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096

/* ----------
 * pgstat_recv_autovac() -
 *
 *	Process an autovacuum signalling message.
 * ----------
 */
static void
pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
{
	PgStat_StatDBEntry *dbentry;

	/*
	 * Lookup the database in the hashtable.  Don't create the entry if it
	 * doesn't exist, because autovacuum may be processing a template
	 * database.  If this isn't the case, the database is most likely to have
	 * an entry already.  (If it doesn't, not much harm is done anyway --
	 * it'll get created as soon as somebody actually uses the database.)
	 */
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
	if (dbentry == NULL)
		return;

	/*
	 * Store the last autovacuum time in the database entry.
	 */
	dbentry->last_autovac_time = msg->m_start_time;
}

/* ----------
 * pgstat_recv_vacuum() -
 *
 *	Process a VACUUM message.
 * ----------
 */
static void
pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;

	/*
	 * Don't create either the database or table entry if it doesn't already
	 * exist.  This avoids bloating the stats with entries for stuff that is
	 * only touched by vacuum and not by live operations.
	 */
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
	if (dbentry == NULL)
		return;

	tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
						   HASH_FIND, NULL);
	if (tabentry == NULL)
		return;

B
Bruce Momjian 已提交
3097
	if (msg->m_autovacuum)
3098
		tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
B
Bruce Momjian 已提交
3099 3100
	else
		tabentry->vacuum_timestamp = msg->m_vacuumtime;
3101
	tabentry->n_live_tuples = msg->m_tuples;
3102
	/* Resetting dead_tuples to 0 is an approximation ... */
3103 3104 3105 3106 3107 3108 3109 3110 3111
	tabentry->n_dead_tuples = 0;
	if (msg->m_analyze)
	{
		tabentry->last_anl_tuples = msg->m_tuples;
		if (msg->m_autovacuum)
			tabentry->autovac_analyze_timestamp = msg->m_vacuumtime;
		else
			tabentry->analyze_timestamp = msg->m_vacuumtime;
	}
3112 3113
	else
	{
3114
		/* last_anl_tuples must never exceed n_live_tuples+n_dead_tuples */
3115 3116 3117
		tabentry->last_anl_tuples = Min(tabentry->last_anl_tuples,
										msg->m_tuples);
	}
3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145
}

/* ----------
 * pgstat_recv_analyze() -
 *
 *	Process an ANALYZE message.
 * ----------
 */
static void
pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;

	/*
	 * Don't create either the database or table entry if it doesn't already
	 * exist.  This avoids bloating the stats with entries for stuff that is
	 * only touched by analyze and not by live operations.
	 */
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
	if (dbentry == NULL)
		return;

	tabentry = hash_search(dbentry->tables, &(msg->m_tableoid),
						   HASH_FIND, NULL);
	if (tabentry == NULL)
		return;

B
Bruce Momjian 已提交
3146
	if (msg->m_autovacuum)
3147
		tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
B
Bruce Momjian 已提交
3148
	else
3149 3150 3151 3152 3153
		tabentry->analyze_timestamp = msg->m_analyzetime;
	tabentry->n_live_tuples = msg->m_live_tuples;
	tabentry->n_dead_tuples = msg->m_dead_tuples;
	tabentry->last_anl_tuples = msg->m_live_tuples + msg->m_dead_tuples;
}
3154 3155 3156 3157 3158 3159 3160 3161 3162


/* ----------
 * pgstat_recv_bgwriter() -
 *
 *	Process a BGWRITER message.
 * ----------
 */
static void
3163
pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
3164 3165 3166 3167
{
	globalStats.timed_checkpoints += msg->m_timed_checkpoints;
	globalStats.requested_checkpoints += msg->m_requested_checkpoints;
	globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
3168 3169
	globalStats.buf_written_clean += msg->m_buf_written_clean;
	globalStats.maxwritten_clean += msg->m_maxwritten_clean;
3170 3171
	globalStats.buf_written_backend += msg->m_buf_written_backend;
	globalStats.buf_alloc += msg->m_buf_alloc;
3172
}