unix_latch.c 17.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*-------------------------------------------------------------------------
 *
 * unix_latch.c
 *	  Routines for inter-process latches
 *
 * The Unix implementation uses the so-called self-pipe trick to overcome
 * the race condition involved with select() and setting a global flag
 * in the signal handler. When a latch is set and the current process
 * is waiting for it, the signal handler wakes up the select() in
 * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
 * interrupt select() on all platforms, and even on platforms where it
 * does, a signal that arrives just before the select() call does not
 * prevent the select() from entering sleep. An incoming byte on a pipe
14 15
 * however reliably interrupts the sleep, and causes select() to return
 * immediately even if the signal arrives before select() begins.
16
 *
17 18 19
 * (Actually, we prefer poll() over select() where available, but the
 * same comments apply to it.)
 *
20 21 22 23 24
 * When SetLatch is called from the same process that owns the latch,
 * SetLatch writes the byte directly to the pipe. If it's owned by another
 * process, SIGUSR1 is sent and the signal handler in the waiting process
 * writes the byte to the pipe on behalf of the signaling process.
 *
25
 * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
26 27 28
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
29
 *	  src/backend/port/unix_latch.c
30 31 32 33 34 35 36 37
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
38 39
#include <sys/time.h>
#include <sys/types.h>
40 41 42 43 44 45
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
46 47 48
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
49 50

#include "miscadmin.h"
51
#include "postmaster/postmaster.h"
52
#include "storage/latch.h"
53
#include "storage/pmsignal.h"
54 55 56 57 58
#include "storage/shmem.h"

/* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false;

59
/* Read and write ends of the self-pipe */
60 61
static int	selfpipe_readfd = -1;
static int	selfpipe_writefd = -1;
62 63 64 65 66 67 68 69 70 71 72 73 74

/* private function prototypes */
static void initSelfPipe(void);
static void drainSelfPipe(void);
static void sendSelfPipeByte(void);


/*
 * Initialize a backend-local latch.
 */
void
InitLatch(volatile Latch *latch)
{
75
	/* Initialize the self-pipe if this is our first latch in the process */
76 77 78 79 80 81 82 83 84 85
	if (selfpipe_readfd == -1)
		initSelfPipe();

	latch->is_set = false;
	latch->owner_pid = MyProcPid;
	latch->is_shared = false;
}

/*
 * Initialize a shared latch that can be set from other processes. The latch
86
 * is initially owned by no-one; use OwnLatch to associate it with the
87 88
 * current process.
 *
89 90
 * InitSharedLatch needs to be called in postmaster before forking child
 * processes, usually right after allocating the shared memory block
91 92 93
 * containing the latch with ShmemInitStruct. (The Unix implementation
 * doesn't actually require that, but the Windows one does.) Because of
 * this restriction, we have no concurrency issues to worry about here.
94 95 96 97 98 99 100 101 102 103 104
 */
void
InitSharedLatch(volatile Latch *latch)
{
	latch->is_set = false;
	latch->owner_pid = 0;
	latch->is_shared = true;
}

/*
 * Associate a shared latch with the current process, allowing it to
105 106 107 108 109 110
 * wait on the latch.
 *
 * Although there is a sanity check for latch-already-owned, we don't do
 * any sort of locking here, meaning that we could fail to detect the error
 * if two processes try to own the same latch at about the same time.  If
 * there is any risk of that, caller must provide an interlock to prevent it.
111
 *
112 113 114
 * In any process that calls OwnLatch(), make sure that
 * latch_sigusr1_handler() is called from the SIGUSR1 signal handler,
 * as shared latches use SIGUSR1 for inter-process communication.
115 116 117 118 119 120
 */
void
OwnLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);

121
	/* Initialize the self-pipe if this is our first latch in this process */
122 123 124
	if (selfpipe_readfd == -1)
		initSelfPipe();

125
	/* sanity check */
126 127
	if (latch->owner_pid != 0)
		elog(ERROR, "latch already owned");
128

129 130 131 132 133 134 135 136 137 138 139
	latch->owner_pid = MyProcPid;
}

/*
 * Disown a shared latch currently owned by the current process.
 */
void
DisownLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);
	Assert(latch->owner_pid == MyProcPid);
140

141 142 143 144
	latch->owner_pid = 0;
}

/*
145 146
 * Wait for a given latch to be set, or for postmaster death, or until timeout
 * is exceeded. 'wakeEvents' is a bitmask that specifies which of those events
147 148
 * to wait for. If the latch is already set (and WL_LATCH_SET is given), the
 * function returns immediately.
149
 *
150
 * The 'timeout' is given in milliseconds. It must be >= 0 if WL_TIMEOUT flag
151 152 153 154 155 156
 * is given.  On some platforms, signals do not interrupt the wait, or even
 * cause the timeout to be restarted, so beware that the function can sleep
 * for several times longer than the requested timeout.  However, this
 * difficulty is not so great as it seems, because the signal handlers for any
 * signals that the caller should respond to ought to be programmed to end the
 * wait by calling SetLatch.  Ideally, the timeout parameter is vestigial.
157 158 159 160 161
 *
 * The latch must be owned by the current process, ie. it must be a
 * backend-local latch initialized with InitLatch, or a shared latch
 * associated with the current process by calling OwnLatch.
 *
162
 * Returns bit mask indicating which condition(s) caused the wake-up. Note
163
 * that if multiple wake-up conditions are true, there is no guarantee that
164
 * we return all of them in one call, but we will return at least one.
165
 */
166 167
int
WaitLatch(volatile Latch *latch, int wakeEvents, long timeout)
168
{
169
	return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout);
170 171 172
}

/*
173 174
 * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
 * conditions.
175 176
 */
int
177 178
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
				  long timeout)
179
{
180 181 182 183 184 185
	int			result = 0;
	int			rc;
#ifdef HAVE_POLL
	struct pollfd pfds[3];
	int			nfds;
#else
186 187
	struct timeval tv,
			   *tvp = NULL;
188
	fd_set		input_mask;
189
	fd_set		output_mask;
190 191
	int			hifd;
#endif
192

193 194 195 196 197 198 199
	/* Ignore WL_SOCKET_* events if no valid socket is given */
	if (sock == PGINVALID_SOCKET)
		wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);

	Assert(wakeEvents != 0);	/* must have at least one wake event */

	if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid)
200 201 202
		elog(ERROR, "cannot wait on a latch owned by another process");

	/* Initialize timeout */
203
	if (wakeEvents & WL_TIMEOUT)
204
	{
205
		Assert(timeout >= 0);
206
#ifndef HAVE_POLL
207 208
		tv.tv_sec = timeout / 1000L;
		tv.tv_usec = (timeout % 1000L) * 1000L;
209
		tvp = &tv;
210 211 212 213 214 215 216 217
#endif
	}
	else
	{
#ifdef HAVE_POLL
		/* make sure poll() agrees there is no timeout */
		timeout = -1;
#endif
218 219 220
	}

	waiting = true;
221
	do
222 223
	{
		/*
224
		 * Clear the pipe, then check if the latch is set already. If someone
225 226 227 228
		 * sets the latch between this and the poll()/select() below, the
		 * setter will write a byte to the pipe (or signal us and the signal
		 * handler will do that), and the poll()/select() will return
		 * immediately.
229 230 231 232 233
		 *
		 * Note: we assume that the kernel calls involved in drainSelfPipe()
		 * and SetLatch() will provide adequate synchronization on machines
		 * with weak memory ordering, so that we cannot miss seeing is_set
		 * if the signal byte is already in the pipe when we drain it.
234 235
		 */
		drainSelfPipe();
236

237
		if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
238
		{
239 240 241 242 243
			result |= WL_LATCH_SET;
			/*
			 * Leave loop immediately, avoid blocking again. We don't attempt
			 * to report any other events that might also be satisfied.
			 */
244 245 246
			break;
		}

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
		/* Must wait ... we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
		nfds = 0;
		if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
		{
			/* socket, if used, is always in pfds[0] */
			pfds[0].fd = sock;
			pfds[0].events = 0;
			if (wakeEvents & WL_SOCKET_READABLE)
				pfds[0].events |= POLLIN;
			if (wakeEvents & WL_SOCKET_WRITEABLE)
				pfds[0].events |= POLLOUT;
			pfds[0].revents = 0;
			nfds++;
		}

		pfds[nfds].fd = selfpipe_readfd;
		pfds[nfds].events = POLLIN;
		pfds[nfds].revents = 0;
		nfds++;

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			/* postmaster fd, if used, is always in pfds[nfds - 1] */
			pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
			pfds[nfds].events = POLLIN;
			pfds[nfds].revents = 0;
			nfds++;
		}

		/* Sleep */
		rc = poll(pfds, nfds, (int) timeout);

		/* Check return code */
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;
			waiting = false;
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("poll() failed: %m")));
		}
		if (rc == 0 && (wakeEvents & WL_TIMEOUT))
		{
			/* timeout exceeded */
			result |= WL_TIMEOUT;
		}
		if ((wakeEvents & WL_SOCKET_READABLE) &&
296
			(pfds[0].revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
297 298 299 300 301 302 303 304 305
		{
			/* data available in socket */
			result |= WL_SOCKET_READABLE;
		}
		if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
			(pfds[0].revents & POLLOUT))
		{
			result |= WL_SOCKET_WRITEABLE;
		}
306 307 308 309 310
		/*
		 * We expect a POLLHUP when the remote end is closed, but because we
		 * don't expect the pipe to become readable or to have any errors
		 * either, treat those as postmaster death, too.
		 */
311
		if ((wakeEvents & WL_POSTMASTER_DEATH) &&
312
			(pfds[nfds - 1].revents & (POLLHUP | POLLIN | POLLERR | POLLNVAL)))
313
		{
314 315 316 317 318 319 320 321 322 323 324
			/*
			 * According to the select(2) man page on Linux, select(2) may
			 * spuriously return and report a file descriptor as readable,
			 * when it's not; and presumably so can poll(2).  It's not clear
			 * that the relevant cases would ever apply to the postmaster
			 * pipe, but since the consequences of falsely returning
			 * WL_POSTMASTER_DEATH could be pretty unpleasant, we take the
			 * trouble to positively verify EOF with PostmasterIsAlive().
			 */
			if (!PostmasterIsAlive())
				result |= WL_POSTMASTER_DEATH;
325 326 327 328
		}

#else /* !HAVE_POLL */

329
		FD_ZERO(&input_mask);
330 331
		FD_ZERO(&output_mask);

332 333
		FD_SET(selfpipe_readfd, &input_mask);
		hifd = selfpipe_readfd;
334 335 336 337 338 339 340 341 342

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask);
			if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd)
				hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
		}

		if (wakeEvents & WL_SOCKET_READABLE)
343 344 345 346 347 348
		{
			FD_SET(sock, &input_mask);
			if (sock > hifd)
				hifd = sock;
		}

349
		if (wakeEvents & WL_SOCKET_WRITEABLE)
350 351 352 353 354 355
		{
			FD_SET(sock, &output_mask);
			if (sock > hifd)
				hifd = sock;
		}

356
		/* Sleep */
357
		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
358 359

		/* Check return code */
360 361 362 363
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;
364
			waiting = false;
365 366 367 368
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("select() failed: %m")));
		}
369
		if (rc == 0 && (wakeEvents & WL_TIMEOUT))
370 371
		{
			/* timeout exceeded */
372
			result |= WL_TIMEOUT;
373
		}
374
		if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask))
375
		{
376 377
			/* data available in socket */
			result |= WL_SOCKET_READABLE;
378
		}
379 380 381 382 383 384 385
		if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask))
		{
			result |= WL_SOCKET_WRITEABLE;
		}
		if ((wakeEvents & WL_POSTMASTER_DEATH) &&
			 FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask))
		{
386 387 388 389 390 391 392 393 394 395 396
			/*
			 * According to the select(2) man page on Linux, select(2) may
			 * spuriously return and report a file descriptor as readable,
			 * when it's not; and presumably so can poll(2).  It's not clear
			 * that the relevant cases would ever apply to the postmaster
			 * pipe, but since the consequences of falsely returning
			 * WL_POSTMASTER_DEATH could be pretty unpleasant, we take the
			 * trouble to positively verify EOF with PostmasterIsAlive().
			 */
			if (!PostmasterIsAlive())
				result |= WL_POSTMASTER_DEATH;
397
		}
398
#endif /* HAVE_POLL */
399
	} while (result == 0);
400 401 402 403 404 405
	waiting = false;

	return result;
}

/*
406 407 408
 * Sets a latch and wakes up anyone waiting on it.
 *
 * This is cheap if the latch is already set, otherwise not so much.
409 410 411 412
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.  (That's standard practice in most signal handlers, of
 * course, but we used to omit it in handlers that only set a flag.)
413 414 415 416
 */
void
SetLatch(volatile Latch *latch)
{
417
	pid_t		owner_pid;
418

419 420 421 422 423 424 425 426
	/*
	 * XXX there really ought to be a memory barrier operation right here,
	 * to ensure that any flag variables we might have changed get flushed
	 * to main memory before we check/set is_set.  Without that, we have to
	 * require that callers provide their own synchronization for machines
	 * with weak memory ordering (see latch.h).
	 */

427 428 429 430 431 432 433
	/* Quick exit if already set */
	if (latch->is_set)
		return;

	latch->is_set = true;

	/*
434 435 436
	 * See if anyone's waiting for the latch. It can be the current process if
	 * we're in a signal handler. We use the self-pipe to wake up the select()
	 * in that case. If it's another process, send a signal.
437
	 *
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
	 * Fetch owner_pid only once, in case the latch is concurrently getting
	 * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
	 * guaranteed to be true! In practice, the effective range of pid_t fits
	 * in a 32 bit integer, and so should be atomic. In the worst case, we
	 * might end up signaling the wrong process. Even then, you're very
	 * unlucky if a process with that bogus pid exists and belongs to
	 * Postgres; and PG database processes should handle excess SIGUSR1
	 * interrupts without a problem anyhow.
	 *
	 * Another sort of race condition that's possible here is for a new process
	 * to own the latch immediately after we look, so we don't signal it.
	 * This is okay so long as all callers of ResetLatch/WaitLatch follow the
	 * standard coding convention of waiting at the bottom of their loops,
	 * not the top, so that they'll correctly process latch-setting events that
	 * happen before they enter the loop.
453 454 455 456 457
	 */
	owner_pid = latch->owner_pid;
	if (owner_pid == 0)
		return;
	else if (owner_pid == MyProcPid)
458 459 460 461
	{
		if (waiting)
			sendSelfPipeByte();
	}
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
	else
		kill(owner_pid, SIGUSR1);
}

/*
 * Clear the latch. Calling WaitLatch after this will sleep, unless
 * the latch is set again before the WaitLatch call.
 */
void
ResetLatch(volatile Latch *latch)
{
	/* Only the owner should reset the latch */
	Assert(latch->owner_pid == MyProcPid);

	latch->is_set = false;
477 478 479 480 481 482 483 484 485 486

	/*
	 * XXX there really ought to be a memory barrier operation right here, to
	 * ensure that the write to is_set gets flushed to main memory before we
	 * examine any flag variables.  Otherwise a concurrent SetLatch might
	 * falsely conclude that it needn't signal us, even though we have missed
	 * seeing some flag updates that SetLatch was supposed to inform us of.
	 * For the moment, callers must supply their own synchronization of flag
	 * variables (see latch.h).
	 */
487 488 489
}

/*
490 491 492
 * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
 *
 * Wake up WaitLatch, if we're waiting.  (We might not be, since SIGUSR1 is
493 494 495 496 497
 * overloaded for multiple purposes; or we might not have reached WaitLatch
 * yet, in which case we don't need to fill the pipe either.)
 *
 * NB: when calling this in a signal handler, be sure to save and restore
 * errno around it.
498 499 500 501 502 503 504 505 506 507 508 509
 */
void
latch_sigusr1_handler(void)
{
	if (waiting)
		sendSelfPipeByte();
}

/* initialize the self-pipe */
static void
initSelfPipe(void)
{
510
	int			pipefd[2];
511 512 513 514 515

	/*
	 * Set up the self-pipe that allows a signal handler to wake up the
	 * select() in WaitLatch. Make the write-end non-blocking, so that
	 * SetLatch won't block if the event has already been set many times
516 517
	 * filling the kernel buffer. Make the read-end non-blocking too, so that
	 * we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
	 */
	if (pipe(pipefd) < 0)
		elog(FATAL, "pipe() failed: %m");
	if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
	if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");

	selfpipe_readfd = pipefd[0];
	selfpipe_writefd = pipefd[1];
}

/* Send one byte to the self-pipe, to wake up WaitLatch */
static void
sendSelfPipeByte(void)
{
534 535
	int			rc;
	char		dummy = 0;
536 537 538 539 540 541 542 543 544 545

retry:
	rc = write(selfpipe_writefd, &dummy, 1);
	if (rc < 0)
	{
		/* If interrupted by signal, just retry */
		if (errno == EINTR)
			goto retry;

		/*
546 547
		 * If the pipe is full, we don't need to retry, the data that's there
		 * already is enough to wake up WaitLatch.
548 549 550 551 552
		 */
		if (errno == EAGAIN || errno == EWOULDBLOCK)
			return;

		/*
553 554 555
		 * Oops, the write() failed for some other reason. We might be in a
		 * signal handler, so it's not safe to elog(). We have no choice but
		 * silently ignore the error.
556 557 558 559 560
		 */
		return;
	}
}

561 562 563 564 565 566 567
/*
 * Read all available data from the self-pipe
 *
 * Note: this is only called when waiting = true.  If it fails and doesn't
 * return, it must reset that flag first (though ideally, this will never
 * happen).
 */
568 569 570 571
static void
drainSelfPipe(void)
{
	/*
572
	 * There shouldn't normally be more than one byte in the pipe, or maybe a
573
	 * few bytes if multiple processes run SetLatch at the same instant.
574
	 */
575 576
	char		buf[16];
	int			rc;
577 578 579 580 581 582 583

	for (;;)
	{
		rc = read(selfpipe_readfd, buf, sizeof(buf));
		if (rc < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
584
				break;			/* the pipe is empty */
585
			else if (errno == EINTR)
586
				continue;		/* retry */
587
			else
588 589
			{
				waiting = false;
590
				elog(ERROR, "read() on self-pipe failed: %m");
591
			}
592 593
		}
		else if (rc == 0)
594 595
		{
			waiting = false;
596
			elog(ERROR, "unexpected EOF on self-pipe");
597 598 599 600 601 602 603
		}
		else if (rc < sizeof(buf))
		{
			/* we successfully drained the pipe; no need to read() again */
			break;
		}
		/* else buffer wasn't big enough, so read again */
604 605
	}
}