unix_latch.c 18.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*-------------------------------------------------------------------------
 *
 * unix_latch.c
 *	  Routines for inter-process latches
 *
 * The Unix implementation uses the so-called self-pipe trick to overcome
 * the race condition involved with select() and setting a global flag
 * in the signal handler. When a latch is set and the current process
 * is waiting for it, the signal handler wakes up the select() in
 * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
 * interrupt select() on all platforms, and even on platforms where it
 * does, a signal that arrives just before the select() call does not
 * prevent the select() from entering sleep. An incoming byte on a pipe
14 15
 * however reliably interrupts the sleep, and causes select() to return
 * immediately even if the signal arrives before select() begins.
16
 *
17 18 19
 * (Actually, we prefer poll() over select() where available, but the
 * same comments apply to it.)
 *
20 21 22 23 24
 * When SetLatch is called from the same process that owns the latch,
 * SetLatch writes the byte directly to the pipe. If it's owned by another
 * process, SIGUSR1 is sent and the signal handler in the waiting process
 * writes the byte to the pipe on behalf of the signaling process.
 *
25
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
26 27 28
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
29
 *	  src/backend/port/unix_latch.c
30 31 32 33 34 35 36 37
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
38 39
#include <sys/time.h>
#include <sys/types.h>
40 41 42 43 44 45
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
46 47 48
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
49 50

#include "miscadmin.h"
51
#include "portability/instr_time.h"
52
#include "postmaster/postmaster.h"
53
#include "storage/latch.h"
54
#include "storage/pmsignal.h"
55 56 57 58 59
#include "storage/shmem.h"

/* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false;

60
/* Read and write ends of the self-pipe */
61 62
static int	selfpipe_readfd = -1;
static int	selfpipe_writefd = -1;
63

64
/* Private function prototypes */
65
static void sendSelfPipeByte(void);
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
static void drainSelfPipe(void);


/*
 * Initialize the process-local latch infrastructure.
 *
 * This must be called once during startup of any process that can wait on
 * latches, before it issues any InitLatch() or OwnLatch() calls.
 */
void
InitializeLatchSupport(void)
{
	int			pipefd[2];

	Assert(selfpipe_readfd == -1);

	/*
	 * Set up the self-pipe that allows a signal handler to wake up the
	 * select() in WaitLatch. Make the write-end non-blocking, so that
	 * SetLatch won't block if the event has already been set many times
	 * filling the kernel buffer. Make the read-end non-blocking too, so that
	 * we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
	 */
	if (pipe(pipefd) < 0)
		elog(FATAL, "pipe() failed: %m");
	if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
	if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");
95

96 97 98
	selfpipe_readfd = pipefd[0];
	selfpipe_writefd = pipefd[1];
}
99 100 101 102 103 104 105

/*
 * Initialize a backend-local latch.
 */
void
InitLatch(volatile Latch *latch)
{
106 107
	/* Assert InitializeLatchSupport has been called in this process */
	Assert(selfpipe_readfd >= 0);
108 109 110 111 112 113 114 115

	latch->is_set = false;
	latch->owner_pid = MyProcPid;
	latch->is_shared = false;
}

/*
 * Initialize a shared latch that can be set from other processes. The latch
116
 * is initially owned by no-one; use OwnLatch to associate it with the
117 118
 * current process.
 *
119 120
 * InitSharedLatch needs to be called in postmaster before forking child
 * processes, usually right after allocating the shared memory block
121 122 123
 * containing the latch with ShmemInitStruct. (The Unix implementation
 * doesn't actually require that, but the Windows one does.) Because of
 * this restriction, we have no concurrency issues to worry about here.
124 125 126 127 128 129 130 131 132 133 134
 */
void
InitSharedLatch(volatile Latch *latch)
{
	latch->is_set = false;
	latch->owner_pid = 0;
	latch->is_shared = true;
}

/*
 * Associate a shared latch with the current process, allowing it to
135 136 137 138 139 140
 * wait on the latch.
 *
 * Although there is a sanity check for latch-already-owned, we don't do
 * any sort of locking here, meaning that we could fail to detect the error
 * if two processes try to own the same latch at about the same time.  If
 * there is any risk of that, caller must provide an interlock to prevent it.
141
 *
142 143 144
 * In any process that calls OwnLatch(), make sure that
 * latch_sigusr1_handler() is called from the SIGUSR1 signal handler,
 * as shared latches use SIGUSR1 for inter-process communication.
145 146 147 148
 */
void
OwnLatch(volatile Latch *latch)
{
149 150
	/* Assert InitializeLatchSupport has been called in this process */
	Assert(selfpipe_readfd >= 0);
151

152
	Assert(latch->is_shared);
153

154
	/* sanity check */
155 156
	if (latch->owner_pid != 0)
		elog(ERROR, "latch already owned");
157

158 159 160 161 162 163 164 165 166 167 168
	latch->owner_pid = MyProcPid;
}

/*
 * Disown a shared latch currently owned by the current process.
 */
void
DisownLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);
	Assert(latch->owner_pid == MyProcPid);
169

170 171 172 173
	latch->owner_pid = 0;
}

/*
174 175
 * Wait for a given latch to be set, or for postmaster death, or until timeout
 * is exceeded. 'wakeEvents' is a bitmask that specifies which of those events
176 177
 * to wait for. If the latch is already set (and WL_LATCH_SET is given), the
 * function returns immediately.
178
 *
179
 * The 'timeout' is given in milliseconds. It must be >= 0 if WL_TIMEOUT flag
180 181
 * is given.  Note that some extra overhead is incurred when WL_TIMEOUT is
 * given, so avoid using a timeout if possible.
182 183 184 185 186
 *
 * The latch must be owned by the current process, ie. it must be a
 * backend-local latch initialized with InitLatch, or a shared latch
 * associated with the current process by calling OwnLatch.
 *
187
 * Returns bit mask indicating which condition(s) caused the wake-up. Note
188
 * that if multiple wake-up conditions are true, there is no guarantee that
189
 * we return all of them in one call, but we will return at least one.
190
 */
191 192
int
WaitLatch(volatile Latch *latch, int wakeEvents, long timeout)
193
{
194
	return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout);
195 196 197
}

/*
198 199
 * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
 * conditions.
200 201 202 203
 *
 * When waiting on a socket, WL_SOCKET_READABLE *must* be included in
 * 'wakeEvents'; WL_SOCKET_WRITEABLE is optional.  The reason for this is
 * that EOF and error conditions are reported only via WL_SOCKET_READABLE.
204 205
 */
int
206 207
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
				  long timeout)
208
{
209 210
	int			result = 0;
	int			rc;
211 212 213
	instr_time	start_time,
				cur_time;
	long		cur_timeout;
214

215 216 217 218
#ifdef HAVE_POLL
	struct pollfd pfds[3];
	int			nfds;
#else
219
	struct timeval tv,
220
			   *tvp;
221
	fd_set		input_mask;
222
	fd_set		output_mask;
223 224
	int			hifd;
#endif
225

226 227 228 229 230
	/* Ignore WL_SOCKET_* events if no valid socket is given */
	if (sock == PGINVALID_SOCKET)
		wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);

	Assert(wakeEvents != 0);	/* must have at least one wake event */
231 232
	/* Cannot specify WL_SOCKET_WRITEABLE without WL_SOCKET_READABLE */
	Assert((wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != WL_SOCKET_WRITEABLE);
233 234

	if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid)
235 236
		elog(ERROR, "cannot wait on a latch owned by another process");

237 238 239 240 241 242
	/*
	 * Initialize timeout if requested.  We must record the current time so
	 * that we can determine the remaining timeout if the poll() or select()
	 * is interrupted.	(On some platforms, select() will update the contents
	 * of "tv" for us, but unfortunately we can't rely on that.)
	 */
243
	if (wakeEvents & WL_TIMEOUT)
244
	{
245
		INSTR_TIME_SET_CURRENT(start_time);
246
		Assert(timeout >= 0);
247 248
		cur_timeout = timeout;

249
#ifndef HAVE_POLL
250 251
		tv.tv_sec = cur_timeout / 1000L;
		tv.tv_usec = (cur_timeout % 1000L) * 1000L;
252
		tvp = &tv;
253 254 255 256
#endif
	}
	else
	{
257 258 259 260
		cur_timeout = -1;

#ifndef HAVE_POLL
		tvp = NULL;
261
#endif
262 263 264
	}

	waiting = true;
265
	do
266 267
	{
		/*
268
		 * Clear the pipe, then check if the latch is set already. If someone
269 270 271 272
		 * sets the latch between this and the poll()/select() below, the
		 * setter will write a byte to the pipe (or signal us and the signal
		 * handler will do that), and the poll()/select() will return
		 * immediately.
273 274 275
		 *
		 * Note: we assume that the kernel calls involved in drainSelfPipe()
		 * and SetLatch() will provide adequate synchronization on machines
276 277
		 * with weak memory ordering, so that we cannot miss seeing is_set if
		 * the signal byte is already in the pipe when we drain it.
278 279
		 */
		drainSelfPipe();
280

281
		if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
282
		{
283
			result |= WL_LATCH_SET;
284

285 286 287 288
			/*
			 * Leave loop immediately, avoid blocking again. We don't attempt
			 * to report any other events that might also be satisfied.
			 */
289 290 291
			break;
		}

292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
		/* Must wait ... we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
		nfds = 0;
		if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
		{
			/* socket, if used, is always in pfds[0] */
			pfds[0].fd = sock;
			pfds[0].events = 0;
			if (wakeEvents & WL_SOCKET_READABLE)
				pfds[0].events |= POLLIN;
			if (wakeEvents & WL_SOCKET_WRITEABLE)
				pfds[0].events |= POLLOUT;
			pfds[0].revents = 0;
			nfds++;
		}

		pfds[nfds].fd = selfpipe_readfd;
		pfds[nfds].events = POLLIN;
		pfds[nfds].revents = 0;
		nfds++;

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			/* postmaster fd, if used, is always in pfds[nfds - 1] */
			pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
			pfds[nfds].events = POLLIN;
			pfds[nfds].revents = 0;
			nfds++;
		}

		/* Sleep */
323
		rc = poll(pfds, nfds, (int) cur_timeout);
324 325 326 327

		/* Check return code */
		if (rc < 0)
		{
328 329 330 331 332 333 334 335
			/* EINTR is okay, otherwise complain */
			if (errno != EINTR)
			{
				waiting = false;
				ereport(ERROR,
						(errcode_for_socket_access(),
						 errmsg("poll() failed: %m")));
			}
336
		}
337
		else if (rc == 0)
338 339
		{
			/* timeout exceeded */
340 341
			if (wakeEvents & WL_TIMEOUT)
				result |= WL_TIMEOUT;
342
		}
343
		else
344
		{
345 346 347 348 349 350 351 352 353 354 355 356
			/* at least one event occurred, so check revents values */
			if ((wakeEvents & WL_SOCKET_READABLE) &&
				(pfds[0].revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
			{
				/* data available in socket, or EOF/error condition */
				result |= WL_SOCKET_READABLE;
			}
			if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
				(pfds[0].revents & POLLOUT))
			{
				result |= WL_SOCKET_WRITEABLE;
			}
357

358
			/*
359 360 361
			 * We expect a POLLHUP when the remote end is closed, but because
			 * we don't expect the pipe to become readable or to have any
			 * errors either, treat those cases as postmaster death, too.
362
			 */
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
			if ((wakeEvents & WL_POSTMASTER_DEATH) &&
				(pfds[nfds - 1].revents & (POLLHUP | POLLIN | POLLERR | POLLNVAL)))
			{
				/*
				 * According to the select(2) man page on Linux, select(2) may
				 * spuriously return and report a file descriptor as readable,
				 * when it's not; and presumably so can poll(2).  It's not
				 * clear that the relevant cases would ever apply to the
				 * postmaster pipe, but since the consequences of falsely
				 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
				 * we take the trouble to positively verify EOF with
				 * PostmasterIsAlive().
				 */
				if (!PostmasterIsAlive())
					result |= WL_POSTMASTER_DEATH;
			}
379
		}
380
#else							/* !HAVE_POLL */
381

382
		FD_ZERO(&input_mask);
383 384
		FD_ZERO(&output_mask);

385 386
		FD_SET(selfpipe_readfd, &input_mask);
		hifd = selfpipe_readfd;
387 388 389 390 391 392 393 394 395

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask);
			if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd)
				hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
		}

		if (wakeEvents & WL_SOCKET_READABLE)
396 397 398 399 400 401
		{
			FD_SET(sock, &input_mask);
			if (sock > hifd)
				hifd = sock;
		}

402
		if (wakeEvents & WL_SOCKET_WRITEABLE)
403 404 405 406 407 408
		{
			FD_SET(sock, &output_mask);
			if (sock > hifd)
				hifd = sock;
		}

409
		/* Sleep */
410
		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
411 412

		/* Check return code */
413 414
		if (rc < 0)
		{
415 416 417 418 419 420 421 422
			/* EINTR is okay, otherwise complain */
			if (errno != EINTR)
			{
				waiting = false;
				ereport(ERROR,
						(errcode_for_socket_access(),
						 errmsg("select() failed: %m")));
			}
423
		}
424
		else if (rc == 0)
425 426
		{
			/* timeout exceeded */
427 428
			if (wakeEvents & WL_TIMEOUT)
				result |= WL_TIMEOUT;
429
		}
430
		else
431
		{
432 433 434 435 436 437 438 439 440 441 442
			/* at least one event occurred, so check masks */
			if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask))
			{
				/* data available in socket, or EOF */
				result |= WL_SOCKET_READABLE;
			}
			if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask))
			{
				result |= WL_SOCKET_WRITEABLE;
			}
			if ((wakeEvents & WL_POSTMASTER_DEATH) &&
443
			FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask))
444 445 446 447 448 449 450 451 452 453 454 455 456 457
			{
				/*
				 * According to the select(2) man page on Linux, select(2) may
				 * spuriously return and report a file descriptor as readable,
				 * when it's not; and presumably so can poll(2).  It's not
				 * clear that the relevant cases would ever apply to the
				 * postmaster pipe, but since the consequences of falsely
				 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
				 * we take the trouble to positively verify EOF with
				 * PostmasterIsAlive().
				 */
				if (!PostmasterIsAlive())
					result |= WL_POSTMASTER_DEATH;
			}
458
		}
459
#endif   /* HAVE_POLL */
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474

		/* If we're not done, update cur_timeout for next iteration */
		if (result == 0 && cur_timeout >= 0)
		{
			INSTR_TIME_SET_CURRENT(cur_time);
			INSTR_TIME_SUBTRACT(cur_time, start_time);
			cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
			if (cur_timeout < 0)
				cur_timeout = 0;

#ifndef HAVE_POLL
			tv.tv_sec = cur_timeout / 1000L;
			tv.tv_usec = (cur_timeout % 1000L) * 1000L;
#endif
		}
475
	} while (result == 0);
476 477 478 479 480 481
	waiting = false;

	return result;
}

/*
482 483 484
 * Sets a latch and wakes up anyone waiting on it.
 *
 * This is cheap if the latch is already set, otherwise not so much.
485 486 487 488
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.  (That's standard practice in most signal handlers, of
 * course, but we used to omit it in handlers that only set a flag.)
R
Robert Haas 已提交
489 490 491
 *
 * NB: this function is called from critical sections and signal handlers so
 * throwing an error is not a good idea.
492 493 494 495
 */
void
SetLatch(volatile Latch *latch)
{
496
	pid_t		owner_pid;
497

498
	/*
499 500 501
	 * XXX there really ought to be a memory barrier operation right here, to
	 * ensure that any flag variables we might have changed get flushed to
	 * main memory before we check/set is_set.	Without that, we have to
502 503 504 505
	 * require that callers provide their own synchronization for machines
	 * with weak memory ordering (see latch.h).
	 */

506 507 508 509 510 511 512
	/* Quick exit if already set */
	if (latch->is_set)
		return;

	latch->is_set = true;

	/*
513 514 515
	 * See if anyone's waiting for the latch. It can be the current process if
	 * we're in a signal handler. We use the self-pipe to wake up the select()
	 * in that case. If it's another process, send a signal.
516
	 *
517 518 519 520 521 522 523 524 525
	 * Fetch owner_pid only once, in case the latch is concurrently getting
	 * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
	 * guaranteed to be true! In practice, the effective range of pid_t fits
	 * in a 32 bit integer, and so should be atomic. In the worst case, we
	 * might end up signaling the wrong process. Even then, you're very
	 * unlucky if a process with that bogus pid exists and belongs to
	 * Postgres; and PG database processes should handle excess SIGUSR1
	 * interrupts without a problem anyhow.
	 *
526 527 528 529 530 531
	 * Another sort of race condition that's possible here is for a new
	 * process to own the latch immediately after we look, so we don't signal
	 * it. This is okay so long as all callers of ResetLatch/WaitLatch follow
	 * the standard coding convention of waiting at the bottom of their loops,
	 * not the top, so that they'll correctly process latch-setting events
	 * that happen before they enter the loop.
532 533 534 535 536
	 */
	owner_pid = latch->owner_pid;
	if (owner_pid == 0)
		return;
	else if (owner_pid == MyProcPid)
537 538 539 540
	{
		if (waiting)
			sendSelfPipeByte();
	}
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
	else
		kill(owner_pid, SIGUSR1);
}

/*
 * Clear the latch. Calling WaitLatch after this will sleep, unless
 * the latch is set again before the WaitLatch call.
 */
void
ResetLatch(volatile Latch *latch)
{
	/* Only the owner should reset the latch */
	Assert(latch->owner_pid == MyProcPid);

	latch->is_set = false;
556 557 558 559

	/*
	 * XXX there really ought to be a memory barrier operation right here, to
	 * ensure that the write to is_set gets flushed to main memory before we
560
	 * examine any flag variables.	Otherwise a concurrent SetLatch might
561 562 563 564 565
	 * falsely conclude that it needn't signal us, even though we have missed
	 * seeing some flag updates that SetLatch was supposed to inform us of.
	 * For the moment, callers must supply their own synchronization of flag
	 * variables (see latch.h).
	 */
566 567 568
}

/*
569 570 571
 * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
 *
 * Wake up WaitLatch, if we're waiting.  (We might not be, since SIGUSR1 is
572 573 574 575 576
 * overloaded for multiple purposes; or we might not have reached WaitLatch
 * yet, in which case we don't need to fill the pipe either.)
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.
577 578 579 580 581 582 583 584 585 586 587 588
 */
void
latch_sigusr1_handler(void)
{
	if (waiting)
		sendSelfPipeByte();
}

/* Send one byte to the self-pipe, to wake up WaitLatch */
static void
sendSelfPipeByte(void)
{
589 590
	int			rc;
	char		dummy = 0;
591 592 593 594 595 596 597 598 599 600

retry:
	rc = write(selfpipe_writefd, &dummy, 1);
	if (rc < 0)
	{
		/* If interrupted by signal, just retry */
		if (errno == EINTR)
			goto retry;

		/*
601 602
		 * If the pipe is full, we don't need to retry, the data that's there
		 * already is enough to wake up WaitLatch.
603 604 605 606 607
		 */
		if (errno == EAGAIN || errno == EWOULDBLOCK)
			return;

		/*
608 609 610
		 * Oops, the write() failed for some other reason. We might be in a
		 * signal handler, so it's not safe to elog(). We have no choice but
		 * silently ignore the error.
611 612 613 614 615
		 */
		return;
	}
}

616 617 618 619 620 621 622
/*
 * Read all available data from the self-pipe
 *
 * Note: this is only called when waiting = true.  If it fails and doesn't
 * return, it must reset that flag first (though ideally, this will never
 * happen).
 */
623 624 625 626
static void
drainSelfPipe(void)
{
	/*
627
	 * There shouldn't normally be more than one byte in the pipe, or maybe a
628
	 * few bytes if multiple processes run SetLatch at the same instant.
629
	 */
630 631
	char		buf[16];
	int			rc;
632 633 634 635 636 637 638

	for (;;)
	{
		rc = read(selfpipe_readfd, buf, sizeof(buf));
		if (rc < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
639
				break;			/* the pipe is empty */
640
			else if (errno == EINTR)
641
				continue;		/* retry */
642
			else
643 644
			{
				waiting = false;
645
				elog(ERROR, "read() on self-pipe failed: %m");
646
			}
647 648
		}
		else if (rc == 0)
649 650
		{
			waiting = false;
651
			elog(ERROR, "unexpected EOF on self-pipe");
652 653 654 655 656 657 658
		}
		else if (rc < sizeof(buf))
		{
			/* we successfully drained the pipe; no need to read() again */
			break;
		}
		/* else buffer wasn't big enough, so read again */
659 660
	}
}