pgstat.c 67.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* ----------
 * pgstat.c
 *
 *	All the statistics collector stuff hacked up in one big, ugly file.
 *
 *	TODO:	- Separate collector, postmaster and backend stuff
 *			  into different files.
 *
 *			- Add some automatic call for pgstat vacuuming.
 *
 *			- Add a pgstat config column to pg_database, so this
12
 *			  entire thing can be enabled/disabled on a per db basis.
13
 *
14
 *	Copyright (c) 2001-2005, PostgreSQL Global Development Group
15
 *
16
 *	$PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.98 2005/06/29 22:51:55 tgl Exp $
17 18
 * ----------
 */
P
Peter Eisentraut 已提交
19 20
#include "postgres.h"

21 22 23 24 25
#include <unistd.h>
#include <fcntl.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/socket.h>
B
Bruce Momjian 已提交
26
#include <netdb.h>
27 28 29
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
30
#include <time.h>
31

32 33
#include "pgstat.h"

34
#include "access/heapam.h"
35
#include "access/xact.h"
36
#include "catalog/pg_database.h"
B
Bruce Momjian 已提交
37
#include "libpq/libpq.h"
38
#include "libpq/pqsignal.h"
39
#include "mb/pg_wchar.h"
40
#include "miscadmin.h"
41
#include "postmaster/fork_process.h"
42
#include "postmaster/postmaster.h"
43
#include "storage/backendid.h"
44
#include "storage/fd.h"
45
#include "storage/ipc.h"
46
#include "storage/pg_shmem.h"
47
#include "storage/pmsignal.h"
48
#include "tcop/tcopprot.h"
49
#include "utils/hsearch.h"
50
#include "utils/memutils.h"
51
#include "utils/ps_status.h"
52
#include "utils/rel.h"
53 54 55
#include "utils/syscache.h"


56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
/* ----------
 * Paths for the statistics files. The %s is replaced with the
 * installation's $PGDATA.
 * ----------
 */
#define PGSTAT_STAT_FILENAME	"%s/global/pgstat.stat"
#define PGSTAT_STAT_TMPFILE		"%s/global/pgstat.tmp.%d"

/* ----------
 * Timer definitions.
 * ----------
 */
#define PGSTAT_STAT_INTERVAL	500		/* How often to write the status
										 * file; in milliseconds. */

#define PGSTAT_DESTROY_DELAY	10000	/* How long to keep destroyed
										 * objects known, to give delayed
B
Bruce Momjian 已提交
73 74
										 * UDP packets time to arrive; in
										 * milliseconds. */
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96

#define PGSTAT_DESTROY_COUNT	(PGSTAT_DESTROY_DELAY / PGSTAT_STAT_INTERVAL)

#define PGSTAT_RESTART_INTERVAL 60		/* How often to attempt to restart
										 * a failed statistics collector;
										 * in seconds. */

/* ----------
 * Amount of space reserved in pgstat_recvbuffer().
 * ----------
 */
#define PGSTAT_RECVBUFFERSZ		((int) (1024 * sizeof(PgStat_Msg)))

/* ----------
 * The initial size hints for the hash tables used in the collector.
 * ----------
 */
#define PGSTAT_DB_HASH_SIZE		16
#define PGSTAT_BE_HASH_SIZE		512
#define PGSTAT_TAB_HASH_SIZE	512


97
/* ----------
98
 * GUC parameters
99 100
 * ----------
 */
101 102 103 104 105
bool		pgstat_collect_startcollector = true;
bool		pgstat_collect_resetonpmstart = true;
bool		pgstat_collect_querystring = false;
bool		pgstat_collect_tuplelevel = false;
bool		pgstat_collect_blocklevel = false;
106 107 108 109 110

/* ----------
 * Local data
 * ----------
 */
B
Bruce Momjian 已提交
111
NON_EXEC_STATIC int pgStatSock = -1;
112
NON_EXEC_STATIC int pgStatPipe[2] = {-1,-1};
B
Bruce Momjian 已提交
113
static struct sockaddr_storage pgStatAddr;
114
static pid_t pgStatCollectorPid = 0;
115

116
static time_t last_pgstat_start_time;
117

118
static long pgStatNumMessages = 0;
119

120
static bool pgStatRunningInCollector = FALSE;
121

122 123 124
static int	pgStatTabstatAlloc = 0;
static int	pgStatTabstatUsed = 0;
static PgStat_MsgTabstat **pgStatTabstatMessages = NULL;
B
Bruce Momjian 已提交
125

126 127
#define TABSTAT_QUANTUM		4	/* we alloc this many at a time */

128 129
static int	pgStatXactCommit = 0;
static int	pgStatXactRollback = 0;
130

131 132 133 134 135
static TransactionId pgStatDBHashXact = InvalidTransactionId;
static HTAB *pgStatDBHash = NULL;
static HTAB *pgStatBeDead = NULL;
static PgStat_StatBeEntry *pgStatBeTable = NULL;
static int	pgStatNumBackends = 0;
136

137
static char pgStat_fname[MAXPGPATH];
138
static char pgStat_tmpfname[MAXPGPATH];
139 140 141 142 143 144


/* ----------
 * Local function forward declarations
 * ----------
 */
145
#ifdef EXEC_BACKEND
146 147 148 149 150

typedef enum STATS_PROCESS_TYPE
{
	STAT_PROC_BUFFER,
	STAT_PROC_COLLECTOR
B
Bruce Momjian 已提交
151
}	STATS_PROCESS_TYPE;
152

153
static pid_t pgstat_forkexec(STATS_PROCESS_TYPE procType);
154
static void pgstat_parseArgs(int argc, char *argv[]);
155
#endif
156 157 158

NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]);
NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]);
159
static void pgstat_recvbuffer(void);
160
static void pgstat_exit(SIGNAL_ARGS);
161
static void pgstat_die(SIGNAL_ARGS);
162
static void pgstat_beshutdown_hook(int code, Datum arg);
163

164
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid);
165 166 167 168 169 170 171
static int	pgstat_add_backend(PgStat_MsgHdr *msg);
static void pgstat_sub_backend(int procpid);
static void pgstat_drop_database(Oid databaseid);
static void pgstat_write_statsfile(void);
static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
					  PgStat_StatBeEntry **betab,
					  int *numbackends);
172
static void backend_read_statsfile(void);
173 174 175 176 177 178 179 180 181 182 183

static void pgstat_setheader(PgStat_MsgHdr *hdr, int mtype);
static void pgstat_send(void *msg, int len);

static void pgstat_recv_bestart(PgStat_MsgBestart *msg, int len);
static void pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len);
static void pgstat_recv_activity(PgStat_MsgActivity *msg, int len);
static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
184 185 186 187 188 189 190 191 192 193 194


/* ------------------------------------------------------------
 * Public functions called from postmaster follow
 * ------------------------------------------------------------
 */

/* ----------
 * pgstat_init() -
 *
 *	Called from postmaster at startup. Create the resources required
195 196 197
 *	by the statistics collector process.  If unable to do so, do not
 *	fail --- better to let the postmaster start with stats collection
 *	disabled.
198 199
 * ----------
 */
200
void
201 202
pgstat_init(void)
{
B
Bruce Momjian 已提交
203 204 205 206
	ACCEPT_TYPE_ARG3 alen;
	struct addrinfo *addrs = NULL,
			   *addr,
				hints;
B
Bruce Momjian 已提交
207
	int			ret;
B
Bruce Momjian 已提交
208
	fd_set		rset;
209
	struct timeval tv;
B
Bruce Momjian 已提交
210 211
	char		test_byte;
	int			sel_res;
212 213

#define TESTBYTEVAL ((char) 199)
214

215 216 217
	/*
	 * Force start of collector daemon if something to collect
	 */
218 219
	if (pgstat_collect_querystring ||
		pgstat_collect_tuplelevel ||
220
		pgstat_collect_blocklevel)
221 222
		pgstat_collect_startcollector = true;

223
	/*
B
Bruce Momjian 已提交
224 225 226 227 228
	 * Initialize the filename for the status reports.	(In the
	 * EXEC_BACKEND case, this only sets the value in the postmaster.  The
	 * collector subprocess will recompute the value for itself, and
	 * individual backends must do so also if they want to access the
	 * file.)
229
	 */
230
	snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
231

232
	/*
233 234
	 * If we don't have to start a collector or should reset the collected
	 * statistics on postmaster start, simply remove the file.
235 236 237 238 239 240 241 242
	 */
	if (!pgstat_collect_startcollector || pgstat_collect_resetonpmstart)
		unlink(pgStat_fname);

	/*
	 * Nothing else required if collector will not get started
	 */
	if (!pgstat_collect_startcollector)
243
		return;
244

245
	/*
246
	 * Create the UDP socket for sending and receiving statistic messages
247
	 */
B
Bruce Momjian 已提交
248 249 250 251 252 253 254 255
	hints.ai_flags = AI_PASSIVE;
	hints.ai_family = PF_UNSPEC;
	hints.ai_socktype = SOCK_DGRAM;
	hints.ai_protocol = 0;
	hints.ai_addrlen = 0;
	hints.ai_addr = NULL;
	hints.ai_canonname = NULL;
	hints.ai_next = NULL;
256 257
	ret = getaddrinfo_all("localhost", NULL, &hints, &addrs);
	if (ret || !addrs)
B
Bruce Momjian 已提交
258
	{
259
		ereport(LOG,
260
				(errmsg("could not resolve \"localhost\": %s",
261
						gai_strerror(ret))));
B
Bruce Momjian 已提交
262 263
		goto startup_failed;
	}
B
Bruce Momjian 已提交
264

265 266
	/*
	 * On some platforms, getaddrinfo_all() may return multiple addresses
B
Bruce Momjian 已提交
267 268 269 270 271
	 * only one of which will actually work (eg, both IPv6 and IPv4
	 * addresses when kernel will reject IPv6).  Worse, the failure may
	 * occur at the bind() or perhaps even connect() stage.  So we must
	 * loop through the results till we find a working combination.  We
	 * will generate LOG messages, but no error, for bogus combinations.
272
	 */
273 274 275 276 277 278 279
	for (addr = addrs; addr; addr = addr->ai_next)
	{
#ifdef HAVE_UNIX_SOCKETS
		/* Ignore AF_UNIX sockets, if any are returned. */
		if (addr->ai_family == AF_UNIX)
			continue;
#endif
B
Bruce Momjian 已提交
280

281 282 283 284 285 286 287 288 289 290 291 292
		/*
		 * Create the socket.
		 */
		if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not create socket for statistics collector: %m")));
			continue;
		}

		/*
B
Bruce Momjian 已提交
293 294
		 * Bind it to a kernel assigned port on localhost and get the
		 * assigned port via getsockname().
295 296 297 298 299 300 301 302 303 304 305 306
		 */
		if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not bind socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		alen = sizeof(pgStatAddr);
B
Bruce Momjian 已提交
307
		if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
308 309 310 311 312 313 314 315 316 317
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not get address of socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		/*
B
Bruce Momjian 已提交
318 319 320 321
		 * Connect the socket to its own address.  This saves a few cycles
		 * by not having to respecify the target address on every send.
		 * This also provides a kernel-level check that only packets from
		 * this same address will be received.
322
		 */
B
Bruce Momjian 已提交
323
		if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
324 325 326 327 328 329 330 331
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not connect socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}
B
Bruce Momjian 已提交
332

333 334 335
		/*
		 * Try to send and receive a one-byte test message on the socket.
		 * This is to catch situations where the socket can be created but
B
Bruce Momjian 已提交
336 337
		 * will not actually pass data (for instance, because kernel
		 * packet filtering rules prevent it).
338 339 340 341 342 343 344 345 346 347 348 349 350
		 */
		test_byte = TESTBYTEVAL;
		if (send(pgStatSock, &test_byte, 1, 0) != 1)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not send test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		/*
B
Bruce Momjian 已提交
351 352 353
		 * There could possibly be a little delay before the message can
		 * be received.  We arbitrarily allow up to half a second before
		 * deciding it's broken.
354 355 356 357 358 359 360
		 */
		for (;;)				/* need a loop to handle EINTR */
		{
			FD_ZERO(&rset);
			FD_SET(pgStatSock, &rset);
			tv.tv_sec = 0;
			tv.tv_usec = 500000;
B
Bruce Momjian 已提交
361
			sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
362 363 364 365 366 367 368
			if (sel_res >= 0 || errno != EINTR)
				break;
		}
		if (sel_res < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
369
				 errmsg("select() failed in statistics collector: %m")));
370 371 372 373 374 375 376
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}
		if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
		{
			/*
B
Bruce Momjian 已提交
377 378
			 * This is the case we actually think is likely, so take pains
			 * to give a specific message for it.
379 380 381 382
			 *
			 * errno will not be set meaningfully here, so don't use it.
			 */
			ereport(LOG,
383
					(errcode(ERRCODE_CONNECTION_FAILURE),
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
					 errmsg("test message did not get through on socket for statistics collector")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

		test_byte++;			/* just make sure variable is changed */

		if (recv(pgStatSock, &test_byte, 1, 0) != 1)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not receive test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

B
Bruce Momjian 已提交
402
		if (test_byte != TESTBYTEVAL)	/* strictly paranoia ... */
403 404
		{
			ereport(LOG,
405
					(errcode(ERRCODE_INTERNAL_ERROR),
406 407 408 409 410 411
					 errmsg("incorrect test message transmission on socket for statistics collector")));
			closesocket(pgStatSock);
			pgStatSock = -1;
			continue;
		}

412 413
		/* If we get here, we have a working socket */
		break;
414 415
	}

416 417
	/* Did we find a working address? */
	if (!addr || pgStatSock < 0)
418
		goto startup_failed;
419 420 421 422 423 424

	/*
	 * Set the socket to non-blocking IO.  This ensures that if the
	 * collector falls behind (despite the buffering process), statistics
	 * messages will be discarded; backends won't block waiting to send
	 * messages to the collector.
425
	 */
426
	if (!pg_set_noblock(pgStatSock))
427
	{
428 429
		ereport(LOG,
				(errcode_for_socket_access(),
B
Bruce Momjian 已提交
430
				 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
431
		goto startup_failed;
432 433
	}

434 435
	freeaddrinfo_all(hints.ai_family, addrs);

436
	return;
437 438

startup_failed:
439 440 441
	ereport(LOG,
			(errmsg("disabling statistics collector for lack of working socket")));

442 443
	if (addrs)
		freeaddrinfo_all(hints.ai_family, addrs);
B
Bruce Momjian 已提交
444

445
	if (pgStatSock >= 0)
446
		closesocket(pgStatSock);
447 448 449
	pgStatSock = -1;

	/* Adjust GUC variables to suppress useless activity */
450
	pgstat_collect_startcollector = false;
451 452 453
	pgstat_collect_querystring = false;
	pgstat_collect_tuplelevel = false;
	pgstat_collect_blocklevel = false;
454 455 456
}


457 458
#ifdef EXEC_BACKEND

459
/*
460
 * pgstat_forkexec() -
461
 *
462
 * Format up the arglist for, then fork and exec, statistics
463 464
 * (buffer and collector) processes
 */
465 466
static pid_t
pgstat_forkexec(STATS_PROCESS_TYPE procType)
467
{
B
Bruce Momjian 已提交
468 469 470 471 472
	char	   *av[10];
	int			ac = 0,
				bufc = 0,
				i;
	char		pgstatBuf[2][32];
473 474

	av[ac++] = "postgres";
475

476 477 478
	switch (procType)
	{
		case STAT_PROC_BUFFER:
479
			av[ac++] = "-forkbuf";
480 481 482
			break;

		case STAT_PROC_COLLECTOR:
483
			av[ac++] = "-forkcol";
484 485 486 487 488 489
			break;

		default:
			Assert(false);
	}

490 491 492 493 494
	av[ac++] = NULL;			/* filled in by postmaster_forkexec */

	/* postgres_exec_path is not passed by write_backend_variables */
	av[ac++] = postgres_exec_path;

495 496 497 498 499
	/* Add to the arg list */
	Assert(bufc <= lengthof(pgstatBuf));
	for (i = 0; i < bufc; i++)
		av[ac++] = pgstatBuf[i];

500 501
	av[ac] = NULL;
	Assert(ac < lengthof(av));
502

503
	return postmaster_forkexec(ac, av);
504 505 506
}


507
/*
508 509
 * pgstat_parseArgs() -
 *
510
 * Extract data from the arglist for exec'ed statistics
511 512 513
 * (buffer and collector) processes
 */
static void
514
pgstat_parseArgs(int argc, char *argv[])
515
{
516
	Assert(argc == 4);
517

518
	argc = 3;
B
Bruce Momjian 已提交
519
	StrNCpy(postgres_exec_path, argv[argc++], MAXPGPATH);
520
}
B
Bruce Momjian 已提交
521
#endif   /* EXEC_BACKEND */
522

523

524 525 526 527
/* ----------
 * pgstat_start() -
 *
 *	Called from postmaster at startup or after an existing collector
528
 *	died.  Attempt to fire up a fresh statistics collector.
529
 *
530 531
 *	Returns PID of child process, or 0 if fail.
 *
532
 *	Note: if fail, we will be called again from the postmaster main loop.
533 534
 * ----------
 */
535
int
536
pgstat_start(void)
537
{
538
	time_t		curtime;
539
	pid_t		pgStatPid;
540

541 542 543
	/*
	 * Do nothing if no collector needed
	 */
544 545
	if (!pgstat_collect_startcollector)
		return 0;
546

547
	/*
548
	 * Do nothing if too soon since last collector start.  This is a
B
Bruce Momjian 已提交
549 550 551 552
	 * safety valve to protect against continuous respawn attempts if the
	 * collector is dying immediately at launch.  Note that since we will
	 * be re-called from the postmaster main loop, we will get another
	 * chance later.
553 554 555 556
	 */
	curtime = time(NULL);
	if ((unsigned int) (curtime - last_pgstat_start_time) <
		(unsigned int) PGSTAT_RESTART_INTERVAL)
557
		return 0;
558 559 560 561
	last_pgstat_start_time = curtime;

	/*
	 * Check that the socket is there, else pgstat_init failed.
562 563 564
	 */
	if (pgStatSock < 0)
	{
565 566
		ereport(LOG,
				(errmsg("statistics collector startup skipped")));
B
Bruce Momjian 已提交
567

568 569 570 571 572
		/*
		 * We can only get here if someone tries to manually turn
		 * pgstat_collect_startcollector on after it had been off.
		 */
		pgstat_collect_startcollector = false;
573
		return 0;
574 575 576
	}

	/*
577
	 * Okay, fork off the collector.
578
	 */
579
#ifdef EXEC_BACKEND
580
	switch ((pgStatPid = pgstat_forkexec(STAT_PROC_BUFFER)))
581
#else
582
	switch ((pgStatPid = fork_process()))
583
#endif
584 585
	{
		case -1:
586 587
			ereport(LOG,
					(errmsg("could not fork statistics buffer: %m")));
588
			return 0;
589

590
#ifndef EXEC_BACKEND
591
		case 0:
592
			/* in postmaster child ... */
593
			/* Close the postmaster's sockets */
594
			ClosePostmasterPorts(false);
595 596 597 598

			/* Drop our connection to postmaster's shared memory, as well */
			PGSharedMemoryDetach();

599
			PgstatBufferMain(0, NULL);
600
			break;
601
#endif
602 603

		default:
604
			return (int) pgStatPid;
605 606
	}

607 608
	/* shouldn't get here */
	return 0;
609 610 611 612 613 614 615 616 617 618 619 620
}


/* ----------
 * pgstat_beterm() -
 *
 *	Called from postmaster to tell collector a backend terminated.
 * ----------
 */
void
pgstat_beterm(int pid)
{
621
	PgStat_MsgBeterm msg;
622

623
	if (pgStatSock < 0)
624 625
		return;

626
	MemSet(&(msg.m_hdr), 0, sizeof(msg.m_hdr));
627 628
	msg.m_hdr.m_type = PGSTAT_MTYPE_BETERM;
	msg.m_hdr.m_procpid = pid;
629 630 631 632 633 634 635

	pgstat_send(&msg, sizeof(msg));
}


/* ------------------------------------------------------------
 * Public functions used by backends follow
636
 *------------------------------------------------------------
637 638 639 640 641 642 643 644 645 646 647 648 649
 */


/* ----------
 * pgstat_bestart() -
 *
 *	Tell the collector that this new backend is soon ready to process
 *	queries. Called from tcop/postgres.c before entering the mainloop.
 * ----------
 */
void
pgstat_bestart(void)
{
650
	PgStat_MsgBestart msg;
651

652
	if (pgStatSock < 0)
653 654 655
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_BESTART);
656 657 658
	msg.m_databaseid = MyDatabaseId;
	msg.m_userid = GetSessionUserId();
	memcpy(&msg.m_clientaddr, &MyProcPort->raddr, sizeof(msg.m_clientaddr));
659
	pgstat_send(&msg, sizeof(msg));
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678

	/*
	 * Set up a process-exit hook to ensure we flush the last batch of
	 * statistics to the collector.
	 */
	on_proc_exit(pgstat_beshutdown_hook, 0);
}

/*
 * Flush any remaining statistics counts out to the collector at process
 * exit.   Without this, operations triggered during backend exit (such as
 * temp table deletions) won't be counted.  This is an on_proc_exit hook,
 * not on_shmem_exit, so that everything interesting must have happened
 * already.
 */
static void
pgstat_beshutdown_hook(int code, Datum arg)
{
	pgstat_report_tabstat();
679 680 681 682 683 684
}


/* ----------
 * pgstat_report_activity() -
 *
685
 *	Called from tcop/postgres.c to tell the collector what the backend
686 687 688 689 690
 *	is actually doing (usually "<IDLE>" or the start of the query to
 *	be executed).
 * ----------
 */
void
691
pgstat_report_activity(const char *what)
692
{
693 694
	PgStat_MsgActivity msg;
	int			len;
695

696
	if (!pgstat_collect_querystring || pgStatSock < 0)
697 698 699
		return;

	len = strlen(what);
700 701
	len = pg_mbcliplen((const unsigned char *) what, len,
					   PGSTAT_ACTIVITY_SIZE - 1);
702

703 704
	memcpy(msg.m_what, what, len);
	msg.m_what[len] = '\0';
705
	len += offsetof(PgStat_MsgActivity, m_what) +1;
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ACTIVITY);
	pgstat_send(&msg, len);
}


/* ----------
 * pgstat_report_tabstat() -
 *
 *	Called from tcop/postgres.c to send the so far collected
 *	per table access statistics to the collector.
 * ----------
 */
void
pgstat_report_tabstat(void)
{
722
	int			i;
723

724 725 726 727 728 729 730
	if (pgStatSock < 0 ||
		!(pgstat_collect_querystring ||
		  pgstat_collect_tuplelevel ||
		  pgstat_collect_blocklevel))
	{
		/* Not reporting stats, so just flush whatever we have */
		pgStatTabstatUsed = 0;
731
		return;
732
	}
733 734

	/*
735 736
	 * For each message buffer used during the last query set the header
	 * fields and send it out.
737 738 739
	 */
	for (i = 0; i < pgStatTabstatUsed; i++)
	{
740 741 742 743 744
		PgStat_MsgTabstat *tsmsg = pgStatTabstatMessages[i];
		int			n;
		int			len;

		n = tsmsg->m_nentries;
745 746
		len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
			n * sizeof(PgStat_TableEntry);
747

748 749
		tsmsg->m_xact_commit = pgStatXactCommit;
		tsmsg->m_xact_rollback = pgStatXactRollback;
750
		pgStatXactCommit = 0;
751 752
		pgStatXactRollback = 0;

753
		pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
754
		tsmsg->m_databaseid = MyDatabaseId;
755
		pgstat_send(tsmsg, len);
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770
	}

	pgStatTabstatUsed = 0;
}


/* ----------
 * pgstat_vacuum_tabstat() -
 *
 *	Will tell the collector about objects he can get rid of.
 * ----------
 */
int
pgstat_vacuum_tabstat(void)
{
771 772 773 774 775 776 777 778 779 780 781 782 783 784
	Relation	dbrel;
	HeapScanDesc dbscan;
	HeapTuple	dbtup;
	Oid		   *dbidlist;
	int			dbidalloc;
	int			dbidused;
	HASH_SEQ_STATUS hstat;
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	HeapTuple	reltup;
	int			nobjects = 0;
	PgStat_MsgTabpurge msg;
	int			len;
	int			i;
785 786 787 788 789 790 791 792

	if (pgStatSock < 0)
		return 0;

	/*
	 * If not done for this transaction, read the statistics collector
	 * stats file into some hash tables.
	 */
793
	backend_read_statsfile();
794 795 796 797

	/*
	 * Lookup our own database entry
	 */
798 799 800
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &MyDatabaseId,
												 HASH_FIND, NULL);
801
	if (dbentry == NULL)
802 803 804 805 806 807 808 809 810 811 812 813 814
		return -1;

	if (dbentry->tables == NULL)
		return 0;

	/*
	 * Initialize our messages table counter to zero
	 */
	msg.m_nentries = 0;

	/*
	 * Check for all tables if they still exist.
	 */
815
	hash_seq_init(&hstat, dbentry->tables);
816
	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
817 818
	{
		/*
819 820
		 * Check if this relation is still alive by looking up it's
		 * pg_class tuple in the system catalog cache.
821 822
		 */
		reltup = SearchSysCache(RELOID,
823 824
								ObjectIdGetDatum(tabentry->tableid),
								0, 0, 0);
825 826 827 828 829 830 831
		if (HeapTupleIsValid(reltup))
		{
			ReleaseSysCache(reltup);
			continue;
		}

		/*
832
		 * Add this table's Oid to the message
833 834 835 836 837 838 839 840 841
		 */
		msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
		nobjects++;

		/*
		 * If the message is full, send it out and reinitialize ot zero
		 */
		if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
		{
842
			len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
843
				+msg.m_nentries * sizeof(Oid);
844 845 846 847 848 849 850 851 852 853 854 855 856

			pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
			pgstat_send(&msg, len);

			msg.m_nentries = 0;
		}
	}

	/*
	 * Send the rest
	 */
	if (msg.m_nentries > 0)
	{
857
		len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
858
			+msg.m_nentries * sizeof(Oid);
859 860

		pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
861
		msg.m_databaseid = MyDatabaseId;
862 863 864 865 866 867 868 869
		pgstat_send(&msg, len);
	}

	/*
	 * Read pg_database and remember the Oid's of all existing databases
	 */
	dbidalloc = 256;
	dbidused = 0;
870
	dbidlist = (Oid *) palloc(sizeof(Oid) * dbidalloc);
871

872
	dbrel = heap_open(DatabaseRelationId, AccessShareLock);
873 874
	dbscan = heap_beginscan(dbrel, SnapshotNow, 0, NULL);
	while ((dbtup = heap_getnext(dbscan, ForwardScanDirection)) != NULL)
875 876 877 878
	{
		if (dbidused >= dbidalloc)
		{
			dbidalloc *= 2;
879 880
			dbidlist = (Oid *) repalloc((char *) dbidlist,
										sizeof(Oid) * dbidalloc);
881
		}
882
		dbidlist[dbidused++] = HeapTupleGetOid(dbtup);
883 884 885 886 887
	}
	heap_endscan(dbscan);
	heap_close(dbrel, AccessShareLock);

	/*
888 889
	 * Search the database hash table for dead databases and tell the
	 * collector to drop them as well.
890 891
	 */
	hash_seq_init(&hstat, pgStatDBHash);
892
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
893
	{
894
		Oid			dbid = dbentry->databaseid;
895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914

		for (i = 0; i < dbidused; i++)
		{
			if (dbidlist[i] == dbid)
			{
				dbid = InvalidOid;
				break;
			}
		}

		if (dbid != InvalidOid)
		{
			nobjects++;
			pgstat_drop_database(dbid);
		}
	}

	/*
	 * Free the dbid list.
	 */
915
	pfree(dbidlist);
916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935

	/*
	 * Tell the caller how many removeable objects we found
	 */
	return nobjects;
}


/* ----------
 * pgstat_drop_database() -
 *
 *	Tell the collector that we just dropped a database.
 *	This is the only message that shouldn't get lost in space. Otherwise
 *	the collector will keep the statistics for the dead DB until his
 *	stats file got removed while the postmaster is down.
 * ----------
 */
static void
pgstat_drop_database(Oid databaseid)
{
936
	PgStat_MsgDropdb msg;
937 938 939 940 941

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
942
	msg.m_databaseid = databaseid;
943 944 945 946 947 948 949 950 951 952 953 954 955
	pgstat_send(&msg, sizeof(msg));
}


/* ----------
 * pgstat_reset_counters() -
 *
 *	Tell the statistics collector to reset counters for our database.
 * ----------
 */
void
pgstat_reset_counters(void)
{
956
	PgStat_MsgResetcounter msg;
957 958 959 960 961

	if (pgStatSock < 0)
		return;

	if (!superuser())
962 963
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
B
Bruce Momjian 已提交
964
			  errmsg("must be superuser to reset statistics counters")));
965 966

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
967
	msg.m_databaseid = MyDatabaseId;
968 969 970 971 972 973 974 975 976 977 978 979 980
	pgstat_send(&msg, sizeof(msg));
}


/* ----------
 * pgstat_ping() -
 *
 *	Send some junk data to the collector to increase traffic.
 * ----------
 */
void
pgstat_ping(void)
{
981
	PgStat_MsgDummy msg;
982 983 984 985 986 987 988 989

	if (pgStatSock < 0)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
	pgstat_send(&msg, sizeof(msg));
}

990 991 992
/*
 * Create or enlarge the pgStatTabstatMessages array
 */
993
static void
994 995 996 997 998 999 1000 1001 1002
more_tabstat_space(void)
{
	PgStat_MsgTabstat *newMessages;
	PgStat_MsgTabstat **msgArray;
	int			newAlloc = pgStatTabstatAlloc + TABSTAT_QUANTUM;
	int			i;

	/* Create (another) quantum of message buffers */
	newMessages = (PgStat_MsgTabstat *)
1003 1004
		MemoryContextAllocZero(TopMemoryContext,
							   sizeof(PgStat_MsgTabstat) * TABSTAT_QUANTUM);
1005 1006 1007 1008

	/* Create or enlarge the pointer array */
	if (pgStatTabstatMessages == NULL)
		msgArray = (PgStat_MsgTabstat **)
1009 1010
			MemoryContextAlloc(TopMemoryContext,
							   sizeof(PgStat_MsgTabstat *) * newAlloc);
1011 1012
	else
		msgArray = (PgStat_MsgTabstat **)
1013 1014
			repalloc(pgStatTabstatMessages,
					 sizeof(PgStat_MsgTabstat *) * newAlloc);
1015 1016 1017 1018 1019 1020

	for (i = 0; i < TABSTAT_QUANTUM; i++)
		msgArray[pgStatTabstatAlloc + i] = newMessages++;
	pgStatTabstatMessages = msgArray;
	pgStatTabstatAlloc = newAlloc;

1021
	Assert(pgStatTabstatUsed < pgStatTabstatAlloc);
1022
}
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035

/* ----------
 * pgstat_initstats() -
 *
 *	Called from various places usually dealing with initialization
 *	of Relation or Scan structures. The data placed into these
 *	structures from here tell where later to count for buffer reads,
 *	scans and tuples fetched.
 * ----------
 */
void
pgstat_initstats(PgStat_Info *stats, Relation rel)
{
1036
	Oid			rel_id = rel->rd_id;
1037 1038
	PgStat_TableEntry *useent;
	PgStat_MsgTabstat *tsmsg;
1039 1040
	int			mb;
	int			i;
1041 1042 1043 1044

	/*
	 * Initialize data not to count at all.
	 */
1045 1046 1047 1048
	stats->tabentry = NULL;
	stats->no_stats = FALSE;
	stats->heap_scan_counted = FALSE;
	stats->index_scan_counted = FALSE;
1049

1050 1051 1052
	if (pgStatSock < 0 ||
		!(pgstat_collect_tuplelevel ||
		  pgstat_collect_blocklevel))
1053 1054 1055 1056 1057 1058
	{
		stats->no_stats = TRUE;
		return;
	}

	/*
1059
	 * Search the already-used message slots for this relation.
1060 1061 1062
	 */
	for (mb = 0; mb < pgStatTabstatUsed; mb++)
	{
1063 1064
		tsmsg = pgStatTabstatMessages[mb];

B
Bruce Momjian 已提交
1065
		for (i = tsmsg->m_nentries; --i >= 0;)
1066
		{
1067
			if (tsmsg->m_entry[i].t_id == rel_id)
1068
			{
1069
				stats->tabentry = (void *) &(tsmsg->m_entry[i]);
1070 1071 1072 1073
				return;
			}
		}

1074
		if (tsmsg->m_nentries >= PGSTAT_NUM_TABENTRIES)
1075
			continue;
1076

1077 1078 1079 1080
		/*
		 * Not found, but found a message buffer with an empty slot
		 * instead. Fine, let's use this one.
		 */
1081 1082
		i = tsmsg->m_nentries++;
		useent = &tsmsg->m_entry[i];
1083
		MemSet(useent, 0, sizeof(PgStat_TableEntry));
1084
		useent->t_id = rel_id;
1085
		stats->tabentry = (void *) useent;
1086 1087 1088 1089 1090 1091 1092
		return;
	}

	/*
	 * If we ran out of message buffers, we just allocate more.
	 */
	if (pgStatTabstatUsed >= pgStatTabstatAlloc)
1093
		more_tabstat_space();
1094 1095 1096 1097 1098

	/*
	 * Use the first entry of the next message buffer.
	 */
	mb = pgStatTabstatUsed++;
1099 1100 1101
	tsmsg = pgStatTabstatMessages[mb];
	tsmsg->m_nentries = 1;
	useent = &tsmsg->m_entry[0];
1102
	MemSet(useent, 0, sizeof(PgStat_TableEntry));
1103
	useent->t_id = rel_id;
1104
	stats->tabentry = (void *) useent;
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116
}


/* ----------
 * pgstat_count_xact_commit() -
 *
 *	Called from access/transam/xact.c to count transaction commits.
 * ----------
 */
void
pgstat_count_xact_commit(void)
{
1117 1118 1119
	if (!(pgstat_collect_querystring ||
		  pgstat_collect_tuplelevel ||
		  pgstat_collect_blocklevel))
1120 1121
		return;

1122 1123 1124
	pgStatXactCommit++;

	/*
1125 1126 1127
	 * If there was no relation activity yet, just make one existing
	 * message buffer used without slots, causing the next report to tell
	 * new xact-counters.
1128
	 */
1129
	if (pgStatTabstatAlloc == 0)
1130 1131
		more_tabstat_space();

1132 1133 1134 1135
	if (pgStatTabstatUsed == 0)
	{
		pgStatTabstatUsed++;
		pgStatTabstatMessages[0]->m_nentries = 0;
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
	}
}


/* ----------
 * pgstat_count_xact_rollback() -
 *
 *	Called from access/transam/xact.c to count transaction rollbacks.
 * ----------
 */
void
pgstat_count_xact_rollback(void)
{
1149 1150 1151
	if (!(pgstat_collect_querystring ||
		  pgstat_collect_tuplelevel ||
		  pgstat_collect_blocklevel))
1152 1153
		return;

1154 1155 1156
	pgStatXactRollback++;

	/*
1157 1158 1159
	 * If there was no relation activity yet, just make one existing
	 * message buffer used without slots, causing the next report to tell
	 * new xact-counters.
1160
	 */
1161
	if (pgStatTabstatAlloc == 0)
1162 1163
		more_tabstat_space();

1164 1165 1166 1167
	if (pgStatTabstatUsed == 0)
	{
		pgStatTabstatUsed++;
		pgStatTabstatMessages[0]->m_nentries = 0;
1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
	}
}


/* ----------
 * pgstat_fetch_stat_dbentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one database or NULL. NULL doesn't mean
 *	that the database doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatDBEntry *
pgstat_fetch_stat_dbentry(Oid dbid)
{
	/*
	 * If not done for this transaction, read the statistics collector
1186
	 * stats file into some hash tables.
1187
	 */
1188
	backend_read_statsfile();
1189 1190

	/*
1191
	 * Lookup the requested database; return NULL if not found
1192
	 */
1193 1194 1195
	return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
											  (void *) &dbid,
											  HASH_FIND, NULL);
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
}


/* ----------
 * pgstat_fetch_stat_tabentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one table or NULL. NULL doesn't mean
 *	that the table doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatTabEntry *
pgstat_fetch_stat_tabentry(Oid relid)
{
1211 1212
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
1213 1214 1215

	/*
	 * If not done for this transaction, read the statistics collector
1216
	 * stats file into some hash tables.
1217
	 */
1218
	backend_read_statsfile();
1219 1220 1221 1222

	/*
	 * Lookup our database.
	 */
1223 1224
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &MyDatabaseId,
1225 1226
												 HASH_FIND, NULL);
	if (dbentry == NULL)
1227 1228 1229 1230 1231 1232 1233
		return NULL;

	/*
	 * Now inside the DB's table hash table lookup the requested one.
	 */
	if (dbentry->tables == NULL)
		return NULL;
1234 1235
	tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
												   (void *) &relid,
1236 1237
												   HASH_FIND, NULL);
	if (tabentry == NULL)
1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
		return NULL;

	return tabentry;
}


/* ----------
 * pgstat_fetch_stat_beentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the actual activity slot of one active backend. The caller is
 *	responsible for a check if the actual user is permitted to see
 *	that info (especially the querystring).
 * ----------
 */
PgStat_StatBeEntry *
pgstat_fetch_stat_beentry(int beid)
{
1256
	backend_read_statsfile();
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274

	if (beid < 1 || beid > pgStatNumBackends)
		return NULL;

	return &pgStatBeTable[beid - 1];
}


/* ----------
 * pgstat_fetch_stat_numbackends() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the maximum current backend id.
 * ----------
 */
int
pgstat_fetch_stat_numbackends(void)
{
1275
	backend_read_statsfile();
1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296

	return pgStatNumBackends;
}



/* ------------------------------------------------------------
 * Local support functions follow
 * ------------------------------------------------------------
 */


/* ----------
 * pgstat_setheader() -
 *
 *		Set common header fields in a statistics message
 * ----------
 */
static void
pgstat_setheader(PgStat_MsgHdr *hdr, int mtype)
{
1297 1298 1299
	hdr->m_type = mtype;
	hdr->m_backendid = MyBackendId;
	hdr->m_procpid = MyProcPid;
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
}


/* ----------
 * pgstat_send() -
 *
 *		Send out one statistics message to the collector
 * ----------
 */
static void
pgstat_send(void *msg, int len)
{
	if (pgStatSock < 0)
		return;

1315
	((PgStat_MsgHdr *) msg)->m_size = len;
1316

1317 1318
	send(pgStatSock, msg, len, 0);
	/* We deliberately ignore any error from send() */
1319 1320 1321
}


1322 1323 1324 1325 1326 1327 1328 1329
/* ----------
 * PgstatBufferMain() -
 *
 *	Start up the statistics buffer process.  This is the body of the
 *	postmaster child process.
 *
 *	The argc/argv parameters are valid only in EXEC_BACKEND case.
 * ----------
1330
 */
1331 1332
NON_EXEC_STATIC void
PgstatBufferMain(int argc, char *argv[])
1333
{
1334
	IsUnderPostmaster = true;	/* we are a postmaster subprocess now */
1335

1336 1337 1338 1339
	MyProcPid = getpid();		/* reset MyProcPid */

	/* Lose the postmaster's on-exit routines */
	on_exit_reset();
1340 1341

	/*
1342
	 * Ignore all signals usually bound to some action in the postmaster,
1343
	 * except for SIGCHLD and SIGQUIT --- see pgstat_recvbuffer.
1344
	 */
1345 1346 1347
	pqsignal(SIGHUP, SIG_IGN);
	pqsignal(SIGINT, SIG_IGN);
	pqsignal(SIGTERM, SIG_IGN);
1348
	pqsignal(SIGQUIT, pgstat_exit);
1349 1350 1351 1352
	pqsignal(SIGALRM, SIG_IGN);
	pqsignal(SIGPIPE, SIG_IGN);
	pqsignal(SIGUSR1, SIG_IGN);
	pqsignal(SIGUSR2, SIG_IGN);
1353
	pqsignal(SIGCHLD, pgstat_die);
1354 1355 1356 1357
	pqsignal(SIGTTIN, SIG_DFL);
	pqsignal(SIGTTOU, SIG_DFL);
	pqsignal(SIGCONT, SIG_DFL);
	pqsignal(SIGWINCH, SIG_DFL);
1358
	/* unblock will happen in pgstat_recvbuffer */
1359 1360

#ifdef EXEC_BACKEND
B
Bruce Momjian 已提交
1361
	pgstat_parseArgs(argc, argv);
1362 1363
#endif

1364
	/*
1365 1366
	 * Start a buffering process to read from the socket, so we have a
	 * little more time to process incoming messages.
1367 1368
	 *
	 * NOTE: the process structure is: postmaster is parent of buffer process
1369
	 * is parent of collector process.	This way, the buffer can detect
1370
	 * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
1371 1372 1373 1374
	 * collector failure until it tried to write on the pipe.  That would
	 * mean that after the postmaster started a new collector, we'd have
	 * two buffer processes competing to read from the UDP socket --- not
	 * good.
1375
	 */
1376
	if (pgpipe(pgStatPipe) < 0)
1377
		ereport(ERROR,
1378
				(errcode_for_socket_access(),
B
Bruce Momjian 已提交
1379
			 errmsg("could not create pipe for statistics buffer: %m")));
1380

1381
	/* child becomes collector process */
1382 1383
#ifdef EXEC_BACKEND
	pgStatCollectorPid = pgstat_forkexec(STAT_PROC_COLLECTOR);
1384
#else
1385
	pgStatCollectorPid = fork();
1386
#endif
1387
	switch (pgStatCollectorPid)
1388
	{
1389
		case -1:
1390
			ereport(ERROR,
1391
					(errmsg("could not fork statistics collector: %m")));
1392

1393
#ifndef EXEC_BACKEND
1394
		case 0:
1395
			/* child becomes collector process */
1396
			PgstatCollectorMain(0, NULL);
1397
			break;
1398
#endif
1399 1400 1401

		default:
			/* parent becomes buffer process */
1402
			closesocket(pgStatPipe[0]);
1403
			pgstat_recvbuffer();
1404
	}
1405
	exit(0);
1406 1407 1408
}


1409 1410 1411 1412 1413 1414 1415 1416 1417
/* ----------
 * PgstatCollectorMain() -
 *
 *	Start up the statistics collector itself.  This is the body of the
 *	postmaster grandchild process.
 *
 *	The argc/argv parameters are valid only in EXEC_BACKEND case.
 * ----------
 */
1418
NON_EXEC_STATIC void
1419
PgstatCollectorMain(int argc, char *argv[])
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430
{
	PgStat_Msg	msg;
	fd_set		rfds;
	int			readPipe;
	int			nready;
	int			len = 0;
	struct timeval timeout;
	struct timeval next_statwrite;
	bool		need_statwrite;
	HASHCTL		hash_ctl;

1431 1432 1433 1434
	MyProcPid = getpid();		/* reset MyProcPid */

	/*
	 * Reset signal handling.  With the exception of restoring default
B
Bruce Momjian 已提交
1435 1436 1437
	 * SIGCHLD and SIGQUIT handling, this is a no-op in the
	 * non-EXEC_BACKEND case because we'll have inherited these settings
	 * from the buffer process; but it's not a no-op for EXEC_BACKEND.
1438 1439 1440 1441
	 */
	pqsignal(SIGHUP, SIG_IGN);
	pqsignal(SIGINT, SIG_IGN);
	pqsignal(SIGTERM, SIG_IGN);
1442
#ifndef WIN32
1443
	pqsignal(SIGQUIT, SIG_IGN);
1444 1445 1446 1447
#else
	/* kluge to allow buffer process to kill collector; FIXME */
	pqsignal(SIGQUIT, pgstat_exit);
#endif
1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458
	pqsignal(SIGALRM, SIG_IGN);
	pqsignal(SIGPIPE, SIG_IGN);
	pqsignal(SIGUSR1, SIG_IGN);
	pqsignal(SIGUSR2, SIG_IGN);
	pqsignal(SIGCHLD, SIG_DFL);
	pqsignal(SIGTTIN, SIG_DFL);
	pqsignal(SIGTTOU, SIG_DFL);
	pqsignal(SIGCONT, SIG_DFL);
	pqsignal(SIGWINCH, SIG_DFL);
	PG_SETMASK(&UnBlockSig);

1459
#ifdef EXEC_BACKEND
B
Bruce Momjian 已提交
1460
	pgstat_parseArgs(argc, argv);
1461 1462
#endif

1463
	/* Close unwanted files */
1464 1465
	closesocket(pgStatPipe[1]);
	closesocket(pgStatSock);
1466

1467 1468 1469
	/*
	 * Identify myself via ps
	 */
1470
	init_ps_display("stats collector process", "", "");
1471 1472
	set_ps_display("");

1473 1474 1475 1476 1477 1478
	/*
	 * Initialize filenames needed for status reports.
	 */
	snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
	/* tmpfname need only be set correctly in this process */
	snprintf(pgStat_tmpfname, MAXPGPATH, PGSTAT_STAT_TMPFILE,
1479
			 DataDir, (int)getpid());
1480

1481 1482 1483 1484 1485 1486
	/*
	 * Arrange to write the initial status file right away
	 */
	gettimeofday(&next_statwrite, NULL);
	need_statwrite = TRUE;

1487
	/*
1488 1489
	 * Read in an existing statistics stats file or initialize the stats
	 * to zero.
1490 1491 1492 1493 1494 1495 1496 1497
	 */
	pgStatRunningInCollector = TRUE;
	pgstat_read_statsfile(&pgStatDBHash, InvalidOid, NULL, NULL);

	/*
	 * Create the dead backend hashtable
	 */
	memset(&hash_ctl, 0, sizeof(hash_ctl));
1498
	hash_ctl.keysize = sizeof(int);
1499
	hash_ctl.entrysize = sizeof(PgStat_StatBeDead);
1500
	hash_ctl.hash = tag_hash;
1501 1502
	pgStatBeDead = hash_create("Dead Backends", PGSTAT_BE_HASH_SIZE,
							   &hash_ctl, HASH_ELEM | HASH_FUNCTION);
1503 1504 1505 1506

	/*
	 * Create the known backends table
	 */
1507
	pgStatBeTable = (PgStat_StatBeEntry *) palloc0(
1508
							   sizeof(PgStat_StatBeEntry) * MaxBackends);
1509

1510 1511
	readPipe = pgStatPipe[0];

1512
	/*
1513 1514
	 * Process incoming messages and handle all the reporting stuff until
	 * there are no more messages.
1515 1516 1517 1518
	 */
	for (;;)
	{
		/*
1519 1520 1521
		 * If we need to write the status file again (there have been
		 * changes in the statistics since we wrote it last) calculate the
		 * timeout until we have to do so.
1522 1523 1524
		 */
		if (need_statwrite)
		{
1525 1526 1527 1528 1529 1530 1531
			struct timeval now;

			gettimeofday(&now, NULL);
			/* avoid assuming that tv_sec is signed */
			if (now.tv_sec > next_statwrite.tv_sec ||
				(now.tv_sec == next_statwrite.tv_sec &&
				 now.tv_usec >= next_statwrite.tv_usec))
1532
			{
1533
				timeout.tv_sec = 0;
1534 1535
				timeout.tv_usec = 0;
			}
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
			else
			{
				timeout.tv_sec = next_statwrite.tv_sec - now.tv_sec;
				timeout.tv_usec = next_statwrite.tv_usec - now.tv_usec;
				if (timeout.tv_usec < 0)
				{
					timeout.tv_sec--;
					timeout.tv_usec += 1000000;
				}
			}
1546 1547 1548 1549 1550 1551
		}

		/*
		 * Setup the descriptor set for select(2)
		 */
		FD_ZERO(&rfds);
1552
		FD_SET(readPipe, &rfds);
1553 1554 1555 1556

		/*
		 * Now wait for something to do.
		 */
B
Bruce Momjian 已提交
1557
		nready = select(readPipe + 1, &rfds, NULL, NULL,
1558 1559 1560
						(need_statwrite) ? &timeout : NULL);
		if (nready < 0)
		{
1561 1562
			if (errno == EINTR)
				continue;
1563
			ereport(ERROR,
1564
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
1565
				 errmsg("select() failed in statistics collector: %m")));
1566 1567 1568
		}

		/*
1569 1570
		 * If there are no descriptors ready, our timeout for writing the
		 * stats file happened.
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582
		 */
		if (nready == 0)
		{
			pgstat_write_statsfile();
			need_statwrite = FALSE;

			continue;
		}

		/*
		 * Check if there is a new statistics message to collect.
		 */
1583
		if (FD_ISSET(readPipe, &rfds))
1584 1585
		{
			/*
1586 1587 1588 1589 1590
			 * We may need to issue multiple read calls in case the buffer
			 * process didn't write the message in a single write, which
			 * is possible since it dumps its buffer bytewise. In any
			 * case, we'd need two reads since we don't know the message
			 * length initially.
1591
			 */
1592 1593
			int			nread = 0;
			int			targetlen = sizeof(PgStat_MsgHdr);		/* initial */
1594
			bool		pipeEOF = false;
1595

1596
			while (nread < targetlen)
1597
			{
1598
				len = piperead(readPipe, ((char *) &msg) + nread,
B
Bruce Momjian 已提交
1599
							   targetlen - nread);
1600 1601
				if (len < 0)
				{
1602 1603
					if (errno == EINTR)
						continue;
1604
					ereport(ERROR,
1605
							(errcode_for_socket_access(),
1606
							 errmsg("could not read from statistics collector pipe: %m")));
1607
				}
1608
				if (len == 0)	/* EOF on the pipe! */
1609 1610
				{
					pipeEOF = true;
1611
					break;
1612
				}
1613 1614
				nread += len;
				if (nread == sizeof(PgStat_MsgHdr))
1615
				{
1616 1617 1618 1619 1620 1621
					/* we have the header, compute actual msg length */
					targetlen = msg.msg_hdr.m_size;
					if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
						targetlen > (int) sizeof(msg))
					{
						/*
1622 1623 1624
						 * Bogus message length implies that we got out of
						 * sync with the buffer process somehow. Abort so
						 * that we can restart both processes.
1625
						 */
1626
						ereport(ERROR,
B
Bruce Momjian 已提交
1627
						  (errmsg("invalid statistics message length")));
1628
					}
1629 1630
				}
			}
1631

1632 1633 1634 1635
			/*
			 * EOF on the pipe implies that the buffer process exited.
			 * Fall out of outer loop.
			 */
1636
			if (pipeEOF)
1637
				break;
1638 1639

			/*
1640 1641
			 * Distribute the message to the specific function handling
			 * it.
1642 1643 1644 1645 1646 1647 1648
			 */
			switch (msg.msg_hdr.m_type)
			{
				case PGSTAT_MTYPE_DUMMY:
					break;

				case PGSTAT_MTYPE_BESTART:
1649
					pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread);
1650 1651 1652
					break;

				case PGSTAT_MTYPE_BETERM:
1653
					pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread);
1654 1655 1656
					break;

				case PGSTAT_MTYPE_TABSTAT:
1657
					pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread);
1658 1659 1660
					break;

				case PGSTAT_MTYPE_TABPURGE:
1661
					pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread);
1662 1663 1664
					break;

				case PGSTAT_MTYPE_ACTIVITY:
1665
					pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread);
1666 1667 1668
					break;

				case PGSTAT_MTYPE_DROPDB:
1669
					pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread);
1670 1671 1672
					break;

				case PGSTAT_MTYPE_RESETCOUNTER:
1673
					pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
1674
											 nread);
1675 1676 1677 1678 1679 1680 1681
					break;

				default:
					break;
			}

			/*
1682
			 * Globally count messages.
1683 1684
			 */
			pgStatNumMessages++;
1685 1686

			/*
1687 1688
			 * If this is the first message after we wrote the stats file
			 * the last time, setup the timeout that it'd be written.
1689 1690 1691 1692 1693
			 */
			if (!need_statwrite)
			{
				gettimeofday(&next_statwrite, NULL);
				next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
1694
				next_statwrite.tv_sec += (next_statwrite.tv_usec / 1000000);
1695 1696 1697
				next_statwrite.tv_usec %= 1000000;
				need_statwrite = TRUE;
			}
1698 1699 1700
		}

		/*
1701
		 * Note that we do NOT check for postmaster exit inside the loop;
1702 1703 1704
		 * only EOF on the buffer pipe causes us to fall out.  This
		 * ensures we don't exit prematurely if there are still a few
		 * messages in the buffer or pipe at postmaster shutdown.
1705 1706
		 */
	}
1707 1708

	/*
1709 1710 1711 1712 1713
	 * Okay, we saw EOF on the buffer pipe, so there are no more messages
	 * to process.	If the buffer process quit because of postmaster
	 * shutdown, we want to save the final stats to reuse at next startup.
	 * But if the buffer process failed, it seems best not to (there may
	 * even now be a new collector firing up, and we don't want it to read
1714
	 * a partially-rewritten stats file).
1715
	 */
1716
	if (!PostmasterIsAlive(false))
1717
		pgstat_write_statsfile();
1718 1719 1720 1721 1722 1723
}


/* ----------
 * pgstat_recvbuffer() -
 *
1724
 *	This is the body of the separate buffering process. Its only
1725
 *	purpose is to receive messages from the UDP socket as fast as
1726 1727
 *	possible and forward them over a pipe into the collector itself.
 *	If the collector is slow to absorb messages, they are buffered here.
1728 1729 1730
 * ----------
 */
static void
1731
pgstat_recvbuffer(void)
1732
{
1733 1734
	fd_set		rfds;
	fd_set		wfds;
1735
	struct timeval timeout;
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747
	int			writePipe = pgStatPipe[1];
	int			maxfd;
	int			nready;
	int			len;
	int			xfr;
	int			frm;
	PgStat_Msg	input_buffer;
	char	   *msgbuffer;
	int			msg_send = 0;	/* next send index in buffer */
	int			msg_recv = 0;	/* next receive index */
	int			msg_have = 0;	/* number of bytes stored */
	bool		overflow = false;
1748

1749 1750 1751
	/*
	 * Identify myself via ps
	 */
1752
	init_ps_display("stats buffer process", "", "");
1753 1754
	set_ps_display("");

1755
	/*
1756 1757 1758 1759 1760
	 * We want to die if our child collector process does.	There are two
	 * ways we might notice that it has died: receive SIGCHLD, or get a
	 * write failure on the pipe leading to the child.	We can set SIGPIPE
	 * to kill us here.  Our SIGCHLD handler was already set up before we
	 * forked (must do it that way, else it's a race condition).
1761 1762 1763 1764 1765 1766 1767 1768
	 */
	pqsignal(SIGPIPE, SIG_DFL);
	PG_SETMASK(&UnBlockSig);

	/*
	 * Set the write pipe to nonblock mode, so that we cannot block when
	 * the collector falls behind.
	 */
1769
	if (!pg_set_noblock(writePipe))
1770
		ereport(ERROR,
1771
				(errcode_for_socket_access(),
B
Bruce Momjian 已提交
1772
				 errmsg("could not set statistics collector pipe to nonblocking mode: %m")));
1773

1774 1775 1776
	/*
	 * Allocate the message buffer
	 */
1777
	msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ);
1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788

	/*
	 * Loop forever
	 */
	for (;;)
	{
		FD_ZERO(&rfds);
		FD_ZERO(&wfds);
		maxfd = -1;

		/*
1789 1790
		 * As long as we have buffer space we add the socket to the read
		 * descriptor set.
1791
		 */
1792
		if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
1793 1794 1795
		{
			FD_SET(pgStatSock, &rfds);
			maxfd = pgStatSock;
1796
			overflow = false;
1797 1798 1799
		}
		else
		{
1800
			if (!overflow)
1801
			{
1802 1803
				ereport(LOG,
						(errmsg("statistics buffer is full")));
1804
				overflow = true;
1805 1806 1807 1808
			}
		}

		/*
1809
		 * If we have messages to write out, we add the pipe to the write
1810
		 * descriptor set.
1811 1812 1813
		 */
		if (msg_have > 0)
		{
1814 1815 1816
			FD_SET(writePipe, &wfds);
			if (writePipe > maxfd)
				maxfd = writePipe;
1817 1818 1819
		}

		/*
1820 1821 1822
		 * Wait for some work to do; but not for more than 10 seconds.
		 * (This determines how quickly we will shut down after an
		 * ungraceful postmaster termination; so it needn't be very fast.)
1823
		 */
1824 1825 1826 1827
		timeout.tv_sec = 10;
		timeout.tv_usec = 0;

		nready = select(maxfd + 1, &rfds, &wfds, NULL, &timeout);
1828 1829
		if (nready < 0)
		{
1830 1831
			if (errno == EINTR)
				continue;
1832
			ereport(ERROR,
1833
					(errcode_for_socket_access(),
1834
					 errmsg("select() failed in statistics buffer: %m")));
1835 1836 1837 1838 1839 1840 1841 1842
		}

		/*
		 * If there is a message on the socket, read it and check for
		 * validity.
		 */
		if (FD_ISSET(pgStatSock, &rfds))
		{
1843 1844
			len = recv(pgStatSock, (char *) &input_buffer,
					   sizeof(PgStat_Msg), 0);
1845
			if (len < 0)
1846
				ereport(ERROR,
1847
						(errcode_for_socket_access(),
1848
					   errmsg("could not read statistics message: %m")));
1849 1850 1851 1852 1853 1854 1855 1856 1857 1858

			/*
			 * We ignore messages that are smaller than our common header
			 */
			if (len < sizeof(PgStat_MsgHdr))
				continue;

			/*
			 * The received length must match the length in the header
			 */
1859
			if (input_buffer.msg_hdr.m_size != len)
1860 1861 1862
				continue;

			/*
1863 1864
			 * O.K. - we accept this message.  Copy it to the circular
			 * msgbuffer.
1865
			 */
1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882
			frm = 0;
			while (len > 0)
			{
				xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
				if (xfr > len)
					xfr = len;
				Assert(xfr > 0);
				memcpy(msgbuffer + msg_recv,
					   ((char *) &input_buffer) + frm,
					   xfr);
				msg_recv += xfr;
				if (msg_recv == PGSTAT_RECVBUFFERSZ)
					msg_recv = 0;
				msg_have += xfr;
				frm += xfr;
				len -= xfr;
			}
1883 1884 1885
		}

		/*
1886 1887 1888 1889
		 * If the collector is ready to receive, write some data into his
		 * pipe.  We may or may not be able to write all that we have.
		 *
		 * NOTE: if what we have is less than PIPE_BUF bytes but more than
1890 1891 1892 1893 1894 1895 1896
		 * the space available in the pipe buffer, most kernels will
		 * refuse to write any of it, and will return EAGAIN.  This means
		 * we will busy-loop until the situation changes (either because
		 * the collector caught up, or because more data arrives so that
		 * we have more than PIPE_BUF bytes buffered).	This is not good,
		 * but is there any way around it?	We have no way to tell when
		 * the collector has caught up...
1897
		 */
1898
		if (FD_ISSET(writePipe, &wfds))
1899
		{
1900 1901 1902 1903
			xfr = PGSTAT_RECVBUFFERSZ - msg_send;
			if (xfr > msg_have)
				xfr = msg_have;
			Assert(xfr > 0);
1904
			len = pipewrite(writePipe, msgbuffer + msg_send, xfr);
1905 1906
			if (len < 0)
			{
1907 1908
				if (errno == EINTR || errno == EAGAIN)
					continue;	/* not enough space in pipe */
1909
				ereport(ERROR,
1910
						(errcode_for_socket_access(),
1911
						 errmsg("could not write to statistics collector pipe: %m")));
1912
			}
1913 1914
			/* NB: len < xfr is okay */
			msg_send += len;
1915 1916
			if (msg_send == PGSTAT_RECVBUFFERSZ)
				msg_send = 0;
1917
			msg_have -= len;
1918 1919 1920 1921
		}

		/*
		 * Make sure we forwarded all messages before we check for
1922
		 * postmaster termination.
1923
		 */
1924
		if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
1925 1926 1927
			continue;

		/*
B
Bruce Momjian 已提交
1928 1929
		 * If the postmaster has terminated, we die too.  (This is no
		 * longer the normal exit path, however.)
1930
		 */
1931
		if (!PostmasterIsAlive(true))
1932 1933 1934 1935
			exit(0);
	}
}

1936 1937 1938 1939 1940 1941
/* SIGQUIT signal handler for buffer process */
static void
pgstat_exit(SIGNAL_ARGS)
{
	/*
	 * For now, we just nail the doors shut and get out of town.  It might
B
Bruce Momjian 已提交
1942 1943
	 * be cleaner to allow any pending messages to be sent, but that
	 * creates a tradeoff against speed of exit.
1944
	 */
1945 1946 1947 1948 1949 1950 1951 1952 1953 1954

	/*
	 * If running in bufferer, kill our collector as well. On some broken
	 * win32 systems, it does not shut down automatically because of issues
	 * with socket inheritance.  XXX so why not fix the socket inheritance...
	 */
#ifdef WIN32
	if (pgStatCollectorPid > 0)
		kill(pgStatCollectorPid, SIGQUIT);
#endif
1955 1956 1957 1958
	exit(0);
}

/* SIGCHLD signal handler for buffer process */
1959 1960 1961 1962 1963 1964
static void
pgstat_die(SIGNAL_ARGS)
{
	exit(1);
}

1965 1966 1967 1968

/* ----------
 * pgstat_add_backend() -
 *
1969
 *	Support function to keep our backend list up to date.
1970 1971 1972 1973 1974
 * ----------
 */
static int
pgstat_add_backend(PgStat_MsgHdr *msg)
{
1975 1976
	PgStat_StatBeEntry *beentry;
	PgStat_StatBeDead *deadbe;
1977 1978 1979 1980 1981 1982

	/*
	 * Check that the backend ID is valid
	 */
	if (msg->m_backendid < 1 || msg->m_backendid > MaxBackends)
	{
1983
		ereport(LOG,
B
Bruce Momjian 已提交
1984
			 (errmsg("invalid server process ID %d", msg->m_backendid)));
1985 1986 1987 1988 1989 1990 1991
		return -1;
	}

	/*
	 * Get the slot for this backendid.
	 */
	beentry = &pgStatBeTable[msg->m_backendid - 1];
1992 1993 1994 1995 1996 1997 1998 1999 2000

	/*
	 * If the slot contains the PID of this backend, everything is
	 * fine and we have nothing to do. Note that all the slots are
	 * zero'd out when the collector is started. We assume that a slot
	 * is "empty" iff procpid == 0.
	 */
	if (beentry->procpid > 0 && beentry->procpid == msg->m_procpid)
		return 0;
2001 2002

	/*
2003
	 * Lookup if this backend is known to be dead. This can be caused due
2004
	 * to messages arriving in the wrong order - e.g. postmaster's BETERM
2005 2006 2007
	 * message might have arrived before we received all the backends
	 * stats messages, or even a new backend with the same backendid was
	 * faster in sending his BESTART.
2008 2009 2010
	 *
	 * If the backend is known to be dead, we ignore this add.
	 */
2011 2012
	deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
											   (void *) &(msg->m_procpid),
2013 2014
											   HASH_FIND, NULL);
	if (deadbe)
2015 2016 2017
		return 1;

	/*
2018 2019
	 * Backend isn't known to be dead. If it's slot is currently used, we
	 * have to kick out the old backend.
2020
	 */
2021
	if (beentry->procpid > 0)
2022 2023
		pgstat_sub_backend(beentry->procpid);

2024 2025 2026 2027
	/* Must be able to distinguish between empty and non-empty slots */
	Assert(msg->m_procpid > 0);

	/* Put this new backend into the slot */
2028
	beentry->procpid = msg->m_procpid;
2029 2030
	beentry->start_timestamp = GetCurrentTimestamp();
	beentry->activity_start_timestamp = 0;
2031
	beentry->activity[0] = '\0';
2032 2033

	/*
2034 2035 2036 2037 2038 2039
	 * We can't initialize the rest of the data in this slot until we
	 * see the BESTART message. Therefore, we set the database and
	 * user to sentinel values, to indicate "undefined". There is no
	 * easy way to do this for the client address, so make sure to
	 * check that the database or user are defined before accessing
	 * the client address.
2040
	 */
2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051
	beentry->userid = InvalidOid;
	beentry->databaseid = InvalidOid;

	return 0;
}

/*
 * Lookup the hash table entry for the specified database. If no hash
 * table entry exists, initialize it.
 */
static PgStat_StatDBEntry *
2052
pgstat_get_db_entry(Oid databaseid)
2053 2054 2055 2056 2057 2058 2059 2060
{
	PgStat_StatDBEntry *result;
	bool found;

	/* Lookup or create the hash table entry for this database */
	result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												&databaseid,
												HASH_ENTER, &found);
2061

2062
	/* If not found, initialize the new one. */
2063 2064
	if (!found)
	{
2065
		HASHCTL		hash_ctl;
2066

2067 2068 2069 2070 2071 2072
		result->tables = NULL;
		result->n_xact_commit = 0;
		result->n_xact_rollback = 0;
		result->n_blocks_fetched = 0;
		result->n_blocks_hit = 0;
		result->destroy = 0;
2073 2074

		memset(&hash_ctl, 0, sizeof(hash_ctl));
2075
		hash_ctl.keysize = sizeof(Oid);
2076
		hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2077
		hash_ctl.hash = oid_hash;
2078
		result->tables = hash_create("Per-database table",
2079 2080 2081
									  PGSTAT_TAB_HASH_SIZE,
									  &hash_ctl,
									  HASH_ELEM | HASH_FUNCTION);
2082 2083
	}

2084
	return result;
2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095
}

/* ----------
 * pgstat_sub_backend() -
 *
 *	Remove a backend from the actual backends list.
 * ----------
 */
static void
pgstat_sub_backend(int procpid)
{
2096 2097 2098
	int			i;
	PgStat_StatBeDead *deadbe;
	bool		found;
2099 2100

	/*
2101 2102
	 * Search in the known-backends table for the slot containing this
	 * PID.
2103 2104 2105
	 */
	for (i = 0; i < MaxBackends; i++)
	{
2106
		if (pgStatBeTable[i].procpid == procpid)
2107 2108 2109
		{
			/*
			 * That's him. Add an entry to the known to be dead backends.
2110 2111 2112 2113 2114 2115 2116
			 * Due to possible misorder in the arrival of UDP packets it's
			 * possible that even if we know the backend is dead, there
			 * could still be messages queued that arrive later. Those
			 * messages must not cause our number of backends statistics
			 * to get screwed up, so we remember for a couple of seconds
			 * that this PID is dead and ignore them (only the counting of
			 * backends, not the table access stats they sent).
2117
			 */
2118 2119
			deadbe = (PgStat_StatBeDead *) hash_search(pgStatBeDead,
													   (void *) &procpid,
2120 2121
													   HASH_ENTER,
													   &found);
2122

2123 2124 2125 2126 2127 2128 2129 2130 2131
			if (!found)
			{
				deadbe->backendid = i + 1;
				deadbe->destroy = PGSTAT_DESTROY_COUNT;
			}

			/*
			 * Declare the backend slot empty.
			 */
2132
			pgStatBeTable[i].procpid = 0;
2133 2134 2135 2136 2137
			return;
		}
	}

	/*
2138 2139
	 * No big problem if not found. This can happen if UDP messages arrive
	 * out of order here.
2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152
	 */
}


/* ----------
 * pgstat_write_statsfile() -
 *
 *	Tell the news.
 * ----------
 */
static void
pgstat_write_statsfile(void)
{
2153 2154 2155 2156 2157 2158 2159
	HASH_SEQ_STATUS hstat;
	HASH_SEQ_STATUS tstat;
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	PgStat_StatBeDead *deadbe;
	FILE	   *fpout;
	int			i;
2160 2161

	/*
2162
	 * Open the statistics temp file to write out the current values.
2163
	 */
P
Peter Eisentraut 已提交
2164
	fpout = fopen(pgStat_tmpfname, PG_BINARY_W);
2165 2166
	if (fpout == NULL)
	{
2167 2168
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2169 2170
			errmsg("could not open temporary statistics file \"%s\": %m",
				   pgStat_tmpfname)));
2171 2172 2173 2174 2175 2176 2177
		return;
	}

	/*
	 * Walk through the database table.
	 */
	hash_seq_init(&hstat, pgStatDBHash);
2178
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
2179 2180
	{
		/*
2181 2182
		 * If this database is marked destroyed, count down and do so if
		 * it reaches 0.
2183 2184 2185 2186 2187 2188 2189 2190
		 */
		if (dbentry->destroy > 0)
		{
			if (--(dbentry->destroy) == 0)
			{
				if (dbentry->tables != NULL)
					hash_destroy(dbentry->tables);

2191
				if (hash_search(pgStatDBHash,
2192 2193
								(void *) &(dbentry->databaseid),
								HASH_REMOVE, NULL) == NULL)
2194
					ereport(ERROR,
2195 2196
							(errmsg("database hash table corrupted "
									"during cleanup --- abort")));
2197
			}
2198

2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214
			/*
			 * Don't include statistics for it.
			 */
			continue;
		}

		/*
		 * Write out the DB line including the number of life backends.
		 */
		fputc('D', fpout);
		fwrite(dbentry, sizeof(PgStat_StatDBEntry), 1, fpout);

		/*
		 * Walk through the databases access stats per table.
		 */
		hash_seq_init(&tstat, dbentry->tables);
2215
		while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
2216 2217
		{
			/*
2218 2219
			 * If table entry marked for destruction, same as above for
			 * the database entry.
2220 2221 2222 2223 2224
			 */
			if (tabentry->destroy > 0)
			{
				if (--(tabentry->destroy) == 0)
				{
2225
					if (hash_search(dbentry->tables,
2226
									(void *) &(tabentry->tableid),
2227
									HASH_REMOVE, NULL) == NULL)
2228
					{
2229
						ereport(ERROR,
2230 2231 2232 2233
								(errmsg("tables hash table for "
										"database %u corrupted during "
										"cleanup --- abort",
										dbentry->databaseid)));
2234 2235 2236 2237 2238 2239
					}
				}
				continue;
			}

			/*
2240 2241
			 * At least we think this is still a life table. Print it's
			 * access stats.
2242 2243 2244 2245
			 */
			fputc('T', fpout);
			fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
		}
2246

2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261
		/*
		 * Mark the end of this DB
		 */
		fputc('d', fpout);
	}

	/*
	 * Write out the known running backends to the stats file.
	 */
	i = MaxBackends;
	fputc('M', fpout);
	fwrite(&i, sizeof(i), 1, fpout);

	for (i = 0; i < MaxBackends; i++)
	{
2262
		if (pgStatBeTable[i].procpid > 0)
2263 2264 2265 2266 2267 2268 2269
		{
			fputc('B', fpout);
			fwrite(&pgStatBeTable[i], sizeof(PgStat_StatBeEntry), 1, fpout);
		}
	}

	/*
2270
	 * No more output to be done. Close the temp file and replace the old
2271
	 * pgstat.stat with it.
2272 2273 2274 2275
	 */
	fputc('E', fpout);
	if (fclose(fpout) < 0)
	{
2276 2277
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
2278 2279
		   errmsg("could not close temporary statistics file \"%s\": %m",
				  pgStat_tmpfname)));
2280 2281 2282 2283 2284
	}
	else
	{
		if (rename(pgStat_tmpfname, pgStat_fname) < 0)
		{
2285 2286
			ereport(LOG,
					(errcode_for_file_access(),
2287
					 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
2288
							pgStat_tmpfname, pgStat_fname)));
2289 2290 2291 2292 2293 2294 2295
		}
	}

	/*
	 * Clear out the dead backends table
	 */
	hash_seq_init(&hstat, pgStatBeDead);
2296
	while ((deadbe = (PgStat_StatBeDead *) hash_seq_search(&hstat)) != NULL)
2297 2298
	{
		/*
2299 2300
		 * Count down the destroy delay and remove entries where it
		 * reaches 0.
2301 2302 2303
		 */
		if (--(deadbe->destroy) <= 0)
		{
2304 2305 2306
			if (hash_search(pgStatBeDead,
							(void *) &(deadbe->procpid),
							HASH_REMOVE, NULL) == NULL)
2307
			{
2308
				ereport(ERROR,
B
Bruce Momjian 已提交
2309 2310
					  (errmsg("dead-server-process hash table corrupted "
							  "during cleanup --- abort")));
2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325
			}
		}
	}
}


/* ----------
 * pgstat_read_statsfile() -
 *
 *	Reads in an existing statistics collector and initializes the
 *	databases hash table (who's entries point to the tables hash tables)
 *	and the current backend table.
 * ----------
 */
static void
2326 2327
pgstat_read_statsfile(HTAB **dbhash, Oid onlydb,
					  PgStat_StatBeEntry **betab, int *numbackends)
2328
{
2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346
	PgStat_StatDBEntry *dbentry;
	PgStat_StatDBEntry dbbuf;
	PgStat_StatTabEntry *tabentry;
	PgStat_StatTabEntry tabbuf;
	HASHCTL		hash_ctl;
	HTAB	   *tabhash = NULL;
	FILE	   *fpin;
	int			maxbackends = 0;
	int			havebackends = 0;
	bool		found;
	MemoryContext use_mcxt;
	int			mcxt_flags;

	/*
	 * If running in the collector we use the DynaHashCxt memory context.
	 * If running in a backend, we use the TopTransactionContext instead,
	 * so the caller must only know the last XactId when this call
	 * happened to know if his tables are still valid or already gone!
2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362
	 */
	if (pgStatRunningInCollector)
	{
		use_mcxt = NULL;
		mcxt_flags = 0;
	}
	else
	{
		use_mcxt = TopTransactionContext;
		mcxt_flags = HASH_CONTEXT;
	}

	/*
	 * Create the DB hashtable
	 */
	memset(&hash_ctl, 0, sizeof(hash_ctl));
2363
	hash_ctl.keysize = sizeof(Oid);
2364
	hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
2365
	hash_ctl.hash = oid_hash;
2366 2367 2368
	hash_ctl.hcxt = use_mcxt;
	*dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
						  HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2369 2370

	/*
2371 2372
	 * Initialize the number of known backends to zero, just in case we do
	 * a silent error return below.
2373 2374 2375 2376 2377 2378
	 */
	if (numbackends != NULL)
		*numbackends = 0;
	if (betab != NULL)
		*betab = NULL;

2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390
	/*
	 * In EXEC_BACKEND case, we won't have inherited pgStat_fname from
	 * postmaster, so compute it first time through.
	 */
#ifdef EXEC_BACKEND
	if (pgStat_fname[0] == '\0')
	{
		Assert(DataDir != NULL);
		snprintf(pgStat_fname, MAXPGPATH, PGSTAT_STAT_FILENAME, DataDir);
	}
#endif

2391 2392
	/*
	 * Try to open the status file. If it doesn't exist, the backends
2393 2394
	 * simply return zero for anything and the collector simply starts
	 * from scratch with empty counters.
2395
	 */
2396
	if ((fpin = AllocateFile(pgStat_fname, PG_BINARY_R)) == NULL)
2397 2398 2399
		return;

	/*
2400 2401
	 * We found an existing collector stats file. Read it and put all the
	 * hashtable entries into place.
2402 2403 2404 2405 2406
	 */
	for (;;)
	{
		switch (fgetc(fpin))
		{
2407 2408 2409 2410 2411
				/*
				 * 'D'	A PgStat_StatDBEntry struct describing a database
				 * follows. Subsequently, zero to many 'T' entries will
				 * follow until a 'd' is encountered.
				 */
2412 2413 2414
			case 'D':
				if (fread(&dbbuf, 1, sizeof(dbbuf), fpin) != sizeof(dbbuf))
				{
2415 2416
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2417
					goto done;
2418 2419 2420 2421 2422
				}

				/*
				 * Add to the DB hash
				 */
2423
				dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
2424
											  (void *) &dbbuf.databaseid,
2425 2426
															 HASH_ENTER,
															 &found);
2427 2428
				if (found)
				{
2429 2430
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2431
					goto done;
2432 2433 2434
				}

				memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
2435 2436 2437
				dbentry->tables = NULL;
				dbentry->destroy = 0;
				dbentry->n_backends = 0;
2438 2439 2440 2441 2442 2443 2444 2445

				/*
				 * Don't collect tables if not the requested DB
				 */
				if (onlydb != InvalidOid && onlydb != dbbuf.databaseid)
					break;

				memset(&hash_ctl, 0, sizeof(hash_ctl));
2446
				hash_ctl.keysize = sizeof(Oid);
2447
				hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2448
				hash_ctl.hash = oid_hash;
2449
				hash_ctl.hcxt = use_mcxt;
2450 2451 2452 2453
				dbentry->tables = hash_create("Per-database table",
											  PGSTAT_TAB_HASH_SIZE,
											  &hash_ctl,
											  HASH_ELEM | HASH_FUNCTION | mcxt_flags);
2454 2455 2456 2457 2458 2459 2460 2461

				/*
				 * Arrange that following 'T's add entries to this
				 * databases tables hash table.
				 */
				tabhash = dbentry->tables;
				break;

2462 2463 2464
				/*
				 * 'd'	End of this database.
				 */
2465 2466 2467 2468
			case 'd':
				tabhash = NULL;
				break;

2469 2470 2471
				/*
				 * 'T'	A PgStat_StatTabEntry follows.
				 */
2472 2473 2474
			case 'T':
				if (fread(&tabbuf, 1, sizeof(tabbuf), fpin) != sizeof(tabbuf))
				{
2475 2476
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2477
					goto done;
2478 2479 2480 2481 2482 2483 2484 2485
				}

				/*
				 * Skip if table belongs to a not requested database.
				 */
				if (tabhash == NULL)
					break;

2486
				tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
2487 2488
												(void *) &tabbuf.tableid,
													 HASH_ENTER, &found);
2489 2490 2491

				if (found)
				{
2492 2493
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2494
					goto done;
2495 2496 2497 2498 2499
				}

				memcpy(tabentry, &tabbuf, sizeof(tabbuf));
				break;

2500 2501 2502
				/*
				 * 'M'	The maximum number of backends to expect follows.
				 */
2503 2504
			case 'M':
				if (betab == NULL || numbackends == NULL)
2505
					goto done;
2506
				if (fread(&maxbackends, 1, sizeof(maxbackends), fpin) !=
2507
					sizeof(maxbackends))
2508
				{
2509 2510
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2511
					goto done;
2512 2513
				}
				if (maxbackends == 0)
2514
					goto done;
2515 2516

				/*
2517 2518
				 * Allocate space (in TopTransactionContext too) for the
				 * backend table.
2519 2520
				 */
				if (use_mcxt == NULL)
2521
					*betab = (PgStat_StatBeEntry *) palloc(
2522
							   sizeof(PgStat_StatBeEntry) * maxbackends);
2523
				else
2524 2525 2526
					*betab = (PgStat_StatBeEntry *) MemoryContextAlloc(
																use_mcxt,
							   sizeof(PgStat_StatBeEntry) * maxbackends);
2527 2528
				break;

2529 2530 2531
				/*
				 * 'B'	A PgStat_StatBeEntry follows.
				 */
2532
			case 'B':
2533 2534
				if (betab == NULL || numbackends == NULL || *betab == NULL)
					goto done;
2535

2536 2537 2538
				/*
				 * Read it directly into the table.
				 */
2539 2540 2541
				if (fread(&(*betab)[havebackends], 1,
						  sizeof(PgStat_StatBeEntry), fpin) !=
					sizeof(PgStat_StatBeEntry))
2542
				{
2543 2544
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted pgstat.stat file")));
2545
					goto done;
2546 2547 2548 2549 2550
				}

				/*
				 * Count backends per database here.
				 */
2551 2552 2553
				dbentry = (PgStat_StatDBEntry *) hash_search(*dbhash,
						   (void *) &((*betab)[havebackends].databaseid),
														HASH_FIND, NULL);
2554
				if (dbentry)
2555 2556 2557 2558 2559 2560
					dbentry->n_backends++;

				havebackends++;
				if (numbackends != 0)
					*numbackends = havebackends;
				if (havebackends >= maxbackends)
2561 2562
					goto done;

2563 2564
				break;

2565 2566 2567
				/*
				 * 'E'	The EOF marker of a complete stats file.
				 */
2568
			case 'E':
2569
				goto done;
2570 2571

			default:
2572 2573
				ereport(pgStatRunningInCollector ? LOG : WARNING,
						(errmsg("corrupted pgstat.stat file")));
2574
				goto done;
2575 2576 2577
		}
	}

2578 2579
done:
	FreeFile(fpin);
2580 2581
}

2582 2583 2584 2585 2586 2587 2588 2589 2590 2591
/*
 * If not done for this transaction, read the statistics collector
 * stats file into some hash tables.
 *
 * Because we store the hash tables in TopTransactionContext, the result
 * is good for the entire current main transaction.
 */
static void
backend_read_statsfile(void)
{
B
Bruce Momjian 已提交
2592
	TransactionId topXid = GetTopTransactionId();
2593 2594 2595 2596 2597 2598 2599 2600 2601 2602

	if (!TransactionIdEquals(pgStatDBHashXact, topXid))
	{
		Assert(!pgStatRunningInCollector);
		pgstat_read_statsfile(&pgStatDBHash, MyDatabaseId,
							  &pgStatBeTable, &pgStatNumBackends);
		pgStatDBHashXact = topXid;
	}
}

2603 2604 2605 2606

/* ----------
 * pgstat_recv_bestart() -
 *
2607
 *	Process a backend startup message.
2608 2609 2610 2611 2612
 * ----------
 */
static void
pgstat_recv_bestart(PgStat_MsgBestart *msg, int len)
{
2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626
	PgStat_StatBeEntry *entry;

	/*
	 * If the backend is known dead, we ignore the message -- we don't
	 * want to update the backend entry's state since this BESTART
	 * message refers to an old, dead backend
	 */
	if (pgstat_add_backend(&msg->m_hdr) != 0)
		return;

	entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);
	entry->userid = msg->m_userid;
	memcpy(&entry->clientaddr, &msg->m_clientaddr, sizeof(entry->clientaddr));
	entry->databaseid = msg->m_databaseid;
2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651
}


/* ----------
 * pgstat_recv_beterm() -
 *
 *	Process a backend termination message.
 * ----------
 */
static void
pgstat_recv_beterm(PgStat_MsgBeterm *msg, int len)
{
	pgstat_sub_backend(msg->m_hdr.m_procpid);
}


/* ----------
 * pgstat_recv_activity() -
 *
 *	Remember what the backend is doing.
 * ----------
 */
static void
pgstat_recv_activity(PgStat_MsgActivity *msg, int len)
{
2652 2653
	PgStat_StatBeEntry *entry;

2654
	/*
2655
	 * Here we check explicitly for 0 return, since we don't want to
2656
	 * mangle the activity of an active backend by a delayed packet from a
2657
	 * dead one.
2658 2659 2660
	 */
	if (pgstat_add_backend(&msg->m_hdr) != 0)
		return;
2661

2662 2663
	entry = &(pgStatBeTable[msg->m_hdr.m_backendid - 1]);

2664
	StrNCpy(entry->activity, msg->m_what, PGSTAT_ACTIVITY_SIZE);
2665

2666
	entry->activity_start_timestamp = GetCurrentTimestamp();
2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678
}


/* ----------
 * pgstat_recv_tabstat() -
 *
 *	Count what the backend has done.
 * ----------
 */
static void
pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
{
2679 2680 2681 2682 2683
	PgStat_TableEntry *tabmsg = &(msg->m_entry[0]);
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	int			i;
	bool		found;
2684 2685 2686 2687 2688 2689 2690

	/*
	 * Make sure the backend is counted for.
	 */
	if (pgstat_add_backend(&msg->m_hdr) < 0)
		return;

2691
	dbentry = pgstat_get_db_entry(msg->m_databaseid);
2692 2693

	/*
2694 2695
	 * If the database is marked for destroy, this is a delayed UDP packet
	 * and not worth being counted.
2696 2697 2698 2699
	 */
	if (dbentry->destroy > 0)
		return;

2700 2701
	dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
	dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
2702 2703 2704 2705 2706 2707

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
2708
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2709 2710
											  (void *) &(tabmsg[i].t_id),
													 HASH_ENTER, &found);
2711 2712 2713 2714

		if (!found)
		{
			/*
2715 2716
			 * If it's a new table entry, initialize counters to the
			 * values we just got.
2717
			 */
2718 2719 2720 2721 2722 2723 2724 2725
			tabentry->numscans = tabmsg[i].t_numscans;
			tabentry->tuples_returned = tabmsg[i].t_tuples_returned;
			tabentry->tuples_fetched = tabmsg[i].t_tuples_fetched;
			tabentry->tuples_inserted = tabmsg[i].t_tuples_inserted;
			tabentry->tuples_updated = tabmsg[i].t_tuples_updated;
			tabentry->tuples_deleted = tabmsg[i].t_tuples_deleted;
			tabentry->blocks_fetched = tabmsg[i].t_blocks_fetched;
			tabentry->blocks_hit = tabmsg[i].t_blocks_hit;
2726 2727 2728 2729 2730 2731 2732 2733

			tabentry->destroy = 0;
		}
		else
		{
			/*
			 * Otherwise add the values to the existing entry.
			 */
2734 2735 2736 2737 2738 2739 2740 2741
			tabentry->numscans += tabmsg[i].t_numscans;
			tabentry->tuples_returned += tabmsg[i].t_tuples_returned;
			tabentry->tuples_fetched += tabmsg[i].t_tuples_fetched;
			tabentry->tuples_inserted += tabmsg[i].t_tuples_inserted;
			tabentry->tuples_updated += tabmsg[i].t_tuples_updated;
			tabentry->tuples_deleted += tabmsg[i].t_tuples_deleted;
			tabentry->blocks_fetched += tabmsg[i].t_blocks_fetched;
			tabentry->blocks_hit += tabmsg[i].t_blocks_hit;
2742 2743 2744 2745 2746
		}

		/*
		 * And add the block IO to the database entry.
		 */
2747 2748
		dbentry->n_blocks_fetched += tabmsg[i].t_blocks_fetched;
		dbentry->n_blocks_hit += tabmsg[i].t_blocks_hit;
2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761
	}
}


/* ----------
 * pgstat_recv_tabpurge() -
 *
 *	Arrange for dead table removal.
 * ----------
 */
static void
pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
{
2762 2763 2764
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	int			i;
2765 2766 2767 2768 2769 2770 2771

	/*
	 * Make sure the backend is counted for.
	 */
	if (pgstat_add_backend(&msg->m_hdr) < 0)
		return;

2772
	dbentry = pgstat_get_db_entry(msg->m_databaseid);
2773 2774

	/*
2775 2776
	 * If the database is marked for destroy, this is a delayed UDP packet
	 * and the tables will go away at DB destruction.
2777 2778 2779 2780 2781 2782 2783 2784 2785
	 */
	if (dbentry->destroy > 0)
		return;

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
2786
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2787 2788
										   (void *) &(msg->m_tableid[i]),
													   HASH_FIND, NULL);
2789
		if (tabentry)
2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803
			tabentry->destroy = PGSTAT_DESTROY_COUNT;
	}
}


/* ----------
 * pgstat_recv_dropdb() -
 *
 *	Arrange for dead database removal
 * ----------
 */
static void
pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
{
2804
	PgStat_StatDBEntry *dbentry;
2805 2806 2807 2808 2809 2810 2811 2812 2813 2814

	/*
	 * Make sure the backend is counted for.
	 */
	if (pgstat_add_backend(&msg->m_hdr) < 0)
		return;

	/*
	 * Lookup the database in the hashtable.
	 */
2815
	dbentry = pgstat_get_db_entry(msg->m_databaseid);
2816 2817 2818 2819 2820 2821 2822 2823 2824

	/*
	 * Mark the database for destruction.
	 */
	dbentry->destroy = PGSTAT_DESTROY_COUNT;
}


/* ----------
2825
 * pgstat_recv_resetcounter() -
2826
 *
2827
 *	Reset the statistics for the specified database.
2828 2829 2830 2831 2832
 * ----------
 */
static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
{
2833 2834
	HASHCTL		hash_ctl;
	PgStat_StatDBEntry *dbentry;
2835 2836 2837 2838 2839 2840 2841 2842 2843 2844

	/*
	 * Make sure the backend is counted for.
	 */
	if (pgstat_add_backend(&msg->m_hdr) < 0)
		return;

	/*
	 * Lookup the database in the hashtable.
	 */
2845
	dbentry = pgstat_get_db_entry(msg->m_databaseid);
2846 2847

	/*
2848 2849
	 * We simply throw away all the database's table entries by
	 * recreating a new hash table for them.
2850 2851 2852 2853
	 */
	if (dbentry->tables != NULL)
		hash_destroy(dbentry->tables);

2854 2855 2856 2857 2858 2859
	dbentry->tables = NULL;
	dbentry->n_xact_commit = 0;
	dbentry->n_xact_rollback = 0;
	dbentry->n_blocks_fetched = 0;
	dbentry->n_blocks_hit = 0;
	dbentry->destroy = 0;
2860 2861

	memset(&hash_ctl, 0, sizeof(hash_ctl));
2862
	hash_ctl.keysize = sizeof(Oid);
2863
	hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
2864
	hash_ctl.hash = oid_hash;
2865 2866 2867 2868
	dbentry->tables = hash_create("Per-database table",
								  PGSTAT_TAB_HASH_SIZE,
								  &hash_ctl,
								  HASH_ELEM | HASH_FUNCTION);
2869
}