pgstat.c 135.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/* ----------
 * pgstat.c
 *
 *	All the statistics collector stuff hacked up in one big, ugly file.
 *
 *	TODO:	- Separate collector, postmaster and backend stuff
 *			  into different files.
 *
 *			- Add some automatic call for pgstat vacuuming.
 *
 *			- Add a pgstat config column to pg_database, so this
12
 *			  entire thing can be enabled/disabled on a per db basis.
13
 *
B
Bruce Momjian 已提交
14
 *	Copyright (c) 2001-2015, PostgreSQL Global Development Group
15
 *
16
 *	src/backend/postmaster/pgstat.c
17 18
 * ----------
 */
P
Peter Eisentraut 已提交
19 20
#include "postgres.h"

21 22 23 24 25
#include <unistd.h>
#include <fcntl.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/socket.h>
B
Bruce Momjian 已提交
26
#include <netdb.h>
27 28 29
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
30
#include <time.h>
31

32 33
#include "pgstat.h"

34
#include "access/heapam.h"
35
#include "access/htup_details.h"
36
#include "access/transam.h"
37
#include "access/twophase_rmgr.h"
38
#include "access/xact.h"
39
#include "catalog/pg_database.h"
40
#include "catalog/pg_proc.h"
41
#include "lib/ilist.h"
42
#include "libpq/ip.h"
B
Bruce Momjian 已提交
43
#include "libpq/libpq.h"
44
#include "libpq/pqsignal.h"
45
#include "mb/pg_wchar.h"
46
#include "miscadmin.h"
47
#include "pg_trace.h"
48
#include "postmaster/autovacuum.h"
49
#include "postmaster/fork_process.h"
50
#include "postmaster/postmaster.h"
51
#include "storage/proc.h"
52
#include "storage/backendid.h"
53
#include "storage/dsm.h"
54
#include "storage/fd.h"
55
#include "storage/ipc.h"
56
#include "storage/latch.h"
57
#include "storage/pg_shmem.h"
58
#include "storage/procsignal.h"
59
#include "storage/sinvaladt.h"
60
#include "utils/ascii.h"
61
#include "utils/guc.h"
62
#include "utils/memutils.h"
63
#include "utils/ps_status.h"
64
#include "utils/rel.h"
65
#include "utils/snapmgr.h"
66
#include "utils/timestamp.h"
67
#include "utils/tqual.h"
68 69


70 71 72 73
/* ----------
 * Timer definitions.
 * ----------
 */
74 75 76
#define PGSTAT_STAT_INTERVAL	500		/* Minimum time between stats file
										 * updates; in milliseconds. */

77 78
#define PGSTAT_RETRY_DELAY		10		/* How long to wait between checks for
										 * a new file; in milliseconds. */
79

80
#define PGSTAT_MAX_WAIT_TIME	10000	/* Maximum time to wait for a stats
81
										 * file update; in milliseconds. */
82

83 84 85
#define PGSTAT_INQ_INTERVAL		640		/* How often to ping the collector for
										 * a new file; in milliseconds. */

86 87 88
#define PGSTAT_RESTART_INTERVAL 60		/* How often to attempt to restart a
										 * failed statistics collector; in
										 * seconds. */
89

90
#define PGSTAT_POLL_LOOP_COUNT	(PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
91
#define PGSTAT_INQ_LOOP_COUNT	(PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
92

93 94 95 96 97 98 99

/* ----------
 * The initial size hints for the hash tables used in the collector.
 * ----------
 */
#define PGSTAT_DB_HASH_SIZE		16
#define PGSTAT_TAB_HASH_SIZE	512
100
#define PGSTAT_FUNCTION_HASH_SIZE	512
101 102


103
/* ----------
104
 * GUC parameters
105 106
 * ----------
 */
107 108
bool		pgstat_track_activities = false;
bool		pgstat_track_counts = false;
109
int			pgstat_track_functions = TRACK_FUNC_OFF;
110
int			pgstat_track_activity_query_size = 1024;
111

112 113 114 115
/* ----------
 * Built from GUC parameter
 * ----------
 */
116
char	   *pgstat_stat_directory = NULL;
117 118 119
char	   *pgstat_stat_filename = NULL;
char	   *pgstat_stat_tmpname = NULL;

120 121 122 123 124 125 126
/*
 * BgWriter global statistics counters (unused in other processes).
 * Stored directly in a stats message structure so it can be sent
 * without needing to copy things around.  We assume this inits to zeroes.
 */
PgStat_MsgBgWriter BgWriterStats;

127 128 129 130
/* ----------
 * Local data
 * ----------
 */
131
NON_EXEC_STATIC pgsocket pgStatSock = PGINVALID_SOCKET;
132

B
Bruce Momjian 已提交
133
static struct sockaddr_storage pgStatAddr;
134

135
static time_t last_pgstat_start_time;
136

137
static bool pgStatRunningInCollector = false;
138

139
/*
140 141
 * Structures in which backends store per-table info that's waiting to be
 * sent to the collector.
142
 *
143 144 145 146 147 148 149
 * NOTE: once allocated, TabStatusArray structures are never moved or deleted
 * for the life of the backend.  Also, we zero out the t_id fields of the
 * contained PgStat_TableStatus structs whenever they are not actively in use.
 * This allows relcache pgstat_info pointers to be treated as long-lived data,
 * avoiding repeated searches in pgstat_initstats() when a relation is
 * repeatedly opened during a transaction.
 */
B
Bruce Momjian 已提交
150
#define TABSTAT_QUANTUM		100 /* we alloc this many at a time */
151 152

typedef struct TabStatusArray
153
{
154
	struct TabStatusArray *tsa_next;	/* link to next array, if any */
B
Bruce Momjian 已提交
155
	int			tsa_used;		/* # entries currently used */
156
	PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM];	/* per-table data */
157
} TabStatusArray;
158 159

static TabStatusArray *pgStatTabList = NULL;
B
Bruce Momjian 已提交
160

161 162 163 164 165 166
/*
 * Backends store per-function info that's waiting to be sent to the collector
 * in this hash table (indexed by function OID).
 */
static HTAB *pgStatFunctions = NULL;

167 168 169 170 171 172
/*
 * Indicates if backend has some function stats that it hasn't yet
 * sent to the collector.
 */
static bool have_function_stats = false;

173 174 175 176 177 178 179 180 181
/*
 * Tuple insertion/deletion counts for an open transaction can't be propagated
 * into PgStat_TableStatus counters until we know if it is going to commit
 * or abort.  Hence, we keep these counts in per-subxact structs that live
 * in TopTransactionContext.  This data structure is designed on the assumption
 * that subxacts won't usually modify very many tables.
 */
typedef struct PgStat_SubXactStatus
{
B
Bruce Momjian 已提交
182
	int			nest_level;		/* subtransaction nest level */
183 184
	struct PgStat_SubXactStatus *prev;	/* higher-level subxact if any */
	PgStat_TableXactStatus *first;		/* head of list for this subxact */
185
} PgStat_SubXactStatus;
186

187
static PgStat_SubXactStatus *pgStatXactStack = NULL;
188

189 190
static int	pgStatXactCommit = 0;
static int	pgStatXactRollback = 0;
191 192
PgStat_Counter pgStatBlockReadTime = 0;
PgStat_Counter pgStatBlockWriteTime = 0;
193

194 195 196
/* Record that's written to 2PC state file when pgstat state is persisted */
typedef struct TwoPhasePgStatRecord
{
B
Bruce Momjian 已提交
197
	PgStat_Counter tuples_inserted;		/* tuples inserted in xact */
198
	PgStat_Counter tuples_updated;		/* tuples updated in xact */
B
Bruce Momjian 已提交
199 200 201
	PgStat_Counter tuples_deleted;		/* tuples deleted in xact */
	Oid			t_id;			/* table's OID */
	bool		t_shared;		/* is it a shared catalog? */
202
} TwoPhasePgStatRecord;
203 204 205 206

/*
 * Info about current "snapshot" of stats file
 */
207
static MemoryContext pgStatLocalContext = NULL;
208
static HTAB *pgStatDBHash = NULL;
209
static LocalPgBackendStatus *localBackendStatusTable = NULL;
210
static int	localNumBackends = 0;
211

212 213 214 215 216
/*
 * Cluster wide statistics, kept in the stats collector.
 * Contains statistics that are not collected per database
 * or per table.
 */
217
static PgStat_ArchiverStats archiverStats;
218 219
static PgStat_GlobalStats globalStats;

220 221 222 223 224 225 226
/* Write request info for each database */
typedef struct DBWriteRequest
{
	Oid			databaseid;		/* OID of the database to write */
	TimestampTz request_time;	/* timestamp of the last write request */
	slist_node	next;
} DBWriteRequest;
227

228 229
/* Latest statistics request times from backends */
static slist_head last_statrequests = SLIST_STATIC_INIT(last_statrequests);
230

B
Bruce Momjian 已提交
231
static volatile bool need_exit = false;
232
static volatile bool got_SIGHUP = false;
233

234 235 236 237 238 239 240
/*
 * Total time charged to functions so far in the current backend.
 * We use this to help separate "self" and "other" time charges.
 * (We assume this initializes to zero.)
 */
static instr_time total_func_time;

241

242 243 244 245
/* ----------
 * Local function forward declarations
 * ----------
 */
246
#ifdef EXEC_BACKEND
247
static pid_t pgstat_forkexec(void);
248
#endif
249

250
NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) __attribute__((noreturn));
251
static void pgstat_exit(SIGNAL_ARGS);
252
static void pgstat_beshutdown_hook(int code, Datum arg);
253
static void pgstat_sighup_handler(SIGNAL_ARGS);
254

255
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
256
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
B
Bruce Momjian 已提交
257
					 Oid tableoid, bool create);
258 259 260 261
static void pgstat_write_statsfiles(bool permanent, bool allDbs);
static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
262
static void backend_read_statsfile(void);
263
static void pgstat_read_current_status(void);
264

265 266 267
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);

268
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
269
static void pgstat_send_funcstats(void);
270
static HTAB *pgstat_collect_oids(Oid catalogid);
271

272 273
static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);

274 275
static void pgstat_setup_memcxt(void);

276
static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
277 278
static void pgstat_send(void *msg, int len);

279
static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
280 281 282 283
static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
284
static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
285
static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
286 287 288
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
289
static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
290
static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
291 292
static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
293
static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
294
static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
295
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
296 297 298 299 300 301 302 303 304 305

/* ------------------------------------------------------------
 * Public functions called from postmaster follow
 * ------------------------------------------------------------
 */

/* ----------
 * pgstat_init() -
 *
 *	Called from postmaster at startup. Create the resources required
306 307 308
 *	by the statistics collector process.  If unable to do so, do not
 *	fail --- better to let the postmaster start with stats collection
 *	disabled.
309 310
 * ----------
 */
311
void
312 313
pgstat_init(void)
{
B
Bruce Momjian 已提交
314 315 316 317
	ACCEPT_TYPE_ARG3 alen;
	struct addrinfo *addrs = NULL,
			   *addr,
				hints;
B
Bruce Momjian 已提交
318
	int			ret;
B
Bruce Momjian 已提交
319
	fd_set		rset;
320
	struct timeval tv;
B
Bruce Momjian 已提交
321 322
	char		test_byte;
	int			sel_res;
323
	int			tries = 0;
B
Bruce Momjian 已提交
324

325
#define TESTBYTEVAL ((char) 199)
326

327 328 329 330 331 332 333 334
	/*
	 * This static assertion verifies that we didn't mess up the calculations
	 * involved in selecting maximum payload sizes for our UDP messages.
	 * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
	 * be silent performance loss from fragmentation, it seems worth having a
	 * compile-time cross-check that we didn't.
	 */
	StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE,
335
				   "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
336

337
	/*
338
	 * Create the UDP socket for sending and receiving statistic messages
339
	 */
B
Bruce Momjian 已提交
340
	hints.ai_flags = AI_PASSIVE;
341
	hints.ai_family = AF_UNSPEC;
B
Bruce Momjian 已提交
342 343 344 345 346 347
	hints.ai_socktype = SOCK_DGRAM;
	hints.ai_protocol = 0;
	hints.ai_addrlen = 0;
	hints.ai_addr = NULL;
	hints.ai_canonname = NULL;
	hints.ai_next = NULL;
348
	ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
349
	if (ret || !addrs)
B
Bruce Momjian 已提交
350
	{
351
		ereport(LOG,
352
				(errmsg("could not resolve \"localhost\": %s",
353
						gai_strerror(ret))));
B
Bruce Momjian 已提交
354 355
		goto startup_failed;
	}
B
Bruce Momjian 已提交
356

357
	/*
358 359 360
	 * On some platforms, pg_getaddrinfo_all() may return multiple addresses
	 * only one of which will actually work (eg, both IPv6 and IPv4 addresses
	 * when kernel will reject IPv6).  Worse, the failure may occur at the
B
Bruce Momjian 已提交
361
	 * bind() or perhaps even connect() stage.  So we must loop through the
362 363
	 * results till we find a working combination. We will generate LOG
	 * messages, but no error, for bogus combinations.
364
	 */
365 366 367 368 369 370 371
	for (addr = addrs; addr; addr = addr->ai_next)
	{
#ifdef HAVE_UNIX_SOCKETS
		/* Ignore AF_UNIX sockets, if any are returned. */
		if (addr->ai_family == AF_UNIX)
			continue;
#endif
B
Bruce Momjian 已提交
372

373 374
		if (++tries > 1)
			ereport(LOG,
B
Bruce Momjian 已提交
375 376
			(errmsg("trying another address for the statistics collector")));

377 378 379
		/*
		 * Create the socket.
		 */
380
		if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
381 382 383
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
384
			errmsg("could not create socket for statistics collector: %m")));
385 386 387 388
			continue;
		}

		/*
B
Bruce Momjian 已提交
389 390
		 * Bind it to a kernel assigned port on localhost and get the assigned
		 * port via getsockname().
391 392 393 394 395
		 */
		if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
396
			  errmsg("could not bind socket for statistics collector: %m")));
397
			closesocket(pgStatSock);
398
			pgStatSock = PGINVALID_SOCKET;
399 400 401 402
			continue;
		}

		alen = sizeof(pgStatAddr);
B
Bruce Momjian 已提交
403
		if (getsockname(pgStatSock, (struct sockaddr *) & pgStatAddr, &alen) < 0)
404 405 406 407 408
		{
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not get address of socket for statistics collector: %m")));
			closesocket(pgStatSock);
409
			pgStatSock = PGINVALID_SOCKET;
410 411 412 413
			continue;
		}

		/*
B
Bruce Momjian 已提交
414 415 416 417
		 * Connect the socket to its own address.  This saves a few cycles by
		 * not having to respecify the target address on every send. This also
		 * provides a kernel-level check that only packets from this same
		 * address will be received.
418
		 */
B
Bruce Momjian 已提交
419
		if (connect(pgStatSock, (struct sockaddr *) & pgStatAddr, alen) < 0)
420 421 422
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
423
			errmsg("could not connect socket for statistics collector: %m")));
424
			closesocket(pgStatSock);
425
			pgStatSock = PGINVALID_SOCKET;
426 427
			continue;
		}
B
Bruce Momjian 已提交
428

429
		/*
B
Bruce Momjian 已提交
430 431 432 433
		 * Try to send and receive a one-byte test message on the socket. This
		 * is to catch situations where the socket can be created but will not
		 * actually pass data (for instance, because kernel packet filtering
		 * rules prevent it).
434 435
		 */
		test_byte = TESTBYTEVAL;
436 437

retry1:
438 439
		if (send(pgStatSock, &test_byte, 1, 0) != 1)
		{
440 441
			if (errno == EINTR)
				goto retry1;	/* if interrupted, just retry */
442 443 444 445
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not send test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
446
			pgStatSock = PGINVALID_SOCKET;
447 448 449 450
			continue;
		}

		/*
B
Bruce Momjian 已提交
451 452 453
		 * There could possibly be a little delay before the message can be
		 * received.  We arbitrarily allow up to half a second before deciding
		 * it's broken.
454 455 456 457
		 */
		for (;;)				/* need a loop to handle EINTR */
		{
			FD_ZERO(&rset);
B
Bruce Momjian 已提交
458
			FD_SET(pgStatSock, &rset);
459

460 461
			tv.tv_sec = 0;
			tv.tv_usec = 500000;
B
Bruce Momjian 已提交
462
			sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
463 464 465 466 467 468 469
			if (sel_res >= 0 || errno != EINTR)
				break;
		}
		if (sel_res < 0)
		{
			ereport(LOG,
					(errcode_for_socket_access(),
B
Bruce Momjian 已提交
470
					 errmsg("select() failed in statistics collector: %m")));
471
			closesocket(pgStatSock);
472
			pgStatSock = PGINVALID_SOCKET;
473 474 475 476 477
			continue;
		}
		if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
		{
			/*
B
Bruce Momjian 已提交
478 479
			 * This is the case we actually think is likely, so take pains to
			 * give a specific message for it.
480 481 482 483
			 *
			 * errno will not be set meaningfully here, so don't use it.
			 */
			ereport(LOG,
484
					(errcode(ERRCODE_CONNECTION_FAILURE),
485 486
					 errmsg("test message did not get through on socket for statistics collector")));
			closesocket(pgStatSock);
487
			pgStatSock = PGINVALID_SOCKET;
488 489 490 491 492
			continue;
		}

		test_byte++;			/* just make sure variable is changed */

493
retry2:
494 495
		if (recv(pgStatSock, &test_byte, 1, 0) != 1)
		{
496 497
			if (errno == EINTR)
				goto retry2;	/* if interrupted, just retry */
498 499 500 501
			ereport(LOG,
					(errcode_for_socket_access(),
					 errmsg("could not receive test message on socket for statistics collector: %m")));
			closesocket(pgStatSock);
502
			pgStatSock = PGINVALID_SOCKET;
503 504 505
			continue;
		}

B
Bruce Momjian 已提交
506
		if (test_byte != TESTBYTEVAL)	/* strictly paranoia ... */
507 508
		{
			ereport(LOG,
509
					(errcode(ERRCODE_INTERNAL_ERROR),
510 511
					 errmsg("incorrect test message transmission on socket for statistics collector")));
			closesocket(pgStatSock);
512
			pgStatSock = PGINVALID_SOCKET;
513 514 515
			continue;
		}

516 517
		/* If we get here, we have a working socket */
		break;
518 519
	}

520
	/* Did we find a working address? */
521
	if (!addr || pgStatSock == PGINVALID_SOCKET)
522
		goto startup_failed;
523 524

	/*
B
Bruce Momjian 已提交
525
	 * Set the socket to non-blocking IO.  This ensures that if the collector
526 527
	 * falls behind, statistics messages will be discarded; backends won't
	 * block waiting to send messages to the collector.
528
	 */
529
	if (!pg_set_noblock(pgStatSock))
530
	{
531 532
		ereport(LOG,
				(errcode_for_socket_access(),
B
Bruce Momjian 已提交
533
				 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
534
		goto startup_failed;
535 536
	}

537
	pg_freeaddrinfo_all(hints.ai_family, addrs);
538

539
	return;
540 541

startup_failed:
542
	ereport(LOG,
B
Bruce Momjian 已提交
543
	  (errmsg("disabling statistics collector for lack of working socket")));
544

545
	if (addrs)
546
		pg_freeaddrinfo_all(hints.ai_family, addrs);
B
Bruce Momjian 已提交
547

548
	if (pgStatSock != PGINVALID_SOCKET)
549
		closesocket(pgStatSock);
550
	pgStatSock = PGINVALID_SOCKET;
551

552 553
	/*
	 * Adjust GUC variables to suppress useless activity, and for debugging
B
Bruce Momjian 已提交
554 555 556
	 * purposes (seeing track_counts off is a clue that we failed here). We
	 * use PGC_S_OVERRIDE because there is no point in trying to turn it back
	 * on from postgresql.conf without a restart.
557 558
	 */
	SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
559 560
}

561 562 563 564 565 566 567 568 569 570
/*
 * subroutine for pgstat_reset_all
 */
static void
pgstat_reset_remove_files(const char *directory)
{
	DIR		   *dir;
	struct dirent *entry;
	char		fname[MAXPGPATH];

571 572
	dir = AllocateDir(directory);
	while ((entry = ReadDir(dir, directory)) != NULL)
573
	{
574 575
		int			nchars;
		Oid			tmp_oid;
576

577 578 579 580
		/*
		 * Skip directory entries that don't match the file names we write.
		 * See get_dbstat_filename for the database-specific pattern.
		 */
581 582 583
		if (strncmp(entry->d_name, "global.", 7) == 0)
			nchars = 7;
		else
584
		{
585 586 587 588 589 590 591
			nchars = 0;
			(void) sscanf(entry->d_name, "db_%u.%n",
						  &tmp_oid, &nchars);
			if (nchars <= 0)
				continue;
			/* %u allows leading whitespace, so reject that */
			if (strchr("0123456789", entry->d_name[3]) == NULL)
592 593 594
				continue;
		}

595 596
		if (strcmp(entry->d_name + nchars, "tmp") != 0 &&
			strcmp(entry->d_name + nchars, "stat") != 0)
597
			continue;
598

599
		snprintf(fname, MAXPGPATH, "%s/%s", directory,
600 601 602 603 604 605
				 entry->d_name);
		unlink(fname);
	}
	FreeDir(dir);
}

606 607 608
/*
 * pgstat_reset_all() -
 *
B
Bruce Momjian 已提交
609
 * Remove the stats files.  This is currently used only if WAL
610 611 612 613 614
 * recovery is needed after a crash.
 */
void
pgstat_reset_all(void)
{
615 616
	pgstat_reset_remove_files(pgstat_stat_directory);
	pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
617
}
618

619 620
#ifdef EXEC_BACKEND

621
/*
622
 * pgstat_forkexec() -
623
 *
624
 * Format up the arglist for, then fork and exec, statistics collector process
625
 */
626
static pid_t
627
pgstat_forkexec(void)
628
{
B
Bruce Momjian 已提交
629
	char	   *av[10];
630
	int			ac = 0;
631 632

	av[ac++] = "postgres";
633
	av[ac++] = "--forkcol";
634 635 636 637
	av[ac++] = NULL;			/* filled in by postmaster_forkexec */

	av[ac] = NULL;
	Assert(ac < lengthof(av));
638

639
	return postmaster_forkexec(ac, av);
640
}
B
Bruce Momjian 已提交
641
#endif   /* EXEC_BACKEND */
642

643

644
/*
645 646 647
 * pgstat_start() -
 *
 *	Called from postmaster at startup or after an existing collector
648
 *	died.  Attempt to fire up a fresh statistics collector.
649
 *
650 651
 *	Returns PID of child process, or 0 if fail.
 *
652
 *	Note: if fail, we will be called again from the postmaster main loop.
653
 */
654
int
655
pgstat_start(void)
656
{
657
	time_t		curtime;
658
	pid_t		pgStatPid;
659

660
	/*
B
Bruce Momjian 已提交
661 662
	 * Check that the socket is there, else pgstat_init failed and we can do
	 * nothing useful.
663
	 */
664
	if (pgStatSock == PGINVALID_SOCKET)
665
		return 0;
666

667
	/*
B
Bruce Momjian 已提交
668 669
	 * Do nothing if too soon since last collector start.  This is a safety
	 * valve to protect against continuous respawn attempts if the collector
B
Bruce Momjian 已提交
670
	 * is dying immediately at launch.  Note that since we will be re-called
B
Bruce Momjian 已提交
671
	 * from the postmaster main loop, we will get another chance later.
672 673 674 675
	 */
	curtime = time(NULL);
	if ((unsigned int) (curtime - last_pgstat_start_time) <
		(unsigned int) PGSTAT_RESTART_INTERVAL)
676
		return 0;
677 678
	last_pgstat_start_time = curtime;

679
	/*
680
	 * Okay, fork off the collector.
681
	 */
682
#ifdef EXEC_BACKEND
683
	switch ((pgStatPid = pgstat_forkexec()))
684
#else
685
	switch ((pgStatPid = fork_process()))
686
#endif
687 688
	{
		case -1:
689
			ereport(LOG,
690
					(errmsg("could not fork statistics collector: %m")));
691
			return 0;
692

693
#ifndef EXEC_BACKEND
694
		case 0:
695
			/* in postmaster child ... */
696 697
			InitPostmasterChild();

698
			/* Close the postmaster's sockets */
699
			ClosePostmasterPorts(false);
700 701

			/* Drop our connection to postmaster's shared memory, as well */
702
			dsm_detach_all();
703 704
			PGSharedMemoryDetach();

705
			PgstatCollectorMain(0, NULL);
706
			break;
707
#endif
708 709

		default:
710
			return (int) pgStatPid;
711 712
	}

713 714
	/* shouldn't get here */
	return 0;
715 716
}

B
Bruce Momjian 已提交
717 718
void
allow_immediate_pgstat_restart(void)
719
{
B
Bruce Momjian 已提交
720
	last_pgstat_start_time = 0;
721
}
722 723 724

/* ------------------------------------------------------------
 * Public functions used by backends follow
725
 *------------------------------------------------------------
726 727 728 729
 */


/* ----------
730
 * pgstat_report_stat() -
731
 *
732
 *	Called from tcop/postgres.c to send the so far collected per-table
733 734 735
 *	and function usage statistics to the collector.  Note that this is
 *	called only when not within a transaction, so it is fair to use
 *	transaction stop time as an approximation of current time.
736 737 738
 * ----------
 */
void
739
pgstat_report_stat(bool force)
740
{
741 742
	/* we assume this inits to all zeroes: */
	static const PgStat_TableCounts all_zeroes;
B
Bruce Momjian 已提交
743
	static TimestampTz last_report = 0;
744

745
	TimestampTz now;
746 747 748 749
	PgStat_MsgTabstat regular_msg;
	PgStat_MsgTabstat shared_msg;
	TabStatusArray *tsa;
	int			i;
750 751

	/* Don't expend a clock check if nothing to do */
752
	if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
753 754
		pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
		!have_function_stats)
755 756 757 758
		return;

	/*
	 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
759
	 * msec since we last sent one, or the caller wants to force stats out.
760 761
	 */
	now = GetCurrentTransactionStopTimestamp();
762 763
	if (!force &&
		!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
764 765 766
		return;
	last_report = now;

767
	/*
768 769
	 * Scan through the TabStatusArray struct(s) to find tables that actually
	 * have counts, and build messages to send.  We have to separate shared
B
Bruce Momjian 已提交
770 771
	 * relations from regular ones because the databaseid field in the message
	 * header has to depend on that.
772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789
	 */
	regular_msg.m_databaseid = MyDatabaseId;
	shared_msg.m_databaseid = InvalidOid;
	regular_msg.m_nentries = 0;
	shared_msg.m_nentries = 0;

	for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
	{
		for (i = 0; i < tsa->tsa_used; i++)
		{
			PgStat_TableStatus *entry = &tsa->tsa_entries[i];
			PgStat_MsgTabstat *this_msg;
			PgStat_TableEntry *this_ent;

			/* Shouldn't have any pending transaction-dependent counts */
			Assert(entry->trans == NULL);

			/*
B
Bruce Momjian 已提交
790 791
			 * Ignore entries that didn't accumulate any actual counts, such
			 * as indexes that were opened by the planner but not used.
792 793 794 795
			 */
			if (memcmp(&entry->t_counts, &all_zeroes,
					   sizeof(PgStat_TableCounts)) == 0)
				continue;
B
Bruce Momjian 已提交
796

797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817
			/*
			 * OK, insert data into the appropriate message, and send if full.
			 */
			this_msg = entry->t_shared ? &shared_msg : &regular_msg;
			this_ent = &this_msg->m_entry[this_msg->m_nentries];
			this_ent->t_id = entry->t_id;
			memcpy(&this_ent->t_counts, &entry->t_counts,
				   sizeof(PgStat_TableCounts));
			if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
			{
				pgstat_send_tabstat(this_msg);
				this_msg->m_nentries = 0;
			}
		}
		/* zero out TableStatus structs after use */
		MemSet(tsa->tsa_entries, 0,
			   tsa->tsa_used * sizeof(PgStat_TableStatus));
		tsa->tsa_used = 0;
	}

	/*
818 819
	 * Send partial messages.  Make sure that any pending xact commit/abort
	 * gets counted, even if there are no table stats to send.
820
	 */
821
	if (regular_msg.m_nentries > 0 ||
822
		pgStatXactCommit > 0 || pgStatXactRollback > 0)
823 824 825
		pgstat_send_tabstat(&regular_msg);
	if (shared_msg.m_nentries > 0)
		pgstat_send_tabstat(&shared_msg);
826 827 828

	/* Now, send function statistics */
	pgstat_send_funcstats();
829 830
}

831
/*
832
 * Subroutine for pgstat_report_stat: finish and send a tabstat message
833
 */
834
static void
835
pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg)
836
{
837 838
	int			n;
	int			len;
839

840
	/* It's unlikely we'd get here with no socket, but maybe not impossible */
841
	if (pgStatSock == PGINVALID_SOCKET)
842
		return;
843

844
	/*
845 846
	 * Report and reset accumulated xact commit/rollback and I/O timings
	 * whenever we send a normal tabstat message
847 848 849
	 */
	if (OidIsValid(tsmsg->m_databaseid))
	{
850 851
		tsmsg->m_xact_commit = pgStatXactCommit;
		tsmsg->m_xact_rollback = pgStatXactRollback;
852 853
		tsmsg->m_block_read_time = pgStatBlockReadTime;
		tsmsg->m_block_write_time = pgStatBlockWriteTime;
854
		pgStatXactCommit = 0;
855
		pgStatXactRollback = 0;
856 857
		pgStatBlockReadTime = 0;
		pgStatBlockWriteTime = 0;
858 859 860 861 862
	}
	else
	{
		tsmsg->m_xact_commit = 0;
		tsmsg->m_xact_rollback = 0;
863 864
		tsmsg->m_block_read_time = 0;
		tsmsg->m_block_write_time = 0;
865
	}
866

867 868 869
	n = tsmsg->m_nentries;
	len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
		n * sizeof(PgStat_TableEntry);
870

871 872
	pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
	pgstat_send(tsmsg, len);
873 874
}

875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
/*
 * Subroutine for pgstat_report_stat: populate and send a function stat message
 */
static void
pgstat_send_funcstats(void)
{
	/* we assume this inits to all zeroes: */
	static const PgStat_FunctionCounts all_zeroes;

	PgStat_MsgFuncstat msg;
	PgStat_BackendFunctionEntry *entry;
	HASH_SEQ_STATUS fstat;

	if (pgStatFunctions == NULL)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_FUNCSTAT);
	msg.m_databaseid = MyDatabaseId;
	msg.m_nentries = 0;

	hash_seq_init(&fstat, pgStatFunctions);
	while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
	{
		PgStat_FunctionEntry *m_ent;

		/* Skip it if no counts accumulated since last time */
		if (memcmp(&entry->f_counts, &all_zeroes,
				   sizeof(PgStat_FunctionCounts)) == 0)
			continue;

		/* need to convert format of time accumulators */
		m_ent = &msg.m_entry[msg.m_nentries];
		m_ent->f_id = entry->f_id;
		m_ent->f_numcalls = entry->f_counts.f_numcalls;
909 910
		m_ent->f_total_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_total_time);
		m_ent->f_self_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_self_time);
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925

		if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
		{
			pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
						msg.m_nentries * sizeof(PgStat_FunctionEntry));
			msg.m_nentries = 0;
		}

		/* reset the entry's counts */
		MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
	}

	if (msg.m_nentries > 0)
		pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
					msg.m_nentries * sizeof(PgStat_FunctionEntry));
926 927

	have_function_stats = false;
928 929
}

930 931

/* ----------
932
 * pgstat_vacuum_stat() -
933 934 935 936
 *
 *	Will tell the collector about objects he can get rid of.
 * ----------
 */
937
void
938
pgstat_vacuum_stat(void)
939
{
940
	HTAB	   *htab;
941
	PgStat_MsgTabpurge msg;
942
	PgStat_MsgFuncpurge f_msg;
943 944 945
	HASH_SEQ_STATUS hstat;
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
946
	PgStat_StatFuncEntry *funcentry;
947
	int			len;
948

949
	if (pgStatSock == PGINVALID_SOCKET)
950
		return;
951 952

	/*
B
Bruce Momjian 已提交
953 954
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
955
	 */
956
	backend_read_statsfile();
957 958

	/*
959 960
	 * Read pg_database and make a list of OIDs of all existing databases
	 */
961
	htab = pgstat_collect_oids(DatabaseRelationId);
962 963 964 965 966 967 968 969 970 971

	/*
	 * Search the database hash table for dead databases and tell the
	 * collector to drop them.
	 */
	hash_seq_init(&hstat, pgStatDBHash);
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
	{
		Oid			dbid = dbentry->databaseid;

972 973
		CHECK_FOR_INTERRUPTS();

974 975 976
		/* the DB entry for shared tables (with InvalidOid) is never dropped */
		if (OidIsValid(dbid) &&
			hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
977 978 979 980
			pgstat_drop_database(dbid);
	}

	/* Clean up */
981
	hash_destroy(htab);
982 983 984

	/*
	 * Lookup our own database entry; if not found, nothing more to do.
985
	 */
986 987 988
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &MyDatabaseId,
												 HASH_FIND, NULL);
989 990 991 992 993 994
	if (dbentry == NULL || dbentry->tables == NULL)
		return;

	/*
	 * Similarly to above, make a list of all known relations in this DB.
	 */
995
	htab = pgstat_collect_oids(RelationRelationId);
996 997 998 999 1000 1001 1002

	/*
	 * Initialize our messages table counter to zero
	 */
	msg.m_nentries = 0;

	/*
1003
	 * Check for all tables listed in stats hashtable if they still exist.
1004
	 */
1005
	hash_seq_init(&hstat, dbentry->tables);
1006
	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
1007
	{
1008 1009 1010 1011 1012
		Oid			tabid = tabentry->tableid;

		CHECK_FOR_INTERRUPTS();

		if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
1013 1014 1015
			continue;

		/*
1016
		 * Not there, so add this table's Oid to the message
1017
		 */
1018
		msg.m_tableid[msg.m_nentries++] = tabid;
1019 1020

		/*
1021
		 * If the message is full, send it out and reinitialize to empty
1022 1023 1024
		 */
		if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
		{
1025
			len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
1026
				+msg.m_nentries * sizeof(Oid);
1027 1028

			pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1029
			msg.m_databaseid = MyDatabaseId;
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
			pgstat_send(&msg, len);

			msg.m_nentries = 0;
		}
	}

	/*
	 * Send the rest
	 */
	if (msg.m_nentries > 0)
	{
1041
		len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
B
Bruce Momjian 已提交
1042
			+msg.m_nentries * sizeof(Oid);
1043 1044

		pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1045
		msg.m_databaseid = MyDatabaseId;
1046 1047 1048
		pgstat_send(&msg, len);
	}

1049
	/* Clean up */
1050
	hash_destroy(htab);
1051 1052

	/*
1053 1054
	 * Now repeat the above steps for functions.  However, we needn't bother
	 * in the common case where no function stats are being collected.
1055
	 */
1056 1057 1058 1059
	if (dbentry->functions != NULL &&
		hash_get_num_entries(dbentry->functions) > 0)
	{
		htab = pgstat_collect_oids(ProcedureRelationId);
1060

1061 1062 1063
		pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
		f_msg.m_databaseid = MyDatabaseId;
		f_msg.m_nentries = 0;
1064

1065 1066 1067 1068
		hash_seq_init(&hstat, dbentry->functions);
		while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
		{
			Oid			funcid = funcentry->functionid;
1069

1070
			CHECK_FOR_INTERRUPTS();
1071

1072 1073
			if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
				continue;
1074

1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092
			/*
			 * Not there, so add this function's Oid to the message
			 */
			f_msg.m_functionid[f_msg.m_nentries++] = funcid;

			/*
			 * If the message is full, send it out and reinitialize to empty
			 */
			if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
			{
				len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
					+f_msg.m_nentries * sizeof(Oid);

				pgstat_send(&f_msg, len);

				f_msg.m_nentries = 0;
			}
		}
1093 1094

		/*
1095
		 * Send the rest
1096
		 */
1097
		if (f_msg.m_nentries > 0)
1098 1099 1100 1101 1102 1103 1104
		{
			len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
				+f_msg.m_nentries * sizeof(Oid);

			pgstat_send(&f_msg, len);
		}

1105
		hash_destroy(htab);
1106
	}
1107 1108 1109 1110 1111 1112
}


/* ----------
 * pgstat_collect_oids() -
 *
1113 1114
 *	Collect the OIDs of all objects listed in the specified system catalog
 *	into a temporary hash table.  Caller should hash_destroy the result
B
Bruce Momjian 已提交
1115
 *	when done with it.  (However, we make the table in CurrentMemoryContext
1116
 *	so that it will be freed properly in event of an error.)
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
 * ----------
 */
static HTAB *
pgstat_collect_oids(Oid catalogid)
{
	HTAB	   *htab;
	HASHCTL		hash_ctl;
	Relation	rel;
	HeapScanDesc scan;
	HeapTuple	tup;
1127
	Snapshot	snapshot;
1128 1129 1130 1131

	memset(&hash_ctl, 0, sizeof(hash_ctl));
	hash_ctl.keysize = sizeof(Oid);
	hash_ctl.entrysize = sizeof(Oid);
1132
	hash_ctl.hcxt = CurrentMemoryContext;
1133 1134 1135
	htab = hash_create("Temporary table of OIDs",
					   PGSTAT_TAB_HASH_SIZE,
					   &hash_ctl,
1136
					   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1137 1138

	rel = heap_open(catalogid, AccessShareLock);
1139 1140
	snapshot = RegisterSnapshot(GetLatestSnapshot());
	scan = heap_beginscan(rel, snapshot, 0, NULL);
1141 1142
	while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
	{
B
Bruce Momjian 已提交
1143
		Oid			thisoid = HeapTupleGetOid(tup);
1144 1145 1146 1147 1148 1149

		CHECK_FOR_INTERRUPTS();

		(void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
	}
	heap_endscan(scan);
1150
	UnregisterSnapshot(snapshot);
1151 1152 1153
	heap_close(rel, AccessShareLock);

	return htab;
1154 1155 1156 1157 1158 1159 1160
}


/* ----------
 * pgstat_drop_database() -
 *
 *	Tell the collector that we just dropped a database.
1161
 *	(If the message gets lost, we will still clean the dead DB eventually
1162
 *	via future invocations of pgstat_vacuum_stat().)
1163 1164
 * ----------
 */
1165
void
1166 1167
pgstat_drop_database(Oid databaseid)
{
1168
	PgStat_MsgDropdb msg;
1169

1170
	if (pgStatSock == PGINVALID_SOCKET)
1171 1172 1173
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1174
	msg.m_databaseid = databaseid;
1175 1176 1177 1178
	pgstat_send(&msg, sizeof(msg));
}


1179 1180 1181 1182 1183
/* ----------
 * pgstat_drop_relation() -
 *
 *	Tell the collector that we just dropped a relation.
 *	(If the message gets lost, we will still clean the dead entry eventually
1184
 *	via future invocations of pgstat_vacuum_stat().)
1185 1186
 *
 *	Currently not used for lack of any good place to call it; we rely
1187
 *	entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
1188 1189
 * ----------
 */
1190
#ifdef NOT_USED
1191 1192 1193 1194 1195 1196
void
pgstat_drop_relation(Oid relid)
{
	PgStat_MsgTabpurge msg;
	int			len;

1197
	if (pgStatSock == PGINVALID_SOCKET)
1198 1199 1200 1201 1202
		return;

	msg.m_tableid[0] = relid;
	msg.m_nentries = 1;

B
Bruce Momjian 已提交
1203
	len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) +sizeof(Oid);
1204 1205 1206 1207 1208

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
	msg.m_databaseid = MyDatabaseId;
	pgstat_send(&msg, len);
}
B
Bruce Momjian 已提交
1209
#endif   /* NOT_USED */
1210 1211


1212 1213 1214 1215 1216 1217 1218 1219 1220
/* ----------
 * pgstat_reset_counters() -
 *
 *	Tell the statistics collector to reset counters for our database.
 * ----------
 */
void
pgstat_reset_counters(void)
{
1221
	PgStat_MsgResetcounter msg;
1222

1223
	if (pgStatSock == PGINVALID_SOCKET)
1224 1225 1226
		return;

	if (!superuser())
1227 1228
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
B
Bruce Momjian 已提交
1229
				 errmsg("must be superuser to reset statistics counters")));
1230 1231

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1232
	msg.m_databaseid = MyDatabaseId;
1233 1234 1235
	pgstat_send(&msg, sizeof(msg));
}

1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
/* ----------
 * pgstat_reset_shared_counters() -
 *
 *	Tell the statistics collector to reset cluster-wide shared counters.
 * ----------
 */
void
pgstat_reset_shared_counters(const char *target)
{
	PgStat_MsgResetsharedcounter msg;

1247
	if (pgStatSock == PGINVALID_SOCKET)
1248 1249 1250 1251 1252 1253 1254
		return;

	if (!superuser())
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("must be superuser to reset statistics counters")));

1255 1256 1257
	if (strcmp(target, "archiver") == 0)
		msg.m_resettarget = RESET_ARCHIVER;
	else if (strcmp(target, "bgwriter") == 0)
1258 1259 1260
		msg.m_resettarget = RESET_BGWRITER;
	else
		ereport(ERROR,
1261 1262
				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
				 errmsg("unrecognized reset target: \"%s\"", target),
1263
				 errhint("Target must be \"archiver\" or \"bgwriter\".")));
1264 1265 1266 1267

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
	pgstat_send(&msg, sizeof(msg));
}
1268

1269 1270 1271 1272 1273 1274
/* ----------
 * pgstat_reset_single_counter() -
 *
 *	Tell the statistics collector to reset a single counter.
 * ----------
 */
B
Bruce Momjian 已提交
1275 1276
void
pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
1277 1278 1279
{
	PgStat_MsgResetsinglecounter msg;

1280
	if (pgStatSock == PGINVALID_SOCKET)
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295
		return;

	if (!superuser())
		ereport(ERROR,
				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
				 errmsg("must be superuser to reset statistics counters")));

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER);
	msg.m_databaseid = MyDatabaseId;
	msg.m_resettype = type;
	msg.m_objectid = objoid;

	pgstat_send(&msg, sizeof(msg));
}

1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
/* ----------
 * pgstat_report_autovac() -
 *
 *	Called from autovacuum.c to report startup of an autovacuum process.
 *	We are called before InitPostgres is done, so can't rely on MyDatabaseId;
 *	the db OID must be passed in, instead.
 * ----------
 */
void
pgstat_report_autovac(Oid dboid)
{
	PgStat_MsgAutovacStart msg;

1309
	if (pgStatSock == PGINVALID_SOCKET)
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
	msg.m_databaseid = dboid;
	msg.m_start_time = GetCurrentTimestamp();

	pgstat_send(&msg, sizeof(msg));
}


/* ---------
 * pgstat_report_vacuum() -
 *
 *	Tell the collector about the table we just vacuumed.
 * ---------
 */
void
1327 1328
pgstat_report_vacuum(Oid tableoid, bool shared,
					 PgStat_Counter livetuples, PgStat_Counter deadtuples)
1329 1330 1331
{
	PgStat_MsgVacuum msg;

1332
	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1333 1334 1335 1336 1337
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
	msg.m_tableoid = tableoid;
1338
	msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1339
	msg.m_vacuumtime = GetCurrentTimestamp();
1340 1341
	msg.m_live_tuples = livetuples;
	msg.m_dead_tuples = deadtuples;
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
	pgstat_send(&msg, sizeof(msg));
}

/* --------
 * pgstat_report_analyze() -
 *
 *	Tell the collector about the table we just analyzed.
 * --------
 */
void
1352
pgstat_report_analyze(Relation rel,
1353
					  PgStat_Counter livetuples, PgStat_Counter deadtuples)
1354 1355 1356
{
	PgStat_MsgAnalyze msg;

1357
	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1358 1359
		return;

1360
	/*
1361 1362 1363 1364 1365
	 * Unlike VACUUM, ANALYZE might be running inside a transaction that has
	 * already inserted and/or deleted rows in the target table. ANALYZE will
	 * have counted such rows as live or dead respectively. Because we will
	 * report our counts of such rows at transaction end, we should subtract
	 * off these counts from what we send to the collector now, else they'll
B
Bruce Momjian 已提交
1366
	 * be double-counted after commit.  (This approach also ensures that the
1367 1368
	 * collector ends up with the right numbers if we abort instead of
	 * committing.)
1369 1370 1371 1372 1373 1374 1375 1376
	 */
	if (rel->pgstat_info != NULL)
	{
		PgStat_TableXactStatus *trans;

		for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
		{
			livetuples -= trans->tuples_inserted - trans->tuples_deleted;
1377
			deadtuples -= trans->tuples_updated + trans->tuples_deleted;
1378 1379
		}
		/* count stuff inserted by already-aborted subxacts, too */
1380
		deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples;
1381 1382 1383 1384 1385
		/* Since ANALYZE's counts are estimates, we could have underflowed */
		livetuples = Max(livetuples, 0);
		deadtuples = Max(deadtuples, 0);
	}

1386
	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
1387 1388
	msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
	msg.m_tableoid = RelationGetRelid(rel);
1389
	msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1390 1391 1392 1393 1394 1395
	msg.m_analyzetime = GetCurrentTimestamp();
	msg.m_live_tuples = livetuples;
	msg.m_dead_tuples = deadtuples;
	pgstat_send(&msg, sizeof(msg));
}

1396 1397 1398
/* --------
 * pgstat_report_recovery_conflict() -
 *
1399
 *	Tell the collector about a Hot Standby recovery conflict.
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
 * --------
 */
void
pgstat_report_recovery_conflict(int reason)
{
	PgStat_MsgRecoveryConflict msg;

	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT);
	msg.m_databaseid = MyDatabaseId;
	msg.m_reason = reason;
	pgstat_send(&msg, sizeof(msg));
}
1415

1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433
/* --------
 * pgstat_report_deadlock() -
 *
 *	Tell the collector about a deadlock detected.
 * --------
 */
void
pgstat_report_deadlock(void)
{
	PgStat_MsgDeadlock msg;

	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DEADLOCK);
	msg.m_databaseid = MyDatabaseId;
	pgstat_send(&msg, sizeof(msg));
}
1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455

/* --------
 * pgstat_report_tempfile() -
 *
 *	Tell the collector about a temporary file.
 * --------
 */
void
pgstat_report_tempfile(size_t filesize)
{
	PgStat_MsgTempFile msg;

	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TEMPFILE);
	msg.m_databaseid = MyDatabaseId;
	msg.m_filesize = filesize;
	pgstat_send(&msg, sizeof(msg));
}


1456 1457 1458 1459 1460 1461 1462 1463 1464
/* ----------
 * pgstat_ping() -
 *
 *	Send some junk data to the collector to increase traffic.
 * ----------
 */
void
pgstat_ping(void)
{
1465
	PgStat_MsgDummy msg;
1466

1467
	if (pgStatSock == PGINVALID_SOCKET)
1468 1469 1470 1471 1472 1473
		return;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
	pgstat_send(&msg, sizeof(msg));
}

1474 1475 1476 1477 1478 1479 1480
/* ----------
 * pgstat_send_inquiry() -
 *
 *	Notify collector that we need fresh data.
 * ----------
 */
static void
1481
pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
1482 1483 1484 1485
{
	PgStat_MsgInquiry msg;

	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
1486 1487
	msg.clock_time = clock_time;
	msg.cutoff_time = cutoff_time;
1488
	msg.databaseid = databaseid;
1489 1490 1491 1492
	pgstat_send(&msg, sizeof(msg));
}


1493 1494 1495 1496 1497 1498 1499 1500 1501
/*
 * Initialize function call usage data.
 * Called by the executor before invoking a function.
 */
void
pgstat_init_function_usage(FunctionCallInfoData *fcinfo,
						   PgStat_FunctionCallUsage *fcu)
{
	PgStat_BackendFunctionEntry *htabent;
1502
	bool		found;
1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521

	if (pgstat_track_functions <= fcinfo->flinfo->fn_stats)
	{
		/* stats not wanted */
		fcu->fs = NULL;
		return;
	}

	if (!pgStatFunctions)
	{
		/* First time through - initialize function stat table */
		HASHCTL		hash_ctl;

		memset(&hash_ctl, 0, sizeof(hash_ctl));
		hash_ctl.keysize = sizeof(Oid);
		hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry);
		pgStatFunctions = hash_create("Function stat entries",
									  PGSTAT_FUNCTION_HASH_SIZE,
									  &hash_ctl,
1522
									  HASH_ELEM | HASH_BLOBS);
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533
	}

	/* Get the stats entry for this function, create if necessary */
	htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid,
						  HASH_ENTER, &found);
	if (!found)
		MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts));

	fcu->fs = &htabent->f_counts;

	/* save stats for this function, later used to compensate for recursion */
1534
	fcu->save_f_total_time = htabent->f_counts.f_total_time;
1535 1536 1537 1538 1539 1540 1541 1542

	/* save current backend-wide total time */
	fcu->save_total = total_func_time;

	/* get clock time as of function start */
	INSTR_TIME_SET_CURRENT(fcu->f_start);
}

1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
/*
 * find_funcstat_entry - find any existing PgStat_BackendFunctionEntry entry
 *		for specified function
 *
 * If no entry, return NULL, don't create a new one
 */
PgStat_BackendFunctionEntry *
find_funcstat_entry(Oid func_id)
{
	if (pgStatFunctions == NULL)
		return NULL;

	return (PgStat_BackendFunctionEntry *) hash_search(pgStatFunctions,
													   (void *) &func_id,
													   HASH_FIND, NULL);
}

1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
/*
 * Calculate function call usage and update stat counters.
 * Called by the executor after invoking a function.
 *
 * In the case of a set-returning function that runs in value-per-call mode,
 * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage
 * calls for what the user considers a single call of the function.  The
 * finalize flag should be TRUE on the last call.
 */
void
pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize)
{
	PgStat_FunctionCounts *fs = fcu->fs;
	instr_time	f_total;
	instr_time	f_others;
	instr_time	f_self;

	/* stats not wanted? */
	if (fs == NULL)
		return;

	/* total elapsed time in this function call */
	INSTR_TIME_SET_CURRENT(f_total);
	INSTR_TIME_SUBTRACT(f_total, fcu->f_start);

	/* self usage: elapsed minus anything already charged to other calls */
	f_others = total_func_time;
	INSTR_TIME_SUBTRACT(f_others, fcu->save_total);
	f_self = f_total;
	INSTR_TIME_SUBTRACT(f_self, f_others);

	/* update backend-wide total time */
	INSTR_TIME_ADD(total_func_time, f_self);

	/*
1595
	 * Compute the new f_total_time as the total elapsed time added to the
B
Bruce Momjian 已提交
1596
	 * pre-call value of f_total_time.  This is necessary to avoid
1597 1598 1599
	 * double-counting any time taken by recursive calls of myself.  (We do
	 * not need any similar kluge for self time, since that already excludes
	 * any recursive calls.)
1600
	 */
1601
	INSTR_TIME_ADD(f_total, fcu->save_f_total_time);
1602 1603 1604 1605

	/* update counters in function stats table */
	if (finalize)
		fs->f_numcalls++;
1606 1607
	fs->f_total_time = f_total;
	INSTR_TIME_ADD(fs->f_self_time, f_self);
1608 1609 1610

	/* indicate that we have something to send */
	have_function_stats = true;
1611 1612
}

1613 1614 1615 1616

/* ----------
 * pgstat_initstats() -
 *
1617 1618
 *	Initialize a relcache entry to count access statistics.
 *	Called whenever a relation is opened.
1619 1620 1621
 *
 *	We assume that a relcache entry's pgstat_info field is zeroed by
 *	relcache.c when the relcache entry is made; thereafter it is long-lived
1622
 *	data.  We can avoid repeated searches of the TabStatus arrays when the
1623
 *	same relation is touched repeatedly within a transaction.
1624 1625 1626
 * ----------
 */
void
1627
pgstat_initstats(Relation rel)
1628
{
1629
	Oid			rel_id = rel->rd_id;
1630 1631 1632 1633
	char		relkind = rel->rd_rel->relkind;

	/* We only count stats for things that have storage */
	if (!(relkind == RELKIND_RELATION ||
1634
		  relkind == RELKIND_MATVIEW ||
1635
		  relkind == RELKIND_INDEX ||
1636 1637
		  relkind == RELKIND_TOASTVALUE ||
		  relkind == RELKIND_SEQUENCE))
1638 1639 1640 1641
	{
		rel->pgstat_info = NULL;
		return;
	}
1642

1643
	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1644
	{
1645 1646
		/* We're not counting at all */
		rel->pgstat_info = NULL;
1647
		return;
1648
	}
1649

1650
	/*
B
Bruce Momjian 已提交
1651 1652
	 * If we already set up this relation in the current transaction, nothing
	 * to do.
1653
	 */
1654 1655
	if (rel->pgstat_info != NULL &&
		rel->pgstat_info->t_id == rel_id)
1656
		return;
1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671

	/* Else find or make the PgStat_TableStatus entry, and update link */
	rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
}

/*
 * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
 */
static PgStat_TableStatus *
get_tabstat_entry(Oid rel_id, bool isshared)
{
	PgStat_TableStatus *entry;
	TabStatusArray *tsa;
	TabStatusArray *prev_tsa;
	int			i;
1672

1673
	/*
1674
	 * Search the already-used tabstat slots for this relation.
1675
	 */
1676 1677
	prev_tsa = NULL;
	for (tsa = pgStatTabList; tsa != NULL; prev_tsa = tsa, tsa = tsa->tsa_next)
1678
	{
1679
		for (i = 0; i < tsa->tsa_used; i++)
1680
		{
1681 1682 1683
			entry = &tsa->tsa_entries[i];
			if (entry->t_id == rel_id)
				return entry;
1684 1685
		}

1686 1687 1688
		if (tsa->tsa_used < TABSTAT_QUANTUM)
		{
			/*
B
Bruce Momjian 已提交
1689 1690 1691
			 * It must not be present, but we found a free slot instead. Fine,
			 * let's use this one.  We assume the entry was already zeroed,
			 * either at creation or after last use.
1692 1693 1694 1695 1696 1697
			 */
			entry = &tsa->tsa_entries[tsa->tsa_used++];
			entry->t_id = rel_id;
			entry->t_shared = isshared;
			return entry;
		}
1698 1699 1700
	}

	/*
1701
	 * We ran out of tabstat slots, so allocate more.  Be sure they're zeroed.
1702
	 */
1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
	tsa = (TabStatusArray *) MemoryContextAllocZero(TopMemoryContext,
													sizeof(TabStatusArray));
	if (prev_tsa)
		prev_tsa->tsa_next = tsa;
	else
		pgStatTabList = tsa;

	/*
	 * Use the first entry of the new TabStatusArray.
	 */
	entry = &tsa->tsa_entries[tsa->tsa_used++];
	entry->t_id = rel_id;
	entry->t_shared = isshared;
	return entry;
}

1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
/*
 * find_tabstat_entry - find any existing PgStat_TableStatus entry for rel
 *
 * If no entry, return NULL, don't create a new one
 */
PgStat_TableStatus *
find_tabstat_entry(Oid rel_id)
{
	PgStat_TableStatus *entry;
	TabStatusArray *tsa;
	int			i;

	for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
	{
		for (i = 0; i < tsa->tsa_used; i++)
		{
			entry = &tsa->tsa_entries[i];
			if (entry->t_id == rel_id)
				return entry;
		}
	}

	/* Not present */
	return NULL;
}

1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770
/*
 * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
 */
static PgStat_SubXactStatus *
get_tabstat_stack_level(int nest_level)
{
	PgStat_SubXactStatus *xact_state;

	xact_state = pgStatXactStack;
	if (xact_state == NULL || xact_state->nest_level != nest_level)
	{
		xact_state = (PgStat_SubXactStatus *)
			MemoryContextAlloc(TopTransactionContext,
							   sizeof(PgStat_SubXactStatus));
		xact_state->nest_level = nest_level;
		xact_state->prev = pgStatXactStack;
		xact_state->first = NULL;
		pgStatXactStack = xact_state;
	}
	return xact_state;
}

/*
 * add_tabstat_xact_level - add a new (sub)transaction state record
 */
static void
1771
add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
1772 1773 1774
{
	PgStat_SubXactStatus *xact_state;
	PgStat_TableXactStatus *trans;
1775 1776

	/*
B
Bruce Momjian 已提交
1777 1778
	 * If this is the first rel to be modified at the current nest level, we
	 * first have to push a transaction stack entry.
1779
	 */
1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794
	xact_state = get_tabstat_stack_level(nest_level);

	/* Now make a per-table stack entry */
	trans = (PgStat_TableXactStatus *)
		MemoryContextAllocZero(TopTransactionContext,
							   sizeof(PgStat_TableXactStatus));
	trans->nest_level = nest_level;
	trans->upper = pgstat_info->trans;
	trans->parent = pgstat_info;
	trans->next = xact_state->first;
	xact_state->first = trans;
	pgstat_info->trans = trans;
}

/*
1795
 * pgstat_count_heap_insert - count a tuple insertion of n tuples
1796 1797
 */
void
1798
pgstat_count_heap_insert(Relation rel, int n)
1799 1800 1801
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1802
	if (pgstat_info != NULL)
1803
	{
1804
		/* We have to log the effect at the proper transactional level */
B
Bruce Momjian 已提交
1805
		int			nest_level = GetCurrentTransactionNestLevel();
1806 1807 1808 1809 1810

		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

1811
		pgstat_info->trans->tuples_inserted += n;
1812 1813 1814 1815 1816 1817 1818
	}
}

/*
 * pgstat_count_heap_update - count a tuple update
 */
void
1819
pgstat_count_heap_update(Relation rel, bool hot)
1820 1821 1822
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1823
	if (pgstat_info != NULL)
1824
	{
1825
		/* We have to log the effect at the proper transactional level */
B
Bruce Momjian 已提交
1826
		int			nest_level = GetCurrentTransactionNestLevel();
1827 1828 1829 1830 1831

		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

1832 1833 1834 1835 1836
		pgstat_info->trans->tuples_updated++;

		/* t_tuples_hot_updated is nontransactional, so just advance it */
		if (hot)
			pgstat_info->t_counts.t_tuples_hot_updated++;
1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847
	}
}

/*
 * pgstat_count_heap_delete - count a tuple deletion
 */
void
pgstat_count_heap_delete(Relation rel)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1848
	if (pgstat_info != NULL)
1849
	{
1850
		/* We have to log the effect at the proper transactional level */
B
Bruce Momjian 已提交
1851
		int			nest_level = GetCurrentTransactionNestLevel();
1852 1853 1854 1855 1856 1857 1858

		if (pgstat_info->trans == NULL ||
			pgstat_info->trans->nest_level != nest_level)
			add_tabstat_xact_level(pgstat_info, nest_level);

		pgstat_info->trans->tuples_deleted++;
	}
1859 1860
}

1861 1862 1863 1864
/*
 * pgstat_update_heap_dead_tuples - update dead-tuples count
 *
 * The semantics of this are that we are reporting the nontransactional
1865
 * recovery of "delta" dead tuples; so t_delta_dead_tuples decreases
1866 1867 1868 1869 1870 1871 1872 1873
 * rather than increasing, and the change goes straight into the per-table
 * counter, not into transactional state.
 */
void
pgstat_update_heap_dead_tuples(Relation rel, int delta)
{
	PgStat_TableStatus *pgstat_info = rel->pgstat_info;

1874
	if (pgstat_info != NULL)
1875
		pgstat_info->t_counts.t_delta_dead_tuples -= delta;
1876 1877
}

1878 1879

/* ----------
1880
 * AtEOXact_PgStat
1881
 *
1882
 *	Called from access/transam/xact.c at top-level transaction commit/abort.
1883 1884 1885
 * ----------
 */
void
1886
AtEOXact_PgStat(bool isCommit)
1887
{
1888
	PgStat_SubXactStatus *xact_state;
1889 1890

	/*
1891 1892
	 * Count transaction commit or abort.  (We use counters, not just bools,
	 * in case the reporting message isn't sent right away.)
1893
	 */
1894 1895 1896 1897
	if (isCommit)
		pgStatXactCommit++;
	else
		pgStatXactRollback++;
1898

1899 1900
	/*
	 * Transfer transactional insert/update counts into the base tabstat
B
Bruce Momjian 已提交
1901 1902
	 * entries.  We don't bother to free any of the transactional state, since
	 * it's all in TopTransactionContext and will go away anyway.
1903 1904 1905
	 */
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
1906
	{
1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918
		PgStat_TableXactStatus *trans;

		Assert(xact_state->nest_level == 1);
		Assert(xact_state->prev == NULL);
		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;

			Assert(trans->nest_level == 1);
			Assert(trans->upper == NULL);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);
1919 1920 1921 1922
			/* count attempted actions regardless of commit/abort */
			tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
			tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
			tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
1923 1924
			if (isCommit)
			{
1925 1926
				/* insert adds a live tuple, delete removes one */
				tabstat->t_counts.t_delta_live_tuples +=
1927
					trans->tuples_inserted - trans->tuples_deleted;
1928 1929 1930 1931 1932 1933 1934
				/* update and delete each create a dead tuple */
				tabstat->t_counts.t_delta_dead_tuples +=
					trans->tuples_updated + trans->tuples_deleted;
				/* insert, update, delete each count as one change event */
				tabstat->t_counts.t_changed_tuples +=
					trans->tuples_inserted + trans->tuples_updated +
					trans->tuples_deleted;
1935 1936 1937 1938
			}
			else
			{
				/* inserted tuples are dead, deleted tuples are unaffected */
1939 1940 1941
				tabstat->t_counts.t_delta_dead_tuples +=
					trans->tuples_inserted + trans->tuples_updated;
				/* an aborted xact generates no changed_tuple events */
1942 1943 1944
			}
			tabstat->trans = NULL;
		}
1945
	}
1946
	pgStatXactStack = NULL;
1947

1948 1949 1950
	/* Make sure any stats snapshot is thrown away */
	pgstat_clear_snapshot();
}
1951 1952

/* ----------
1953
 * AtEOSubXact_PgStat
1954
 *
1955
 *	Called from access/transam/xact.c at subtransaction commit/abort.
1956 1957 1958
 * ----------
 */
void
1959
AtEOSubXact_PgStat(bool isCommit, int nestDepth)
1960
{
1961
	PgStat_SubXactStatus *xact_state;
1962 1963

	/*
1964 1965
	 * Transfer transactional insert/update counts into the next higher
	 * subtransaction state.
1966
	 */
1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989
	xact_state = pgStatXactStack;
	if (xact_state != NULL &&
		xact_state->nest_level >= nestDepth)
	{
		PgStat_TableXactStatus *trans;
		PgStat_TableXactStatus *next_trans;

		/* delink xact_state from stack immediately to simplify reuse case */
		pgStatXactStack = xact_state->prev;

		for (trans = xact_state->first; trans != NULL; trans = next_trans)
		{
			PgStat_TableStatus *tabstat;

			next_trans = trans->next;
			Assert(trans->nest_level == nestDepth);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);
			if (isCommit)
			{
				if (trans->upper && trans->upper->nest_level == nestDepth - 1)
				{
					trans->upper->tuples_inserted += trans->tuples_inserted;
1990
					trans->upper->tuples_updated += trans->tuples_updated;
1991 1992 1993 1994 1995 1996 1997
					trans->upper->tuples_deleted += trans->tuples_deleted;
					tabstat->trans = trans->upper;
					pfree(trans);
				}
				else
				{
					/*
B
Bruce Momjian 已提交
1998 1999
					 * When there isn't an immediate parent state, we can just
					 * reuse the record instead of going through a
2000
					 * palloc/pfree pushup (this works since it's all in
B
Bruce Momjian 已提交
2001 2002
					 * TopTransactionContext anyway).  We have to re-link it
					 * into the parent level, though, and that might mean
2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015
					 * pushing a new entry into the pgStatXactStack.
					 */
					PgStat_SubXactStatus *upper_xact_state;

					upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
					trans->next = upper_xact_state->first;
					upper_xact_state->first = trans;
					trans->nest_level = nestDepth - 1;
				}
			}
			else
			{
				/*
B
Bruce Momjian 已提交
2016 2017
				 * On abort, update top-level tabstat counts, then forget the
				 * subtransaction
2018
				 */
2019 2020 2021 2022 2023 2024 2025 2026

				/* count attempted actions regardless of commit/abort */
				tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
				tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
				tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
				/* inserted tuples are dead, deleted tuples are unaffected */
				tabstat->t_counts.t_delta_dead_tuples +=
					trans->tuples_inserted + trans->tuples_updated;
2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046
				tabstat->trans = trans->upper;
				pfree(trans);
			}
		}
		pfree(xact_state);
	}
}


/*
 * AtPrepare_PgStat
 *		Save the transactional stats state at 2PC transaction prepare.
 *
 * In this phase we just generate 2PC records for all the pending
 * transaction-dependent stats work.
 */
void
AtPrepare_PgStat(void)
{
	PgStat_SubXactStatus *xact_state;
2047

2048 2049
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
2050
	{
2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065
		PgStat_TableXactStatus *trans;

		Assert(xact_state->nest_level == 1);
		Assert(xact_state->prev == NULL);
		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;
			TwoPhasePgStatRecord record;

			Assert(trans->nest_level == 1);
			Assert(trans->upper == NULL);
			tabstat = trans->parent;
			Assert(tabstat->trans == trans);

			record.tuples_inserted = trans->tuples_inserted;
2066
			record.tuples_updated = trans->tuples_updated;
2067 2068 2069 2070 2071 2072 2073
			record.tuples_deleted = trans->tuples_deleted;
			record.t_id = tabstat->t_id;
			record.t_shared = tabstat->t_shared;

			RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
								   &record, sizeof(TwoPhasePgStatRecord));
		}
2074 2075 2076
	}
}

2077 2078 2079 2080 2081
/*
 * PostPrepare_PgStat
 *		Clean up after successful PREPARE.
 *
 * All we need do here is unlink the transaction stats state from the
B
Bruce Momjian 已提交
2082
 * nontransactional state.  The nontransactional action counts will be
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093
 * reported to the stats collector immediately, while the effects on live
 * and dead tuple counts are preserved in the 2PC state file.
 *
 * Note: AtEOXact_PgStat is not called during PREPARE.
 */
void
PostPrepare_PgStat(void)
{
	PgStat_SubXactStatus *xact_state;

	/*
B
Bruce Momjian 已提交
2094 2095
	 * We don't bother to free any of the transactional state, since it's all
	 * in TopTransactionContext and will go away anyway.
2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130
	 */
	xact_state = pgStatXactStack;
	if (xact_state != NULL)
	{
		PgStat_TableXactStatus *trans;

		for (trans = xact_state->first; trans != NULL; trans = trans->next)
		{
			PgStat_TableStatus *tabstat;

			tabstat = trans->parent;
			tabstat->trans = NULL;
		}
	}
	pgStatXactStack = NULL;

	/* Make sure any stats snapshot is thrown away */
	pgstat_clear_snapshot();
}

/*
 * 2PC processing routine for COMMIT PREPARED case.
 *
 * Load the saved counts into our local pgstats state.
 */
void
pgstat_twophase_postcommit(TransactionId xid, uint16 info,
						   void *recdata, uint32 len)
{
	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
	PgStat_TableStatus *pgstat_info;

	/* Find or create a tabstat entry for the rel */
	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);

2131 2132 2133 2134 2135
	/* Same math as in AtEOXact_PgStat, commit case */
	pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
	pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
	pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
	pgstat_info->t_counts.t_delta_live_tuples +=
2136
		rec->tuples_inserted - rec->tuples_deleted;
2137 2138 2139 2140 2141
	pgstat_info->t_counts.t_delta_dead_tuples +=
		rec->tuples_updated + rec->tuples_deleted;
	pgstat_info->t_counts.t_changed_tuples +=
		rec->tuples_inserted + rec->tuples_updated +
		rec->tuples_deleted;
2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159
}

/*
 * 2PC processing routine for ROLLBACK PREPARED case.
 *
 * Load the saved counts into our local pgstats state, but treat them
 * as aborted.
 */
void
pgstat_twophase_postabort(TransactionId xid, uint16 info,
						  void *recdata, uint32 len)
{
	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
	PgStat_TableStatus *pgstat_info;

	/* Find or create a tabstat entry for the rel */
	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);

2160 2161 2162 2163 2164 2165
	/* Same math as in AtEOXact_PgStat, abort case */
	pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
	pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
	pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
	pgstat_info->t_counts.t_delta_dead_tuples +=
		rec->tuples_inserted + rec->tuples_updated;
2166 2167
}

2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181

/* ----------
 * pgstat_fetch_stat_dbentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one database or NULL. NULL doesn't mean
 *	that the database doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatDBEntry *
pgstat_fetch_stat_dbentry(Oid dbid)
{
	/*
B
Bruce Momjian 已提交
2182 2183
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
2184
	 */
2185
	backend_read_statsfile();
2186 2187

	/*
2188
	 * Lookup the requested database; return NULL if not found
2189
	 */
2190 2191 2192
	return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
											  (void *) &dbid,
											  HASH_FIND, NULL);
2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207
}


/* ----------
 * pgstat_fetch_stat_tabentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one table or NULL. NULL doesn't mean
 *	that the table doesn't exist, it is just not yet known by the
 *	collector, so the caller is better off to report ZERO instead.
 * ----------
 */
PgStat_StatTabEntry *
pgstat_fetch_stat_tabentry(Oid relid)
{
2208
	Oid			dbid;
2209 2210
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
2211 2212

	/*
B
Bruce Momjian 已提交
2213 2214
	 * If not done for this transaction, read the statistics collector stats
	 * file into some hash tables.
2215
	 */
2216
	backend_read_statsfile();
2217 2218

	/*
2219
	 * Lookup our database, then look in its table hash table.
2220
	 */
2221
	dbid = MyDatabaseId;
2222
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2223
												 (void *) &dbid,
2224
												 HASH_FIND, NULL);
2225 2226 2227 2228 2229 2230 2231 2232
	if (dbentry != NULL && dbentry->tables != NULL)
	{
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
													   (void *) &relid,
													   HASH_FIND, NULL);
		if (tabentry)
			return tabentry;
	}
2233 2234

	/*
2235
	 * If we didn't find it, maybe it's a shared table.
2236
	 */
2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248
	dbid = InvalidOid;
	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												 (void *) &dbid,
												 HASH_FIND, NULL);
	if (dbentry != NULL && dbentry->tables != NULL)
	{
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
													   (void *) &relid,
													   HASH_FIND, NULL);
		if (tabentry)
			return tabentry;
	}
2249

2250
	return NULL;
2251 2252 2253
}


2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282
/* ----------
 * pgstat_fetch_stat_funcentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the collected statistics for one function or NULL.
 * ----------
 */
PgStat_StatFuncEntry *
pgstat_fetch_stat_funcentry(Oid func_id)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatFuncEntry *funcentry = NULL;

	/* load the stats file if needed */
	backend_read_statsfile();

	/* Lookup our database, then find the requested function.  */
	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
	if (dbentry != NULL && dbentry->functions != NULL)
	{
		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
														 (void *) &func_id,
														 HASH_FIND, NULL);
	}

	return funcentry;
}


2283 2284 2285 2286
/* ----------
 * pgstat_fetch_stat_beentry() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
2287 2288 2289 2290
 *	our local copy of the current-activity entry for one backend.
 *
 *	NB: caller is responsible for a check if the user is permitted to see
 *	this info (especially the querystring).
2291 2292
 * ----------
 */
2293
PgBackendStatus *
2294 2295
pgstat_fetch_stat_beentry(int beid)
{
2296
	pgstat_read_current_status();
2297

2298 2299 2300 2301 2302 2303 2304 2305 2306 2307
	if (beid < 1 || beid > localNumBackends)
		return NULL;

	return &localBackendStatusTable[beid - 1].backendStatus;
}


/* ----------
 * pgstat_fetch_stat_local_beentry() -
 *
B
Bruce Momjian 已提交
2308 2309
 *	Like pgstat_fetch_stat_beentry() but with locally computed addtions (like
 *	xid and xmin values of the backend)
2310 2311 2312 2313 2314 2315 2316 2317 2318 2319
 *
 *	NB: caller is responsible for a check if the user is permitted to see
 *	this info (especially the querystring).
 * ----------
 */
LocalPgBackendStatus *
pgstat_fetch_stat_local_beentry(int beid)
{
	pgstat_read_current_status();

2320
	if (beid < 1 || beid > localNumBackends)
2321 2322
		return NULL;

2323
	return &localBackendStatusTable[beid - 1];
2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336
}


/* ----------
 * pgstat_fetch_stat_numbackends() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	the maximum current backend id.
 * ----------
 */
int
pgstat_fetch_stat_numbackends(void)
{
2337
	pgstat_read_current_status();
2338

2339
	return localNumBackends;
2340 2341
}

2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358
/*
 * ---------
 * pgstat_fetch_stat_archiver() -
 *
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	a pointer to the archiver statistics struct.
 * ---------
 */
PgStat_ArchiverStats *
pgstat_fetch_stat_archiver(void)
{
	backend_read_statsfile();

	return &archiverStats;
}


2359 2360 2361 2362
/*
 * ---------
 * pgstat_fetch_global() -
 *
B
Bruce Momjian 已提交
2363 2364
 *	Support function for the SQL-callable pgstat* functions. Returns
 *	a pointer to the global statistics struct.
2365 2366 2367 2368 2369 2370 2371 2372 2373 2374
 * ---------
 */
PgStat_GlobalStats *
pgstat_fetch_global(void)
{
	backend_read_statsfile();

	return &globalStats;
}

2375 2376

/* ------------------------------------------------------------
2377
 * Functions for management of the shared-memory PgBackendStatus array
2378 2379 2380
 * ------------------------------------------------------------
 */

2381 2382
static PgBackendStatus *BackendStatusArray = NULL;
static PgBackendStatus *MyBEEntry = NULL;
2383
static char *BackendClientHostnameBuffer = NULL;
2384
static char *BackendAppnameBuffer = NULL;
2385
static char *BackendActivityBuffer = NULL;
2386
static Size BackendActivityBufferSize = 0;
2387

2388 2389 2390

/*
 * Report shared-memory space needed by CreateSharedBackendStatus.
2391
 */
2392 2393
Size
BackendStatusShmemSize(void)
2394
{
2395
	Size		size;
2396

2397 2398 2399 2400
	size = mul_size(sizeof(PgBackendStatus), MaxBackends);
	size = add_size(size,
					mul_size(NAMEDATALEN, MaxBackends));
	size = add_size(size,
2401
					mul_size(pgstat_track_activity_query_size, MaxBackends));
2402 2403
	size = add_size(size,
					mul_size(NAMEDATALEN, MaxBackends));
2404 2405
	return size;
}
2406

2407
/*
2408
 * Initialize the shared status array and several string buffers
2409
 * during postmaster startup.
2410
 */
2411 2412
void
CreateSharedBackendStatus(void)
2413
{
2414
	Size		size;
2415
	bool		found;
2416 2417
	int			i;
	char	   *buffer;
2418 2419

	/* Create or attach to the shared array */
2420
	size = mul_size(sizeof(PgBackendStatus), MaxBackends);
2421 2422 2423 2424 2425 2426 2427 2428 2429 2430
	BackendStatusArray = (PgBackendStatus *)
		ShmemInitStruct("Backend Status Array", size, &found);

	if (!found)
	{
		/*
		 * We're the first - initialize.
		 */
		MemSet(BackendStatusArray, 0, size);
	}
2431

2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449
	/* Create or attach to the shared appname buffer */
	size = mul_size(NAMEDATALEN, MaxBackends);
	BackendAppnameBuffer = (char *)
		ShmemInitStruct("Backend Application Name Buffer", size, &found);

	if (!found)
	{
		MemSet(BackendAppnameBuffer, 0, size);

		/* Initialize st_appname pointers. */
		buffer = BackendAppnameBuffer;
		for (i = 0; i < MaxBackends; i++)
		{
			BackendStatusArray[i].st_appname = buffer;
			buffer += NAMEDATALEN;
		}
	}

2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467
	/* Create or attach to the shared client hostname buffer */
	size = mul_size(NAMEDATALEN, MaxBackends);
	BackendClientHostnameBuffer = (char *)
		ShmemInitStruct("Backend Client Host Name Buffer", size, &found);

	if (!found)
	{
		MemSet(BackendClientHostnameBuffer, 0, size);

		/* Initialize st_clienthostname pointers. */
		buffer = BackendClientHostnameBuffer;
		for (i = 0; i < MaxBackends; i++)
		{
			BackendStatusArray[i].st_clienthostname = buffer;
			buffer += NAMEDATALEN;
		}
	}

2468
	/* Create or attach to the shared activity buffer */
2469 2470
	BackendActivityBufferSize = mul_size(pgstat_track_activity_query_size,
										 MaxBackends);
2471
	BackendActivityBuffer = (char *)
2472 2473 2474
		ShmemInitStruct("Backend Activity Buffer",
						BackendActivityBufferSize,
						&found);
2475 2476 2477 2478 2479 2480 2481

	if (!found)
	{
		MemSet(BackendActivityBuffer, 0, size);

		/* Initialize st_activity pointers. */
		buffer = BackendActivityBuffer;
2482 2483
		for (i = 0; i < MaxBackends; i++)
		{
2484 2485 2486 2487
			BackendStatusArray[i].st_activity = buffer;
			buffer += pgstat_track_activity_query_size;
		}
	}
2488 2489 2490
}


2491 2492 2493 2494 2495 2496 2497
/* ----------
 * pgstat_initialize() -
 *
 *	Initialize pgstats state, and set up our on-proc-exit hook.
 *	Called from InitPostgres.  MyBackendId must be set,
 *	but we must not have started any transaction yet (since the
 *	exit hook must run after the last transaction exit).
2498
 *	NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511
 * ----------
 */
void
pgstat_initialize(void)
{
	/* Initialize MyBEEntry */
	Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
	MyBEEntry = &BackendStatusArray[MyBackendId - 1];

	/* Set up a process-exit hook to clean up */
	on_shmem_exit(pgstat_beshutdown_hook, 0);
}

2512 2513 2514
/* ----------
 * pgstat_bestart() -
 *
2515
 *	Initialize this backend's entry in the PgBackendStatus array.
2516 2517
 *	Called from InitPostgres.
 *	MyDatabaseId, session userid, and application_name must be set
2518
 *	(hence, this cannot be combined with pgstat_initialize).
2519 2520 2521 2522 2523 2524 2525 2526
 * ----------
 */
void
pgstat_bestart(void)
{
	TimestampTz proc_start_timestamp;
	Oid			userid;
	SockAddr	clientaddr;
2527
	volatile PgBackendStatus *beentry;
2528 2529

	/*
B
Bruce Momjian 已提交
2530 2531
	 * To minimize the time spent modifying the PgBackendStatus entry, fetch
	 * all the needed data first.
2532 2533 2534
	 *
	 * If we have a MyProcPort, use its session start time (for consistency,
	 * and to save a kernel call).
2535
	 */
2536 2537 2538 2539
	if (MyProcPort)
		proc_start_timestamp = MyProcPort->SessionStartTime;
	else
		proc_start_timestamp = GetCurrentTimestamp();
2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553
	userid = GetSessionUserId();

	/*
	 * We may not have a MyProcPort (eg, if this is the autovacuum process).
	 * If so, use all-zeroes client address, which is dealt with specially in
	 * pg_stat_get_backend_client_addr and pg_stat_get_backend_client_port.
	 */
	if (MyProcPort)
		memcpy(&clientaddr, &MyProcPort->raddr, sizeof(clientaddr));
	else
		MemSet(&clientaddr, 0, sizeof(clientaddr));

	/*
	 * Initialize my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
2554 2555 2556
	 * st_changecount before and after; and make sure it's even afterwards. We
	 * use a volatile pointer here to ensure the compiler doesn't try to get
	 * cute.
2557 2558
	 */
	beentry = MyBEEntry;
B
Bruce Momjian 已提交
2559 2560
	do
	{
2561
		pgstat_increment_changecount_before(beentry);
2562 2563 2564 2565 2566
	} while ((beentry->st_changecount & 1) == 0);

	beentry->st_procpid = MyProcPid;
	beentry->st_proc_start_timestamp = proc_start_timestamp;
	beentry->st_activity_start_timestamp = 0;
2567
	beentry->st_state_start_timestamp = 0;
2568
	beentry->st_xact_start_timestamp = 0;
2569 2570 2571
	beentry->st_databaseid = MyDatabaseId;
	beentry->st_userid = userid;
	beentry->st_clientaddr = clientaddr;
2572 2573 2574 2575 2576
	if (MyProcPort && MyProcPort->remote_hostname)
		strlcpy(beentry->st_clienthostname, MyProcPort->remote_hostname,
				NAMEDATALEN);
	else
		beentry->st_clienthostname[0] = '\0';
2577
	beentry->st_waiting = false;
2578
	beentry->st_state = STATE_UNDEFINED;
2579
	beentry->st_appname[0] = '\0';
2580
	beentry->st_activity[0] = '\0';
2581
	/* Also make sure the last byte in each string area is always 0 */
2582
	beentry->st_clienthostname[NAMEDATALEN - 1] = '\0';
2583
	beentry->st_appname[NAMEDATALEN - 1] = '\0';
2584
	beentry->st_activity[pgstat_track_activity_query_size - 1] = '\0';
2585

2586
	pgstat_increment_changecount_after(beentry);
2587 2588 2589 2590

	/* Update app name to current GUC setting */
	if (application_name)
		pgstat_report_appname(application_name);
2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604
}

/*
 * Shut down a single backend's statistics reporting at process exit.
 *
 * Flush any remaining statistics counts out to the collector.
 * Without this, operations triggered during backend exit (such as
 * temp table deletions) won't be counted.
 *
 * Lastly, clear out our entry in the PgBackendStatus array.
 */
static void
pgstat_beshutdown_hook(int code, Datum arg)
{
2605
	volatile PgBackendStatus *beentry = MyBEEntry;
2606

2607
	/*
B
Bruce Momjian 已提交
2608 2609
	 * If we got as far as discovering our own database ID, we can report what
	 * we did to the collector.  Otherwise, we'd be sending an invalid
2610 2611 2612 2613 2614
	 * database ID, so forget it.  (This means that accesses to pg_database
	 * during failed backend starts might never get counted.)
	 */
	if (OidIsValid(MyDatabaseId))
		pgstat_report_stat(true);
2615 2616

	/*
B
Bruce Momjian 已提交
2617 2618 2619
	 * Clear my status entry, following the protocol of bumping st_changecount
	 * before and after.  We use a volatile pointer here to ensure the
	 * compiler doesn't try to get cute.
2620
	 */
2621
	pgstat_increment_changecount_before(beentry);
2622 2623 2624

	beentry->st_procpid = 0;	/* mark invalid */

2625
	pgstat_increment_changecount_after(beentry);
2626 2627 2628 2629 2630 2631 2632
}


/* ----------
 * pgstat_report_activity() -
 *
 *	Called from tcop/postgres.c to report what the backend is actually doing
2633
 *	(but note cmd_str can be NULL for certain cases).
2634 2635 2636 2637
 *
 * All updates of the status entry follow the protocol of bumping
 * st_changecount before and after.  We use a volatile pointer here to
 * ensure the compiler doesn't try to get cute.
2638 2639 2640
 * ----------
 */
void
2641
pgstat_report_activity(BackendState state, const char *cmd_str)
2642
{
2643
	volatile PgBackendStatus *beentry = MyBEEntry;
2644
	TimestampTz start_timestamp;
2645
	TimestampTz current_timestamp;
2646
	int			len = 0;
2647

2648 2649
	TRACE_POSTGRESQL_STATEMENT_STATUS(cmd_str);

2650
	if (!beentry)
2651 2652
		return;

2653
	if (!pgstat_track_activities)
2654
	{
2655 2656 2657 2658
		if (beentry->st_state != STATE_DISABLED)
		{
			/*
			 * track_activities is disabled, but we last reported a
B
Bruce Momjian 已提交
2659
			 * non-disabled state.  As our final update, change the state and
2660 2661
			 * clear fields we will not be updating anymore.
			 */
2662
			pgstat_increment_changecount_before(beentry);
2663 2664 2665 2666 2667 2668 2669
			beentry->st_state = STATE_DISABLED;
			beentry->st_state_start_timestamp = 0;
			beentry->st_activity[0] = '\0';
			beentry->st_activity_start_timestamp = 0;
			/* st_xact_start_timestamp and st_waiting are also disabled */
			beentry->st_xact_start_timestamp = 0;
			beentry->st_waiting = false;
2670
			pgstat_increment_changecount_after(beentry);
2671
		}
2672 2673
		return;
	}
2674 2675

	/*
2676 2677
	 * To minimize the time spent modifying the entry, fetch all the needed
	 * data first.
2678 2679 2680 2681
	 */
	start_timestamp = GetCurrentStatementStartTimestamp();
	if (cmd_str != NULL)
	{
2682 2683
		len = pg_mbcliplen(cmd_str, strlen(cmd_str),
						   pgstat_track_activity_query_size - 1);
2684
	}
2685
	current_timestamp = GetCurrentTimestamp();
2686 2687 2688

	/*
	 * Now update the status entry
2689
	 */
2690
	pgstat_increment_changecount_before(beentry);
2691

2692 2693 2694 2695 2696 2697 2698 2699 2700
	beentry->st_state = state;
	beentry->st_state_start_timestamp = current_timestamp;

	if (cmd_str != NULL)
	{
		memcpy((char *) beentry->st_activity, cmd_str, len);
		beentry->st_activity[len] = '\0';
		beentry->st_activity_start_timestamp = start_timestamp;
	}
2701

2702
	pgstat_increment_changecount_after(beentry);
2703 2704
}

2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727
/* ----------
 * pgstat_report_appname() -
 *
 *	Called to update our application name.
 * ----------
 */
void
pgstat_report_appname(const char *appname)
{
	volatile PgBackendStatus *beentry = MyBEEntry;
	int			len;

	if (!beentry)
		return;

	/* This should be unnecessary if GUC did its job, but be safe */
	len = pg_mbcliplen(appname, strlen(appname), NAMEDATALEN - 1);

	/*
	 * Update my status entry, following the protocol of bumping
	 * st_changecount before and after.  We use a volatile pointer here to
	 * ensure the compiler doesn't try to get cute.
	 */
2728
	pgstat_increment_changecount_before(beentry);
2729 2730 2731 2732

	memcpy((char *) beentry->st_appname, appname, len);
	beentry->st_appname[len] = '\0';

2733
	pgstat_increment_changecount_after(beentry);
2734 2735
}

2736
/*
2737 2738
 * Report current transaction start timestamp as the specified value.
 * Zero means there is no active transaction.
2739 2740
 */
void
2741
pgstat_report_xact_timestamp(TimestampTz tstamp)
2742 2743 2744
{
	volatile PgBackendStatus *beentry = MyBEEntry;

2745
	if (!pgstat_track_activities || !beentry)
2746 2747 2748 2749
		return;

	/*
	 * Update my status entry, following the protocol of bumping
B
Bruce Momjian 已提交
2750 2751
	 * st_changecount before and after.  We use a volatile pointer here to
	 * ensure the compiler doesn't try to get cute.
2752
	 */
2753
	pgstat_increment_changecount_before(beentry);
2754
	beentry->st_xact_start_timestamp = tstamp;
2755
	pgstat_increment_changecount_after(beentry);
2756
}
2757

2758 2759 2760 2761
/* ----------
 * pgstat_report_waiting() -
 *
 *	Called from lock manager to report beginning or end of a lock wait.
2762 2763 2764
 *
 * NB: this *must* be able to survive being called before MyBEEntry has been
 * initialized.
2765 2766 2767 2768 2769
 * ----------
 */
void
pgstat_report_waiting(bool waiting)
{
2770
	volatile PgBackendStatus *beentry = MyBEEntry;
2771

2772
	if (!pgstat_track_activities || !beentry)
2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783
		return;

	/*
	 * Since this is a single-byte field in a struct that only this process
	 * may modify, there seems no need to bother with the st_changecount
	 * protocol.  The update must appear atomic in any case.
	 */
	beentry->st_waiting = waiting;
}


2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794
/* ----------
 * pgstat_read_current_status() -
 *
 *	Copy the current contents of the PgBackendStatus array to local memory,
 *	if not already done in this transaction.
 * ----------
 */
static void
pgstat_read_current_status(void)
{
	volatile PgBackendStatus *beentry;
2795 2796
	LocalPgBackendStatus *localtable;
	LocalPgBackendStatus *localentry;
2797 2798
	char	   *localappname,
			   *localactivity;
2799 2800 2801
	int			i;

	Assert(!pgStatRunningInCollector);
2802
	if (localBackendStatusTable)
2803 2804
		return;					/* already done */

2805 2806
	pgstat_setup_memcxt();

2807
	localtable = (LocalPgBackendStatus *)
2808
		MemoryContextAlloc(pgStatLocalContext,
2809
						   sizeof(LocalPgBackendStatus) * MaxBackends);
2810 2811 2812
	localappname = (char *)
		MemoryContextAlloc(pgStatLocalContext,
						   NAMEDATALEN * MaxBackends);
2813 2814 2815
	localactivity = (char *)
		MemoryContextAlloc(pgStatLocalContext,
						   pgstat_track_activity_query_size * MaxBackends);
2816 2817 2818
	localNumBackends = 0;

	beentry = BackendStatusArray;
2819
	localentry = localtable;
2820 2821 2822
	for (i = 1; i <= MaxBackends; i++)
	{
		/*
B
Bruce Momjian 已提交
2823 2824 2825 2826 2827
		 * Follow the protocol of retrying if st_changecount changes while we
		 * copy the entry, or if it's odd.  (The check for odd is needed to
		 * cover the case where we are able to completely copy the entry while
		 * the source backend is between increment steps.)	We use a volatile
		 * pointer here to ensure the compiler doesn't try to get cute.
2828 2829 2830
		 */
		for (;;)
		{
2831 2832 2833 2834
			int			before_changecount;
			int			after_changecount;

			pgstat_save_changecount_before(beentry, before_changecount);
2835

2836 2837
			localentry->backendStatus.st_procpid = beentry->st_procpid;
			if (localentry->backendStatus.st_procpid > 0)
2838
			{
2839
				memcpy(&localentry->backendStatus, (char *) beentry, sizeof(PgBackendStatus));
2840

2841 2842 2843 2844
				/*
				 * strcpy is safe even if the string is modified concurrently,
				 * because there's always a \0 at the end of the buffer.
				 */
2845
				strcpy(localappname, (char *) beentry->st_appname);
2846
				localentry->backendStatus.st_appname = localappname;
2847
				strcpy(localactivity, (char *) beentry->st_activity);
2848
				localentry->backendStatus.st_activity = localactivity;
2849
			}
2850

2851 2852 2853
			pgstat_save_changecount_after(beentry, after_changecount);
			if (before_changecount == after_changecount &&
				(before_changecount & 1) == 0)
2854 2855 2856 2857 2858 2859 2860 2861
				break;

			/* Make sure we can break out of loop if stuck... */
			CHECK_FOR_INTERRUPTS();
		}

		beentry++;
		/* Only valid entries get included into the local array */
2862
		if (localentry->backendStatus.st_procpid > 0)
2863
		{
2864 2865 2866 2867
			BackendIdGetTransactionIds(i,
									   &localentry->backend_xid,
									   &localentry->backend_xmin);

2868
			localentry++;
2869
			localappname += NAMEDATALEN;
2870
			localactivity += pgstat_track_activity_query_size;
2871 2872 2873 2874
			localNumBackends++;
		}
	}

2875 2876
	/* Set the pointer only after completion of a valid table */
	localBackendStatusTable = localtable;
2877 2878 2879
}


2880 2881 2882 2883
/* ----------
 * pgstat_get_backend_current_activity() -
 *
 *	Return a string representing the current activity of the backend with
B
Bruce Momjian 已提交
2884
 *	the specified PID.  This looks directly at the BackendStatusArray,
2885 2886 2887 2888
 *	and so will provide current information regardless of the age of our
 *	transaction's snapshot of the status array.
 *
 *	It is the caller's responsibility to invoke this only for backends whose
B
Bruce Momjian 已提交
2889
 *	state is expected to remain stable while the result is in use.  The
2890 2891 2892 2893 2894 2895 2896 2897 2898 2899
 *	only current use is in deadlock reporting, where we can expect that
 *	the target backend is blocked on a lock.  (There are corner cases
 *	where the target's wait could get aborted while we are looking at it,
 *	but the very worst consequence is to return a pointer to a string
 *	that's been changed, so we won't worry too much.)
 *
 *	Note: return strings for special cases match pg_stat_get_backend_activity.
 * ----------
 */
const char *
2900
pgstat_get_backend_current_activity(int pid, bool checkUser)
2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914
{
	PgBackendStatus *beentry;
	int			i;

	beentry = BackendStatusArray;
	for (i = 1; i <= MaxBackends; i++)
	{
		/*
		 * Although we expect the target backend's entry to be stable, that
		 * doesn't imply that anyone else's is.  To avoid identifying the
		 * wrong backend, while we check for a match to the desired PID we
		 * must follow the protocol of retrying if st_changecount changes
		 * while we examine the entry, or if it's odd.  (This might be
		 * unnecessary, since fetching or storing an int is almost certainly
2915 2916
		 * atomic, but let's play it safe.)  We use a volatile pointer here to
		 * ensure the compiler doesn't try to get cute.
2917 2918
		 */
		volatile PgBackendStatus *vbeentry = beentry;
2919
		bool		found;
2920 2921 2922

		for (;;)
		{
2923 2924 2925 2926
			int			before_changecount;
			int			after_changecount;

			pgstat_save_changecount_before(vbeentry, before_changecount);
2927 2928 2929

			found = (vbeentry->st_procpid == pid);

2930 2931 2932 2933
			pgstat_save_changecount_after(vbeentry, after_changecount);

			if (before_changecount == after_changecount &&
				(before_changecount & 1) == 0)
2934 2935 2936 2937 2938 2939 2940 2941 2942
				break;

			/* Make sure we can break out of loop if stuck... */
			CHECK_FOR_INTERRUPTS();
		}

		if (found)
		{
			/* Now it is safe to use the non-volatile pointer */
2943
			if (checkUser && !superuser() && beentry->st_userid != GetUserId())
2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957
				return "<insufficient privilege>";
			else if (*(beentry->st_activity) == '\0')
				return "<command string not enabled>";
			else
				return beentry->st_activity;
		}

		beentry++;
	}

	/* If we get here, caller is in error ... */
	return "<backend information not available>";
}

2958 2959 2960 2961
/* ----------
 * pgstat_get_crashed_backend_activity() -
 *
 *	Return a string representing the current activity of the backend with
B
Bruce Momjian 已提交
2962
 *	the specified PID.  Like the function above, but reads shared memory with
2963 2964 2965
 *	the expectation that it may be corrupt.  On success, copy the string
 *	into the "buffer" argument and return that pointer.  On failure,
 *	return NULL.
2966
 *
2967 2968
 *	This function is only intended to be used by the postmaster to report the
 *	query that crashed a backend.  In particular, no attempt is made to
2969
 *	follow the correct concurrency protocol when accessing the
2970
 *	BackendStatusArray.  But that's OK, in the worst case we'll return a
B
Bruce Momjian 已提交
2971
 *	corrupted message.  We also must take care not to trip on ereport(ERROR).
2972 2973 2974
 * ----------
 */
const char *
2975
pgstat_get_crashed_backend_activity(int pid, char *buffer, int buflen)
2976 2977 2978 2979 2980
{
	volatile PgBackendStatus *beentry;
	int			i;

	beentry = BackendStatusArray;
2981 2982 2983 2984 2985 2986 2987 2988

	/*
	 * We probably shouldn't get here before shared memory has been set up,
	 * but be safe.
	 */
	if (beentry == NULL || BackendActivityBuffer == NULL)
		return NULL;

2989 2990 2991 2992 2993 2994 2995 2996 2997
	for (i = 1; i <= MaxBackends; i++)
	{
		if (beentry->st_procpid == pid)
		{
			/* Read pointer just once, so it can't change after validation */
			const char *activity = beentry->st_activity;
			const char *activity_last;

			/*
2998 2999 3000 3001
			 * We mustn't access activity string before we verify that it
			 * falls within the BackendActivityBuffer. To make sure that the
			 * entire string including its ending is contained within the
			 * buffer, subtract one activity length from the buffer size.
3002 3003
			 */
			activity_last = BackendActivityBuffer + BackendActivityBufferSize
3004
				- pgstat_track_activity_query_size;
3005 3006 3007

			if (activity < BackendActivityBuffer ||
				activity > activity_last)
3008
				return NULL;
3009

3010 3011 3012
			/* If no string available, no point in a report */
			if (activity[0] == '\0')
				return NULL;
3013 3014 3015

			/*
			 * Copy only ASCII-safe characters so we don't run into encoding
3016 3017
			 * problems when reporting the message; and be sure not to run off
			 * the end of memory.
3018
			 */
3019 3020
			ascii_safe_strlcpy(buffer, activity,
							   Min(buflen, pgstat_track_activity_query_size));
3021 3022 3023 3024 3025 3026 3027 3028

			return buffer;
		}

		beentry++;
	}

	/* PID not found */
3029
	return NULL;
3030
}
3031

3032

3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060
/* ------------------------------------------------------------
 * Local support functions follow
 * ------------------------------------------------------------
 */


/* ----------
 * pgstat_setheader() -
 *
 *		Set common header fields in a statistics message
 * ----------
 */
static void
pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
{
	hdr->m_type = mtype;
}


/* ----------
 * pgstat_send() -
 *
 *		Send out one statistics message to the collector
 * ----------
 */
static void
pgstat_send(void *msg, int len)
{
3061 3062
	int			rc;

3063
	if (pgStatSock == PGINVALID_SOCKET)
3064
		return;
3065

3066
	((PgStat_MsgHdr *) msg)->m_size = len;
3067

3068 3069 3070 3071 3072 3073
	/* We'll retry after EINTR, but ignore all other failures */
	do
	{
		rc = send(pgStatSock, msg, len, 0);
	} while (rc < 0 && errno == EINTR);

3074
#ifdef USE_ASSERT_CHECKING
3075 3076
	/* In debug builds, log send failures ... */
	if (rc < 0)
3077 3078
		elog(LOG, "could not send to statistics collector: %m");
#endif
3079 3080
}

3081 3082 3083 3084 3085 3086 3087 3088 3089 3090
/* ----------
 * pgstat_send_archiver() -
 *
 *	Tell the collector about the WAL file that we successfully
 *	archived or failed to archive.
 * ----------
 */
void
pgstat_send_archiver(const char *xlog, bool failed)
{
B
Bruce Momjian 已提交
3091
	PgStat_MsgArchiver msg;
3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102

	/*
	 * Prepare and send the message
	 */
	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ARCHIVER);
	msg.m_failed = failed;
	strncpy(msg.m_xlog, xlog, sizeof(msg.m_xlog));
	msg.m_timestamp = GetCurrentTimestamp();
	pgstat_send(&msg, sizeof(msg));
}

3103 3104 3105
/* ----------
 * pgstat_send_bgwriter() -
 *
B
Bruce Momjian 已提交
3106
 *		Send bgwriter statistics to the collector
3107 3108 3109 3110 3111
 * ----------
 */
void
pgstat_send_bgwriter(void)
{
3112 3113 3114
	/* We assume this initializes to zeroes */
	static const PgStat_MsgBgWriter all_zeroes;

3115
	/*
B
Bruce Momjian 已提交
3116 3117 3118
	 * This function can be called even if nothing at all has happened. In
	 * this case, avoid sending a completely empty message to the stats
	 * collector.
3119
	 */
3120
	if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
3121 3122 3123 3124 3125 3126 3127 3128 3129
		return;

	/*
	 * Prepare and send the message
	 */
	pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
	pgstat_send(&BgWriterStats, sizeof(BgWriterStats));

	/*
3130
	 * Clear out the statistics buffer, so it can be re-used.
3131
	 */
3132
	MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
3133 3134
}

3135

3136 3137 3138
/* ----------
 * PgstatCollectorMain() -
 *
B
Bruce Momjian 已提交
3139
 *	Start up the statistics collector process.  This is the body of the
3140
 *	postmaster child process.
3141 3142 3143 3144
 *
 *	The argc/argv parameters are valid only in EXEC_BACKEND case.
 * ----------
 */
3145
NON_EXEC_STATIC void
3146
PgstatCollectorMain(int argc, char *argv[])
3147
{
3148
	int			len;
3149
	PgStat_Msg	msg;
3150
	int			wr;
3151

3152
	/*
3153
	 * Ignore all signals usually bound to some action in the postmaster,
3154
	 * except SIGHUP and SIGQUIT.  Note we don't need a SIGUSR1 handler to
3155
	 * support latch operations, because we only use a local latch.
3156
	 */
3157
	pqsignal(SIGHUP, pgstat_sighup_handler);
3158 3159
	pqsignal(SIGINT, SIG_IGN);
	pqsignal(SIGTERM, SIG_IGN);
3160
	pqsignal(SIGQUIT, pgstat_exit);
3161
	pqsignal(SIGALRM, SIG_IGN);
3162 3163 3164 3165 3166 3167 3168 3169 3170 3171
	pqsignal(SIGPIPE, SIG_IGN);
	pqsignal(SIGUSR1, SIG_IGN);
	pqsignal(SIGUSR2, SIG_IGN);
	pqsignal(SIGCHLD, SIG_DFL);
	pqsignal(SIGTTIN, SIG_DFL);
	pqsignal(SIGTTOU, SIG_DFL);
	pqsignal(SIGCONT, SIG_DFL);
	pqsignal(SIGWINCH, SIG_DFL);
	PG_SETMASK(&UnBlockSig);

3172 3173 3174
	/*
	 * Identify myself via ps
	 */
3175
	init_ps_display("stats collector process", "", "", "");
3176

3177
	/*
B
Bruce Momjian 已提交
3178 3179
	 * Read in an existing statistics stats file or initialize the stats to
	 * zero.
3180
	 */
3181
	pgStatRunningInCollector = true;
3182
	pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
3183 3184

	/*
3185 3186 3187
	 * Loop to process messages until we get SIGQUIT or detect ungraceful
	 * death of our parent postmaster.
	 *
3188 3189 3190 3191
	 * For performance reasons, we don't want to do ResetLatch/WaitLatch after
	 * every message; instead, do that only after a recv() fails to obtain a
	 * message.  (This effectively means that if backends are sending us stuff
	 * like mad, we won't notice postmaster death until things slack off a
3192
	 * bit; which seems fine.)	To do that, we have an inner loop that
3193 3194 3195 3196
	 * iterates as long as recv() succeeds.  We do recognize got_SIGHUP inside
	 * the inner loop, which means that such interrupts will get serviced but
	 * the latch won't get cleared until next time there is a break in the
	 * action.
3197 3198 3199
	 */
	for (;;)
	{
3200
		/* Clear any already-pending wakeups */
3201
		ResetLatch(MyLatch);
3202 3203 3204 3205 3206 3207 3208

		/*
		 * Quit if we get SIGQUIT from the postmaster.
		 */
		if (need_exit)
			break;

3209
		/*
3210 3211
		 * Inner loop iterates as long as we keep getting messages, or until
		 * need_exit becomes set.
3212
		 */
3213
		while (!need_exit)
3214
		{
3215 3216 3217 3218 3219 3220 3221 3222
			/*
			 * Reload configuration if we got SIGHUP from the postmaster.
			 */
			if (got_SIGHUP)
			{
				got_SIGHUP = false;
				ProcessConfigFile(PGC_SIGHUP);
			}
3223

3224 3225 3226 3227
			/*
			 * Write the stats file if a new request has arrived that is not
			 * satisfied by existing file.
			 */
3228 3229
			if (pgstat_write_statsfile_needed())
				pgstat_write_statsfiles(false, false);
3230

3231 3232 3233
			/*
			 * Try to receive and process a message.  This will not block,
			 * since the socket is set to non-blocking mode.
3234
			 *
3235 3236
			 * XXX On Windows, we have to force pgwin32_recv to cooperate,
			 * despite the previous use of pg_set_noblock() on the socket.
3237
			 * This is extremely broken and should be fixed someday.
3238
			 */
3239 3240 3241 3242
#ifdef WIN32
			pgwin32_noblock = 1;
#endif

3243 3244
			len = recv(pgStatSock, (char *) &msg,
					   sizeof(PgStat_Msg), 0);
3245 3246 3247 3248 3249

#ifdef WIN32
			pgwin32_noblock = 0;
#endif

3250
			if (len < 0)
3251
			{
3252 3253
				if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
					break;		/* out of inner loop */
3254 3255 3256
				ereport(ERROR,
						(errcode_for_socket_access(),
						 errmsg("could not read statistics message: %m")));
3257
			}
3258

3259
			/*
3260
			 * We ignore messages that are smaller than our common header
3261
			 */
3262 3263
			if (len < sizeof(PgStat_MsgHdr))
				continue;
3264

3265
			/*
3266
			 * The received length must match the length in the header
3267
			 */
3268 3269
			if (msg.msg_hdr.m_size != len)
				continue;
3270 3271

			/*
3272
			 * O.K. - we accept this message.  Process it.
3273 3274 3275 3276 3277 3278
			 */
			switch (msg.msg_hdr.m_type)
			{
				case PGSTAT_MTYPE_DUMMY:
					break;

3279 3280 3281 3282
				case PGSTAT_MTYPE_INQUIRY:
					pgstat_recv_inquiry((PgStat_MsgInquiry *) &msg, len);
					break;

3283
				case PGSTAT_MTYPE_TABSTAT:
3284
					pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len);
3285 3286 3287
					break;

				case PGSTAT_MTYPE_TABPURGE:
3288
					pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, len);
3289 3290 3291
					break;

				case PGSTAT_MTYPE_DROPDB:
3292
					pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, len);
3293 3294 3295
					break;

				case PGSTAT_MTYPE_RESETCOUNTER:
3296
					pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
3297
											 len);
3298 3299
					break;

3300 3301
				case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
					pgstat_recv_resetsharedcounter(
B
Bruce Momjian 已提交
3302 3303
									   (PgStat_MsgResetsharedcounter *) &msg,
												   len);
3304 3305
					break;

3306 3307
				case PGSTAT_MTYPE_RESETSINGLECOUNTER:
					pgstat_recv_resetsinglecounter(
B
Bruce Momjian 已提交
3308 3309
									   (PgStat_MsgResetsinglecounter *) &msg,
												   len);
3310 3311
					break;

3312
				case PGSTAT_MTYPE_AUTOVAC_START:
3313
					pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len);
3314 3315 3316
					break;

				case PGSTAT_MTYPE_VACUUM:
3317
					pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, len);
3318 3319 3320
					break;

				case PGSTAT_MTYPE_ANALYZE:
3321
					pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
3322 3323
					break;

3324 3325 3326 3327
				case PGSTAT_MTYPE_ARCHIVER:
					pgstat_recv_archiver((PgStat_MsgArchiver *) &msg, len);
					break;

3328
				case PGSTAT_MTYPE_BGWRITER:
3329
					pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len);
3330 3331
					break;

3332 3333 3334
				case PGSTAT_MTYPE_FUNCSTAT:
					pgstat_recv_funcstat((PgStat_MsgFuncstat *) &msg, len);
					break;
3335 3336 3337 3338 3339

				case PGSTAT_MTYPE_FUNCPURGE:
					pgstat_recv_funcpurge((PgStat_MsgFuncpurge *) &msg, len);
					break;

3340 3341 3342 3343
				case PGSTAT_MTYPE_RECOVERYCONFLICT:
					pgstat_recv_recoveryconflict((PgStat_MsgRecoveryConflict *) &msg, len);
					break;

3344 3345 3346 3347
				case PGSTAT_MTYPE_DEADLOCK:
					pgstat_recv_deadlock((PgStat_MsgDeadlock *) &msg, len);
					break;

3348 3349 3350 3351
				case PGSTAT_MTYPE_TEMPFILE:
					pgstat_recv_tempfile((PgStat_MsgTempFile *) &msg, len);
					break;

3352 3353 3354
				default:
					break;
			}
3355 3356 3357
		}						/* end of inner message-processing loop */

		/* Sleep until there's something to do */
3358
#ifndef WIN32
3359
		wr = WaitLatchOrSocket(MyLatch,
3360
					 WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
3361 3362 3363
							   pgStatSock,
							   -1L);
#else
3364

3365 3366
		/*
		 * Windows, at least in its Windows Server 2003 R2 incarnation,
B
Bruce Momjian 已提交
3367
		 * sometimes loses FD_READ events.  Waking up and retrying the recv()
3368 3369 3370 3371 3372 3373 3374
		 * fixes that, so don't sleep indefinitely.  This is a crock of the
		 * first water, but until somebody wants to debug exactly what's
		 * happening there, this is the best we can do.  The two-second
		 * timeout matches our pre-9.2 behavior, and needs to be short enough
		 * to not provoke "pgstat wait timeout" complaints from
		 * backend_read_statsfile.
		 */
3375
		wr = WaitLatchOrSocket(MyLatch,
3376
		WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT,
3377
							   pgStatSock,
3378
							   2 * 1000L /* msec */ );
3379
#endif
3380 3381 3382 3383 3384 3385 3386 3387

		/*
		 * Emergency bailout if postmaster has died.  This is to avoid the
		 * necessity for manual cleanup of all postmaster children.
		 */
		if (wr & WL_POSTMASTER_DEATH)
			break;
	}							/* end of outer loop */
3388

3389 3390 3391
	/*
	 * Save the final stats to reuse at next startup.
	 */
3392
	pgstat_write_statsfiles(true, true);
3393

3394
	exit(0);
3395 3396
}

3397 3398

/* SIGQUIT signal handler for collector process */
3399 3400 3401
static void
pgstat_exit(SIGNAL_ARGS)
{
3402 3403
	int			save_errno = errno;

3404
	need_exit = true;
3405
	SetLatch(MyLatch);
3406 3407

	errno = save_errno;
3408 3409
}

3410 3411 3412 3413
/* SIGHUP handler for collector process */
static void
pgstat_sighup_handler(SIGNAL_ARGS)
{
3414 3415
	int			save_errno = errno;

3416
	got_SIGHUP = true;
3417
	SetLatch(MyLatch);
3418 3419

	errno = save_errno;
3420 3421
}

3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461
/*
 * Subroutine to clear stats in a database entry
 *
 * Tables and functions hashes are initialized to empty.
 */
static void
reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
{
	HASHCTL		hash_ctl;

	dbentry->n_xact_commit = 0;
	dbentry->n_xact_rollback = 0;
	dbentry->n_blocks_fetched = 0;
	dbentry->n_blocks_hit = 0;
	dbentry->n_tuples_returned = 0;
	dbentry->n_tuples_fetched = 0;
	dbentry->n_tuples_inserted = 0;
	dbentry->n_tuples_updated = 0;
	dbentry->n_tuples_deleted = 0;
	dbentry->last_autovac_time = 0;
	dbentry->n_conflict_tablespace = 0;
	dbentry->n_conflict_lock = 0;
	dbentry->n_conflict_snapshot = 0;
	dbentry->n_conflict_bufferpin = 0;
	dbentry->n_conflict_startup_deadlock = 0;
	dbentry->n_temp_files = 0;
	dbentry->n_temp_bytes = 0;
	dbentry->n_deadlocks = 0;
	dbentry->n_block_read_time = 0;
	dbentry->n_block_write_time = 0;

	dbentry->stat_reset_timestamp = GetCurrentTimestamp();
	dbentry->stats_timestamp = 0;

	memset(&hash_ctl, 0, sizeof(hash_ctl));
	hash_ctl.keysize = sizeof(Oid);
	hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
	dbentry->tables = hash_create("Per-database table",
								  PGSTAT_TAB_HASH_SIZE,
								  &hash_ctl,
3462
								  HASH_ELEM | HASH_BLOBS);
3463 3464 3465 3466 3467 3468

	hash_ctl.keysize = sizeof(Oid);
	hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
	dbentry->functions = hash_create("Per-database function",
									 PGSTAT_FUNCTION_HASH_SIZE,
									 &hash_ctl,
3469
									 HASH_ELEM | HASH_BLOBS);
3470
}
3471

3472 3473
/*
 * Lookup the hash table entry for the specified database. If no hash
3474 3475
 * table entry exists, initialize it, if the create parameter is true.
 * Else, return NULL.
3476 3477
 */
static PgStat_StatDBEntry *
3478
pgstat_get_db_entry(Oid databaseid, bool create)
3479 3480
{
	PgStat_StatDBEntry *result;
B
Bruce Momjian 已提交
3481 3482
	bool		found;
	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
3483 3484 3485 3486

	/* Lookup or create the hash table entry for this database */
	result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
												&databaseid,
3487 3488 3489 3490
												action, &found);

	if (!create && !found)
		return NULL;
3491

3492 3493 3494 3495
	/*
	 * If not found, initialize the new one.  This creates empty hash tables
	 * for tables and functions, too.
	 */
3496
	if (!found)
3497
		reset_dbentry_counters(result);
3498

3499
	return result;
3500 3501 3502
}


3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534
/*
 * Lookup the hash table entry for the specified table. If no hash
 * table entry exists, initialize it, if the create parameter is true.
 * Else, return NULL.
 */
static PgStat_StatTabEntry *
pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
{
	PgStat_StatTabEntry *result;
	bool		found;
	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);

	/* Lookup or create the hash table entry for this table */
	result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
												 &tableoid,
												 action, &found);

	if (!create && !found)
		return NULL;

	/* If not found, initialize the new one. */
	if (!found)
	{
		result->numscans = 0;
		result->tuples_returned = 0;
		result->tuples_fetched = 0;
		result->tuples_inserted = 0;
		result->tuples_updated = 0;
		result->tuples_deleted = 0;
		result->tuples_hot_updated = 0;
		result->n_live_tuples = 0;
		result->n_dead_tuples = 0;
3535
		result->changes_since_analyze = 0;
3536 3537 3538
		result->blocks_fetched = 0;
		result->blocks_hit = 0;
		result->vacuum_timestamp = 0;
3539
		result->vacuum_count = 0;
3540
		result->autovac_vacuum_timestamp = 0;
3541
		result->autovac_vacuum_count = 0;
3542
		result->analyze_timestamp = 0;
3543
		result->analyze_count = 0;
3544
		result->autovac_analyze_timestamp = 0;
3545
		result->autovac_analyze_count = 0;
3546 3547 3548 3549 3550 3551
	}

	return result;
}


3552
/* ----------
3553 3554
 * pgstat_write_statsfiles() -
 *		Write the global statistics file, as well as requested DB files.
3555
 *
3556 3557
 *	If writing to the permanent files (happens when the collector is
 *	shutting down only), remove the temporary files so that backends
3558 3559
 *	starting up under a new postmaster can't read the old data before
 *	the new collector is ready.
3560 3561 3562 3563
 *
 *	When 'allDbs' is false, only the requested databases (listed in
 *	last_statrequests) will be written; otherwise, all databases will be
 *	written.
3564 3565 3566
 * ----------
 */
static void
3567
pgstat_write_statsfiles(bool permanent, bool allDbs)
3568
{
3569 3570 3571
	HASH_SEQ_STATUS hstat;
	PgStat_StatDBEntry *dbentry;
	FILE	   *fpout;
3572
	int32		format_id;
3573 3574
	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3575
	int			rc;
3576

3577
	elog(DEBUG2, "writing stats file \"%s\"", statfile);
3578

3579
	/*
3580
	 * Open the statistics temp file to write out the current values.
3581
	 */
3582
	fpout = AllocateFile(tmpfile, PG_BINARY_W);
3583 3584
	if (fpout == NULL)
	{
3585 3586
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
3587
				 errmsg("could not open temporary statistics file \"%s\": %m",
3588
						tmpfile)));
3589 3590 3591
		return;
	}

3592 3593 3594 3595 3596
	/*
	 * Set the timestamp of the stats file.
	 */
	globalStats.stats_timestamp = GetCurrentTimestamp();

3597 3598 3599 3600
	/*
	 * Write the file header --- currently just a format ID.
	 */
	format_id = PGSTAT_FILE_FORMAT_ID;
3601 3602
	rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
	(void) rc;					/* we'll check for error with ferror */
3603

3604 3605 3606
	/*
	 * Write global stats struct
	 */
3607 3608
	rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
	(void) rc;					/* we'll check for error with ferror */
3609

3610 3611 3612 3613 3614 3615
	/*
	 * Write archiver stats struct
	 */
	rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
	(void) rc;					/* we'll check for error with ferror */

3616 3617 3618 3619
	/*
	 * Walk through the database table.
	 */
	hash_seq_init(&hstat, pgStatDBHash);
3620
	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
3621 3622
	{
		/*
3623 3624 3625 3626 3627
		 * Write out the tables and functions into the DB stat file, if
		 * required.
		 *
		 * We need to do this before the dbentry write, to ensure the
		 * timestamps written to both are consistent.
3628
		 */
3629
		if (allDbs || pgstat_db_requested(dbentry->databaseid))
3630
		{
3631 3632
			dbentry->stats_timestamp = globalStats.stats_timestamp;
			pgstat_write_db_statsfile(dbentry, permanent);
3633 3634
		}

3635
		/*
3636 3637
		 * Write out the DB entry. We don't write the tables or functions
		 * pointers, since they're of no use to any other process.
3638
		 */
3639 3640 3641
		fputc('D', fpout);
		rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
		(void) rc;				/* we'll check for error with ferror */
3642 3643 3644
	}

	/*
3645
	 * No more output to be done. Close the temp file and replace the old
3646 3647
	 * pgstat.stat with it.  The ferror() check replaces testing for error
	 * after each individual fputc or fwrite above.
3648 3649
	 */
	fputc('E', fpout);
3650 3651 3652 3653 3654

	if (ferror(fpout))
	{
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
3655
			   errmsg("could not write temporary statistics file \"%s\": %m",
3656
					  tmpfile)));
3657
		FreeFile(fpout);
3658
		unlink(tmpfile);
3659
	}
3660
	else if (FreeFile(fpout) < 0)
3661
	{
3662 3663
		ereport(LOG,
				(errcode_for_file_access(),
B
Bruce Momjian 已提交
3664
			   errmsg("could not close temporary statistics file \"%s\": %m",
3665 3666
					  tmpfile)));
		unlink(tmpfile);
3667
	}
3668
	else if (rename(tmpfile, statfile) < 0)
3669
	{
3670 3671 3672
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3673 3674
						tmpfile, statfile)));
		unlink(tmpfile);
3675
	}
3676 3677 3678 3679 3680 3681 3682 3683 3684

	if (permanent)
		unlink(pgstat_stat_filename);

	/*
	 * Now throw away the list of requests.  Note that requests sent after we
	 * started the write are still waiting on the network socket.
	 */
	if (!slist_is_empty(&last_statrequests))
3685
	{
3686
		slist_mutable_iter iter;
3687

3688 3689 3690 3691 3692 3693 3694
		/*
		 * Strictly speaking we should do slist_delete_current() before
		 * freeing each request struct.  We skip that and instead
		 * re-initialize the list header at the end.  Nonetheless, we must use
		 * slist_foreach_modify, not just slist_foreach, since we will free
		 * the node's storage before advancing.
		 */
3695
		slist_foreach_modify(iter, &last_statrequests)
3696
		{
3697
			DBWriteRequest *req;
3698

3699 3700
			req = slist_container(DBWriteRequest, next, iter.cur);
			pfree(req);
3701
		}
3702 3703

		slist_init(&last_statrequests);
3704
	}
3705
}
3706

3707 3708 3709 3710 3711 3712 3713 3714 3715 3716
/*
 * return the filename for a DB stat file; filename is the output buffer,
 * of length len.
 */
static void
get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
					char *filename, int len)
{
	int			printed;

3717
	/* NB -- pgstat_reset_remove_files knows about the pattern this uses */
3718 3719 3720 3721 3722 3723 3724
	printed = snprintf(filename, len, "%s/db_%u.%s",
					   permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
					   pgstat_stat_directory,
					   databaseid,
					   tempname ? "tmp" : "stat");
	if (printed > len)
		elog(ERROR, "overlength pgstat path");
3725 3726
}

3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753
/* ----------
 * pgstat_write_db_statsfile() -
 *		Write the stat file for a single database.
 *
 *	If writing to the permanent file (happens when the collector is
 *	shutting down only), remove the temporary file so that backends
 *	starting up under a new postmaster can't read the old data before
 *	the new collector is ready.
 * ----------
 */
static void
pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
{
	HASH_SEQ_STATUS tstat;
	HASH_SEQ_STATUS fstat;
	PgStat_StatTabEntry *tabentry;
	PgStat_StatFuncEntry *funcentry;
	FILE	   *fpout;
	int32		format_id;
	Oid			dbid = dbentry->databaseid;
	int			rc;
	char		tmpfile[MAXPGPATH];
	char		statfile[MAXPGPATH];

	get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
	get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);

3754
	elog(DEBUG2, "writing stats file \"%s\"", statfile);
3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834

	/*
	 * Open the statistics temp file to write out the current values.
	 */
	fpout = AllocateFile(tmpfile, PG_BINARY_W);
	if (fpout == NULL)
	{
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not open temporary statistics file \"%s\": %m",
						tmpfile)));
		return;
	}

	/*
	 * Write the file header --- currently just a format ID.
	 */
	format_id = PGSTAT_FILE_FORMAT_ID;
	rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
	(void) rc;					/* we'll check for error with ferror */

	/*
	 * Walk through the database's access stats per table.
	 */
	hash_seq_init(&tstat, dbentry->tables);
	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
	{
		fputc('T', fpout);
		rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
		(void) rc;				/* we'll check for error with ferror */
	}

	/*
	 * Walk through the database's function stats table.
	 */
	hash_seq_init(&fstat, dbentry->functions);
	while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
	{
		fputc('F', fpout);
		rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
		(void) rc;				/* we'll check for error with ferror */
	}

	/*
	 * No more output to be done. Close the temp file and replace the old
	 * pgstat.stat with it.  The ferror() check replaces testing for error
	 * after each individual fputc or fwrite above.
	 */
	fputc('E', fpout);

	if (ferror(fpout))
	{
		ereport(LOG,
				(errcode_for_file_access(),
			   errmsg("could not write temporary statistics file \"%s\": %m",
					  tmpfile)));
		FreeFile(fpout);
		unlink(tmpfile);
	}
	else if (FreeFile(fpout) < 0)
	{
		ereport(LOG,
				(errcode_for_file_access(),
			   errmsg("could not close temporary statistics file \"%s\": %m",
					  tmpfile)));
		unlink(tmpfile);
	}
	else if (rename(tmpfile, statfile) < 0)
	{
		ereport(LOG,
				(errcode_for_file_access(),
				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
						tmpfile, statfile)));
		unlink(tmpfile);
	}

	if (permanent)
	{
		get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);

3835
		elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
3836 3837 3838
		unlink(statfile);
	}
}
3839 3840

/* ----------
3841
 * pgstat_read_statsfiles() -
3842
 *
3843 3844 3845 3846 3847 3848 3849 3850
 *	Reads in the existing statistics collector files and initializes the
 *	databases' hash table.  If the permanent file name is requested (which
 *	only happens in the stats collector itself), also remove the file after
 *	reading; the in-memory status is now authoritative, and the permanent file
 *	would be out of date in case somebody else reads it.
 *
 *	If a deep read is requested, table/function stats are read also, otherwise
 *	the table/function hash tables remain empty.
3851 3852
 * ----------
 */
3853
static HTAB *
3854
pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
3855
{
3856 3857 3858
	PgStat_StatDBEntry *dbentry;
	PgStat_StatDBEntry dbbuf;
	HASHCTL		hash_ctl;
3859
	HTAB	   *dbhash;
3860
	FILE	   *fpin;
3861
	int32		format_id;
3862
	bool		found;
3863
	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3864 3865

	/*
3866
	 * The tables will live in pgStatLocalContext.
3867
	 */
3868
	pgstat_setup_memcxt();
3869 3870 3871 3872 3873

	/*
	 * Create the DB hashtable
	 */
	memset(&hash_ctl, 0, sizeof(hash_ctl));
3874
	hash_ctl.keysize = sizeof(Oid);
3875
	hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
3876 3877
	hash_ctl.hcxt = pgStatLocalContext;
	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
3878
						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
3879

3880
	/*
B
Bruce Momjian 已提交
3881 3882
	 * Clear out global and archiver statistics so they start from zero in
	 * case we can't load an existing statsfile.
3883 3884
	 */
	memset(&globalStats, 0, sizeof(globalStats));
3885
	memset(&archiverStats, 0, sizeof(archiverStats));
3886

3887 3888
	/*
	 * Set the current timestamp (will be kept only in case we can't load an
3889
	 * existing statsfile).
3890 3891
	 */
	globalStats.stat_reset_timestamp = GetCurrentTimestamp();
3892
	archiverStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
3893

3894
	/*
3895
	 * Try to open the stats file. If it doesn't exist, the backends simply
B
Bruce Momjian 已提交
3896 3897
	 * return zero for anything and the collector simply starts from scratch
	 * with empty counters.
3898 3899 3900 3901
	 *
	 * ENOENT is a possibility if the stats collector is not running or has
	 * not yet written the stats file the first time.  Any other failure
	 * condition is suspicious.
3902
	 */
3903
	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
3904 3905 3906 3907 3908 3909
	{
		if (errno != ENOENT)
			ereport(pgStatRunningInCollector ? LOG : WARNING,
					(errcode_for_file_access(),
					 errmsg("could not open statistics file \"%s\": %m",
							statfile)));
3910
		return dbhash;
3911
	}
3912

3913 3914 3915
	/*
	 * Verify it's of the expected format.
	 */
3916 3917
	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
		format_id != PGSTAT_FILE_FORMAT_ID)
3918 3919
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
3920
				(errmsg("corrupted statistics file \"%s\"", statfile)));
3921 3922 3923
		goto done;
	}

3924 3925 3926 3927 3928 3929
	/*
	 * Read global stats struct
	 */
	if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
3930
				(errmsg("corrupted statistics file \"%s\"", statfile)));
3931 3932 3933
		goto done;
	}

3934 3935 3936 3937 3938 3939 3940 3941 3942 3943
	/*
	 * Read archiver stats struct
	 */
	if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted statistics file \"%s\"", statfile)));
		goto done;
	}

3944
	/*
3945 3946
	 * We found an existing collector stats file. Read it and put all the
	 * hashtable entries into place.
3947 3948 3949 3950 3951
	 */
	for (;;)
	{
		switch (fgetc(fpin))
		{
3952 3953
				/*
				 * 'D'	A PgStat_StatDBEntry struct describing a database
3954
				 * follows.
3955
				 */
3956
			case 'D':
3957 3958
				if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
						  fpin) != offsetof(PgStat_StatDBEntry, tables))
3959
				{
3960
					ereport(pgStatRunningInCollector ? LOG : WARNING,
3961 3962
							(errmsg("corrupted statistics file \"%s\"",
									statfile)));
3963
					goto done;
3964 3965 3966 3967 3968
				}

				/*
				 * Add to the DB hash
				 */
3969
				dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
B
Bruce Momjian 已提交
3970
												  (void *) &dbbuf.databaseid,
3971 3972
															 HASH_ENTER,
															 &found);
3973 3974
				if (found)
				{
3975
					ereport(pgStatRunningInCollector ? LOG : WARNING,
3976 3977
							(errmsg("corrupted statistics file \"%s\"",
									statfile)));
3978
					goto done;
3979 3980 3981
				}

				memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
3982
				dbentry->tables = NULL;
3983
				dbentry->functions = NULL;
3984 3985

				/*
3986 3987
				 * Don't collect tables if not the requested DB (or the
				 * shared-table info)
3988
				 */
3989 3990 3991 3992
				if (onlydb != InvalidOid)
				{
					if (dbbuf.databaseid != onlydb &&
						dbbuf.databaseid != InvalidOid)
B
Bruce Momjian 已提交
3993
						break;
3994
				}
3995 3996

				memset(&hash_ctl, 0, sizeof(hash_ctl));
3997
				hash_ctl.keysize = sizeof(Oid);
3998
				hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3999
				hash_ctl.hcxt = pgStatLocalContext;
4000 4001 4002
				dbentry->tables = hash_create("Per-database table",
											  PGSTAT_TAB_HASH_SIZE,
											  &hash_ctl,
4003
									  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4004

4005 4006 4007 4008 4009 4010
				hash_ctl.keysize = sizeof(Oid);
				hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
				hash_ctl.hcxt = pgStatLocalContext;
				dbentry->functions = hash_create("Per-database function",
												 PGSTAT_FUNCTION_HASH_SIZE,
												 &hash_ctl,
4011
									  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4012

4013
				/*
4014 4015 4016 4017
				 * If requested, read the data from the database-specific
				 * file. If there was onlydb specified (!= InvalidOid), we
				 * would not get here because of a break above. So we don't
				 * need to recheck.
4018
				 */
4019 4020 4021 4022 4023
				if (deep)
					pgstat_read_db_statsfile(dbentry->databaseid,
											 dbentry->tables,
											 dbentry->functions,
											 permanent);
4024 4025 4026

				break;

4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043
			case 'E':
				goto done;

			default:
				ereport(pgStatRunningInCollector ? LOG : WARNING,
						(errmsg("corrupted statistics file \"%s\"",
								statfile)));
				goto done;
		}
	}

done:
	FreeFile(fpin);

	/* If requested to read the permanent file, also get rid of it. */
	if (permanent)
	{
4044
		elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114
		unlink(statfile);
	}

	return dbhash;
}


/* ----------
 * pgstat_read_db_statsfile() -
 *
 *	Reads in the existing statistics collector file for the given database,
 *	and initializes the tables and functions hash tables.
 *
 *	As pgstat_read_statsfiles, if the permanent file is requested, it is
 *	removed after reading.
 * ----------
 */
static void
pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
						 bool permanent)
{
	PgStat_StatTabEntry *tabentry;
	PgStat_StatTabEntry tabbuf;
	PgStat_StatFuncEntry funcbuf;
	PgStat_StatFuncEntry *funcentry;
	FILE	   *fpin;
	int32		format_id;
	bool		found;
	char		statfile[MAXPGPATH];

	get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);

	/*
	 * Try to open the stats file. If it doesn't exist, the backends simply
	 * return zero for anything and the collector simply starts from scratch
	 * with empty counters.
	 *
	 * ENOENT is a possibility if the stats collector is not running or has
	 * not yet written the stats file the first time.  Any other failure
	 * condition is suspicious.
	 */
	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
	{
		if (errno != ENOENT)
			ereport(pgStatRunningInCollector ? LOG : WARNING,
					(errcode_for_file_access(),
					 errmsg("could not open statistics file \"%s\": %m",
							statfile)));
		return;
	}

	/*
	 * Verify it's of the expected format.
	 */
	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
		format_id != PGSTAT_FILE_FORMAT_ID)
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted statistics file \"%s\"", statfile)));
		goto done;
	}

	/*
	 * We found an existing collector stats file. Read it and put all the
	 * hashtable entries into place.
	 */
	for (;;)
	{
		switch (fgetc(fpin))
		{
4115 4116 4117
				/*
				 * 'T'	A PgStat_StatTabEntry follows.
				 */
4118
			case 'T':
4119 4120
				if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
						  fpin) != sizeof(PgStat_StatTabEntry))
4121
				{
4122
					ereport(pgStatRunningInCollector ? LOG : WARNING,
4123 4124
							(errmsg("corrupted statistics file \"%s\"",
									statfile)));
4125
					goto done;
4126 4127 4128 4129 4130 4131 4132 4133
				}

				/*
				 * Skip if table belongs to a not requested database.
				 */
				if (tabhash == NULL)
					break;

4134
				tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
B
Bruce Momjian 已提交
4135 4136
													(void *) &tabbuf.tableid,
														 HASH_ENTER, &found);
4137 4138 4139

				if (found)
				{
4140
					ereport(pgStatRunningInCollector ? LOG : WARNING,
4141 4142
							(errmsg("corrupted statistics file \"%s\"",
									statfile)));
4143
					goto done;
4144 4145 4146 4147 4148
				}

				memcpy(tabentry, &tabbuf, sizeof(tabbuf));
				break;

4149 4150 4151 4152 4153 4154 4155 4156
				/*
				 * 'F'	A PgStat_StatFuncEntry follows.
				 */
			case 'F':
				if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
						  fpin) != sizeof(PgStat_StatFuncEntry))
				{
					ereport(pgStatRunningInCollector ? LOG : WARNING,
4157 4158
							(errmsg("corrupted statistics file \"%s\"",
									statfile)));
4159 4160 4161 4162 4163 4164 4165 4166 4167 4168
					goto done;
				}

				/*
				 * Skip if function belongs to a not requested database.
				 */
				if (funchash == NULL)
					break;

				funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
4169
												(void *) &funcbuf.functionid,
4170 4171 4172 4173 4174
														 HASH_ENTER, &found);

				if (found)
				{
					ereport(pgStatRunningInCollector ? LOG : WARNING,
4175 4176
							(errmsg("corrupted statistics file \"%s\"",
									statfile)));
4177 4178 4179 4180 4181 4182
					goto done;
				}

				memcpy(funcentry, &funcbuf, sizeof(funcbuf));
				break;

4183
				/*
4184
				 * 'E'	The EOF marker of a complete stats file.
4185
				 */
4186 4187
			case 'E':
				goto done;
4188

4189 4190
			default:
				ereport(pgStatRunningInCollector ? LOG : WARNING,
4191 4192
						(errmsg("corrupted statistics file \"%s\"",
								statfile)));
4193 4194 4195
				goto done;
		}
	}
4196

4197 4198
done:
	FreeFile(fpin);
4199

4200
	if (permanent)
4201
	{
4202
		elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4203 4204
		unlink(statfile);
	}
4205

4206
	return;
4207
}
4208

4209
/* ----------
4210 4211 4212 4213 4214 4215 4216
 * pgstat_read_db_statsfile_timestamp() -
 *
 *	Attempt to determine the timestamp of the last db statfile write.
 *	Returns TRUE if successful; the timestamp is stored in *ts.
 *
 *	This needs to be careful about handling databases for which no stats file
 *	exists, such as databases without a stat entry or those not yet written:
4217
 *
4218 4219 4220 4221
 *	- if there's a database entry in the global file, return the corresponding
 *	stats_timestamp value.
 *
 *	- if there's no db stat entry (e.g. for a new or inactive database),
4222
 *	there's no stats_timestamp value, but also nothing to write so we return
4223
 *	the timestamp of the global statfile.
4224 4225 4226
 * ----------
 */
static bool
4227 4228
pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
								   TimestampTz *ts)
4229
{
4230
	PgStat_StatDBEntry dbentry;
4231
	PgStat_GlobalStats myGlobalStats;
4232
	PgStat_ArchiverStats myArchiverStats;
4233 4234
	FILE	   *fpin;
	int32		format_id;
4235
	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4236 4237

	/*
B
Bruce Momjian 已提交
4238
	 * Try to open the stats file.  As above, anything but ENOENT is worthy of
4239
	 * complaining about.
4240 4241
	 */
	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4242 4243 4244 4245 4246 4247
	{
		if (errno != ENOENT)
			ereport(pgStatRunningInCollector ? LOG : WARNING,
					(errcode_for_file_access(),
					 errmsg("could not open statistics file \"%s\": %m",
							statfile)));
4248
		return false;
4249
	}
4250 4251 4252 4253

	/*
	 * Verify it's of the expected format.
	 */
4254 4255
	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
		format_id != PGSTAT_FILE_FORMAT_ID)
4256
	{
4257 4258
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted statistics file \"%s\"", statfile)));
4259 4260 4261 4262 4263 4264 4265
		FreeFile(fpin);
		return false;
	}

	/*
	 * Read global stats struct
	 */
4266 4267
	if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
			  fpin) != sizeof(myGlobalStats))
4268
	{
4269 4270
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted statistics file \"%s\"", statfile)));
4271 4272 4273 4274
		FreeFile(fpin);
		return false;
	}

4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286
	/*
	 * Read archiver stats struct
	 */
	if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
			  fpin) != sizeof(myArchiverStats))
	{
		ereport(pgStatRunningInCollector ? LOG : WARNING,
				(errmsg("corrupted statistics file \"%s\"", statfile)));
		FreeFile(fpin);
		return false;
	}

4287
	/* By default, we're going to return the timestamp of the global file. */
4288 4289
	*ts = myGlobalStats.stats_timestamp;

4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335
	/*
	 * We found an existing collector stats file.  Read it and look for a
	 * record for the requested database.  If found, use its timestamp.
	 */
	for (;;)
	{
		switch (fgetc(fpin))
		{
				/*
				 * 'D'	A PgStat_StatDBEntry struct describing a database
				 * follows.
				 */
			case 'D':
				if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
						  fpin) != offsetof(PgStat_StatDBEntry, tables))
				{
					ereport(pgStatRunningInCollector ? LOG : WARNING,
							(errmsg("corrupted statistics file \"%s\"",
									statfile)));
					goto done;
				}

				/*
				 * If this is the DB we're looking for, save its timestamp and
				 * we're done.
				 */
				if (dbentry.databaseid == databaseid)
				{
					*ts = dbentry.stats_timestamp;
					goto done;
				}

				break;

			case 'E':
				goto done;

			default:
				ereport(pgStatRunningInCollector ? LOG : WARNING,
						(errmsg("corrupted statistics file \"%s\"",
								statfile)));
				goto done;
		}
	}

done:
4336 4337 4338 4339
	FreeFile(fpin);
	return true;
}

4340
/*
4341 4342 4343
 * If not already done, read the statistics collector stats file into
 * some hash tables.  The results will be kept until pgstat_clear_snapshot()
 * is called (typically, at end of transaction).
4344 4345 4346 4347
 */
static void
backend_read_statsfile(void)
{
4348 4349
	TimestampTz min_ts = 0;
	TimestampTz ref_ts = 0;
4350 4351
	int			count;

4352 4353 4354 4355 4356
	/* already read it? */
	if (pgStatDBHash)
		return;
	Assert(!pgStatRunningInCollector);

4357 4358
	/*
	 * Loop until fresh enough stats file is available or we ran out of time.
4359
	 * The stats inquiry message is sent repeatedly in case collector drops
4360
	 * it; but not every single time, as that just swamps the collector.
4361 4362 4363
	 */
	for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
	{
4364
		bool		ok;
4365
		TimestampTz file_ts = 0;
4366
		TimestampTz cur_ts;
4367 4368 4369

		CHECK_FOR_INTERRUPTS();

4370
		ok = pgstat_read_db_statsfile_timestamp(MyDatabaseId, false, &file_ts);
4371 4372 4373 4374 4375 4376 4377 4378

		cur_ts = GetCurrentTimestamp();
		/* Calculate min acceptable timestamp, if we didn't already */
		if (count == 0 || cur_ts < ref_ts)
		{
			/*
			 * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
			 * msec before now.  This indirectly ensures that the collector
4379 4380 4381
			 * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
			 * an autovacuum worker, however, we want a lower delay to avoid
			 * using stale data, so we use PGSTAT_RETRY_DELAY (since the
4382 4383 4384 4385
			 * number of workers is low, this shouldn't be a problem).
			 *
			 * We don't recompute min_ts after sleeping, except in the
			 * unlikely case that cur_ts went backwards.  So we might end up
B
Bruce Momjian 已提交
4386
			 * accepting a file a bit older than PGSTAT_STAT_INTERVAL.  In
4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415 4416 4417 4418 4419 4420 4421 4422 4423 4424 4425 4426 4427 4428
			 * practice that shouldn't happen, though, as long as the sleep
			 * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
			 * tell the collector that our cutoff time is less than what we'd
			 * actually accept.
			 */
			ref_ts = cur_ts;
			if (IsAutoVacuumWorkerProcess())
				min_ts = TimestampTzPlusMilliseconds(ref_ts,
													 -PGSTAT_RETRY_DELAY);
			else
				min_ts = TimestampTzPlusMilliseconds(ref_ts,
													 -PGSTAT_STAT_INTERVAL);
		}

		/*
		 * If the file timestamp is actually newer than cur_ts, we must have
		 * had a clock glitch (system time went backwards) or there is clock
		 * skew between our processor and the stats collector's processor.
		 * Accept the file, but send an inquiry message anyway to make
		 * pgstat_recv_inquiry do a sanity check on the collector's time.
		 */
		if (ok && file_ts > cur_ts)
		{
			/*
			 * A small amount of clock skew between processors isn't terribly
			 * surprising, but a large difference is worth logging.  We
			 * arbitrarily define "large" as 1000 msec.
			 */
			if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
			{
				char	   *filetime;
				char	   *mytime;

				/* Copy because timestamptz_to_str returns a static buffer */
				filetime = pstrdup(timestamptz_to_str(file_ts));
				mytime = pstrdup(timestamptz_to_str(cur_ts));
				elog(LOG, "stats collector's time %s is later than backend local time %s",
					 filetime, mytime);
				pfree(filetime);
				pfree(mytime);
			}

4429
			pgstat_send_inquiry(cur_ts, min_ts, MyDatabaseId);
4430 4431 4432 4433 4434
			break;
		}

		/* Normal acceptance case: file is not older than cutoff time */
		if (ok && file_ts >= min_ts)
4435 4436 4437
			break;

		/* Not there or too old, so kick the collector and wait a bit */
4438
		if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
4439
			pgstat_send_inquiry(cur_ts, min_ts, MyDatabaseId);
4440

4441 4442 4443 4444 4445 4446
		pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
	}

	if (count >= PGSTAT_POLL_LOOP_COUNT)
		elog(WARNING, "pgstat wait timeout");

4447 4448 4449 4450
	/*
	 * Autovacuum launcher wants stats about all databases, but a shallow read
	 * is sufficient.
	 */
4451
	if (IsAutoVacuumLauncherProcess())
4452
		pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
4453
	else
4454
		pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
4455
}
4456

4457 4458 4459 4460 4461 4462 4463 4464 4465 4466 4467 4468 4469 4470 4471 4472

/* ----------
 * pgstat_setup_memcxt() -
 *
 *	Create pgStatLocalContext, if not already done.
 * ----------
 */
static void
pgstat_setup_memcxt(void)
{
	if (!pgStatLocalContext)
		pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
												   "Statistics snapshot",
												   ALLOCSET_SMALL_MINSIZE,
												   ALLOCSET_SMALL_INITSIZE,
												   ALLOCSET_SMALL_MAXSIZE);
4473 4474
}

4475 4476 4477 4478

/* ----------
 * pgstat_clear_snapshot() -
 *
B
Bruce Momjian 已提交
4479
 *	Discard any data collected in the current transaction.  Any subsequent
4480 4481 4482 4483 4484 4485 4486 4487 4488 4489 4490 4491 4492 4493 4494 4495 4496 4497 4498 4499 4500
 *	request will cause new snapshots to be read.
 *
 *	This is also invoked during transaction commit or abort to discard
 *	the no-longer-wanted snapshot.
 * ----------
 */
void
pgstat_clear_snapshot(void)
{
	/* Release memory, if any was allocated */
	if (pgStatLocalContext)
		MemoryContextDelete(pgStatLocalContext);

	/* Reset variables */
	pgStatLocalContext = NULL;
	pgStatDBHash = NULL;
	localBackendStatusTable = NULL;
	localNumBackends = 0;
}


4501 4502 4503 4504 4505 4506 4507 4508 4509
/* ----------
 * pgstat_recv_inquiry() -
 *
 *	Process stat inquiry requests.
 * ----------
 */
static void
pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
{
4510 4511 4512 4513
	slist_iter	iter;
	DBWriteRequest *newreq;
	PgStat_StatDBEntry *dbentry;

4514
	elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
4515 4516

	/*
A
Alvaro Herrera 已提交
4517 4518 4519 4520
	 * Find the last write request for this DB.  If it's older than the
	 * request's cutoff time, update it; otherwise there's nothing to do.
	 *
	 * Note that if a request is found, we return early and skip the below
B
Bruce Momjian 已提交
4521 4522 4523
	 * check for clock skew.  This is okay, since the only way for a DB
	 * request to be present in the list is that we have been here since the
	 * last write round.
4524 4525 4526 4527 4528 4529 4530 4531 4532 4533 4534 4535 4536
	 */
	slist_foreach(iter, &last_statrequests)
	{
		DBWriteRequest *req = slist_container(DBWriteRequest, next, iter.cur);

		if (req->databaseid != msg->databaseid)
			continue;

		if (msg->cutoff_time > req->request_time)
			req->request_time = msg->cutoff_time;
		return;
	}

4537
	/*
4538
	 * There's no request for this DB yet, so create one.
4539
	 */
4540 4541 4542 4543 4544
	newreq = palloc(sizeof(DBWriteRequest));

	newreq->databaseid = msg->databaseid;
	newreq->request_time = msg->clock_time;
	slist_push_head(&last_statrequests, &newreq->next);
4545 4546

	/*
4547
	 * If the requestor's local clock time is older than stats_timestamp, we
4548 4549 4550 4551 4552 4553
	 * should suspect a clock glitch, ie system time going backwards; though
	 * the more likely explanation is just delayed message receipt.  It is
	 * worth expending a GetCurrentTimestamp call to be sure, since a large
	 * retreat in the system clock reading could otherwise cause us to neglect
	 * to update the stats file for a long time.
	 */
4554 4555
	dbentry = pgstat_get_db_entry(msg->databaseid, false);
	if ((dbentry != NULL) && (msg->clock_time < dbentry->stats_timestamp))
4556 4557 4558
	{
		TimestampTz cur_ts = GetCurrentTimestamp();

4559
		if (cur_ts < dbentry->stats_timestamp)
4560 4561 4562 4563 4564 4565 4566 4567 4568
		{
			/*
			 * Sure enough, time went backwards.  Force a new stats file write
			 * to get back in sync; but first, log a complaint.
			 */
			char	   *writetime;
			char	   *mytime;

			/* Copy because timestamptz_to_str returns a static buffer */
4569
			writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
4570
			mytime = pstrdup(timestamptz_to_str(cur_ts));
4571
			elog(LOG,
4572
				 "stats_timestamp %s is later than collector's time %s for database %u",
4573
				 writetime, mytime, dbentry->databaseid);
4574 4575 4576
			pfree(writetime);
			pfree(mytime);

4577 4578
			newreq->request_time = cur_ts;
			dbentry->stats_timestamp = cur_ts - 1;
4579 4580
		}
	}
4581 4582 4583
}


4584 4585 4586 4587 4588 4589 4590 4591 4592
/* ----------
 * pgstat_recv_tabstat() -
 *
 *	Count what the backend has done.
 * ----------
 */
static void
pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
{
4593 4594 4595 4596
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;
	int			i;
	bool		found;
4597

4598
	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4599 4600

	/*
4601
	 * Update database-wide stats.
4602
	 */
4603 4604
	dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
	dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
4605 4606
	dbentry->n_block_read_time += msg->m_block_read_time;
	dbentry->n_block_write_time += msg->m_block_write_time;
4607 4608 4609 4610 4611 4612

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
4613 4614
		PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);

4615
		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
B
Bruce Momjian 已提交
4616
													(void *) &(tabmsg->t_id),
B
Bruce Momjian 已提交
4617
													   HASH_ENTER, &found);
4618 4619 4620 4621

		if (!found)
		{
			/*
B
Bruce Momjian 已提交
4622 4623
			 * If it's a new table entry, initialize counters to the values we
			 * just got.
4624
			 */
4625 4626 4627 4628 4629 4630 4631 4632 4633 4634 4635 4636
			tabentry->numscans = tabmsg->t_counts.t_numscans;
			tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
			tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
			tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
			tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
			tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
			tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
			tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
			tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
			tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
			tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
			tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
4637

4638
			tabentry->vacuum_timestamp = 0;
4639
			tabentry->vacuum_count = 0;
4640
			tabentry->autovac_vacuum_timestamp = 0;
4641
			tabentry->autovac_vacuum_count = 0;
4642
			tabentry->analyze_timestamp = 0;
4643
			tabentry->analyze_count = 0;
4644
			tabentry->autovac_analyze_timestamp = 0;
4645
			tabentry->autovac_analyze_count = 0;
4646 4647 4648 4649 4650 4651
		}
		else
		{
			/*
			 * Otherwise add the values to the existing entry.
			 */
4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663
			tabentry->numscans += tabmsg->t_counts.t_numscans;
			tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
			tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
			tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
			tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
			tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
			tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
			tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
			tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
			tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
			tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
			tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
4664 4665
		}

4666
		/* Clamp n_live_tuples in case of negative delta_live_tuples */
4667
		tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
4668 4669
		/* Likewise for n_dead_tuples */
		tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
4670

4671
		/*
4672
		 * Add per-table stats to the per-database entry, too.
4673
		 */
4674 4675 4676 4677 4678 4679 4680
		dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
		dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
		dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
		dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
		dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
		dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
		dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
4681 4682 4683 4684 4685 4686 4687 4688 4689 4690 4691 4692 4693
	}
}


/* ----------
 * pgstat_recv_tabpurge() -
 *
 *	Arrange for dead table removal.
 * ----------
 */
static void
pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
{
4694 4695
	PgStat_StatDBEntry *dbentry;
	int			i;
4696

4697 4698 4699 4700 4701 4702 4703
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	/*
	 * No need to purge if we don't even know the database.
	 */
	if (!dbentry || !dbentry->tables)
		return;
4704 4705 4706 4707 4708 4709

	/*
	 * Process all table entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
4710 4711 4712 4713
		/* Remove from hashtable if present; we don't care if it's not. */
		(void) hash_search(dbentry->tables,
						   (void *) &(msg->m_tableid[i]),
						   HASH_REMOVE, NULL);
4714 4715 4716 4717 4718 4719 4720 4721 4722 4723 4724 4725 4726
	}
}


/* ----------
 * pgstat_recv_dropdb() -
 *
 *	Arrange for dead database removal
 * ----------
 */
static void
pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
{
4727
	Oid			dbid = msg->m_databaseid;
4728
	PgStat_StatDBEntry *dbentry;
4729 4730 4731 4732

	/*
	 * Lookup the database in the hashtable.
	 */
4733
	dbentry = pgstat_get_db_entry(dbid, false);
4734 4735

	/*
4736
	 * If found, remove it (along with the db statfile).
4737
	 */
4738
	if (dbentry)
4739
	{
4740 4741
		char		statfile[MAXPGPATH];

4742
		get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
4743

4744
		elog(DEBUG2, "removing stats file \"%s\"", statfile);
4745 4746
		unlink(statfile);

4747 4748
		if (dbentry->tables != NULL)
			hash_destroy(dbentry->tables);
4749 4750
		if (dbentry->functions != NULL)
			hash_destroy(dbentry->functions);
4751 4752

		if (hash_search(pgStatDBHash,
4753
						(void *) &dbid,
4754 4755
						HASH_REMOVE, NULL) == NULL)
			ereport(ERROR,
4756
					(errmsg("database hash table corrupted during cleanup --- abort")));
4757
	}
4758 4759 4760 4761
}


/* ----------
4762
 * pgstat_recv_resetcounter() -
4763
 *
4764
 *	Reset the statistics for the specified database.
4765 4766 4767 4768 4769
 * ----------
 */
static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
{
4770
	PgStat_StatDBEntry *dbentry;
4771 4772

	/*
4773
	 * Lookup the database in the hashtable.  Nothing to do if not there.
4774
	 */
4775 4776 4777 4778
	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	if (!dbentry)
		return;
4779 4780

	/*
B
Bruce Momjian 已提交
4781 4782
	 * We simply throw away all the database's table entries by recreating a
	 * new hash table for them.
4783 4784 4785
	 */
	if (dbentry->tables != NULL)
		hash_destroy(dbentry->tables);
4786 4787
	if (dbentry->functions != NULL)
		hash_destroy(dbentry->functions);
4788

4789
	dbentry->tables = NULL;
4790
	dbentry->functions = NULL;
4791 4792

	/*
4793 4794
	 * Reset database-level stats, too.  This creates empty hash tables for
	 * tables and functions.
4795
	 */
4796
	reset_dbentry_counters(dbentry);
4797
}
4798

4799 4800 4801 4802 4803 4804 4805 4806 4807
/* ----------
 * pgstat_recv_resetshared() -
 *
 *	Reset some shared statistics of the cluster.
 * ----------
 */
static void
pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
{
B
Bruce Momjian 已提交
4808
	if (msg->m_resettarget == RESET_BGWRITER)
4809 4810 4811
	{
		/* Reset the global background writer statistics for the cluster. */
		memset(&globalStats, 0, sizeof(globalStats));
4812
		globalStats.stat_reset_timestamp = GetCurrentTimestamp();
4813
	}
4814 4815 4816 4817 4818 4819
	else if (msg->m_resettarget == RESET_ARCHIVER)
	{
		/* Reset the archiver statistics for the cluster. */
		memset(&archiverStats, 0, sizeof(archiverStats));
		archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
	}
4820 4821 4822 4823 4824 4825 4826

	/*
	 * Presumably the sender of this message validated the target, don't
	 * complain here if it's not valid
	 */
}

4827 4828 4829 4830 4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842
/* ----------
 * pgstat_recv_resetsinglecounter() -
 *
 *	Reset a statistics for a single object
 * ----------
 */
static void
pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
{
	PgStat_StatDBEntry *dbentry;

	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	if (!dbentry)
		return;

4843 4844
	/* Set the reset timestamp for the whole database */
	dbentry->stat_reset_timestamp = GetCurrentTimestamp();
4845 4846 4847

	/* Remove object if it exists, ignore it if not */
	if (msg->m_resettype == RESET_TABLE)
4848 4849
		(void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
						   HASH_REMOVE, NULL);
4850
	else if (msg->m_resettype == RESET_FUNCTION)
4851 4852
		(void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
						   HASH_REMOVE, NULL);
4853 4854
}

4855 4856 4857 4858 4859 4860 4861 4862 4863 4864 4865 4866
/* ----------
 * pgstat_recv_autovac() -
 *
 *	Process an autovacuum signalling message.
 * ----------
 */
static void
pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
{
	PgStat_StatDBEntry *dbentry;

	/*
4867
	 * Store the last autovacuum time in the database's hashtable entry.
4868
	 */
4869
	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4870 4871 4872 4873 4874 4875 4876 4877 4878 4879 4880 4881 4882 4883 4884 4885 4886

	dbentry->last_autovac_time = msg->m_start_time;
}

/* ----------
 * pgstat_recv_vacuum() -
 *
 *	Process a VACUUM message.
 * ----------
 */
static void
pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;

	/*
4887
	 * Store the data in the table's hashtable entry.
4888
	 */
4889
	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4890

4891
	tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
4892

4893 4894
	tabentry->n_live_tuples = msg->m_live_tuples;
	tabentry->n_dead_tuples = msg->m_dead_tuples;
4895

4896
	if (msg->m_autovacuum)
4897
	{
4898
		tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
4899 4900
		tabentry->autovac_vacuum_count++;
	}
4901
	else
4902
	{
4903
		tabentry->vacuum_timestamp = msg->m_vacuumtime;
4904 4905
		tabentry->vacuum_count++;
	}
4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919 4920
}

/* ----------
 * pgstat_recv_analyze() -
 *
 *	Process an ANALYZE message.
 * ----------
 */
static void
pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	PgStat_StatTabEntry *tabentry;

	/*
4921
	 * Store the data in the table's hashtable entry.
4922
	 */
4923
	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4924

4925
	tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
4926

4927 4928
	tabentry->n_live_tuples = msg->m_live_tuples;
	tabentry->n_dead_tuples = msg->m_dead_tuples;
4929 4930 4931 4932 4933 4934 4935

	/*
	 * We reset changes_since_analyze to zero, forgetting any changes that
	 * occurred while the ANALYZE was in progress.
	 */
	tabentry->changes_since_analyze = 0;

B
Bruce Momjian 已提交
4936
	if (msg->m_autovacuum)
4937
	{
4938
		tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
4939 4940
		tabentry->autovac_analyze_count++;
	}
B
Bruce Momjian 已提交
4941
	else
4942
	{
4943
		tabentry->analyze_timestamp = msg->m_analyzetime;
4944 4945
		tabentry->analyze_count++;
	}
4946
}
4947 4948


4949 4950 4951 4952 4953 4954 4955 4956 4957 4958 4959 4960 4961 4962
/* ----------
 * pgstat_recv_archiver() -
 *
 *	Process a ARCHIVER message.
 * ----------
 */
static void
pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
{
	if (msg->m_failed)
	{
		/* Failed archival attempt */
		++archiverStats.failed_count;
		memcpy(archiverStats.last_failed_wal, msg->m_xlog,
B
Bruce Momjian 已提交
4963
			   sizeof(archiverStats.last_failed_wal));
4964 4965 4966 4967 4968 4969 4970
		archiverStats.last_failed_timestamp = msg->m_timestamp;
	}
	else
	{
		/* Successful archival operation */
		++archiverStats.archived_count;
		memcpy(archiverStats.last_archived_wal, msg->m_xlog,
B
Bruce Momjian 已提交
4971
			   sizeof(archiverStats.last_archived_wal));
4972 4973 4974 4975
		archiverStats.last_archived_timestamp = msg->m_timestamp;
	}
}

4976 4977 4978 4979 4980 4981 4982
/* ----------
 * pgstat_recv_bgwriter() -
 *
 *	Process a BGWRITER message.
 * ----------
 */
static void
4983
pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
4984 4985 4986
{
	globalStats.timed_checkpoints += msg->m_timed_checkpoints;
	globalStats.requested_checkpoints += msg->m_requested_checkpoints;
4987 4988
	globalStats.checkpoint_write_time += msg->m_checkpoint_write_time;
	globalStats.checkpoint_sync_time += msg->m_checkpoint_sync_time;
4989
	globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
4990 4991
	globalStats.buf_written_clean += msg->m_buf_written_clean;
	globalStats.maxwritten_clean += msg->m_maxwritten_clean;
4992
	globalStats.buf_written_backend += msg->m_buf_written_backend;
4993
	globalStats.buf_fsync_backend += msg->m_buf_fsync_backend;
4994
	globalStats.buf_alloc += msg->m_buf_alloc;
4995
}
4996

4997 4998 4999
/* ----------
 * pgstat_recv_recoveryconflict() -
 *
5000
 *	Process a RECOVERYCONFLICT message.
5001 5002 5003 5004 5005 5006
 * ----------
 */
static void
pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
5007

5008 5009 5010 5011 5012
	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);

	switch (msg->m_reason)
	{
		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
5013

5014
			/*
5015 5016
			 * Since we drop the information about the database as soon as it
			 * replicates, there is no point in counting these conflicts.
5017 5018 5019 5020 5021 5022 5023 5024 5025 5026 5027 5028 5029 5030 5031 5032 5033 5034 5035 5036
			 */
			break;
		case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
			dbentry->n_conflict_tablespace++;
			break;
		case PROCSIG_RECOVERY_CONFLICT_LOCK:
			dbentry->n_conflict_lock++;
			break;
		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
			dbentry->n_conflict_snapshot++;
			break;
		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
			dbentry->n_conflict_bufferpin++;
			break;
		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
			dbentry->n_conflict_startup_deadlock++;
			break;
	}
}

5037 5038 5039
/* ----------
 * pgstat_recv_deadlock() -
 *
5040
 *	Process a DEADLOCK message.
5041 5042 5043 5044 5045 5046 5047 5048 5049 5050 5051 5052
 * ----------
 */
static void
pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len)
{
	PgStat_StatDBEntry *dbentry;

	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);

	dbentry->n_deadlocks++;
}

5053 5054 5055
/* ----------
 * pgstat_recv_tempfile() -
 *
5056
 *	Process a TEMPFILE message.
5057 5058 5059 5060 5061 5062 5063 5064 5065 5066 5067 5068 5069
 * ----------
 */
static void
pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
{
	PgStat_StatDBEntry *dbentry;

	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);

	dbentry->n_temp_bytes += msg->m_filesize;
	dbentry->n_temp_files += 1;
}

5070 5071 5072 5073 5074 5075 5076 5077 5078 5079 5080 5081 5082 5083 5084 5085 5086 5087 5088 5089 5090 5091 5092
/* ----------
 * pgstat_recv_funcstat() -
 *
 *	Count what the backend has done.
 * ----------
 */
static void
pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
{
	PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
	PgStat_StatDBEntry *dbentry;
	PgStat_StatFuncEntry *funcentry;
	int			i;
	bool		found;

	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);

	/*
	 * Process all function entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++, funcmsg++)
	{
		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
5093 5094
												   (void *) &(funcmsg->f_id),
														 HASH_ENTER, &found);
5095 5096 5097 5098 5099 5100 5101 5102

		if (!found)
		{
			/*
			 * If it's a new function entry, initialize counters to the values
			 * we just got.
			 */
			funcentry->f_numcalls = funcmsg->f_numcalls;
5103 5104
			funcentry->f_total_time = funcmsg->f_total_time;
			funcentry->f_self_time = funcmsg->f_self_time;
5105 5106 5107 5108 5109 5110 5111
		}
		else
		{
			/*
			 * Otherwise add the values to the existing entry.
			 */
			funcentry->f_numcalls += funcmsg->f_numcalls;
5112 5113
			funcentry->f_total_time += funcmsg->f_total_time;
			funcentry->f_self_time += funcmsg->f_self_time;
5114 5115 5116 5117 5118 5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 5129 5130 5131 5132 5133 5134 5135 5136 5137 5138 5139 5140 5141 5142 5143 5144 5145 5146 5147 5148
		}
	}
}

/* ----------
 * pgstat_recv_funcpurge() -
 *
 *	Arrange for dead function removal.
 * ----------
 */
static void
pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
{
	PgStat_StatDBEntry *dbentry;
	int			i;

	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);

	/*
	 * No need to purge if we don't even know the database.
	 */
	if (!dbentry || !dbentry->functions)
		return;

	/*
	 * Process all function entries in the message.
	 */
	for (i = 0; i < msg->m_nentries; i++)
	{
		/* Remove from hashtable if present; we don't care if it's not. */
		(void) hash_search(dbentry->functions,
						   (void *) &(msg->m_functionid[i]),
						   HASH_REMOVE, NULL);
	}
}
5149 5150 5151 5152 5153 5154 5155 5156 5157 5158 5159 5160 5161 5162 5163 5164 5165 5166 5167 5168 5169 5170 5171 5172 5173 5174 5175 5176 5177 5178 5179 5180 5181 5182 5183 5184 5185 5186 5187

/* ----------
 * pgstat_write_statsfile_needed() -
 *
 *	Do we need to write out the files?
 * ----------
 */
static bool
pgstat_write_statsfile_needed(void)
{
	if (!slist_is_empty(&last_statrequests))
		return true;

	/* Everything was written recently */
	return false;
}

/* ----------
 * pgstat_db_requested() -
 *
 *	Checks whether stats for a particular DB need to be written to a file.
 * ----------
 */
static bool
pgstat_db_requested(Oid databaseid)
{
	slist_iter	iter;

	/* Check the databases if they need to refresh the stats. */
	slist_foreach(iter, &last_statrequests)
	{
		DBWriteRequest *req = slist_container(DBWriteRequest, next, iter.cur);

		if (req->databaseid == databaseid)
			return true;
	}

	return false;
}