unix_latch.c 16.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*-------------------------------------------------------------------------
 *
 * unix_latch.c
 *	  Routines for inter-process latches
 *
 * The Unix implementation uses the so-called self-pipe trick to overcome
 * the race condition involved with select() and setting a global flag
 * in the signal handler. When a latch is set and the current process
 * is waiting for it, the signal handler wakes up the select() in
 * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
 * interrupt select() on all platforms, and even on platforms where it
 * does, a signal that arrives just before the select() call does not
 * prevent the select() from entering sleep. An incoming byte on a pipe
14 15
 * however reliably interrupts the sleep, and causes select() to return
 * immediately even if the signal arrives before select() begins.
16
 *
17 18 19
 * (Actually, we prefer poll() over select() where available, but the
 * same comments apply to it.)
 *
20 21 22 23 24
 * When SetLatch is called from the same process that owns the latch,
 * SetLatch writes the byte directly to the pipe. If it's owned by another
 * process, SIGUSR1 is sent and the signal handler in the waiting process
 * writes the byte to the pipe on behalf of the signaling process.
 *
25
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
26 27 28
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
29
 *	  src/backend/port/unix_latch.c
30 31 32 33 34 35 36 37
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
38 39
#include <sys/time.h>
#include <sys/types.h>
40 41 42 43 44 45
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
46 47 48
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
49 50

#include "miscadmin.h"
51
#include "postmaster/postmaster.h"
52 53 54 55 56 57
#include "storage/latch.h"
#include "storage/shmem.h"

/* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false;

58
/* Read and write ends of the self-pipe */
59 60
static int	selfpipe_readfd = -1;
static int	selfpipe_writefd = -1;
61 62 63 64 65 66 67 68 69 70 71 72 73

/* private function prototypes */
static void initSelfPipe(void);
static void drainSelfPipe(void);
static void sendSelfPipeByte(void);


/*
 * Initialize a backend-local latch.
 */
void
InitLatch(volatile Latch *latch)
{
74
	/* Initialize the self-pipe if this is our first latch in the process */
75 76 77 78 79 80 81 82 83 84
	if (selfpipe_readfd == -1)
		initSelfPipe();

	latch->is_set = false;
	latch->owner_pid = MyProcPid;
	latch->is_shared = false;
}

/*
 * Initialize a shared latch that can be set from other processes. The latch
85
 * is initially owned by no-one; use OwnLatch to associate it with the
86 87
 * current process.
 *
88 89
 * InitSharedLatch needs to be called in postmaster before forking child
 * processes, usually right after allocating the shared memory block
90 91 92
 * containing the latch with ShmemInitStruct. (The Unix implementation
 * doesn't actually require that, but the Windows one does.) Because of
 * this restriction, we have no concurrency issues to worry about here.
93 94 95 96 97 98 99 100 101 102 103
 */
void
InitSharedLatch(volatile Latch *latch)
{
	latch->is_set = false;
	latch->owner_pid = 0;
	latch->is_shared = true;
}

/*
 * Associate a shared latch with the current process, allowing it to
104 105 106 107 108 109
 * wait on the latch.
 *
 * Although there is a sanity check for latch-already-owned, we don't do
 * any sort of locking here, meaning that we could fail to detect the error
 * if two processes try to own the same latch at about the same time.  If
 * there is any risk of that, caller must provide an interlock to prevent it.
110
 *
111 112 113
 * In any process that calls OwnLatch(), make sure that
 * latch_sigusr1_handler() is called from the SIGUSR1 signal handler,
 * as shared latches use SIGUSR1 for inter-process communication.
114 115 116 117 118 119
 */
void
OwnLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);

120
	/* Initialize the self-pipe if this is our first latch in this process */
121 122 123
	if (selfpipe_readfd == -1)
		initSelfPipe();

124
	/* sanity check */
125 126
	if (latch->owner_pid != 0)
		elog(ERROR, "latch already owned");
127

128 129 130 131 132 133 134 135 136 137 138
	latch->owner_pid = MyProcPid;
}

/*
 * Disown a shared latch currently owned by the current process.
 */
void
DisownLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);
	Assert(latch->owner_pid == MyProcPid);
139

140 141 142 143
	latch->owner_pid = 0;
}

/*
144 145
 * Wait for a given latch to be set, or for postmaster death, or until timeout
 * is exceeded. 'wakeEvents' is a bitmask that specifies which of those events
146 147
 * to wait for. If the latch is already set (and WL_LATCH_SET is given), the
 * function returns immediately.
148
 *
149
 * The 'timeout' is given in milliseconds. It must be >= 0 if WL_TIMEOUT flag
150 151 152 153 154 155
 * is given.  On some platforms, signals do not interrupt the wait, or even
 * cause the timeout to be restarted, so beware that the function can sleep
 * for several times longer than the requested timeout.  However, this
 * difficulty is not so great as it seems, because the signal handlers for any
 * signals that the caller should respond to ought to be programmed to end the
 * wait by calling SetLatch.  Ideally, the timeout parameter is vestigial.
156 157 158 159 160
 *
 * The latch must be owned by the current process, ie. it must be a
 * backend-local latch initialized with InitLatch, or a shared latch
 * associated with the current process by calling OwnLatch.
 *
161
 * Returns bit mask indicating which condition(s) caused the wake-up. Note
162 163 164 165 166 167 168 169
 * that if multiple wake-up conditions are true, there is no guarantee that
 * we return all of them in one call, but we will return at least one. Also,
 * according to the select(2) man page on Linux, select(2) may spuriously
 * return and report a file descriptor as readable, when it's not. We use
 * select(2), so WaitLatch can also spuriously claim that a socket is
 * readable, or postmaster has died, even when none of the wake conditions
 * have been satisfied. That should be rare in practice, but the caller
 * should not use the return value for anything critical, re-checking the
170
 * situation with PostmasterIsAlive() or read() on a socket as necessary.
171
 * The latch and timeout flag bits can be trusted, however.
172
 */
173 174
int
WaitLatch(volatile Latch *latch, int wakeEvents, long timeout)
175
{
176
	return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout);
177 178 179
}

/*
180 181
 * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
 * conditions.
182 183
 */
int
184 185
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
				  long timeout)
186
{
187 188 189 190 191 192
	int			result = 0;
	int			rc;
#ifdef HAVE_POLL
	struct pollfd pfds[3];
	int			nfds;
#else
193 194
	struct timeval tv,
			   *tvp = NULL;
195
	fd_set		input_mask;
196
	fd_set		output_mask;
197 198
	int			hifd;
#endif
199

200 201 202 203 204 205 206
	/* Ignore WL_SOCKET_* events if no valid socket is given */
	if (sock == PGINVALID_SOCKET)
		wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);

	Assert(wakeEvents != 0);	/* must have at least one wake event */

	if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid)
207 208 209
		elog(ERROR, "cannot wait on a latch owned by another process");

	/* Initialize timeout */
210
	if (wakeEvents & WL_TIMEOUT)
211
	{
212
		Assert(timeout >= 0);
213
#ifndef HAVE_POLL
214 215
		tv.tv_sec = timeout / 1000L;
		tv.tv_usec = (timeout % 1000L) * 1000L;
216
		tvp = &tv;
217 218 219 220 221 222 223 224
#endif
	}
	else
	{
#ifdef HAVE_POLL
		/* make sure poll() agrees there is no timeout */
		timeout = -1;
#endif
225 226 227
	}

	waiting = true;
228
	do
229 230
	{
		/*
231
		 * Clear the pipe, then check if the latch is set already. If someone
232 233 234 235
		 * sets the latch between this and the poll()/select() below, the
		 * setter will write a byte to the pipe (or signal us and the signal
		 * handler will do that), and the poll()/select() will return
		 * immediately.
236 237 238 239 240
		 *
		 * Note: we assume that the kernel calls involved in drainSelfPipe()
		 * and SetLatch() will provide adequate synchronization on machines
		 * with weak memory ordering, so that we cannot miss seeing is_set
		 * if the signal byte is already in the pipe when we drain it.
241 242
		 */
		drainSelfPipe();
243

244
		if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
245
		{
246 247 248 249 250
			result |= WL_LATCH_SET;
			/*
			 * Leave loop immediately, avoid blocking again. We don't attempt
			 * to report any other events that might also be satisfied.
			 */
251 252 253
			break;
		}

254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
		/* Must wait ... we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
		nfds = 0;
		if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
		{
			/* socket, if used, is always in pfds[0] */
			pfds[0].fd = sock;
			pfds[0].events = 0;
			if (wakeEvents & WL_SOCKET_READABLE)
				pfds[0].events |= POLLIN;
			if (wakeEvents & WL_SOCKET_WRITEABLE)
				pfds[0].events |= POLLOUT;
			pfds[0].revents = 0;
			nfds++;
		}

		pfds[nfds].fd = selfpipe_readfd;
		pfds[nfds].events = POLLIN;
		pfds[nfds].revents = 0;
		nfds++;

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			/* postmaster fd, if used, is always in pfds[nfds - 1] */
			pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
			pfds[nfds].events = POLLIN;
			pfds[nfds].revents = 0;
			nfds++;
		}

		/* Sleep */
		rc = poll(pfds, nfds, (int) timeout);

		/* Check return code */
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;
			waiting = false;
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("poll() failed: %m")));
		}
		if (rc == 0 && (wakeEvents & WL_TIMEOUT))
		{
			/* timeout exceeded */
			result |= WL_TIMEOUT;
		}
		if ((wakeEvents & WL_SOCKET_READABLE) &&
			(pfds[0].revents & POLLIN))
		{
			/* data available in socket */
			result |= WL_SOCKET_READABLE;
		}
		if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
			(pfds[0].revents & POLLOUT))
		{
			result |= WL_SOCKET_WRITEABLE;
		}
		if ((wakeEvents & WL_POSTMASTER_DEATH) &&
			(pfds[nfds - 1].revents & POLLIN))
		{
			result |= WL_POSTMASTER_DEATH;
		}

#else /* !HAVE_POLL */

321
		FD_ZERO(&input_mask);
322 323
		FD_ZERO(&output_mask);

324 325
		FD_SET(selfpipe_readfd, &input_mask);
		hifd = selfpipe_readfd;
326 327 328 329 330 331 332 333 334

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask);
			if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd)
				hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
		}

		if (wakeEvents & WL_SOCKET_READABLE)
335 336 337 338 339 340
		{
			FD_SET(sock, &input_mask);
			if (sock > hifd)
				hifd = sock;
		}

341
		if (wakeEvents & WL_SOCKET_WRITEABLE)
342 343 344 345 346 347
		{
			FD_SET(sock, &output_mask);
			if (sock > hifd)
				hifd = sock;
		}

348
		/* Sleep */
349
		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
350 351

		/* Check return code */
352 353 354 355
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;
356
			waiting = false;
357 358 359 360
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("select() failed: %m")));
		}
361
		if (rc == 0 && (wakeEvents & WL_TIMEOUT))
362 363
		{
			/* timeout exceeded */
364
			result |= WL_TIMEOUT;
365
		}
366
		if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask))
367
		{
368 369
			/* data available in socket */
			result |= WL_SOCKET_READABLE;
370
		}
371 372 373 374 375 376 377 378 379
		if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask))
		{
			result |= WL_SOCKET_WRITEABLE;
		}
		if ((wakeEvents & WL_POSTMASTER_DEATH) &&
			 FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask))
		{
			result |= WL_POSTMASTER_DEATH;
		}
380
#endif /* HAVE_POLL */
381
	} while (result == 0);
382 383 384 385 386 387
	waiting = false;

	return result;
}

/*
388 389 390
 * Sets a latch and wakes up anyone waiting on it.
 *
 * This is cheap if the latch is already set, otherwise not so much.
391 392 393 394
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.  (That's standard practice in most signal handlers, of
 * course, but we used to omit it in handlers that only set a flag.)
395 396 397 398
 */
void
SetLatch(volatile Latch *latch)
{
399
	pid_t		owner_pid;
400

401 402 403 404 405 406 407 408
	/*
	 * XXX there really ought to be a memory barrier operation right here,
	 * to ensure that any flag variables we might have changed get flushed
	 * to main memory before we check/set is_set.  Without that, we have to
	 * require that callers provide their own synchronization for machines
	 * with weak memory ordering (see latch.h).
	 */

409 410 411 412 413 414 415
	/* Quick exit if already set */
	if (latch->is_set)
		return;

	latch->is_set = true;

	/*
416 417 418
	 * See if anyone's waiting for the latch. It can be the current process if
	 * we're in a signal handler. We use the self-pipe to wake up the select()
	 * in that case. If it's another process, send a signal.
419
	 *
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
	 * Fetch owner_pid only once, in case the latch is concurrently getting
	 * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
	 * guaranteed to be true! In practice, the effective range of pid_t fits
	 * in a 32 bit integer, and so should be atomic. In the worst case, we
	 * might end up signaling the wrong process. Even then, you're very
	 * unlucky if a process with that bogus pid exists and belongs to
	 * Postgres; and PG database processes should handle excess SIGUSR1
	 * interrupts without a problem anyhow.
	 *
	 * Another sort of race condition that's possible here is for a new process
	 * to own the latch immediately after we look, so we don't signal it.
	 * This is okay so long as all callers of ResetLatch/WaitLatch follow the
	 * standard coding convention of waiting at the bottom of their loops,
	 * not the top, so that they'll correctly process latch-setting events that
	 * happen before they enter the loop.
435 436 437 438 439
	 */
	owner_pid = latch->owner_pid;
	if (owner_pid == 0)
		return;
	else if (owner_pid == MyProcPid)
440 441 442 443
	{
		if (waiting)
			sendSelfPipeByte();
	}
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
	else
		kill(owner_pid, SIGUSR1);
}

/*
 * Clear the latch. Calling WaitLatch after this will sleep, unless
 * the latch is set again before the WaitLatch call.
 */
void
ResetLatch(volatile Latch *latch)
{
	/* Only the owner should reset the latch */
	Assert(latch->owner_pid == MyProcPid);

	latch->is_set = false;
459 460 461 462 463 464 465 466 467 468

	/*
	 * XXX there really ought to be a memory barrier operation right here, to
	 * ensure that the write to is_set gets flushed to main memory before we
	 * examine any flag variables.  Otherwise a concurrent SetLatch might
	 * falsely conclude that it needn't signal us, even though we have missed
	 * seeing some flag updates that SetLatch was supposed to inform us of.
	 * For the moment, callers must supply their own synchronization of flag
	 * variables (see latch.h).
	 */
469 470 471
}

/*
472 473 474
 * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
 *
 * Wake up WaitLatch, if we're waiting.  (We might not be, since SIGUSR1 is
475 476 477 478 479
 * overloaded for multiple purposes; or we might not have reached WaitLatch
 * yet, in which case we don't need to fill the pipe either.)
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.
480 481 482 483 484 485 486 487 488 489 490 491
 */
void
latch_sigusr1_handler(void)
{
	if (waiting)
		sendSelfPipeByte();
}

/* initialize the self-pipe */
static void
initSelfPipe(void)
{
492
	int			pipefd[2];
493 494 495 496 497

	/*
	 * Set up the self-pipe that allows a signal handler to wake up the
	 * select() in WaitLatch. Make the write-end non-blocking, so that
	 * SetLatch won't block if the event has already been set many times
498 499
	 * filling the kernel buffer. Make the read-end non-blocking too, so that
	 * we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515
	 */
	if (pipe(pipefd) < 0)
		elog(FATAL, "pipe() failed: %m");
	if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
	if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");

	selfpipe_readfd = pipefd[0];
	selfpipe_writefd = pipefd[1];
}

/* Send one byte to the self-pipe, to wake up WaitLatch */
static void
sendSelfPipeByte(void)
{
516 517
	int			rc;
	char		dummy = 0;
518 519 520 521 522 523 524 525 526 527

retry:
	rc = write(selfpipe_writefd, &dummy, 1);
	if (rc < 0)
	{
		/* If interrupted by signal, just retry */
		if (errno == EINTR)
			goto retry;

		/*
528 529
		 * If the pipe is full, we don't need to retry, the data that's there
		 * already is enough to wake up WaitLatch.
530 531 532 533 534
		 */
		if (errno == EAGAIN || errno == EWOULDBLOCK)
			return;

		/*
535 536 537
		 * Oops, the write() failed for some other reason. We might be in a
		 * signal handler, so it's not safe to elog(). We have no choice but
		 * silently ignore the error.
538 539 540 541 542
		 */
		return;
	}
}

543 544 545 546 547 548 549
/*
 * Read all available data from the self-pipe
 *
 * Note: this is only called when waiting = true.  If it fails and doesn't
 * return, it must reset that flag first (though ideally, this will never
 * happen).
 */
550 551 552 553
static void
drainSelfPipe(void)
{
	/*
554
	 * There shouldn't normally be more than one byte in the pipe, or maybe a
555
	 * few bytes if multiple processes run SetLatch at the same instant.
556
	 */
557 558
	char		buf[16];
	int			rc;
559 560 561 562 563 564 565

	for (;;)
	{
		rc = read(selfpipe_readfd, buf, sizeof(buf));
		if (rc < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
566
				break;			/* the pipe is empty */
567
			else if (errno == EINTR)
568
				continue;		/* retry */
569
			else
570 571
			{
				waiting = false;
572
				elog(ERROR, "read() on self-pipe failed: %m");
573
			}
574 575
		}
		else if (rc == 0)
576 577
		{
			waiting = false;
578
			elog(ERROR, "unexpected EOF on self-pipe");
579 580 581 582 583 584 585
		}
		else if (rc < sizeof(buf))
		{
			/* we successfully drained the pipe; no need to read() again */
			break;
		}
		/* else buffer wasn't big enough, so read again */
586 587
	}
}