unix_latch.c 13.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/*-------------------------------------------------------------------------
 *
 * unix_latch.c
 *	  Routines for inter-process latches
 *
 * The Unix implementation uses the so-called self-pipe trick to overcome
 * the race condition involved with select() and setting a global flag
 * in the signal handler. When a latch is set and the current process
 * is waiting for it, the signal handler wakes up the select() in
 * WaitLatch by writing a byte to a pipe. A signal by itself doesn't
 * interrupt select() on all platforms, and even on platforms where it
 * does, a signal that arrives just before the select() call does not
 * prevent the select() from entering sleep. An incoming byte on a pipe
14 15
 * however reliably interrupts the sleep, and causes select() to return
 * immediately even if the signal arrives before select() begins.
16 17 18 19 20 21
 *
 * When SetLatch is called from the same process that owns the latch,
 * SetLatch writes the byte directly to the pipe. If it's owned by another
 * process, SIGUSR1 is sent and the signal handler in the waiting process
 * writes the byte to the pipe on behalf of the signaling process.
 *
B
Bruce Momjian 已提交
22
 * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
23 24 25
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
26
 *	  src/backend/port/unix_latch.c
27 28 29 30 31 32 33 34
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
35 36 37 38 39
#include <sys/time.h>
#include <sys/types.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
40 41

#include "miscadmin.h"
42
#include "postmaster/postmaster.h"
43 44 45 46 47 48
#include "storage/latch.h"
#include "storage/shmem.h"

/* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false;

49
/* Read and write ends of the self-pipe */
50 51
static int	selfpipe_readfd = -1;
static int	selfpipe_writefd = -1;
52 53 54 55 56 57 58 59 60 61 62 63 64

/* private function prototypes */
static void initSelfPipe(void);
static void drainSelfPipe(void);
static void sendSelfPipeByte(void);


/*
 * Initialize a backend-local latch.
 */
void
InitLatch(volatile Latch *latch)
{
65
	/* Initialize the self-pipe if this is our first latch in the process */
66 67 68 69 70 71 72 73 74 75
	if (selfpipe_readfd == -1)
		initSelfPipe();

	latch->is_set = false;
	latch->owner_pid = MyProcPid;
	latch->is_shared = false;
}

/*
 * Initialize a shared latch that can be set from other processes. The latch
76
 * is initially owned by no-one; use OwnLatch to associate it with the
77 78
 * current process.
 *
79 80
 * InitSharedLatch needs to be called in postmaster before forking child
 * processes, usually right after allocating the shared memory block
81 82 83
 * containing the latch with ShmemInitStruct. (The Unix implementation
 * doesn't actually require that, but the Windows one does.) Because of
 * this restriction, we have no concurrency issues to worry about here.
84 85 86 87 88 89 90 91 92 93 94
 */
void
InitSharedLatch(volatile Latch *latch)
{
	latch->is_set = false;
	latch->owner_pid = 0;
	latch->is_shared = true;
}

/*
 * Associate a shared latch with the current process, allowing it to
95 96 97 98 99 100
 * wait on the latch.
 *
 * Although there is a sanity check for latch-already-owned, we don't do
 * any sort of locking here, meaning that we could fail to detect the error
 * if two processes try to own the same latch at about the same time.  If
 * there is any risk of that, caller must provide an interlock to prevent it.
101
 *
102 103 104
 * In any process that calls OwnLatch(), make sure that
 * latch_sigusr1_handler() is called from the SIGUSR1 signal handler,
 * as shared latches use SIGUSR1 for inter-process communication.
105 106 107 108 109 110
 */
void
OwnLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);

111
	/* Initialize the self-pipe if this is our first latch in this process */
112 113 114
	if (selfpipe_readfd == -1)
		initSelfPipe();

115
	/* sanity check */
116 117
	if (latch->owner_pid != 0)
		elog(ERROR, "latch already owned");
118

119 120 121 122 123 124 125 126 127 128 129
	latch->owner_pid = MyProcPid;
}

/*
 * Disown a shared latch currently owned by the current process.
 */
void
DisownLatch(volatile Latch *latch)
{
	Assert(latch->is_shared);
	Assert(latch->owner_pid == MyProcPid);
130

131 132 133 134
	latch->owner_pid = 0;
}

/*
135 136
 * Wait for a given latch to be set, or for postmaster death, or until timeout
 * is exceeded. 'wakeEvents' is a bitmask that specifies which of those events
137 138
 * to wait for. If the latch is already set (and WL_LATCH_SET is given), the
 * function returns immediately.
139
 *
140
 * The 'timeout' is given in milliseconds. It must be >= 0 if WL_TIMEOUT flag
141 142 143
 * is given.  On some platforms, signals cause the timeout to be restarted,
 * so beware that the function can sleep for several times longer than the
 * specified timeout.
144 145 146 147 148
 *
 * The latch must be owned by the current process, ie. it must be a
 * backend-local latch initialized with InitLatch, or a shared latch
 * associated with the current process by calling OwnLatch.
 *
149
 * Returns bit mask indicating which condition(s) caused the wake-up. Note
150 151 152 153 154 155 156 157
 * that if multiple wake-up conditions are true, there is no guarantee that
 * we return all of them in one call, but we will return at least one. Also,
 * according to the select(2) man page on Linux, select(2) may spuriously
 * return and report a file descriptor as readable, when it's not. We use
 * select(2), so WaitLatch can also spuriously claim that a socket is
 * readable, or postmaster has died, even when none of the wake conditions
 * have been satisfied. That should be rare in practice, but the caller
 * should not use the return value for anything critical, re-checking the
158
 * situation with PostmasterIsAlive() or read() on a socket as necessary.
159
 * The latch and timeout flag bits can be trusted, however.
160
 */
161 162
int
WaitLatch(volatile Latch *latch, int wakeEvents, long timeout)
163
{
164
	return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout);
165 166 167
}

/*
168 169
 * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
 * conditions.
170 171
 */
int
172 173
WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
				  long timeout)
174
{
175 176
	struct timeval tv,
			   *tvp = NULL;
177
	fd_set		input_mask;
178
	fd_set		output_mask;
179 180 181
	int			rc;
	int			result = 0;

182 183 184 185 186 187 188
	/* Ignore WL_SOCKET_* events if no valid socket is given */
	if (sock == PGINVALID_SOCKET)
		wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);

	Assert(wakeEvents != 0);	/* must have at least one wake event */

	if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid)
189 190 191
		elog(ERROR, "cannot wait on a latch owned by another process");

	/* Initialize timeout */
192
	if (wakeEvents & WL_TIMEOUT)
193
	{
194
		Assert(timeout >= 0);
195 196
		tv.tv_sec = timeout / 1000L;
		tv.tv_usec = (timeout % 1000L) * 1000L;
197 198 199 200
		tvp = &tv;
	}

	waiting = true;
201
	do
202
	{
203
		int			hifd;
204 205

		/*
206
		 * Clear the pipe, then check if the latch is set already. If someone
207 208 209
		 * sets the latch between this and the select() below, the setter will
		 * write a byte to the pipe (or signal us and the signal handler will
		 * do that), and the select() will return immediately.
210 211 212 213 214
		 *
		 * Note: we assume that the kernel calls involved in drainSelfPipe()
		 * and SetLatch() will provide adequate synchronization on machines
		 * with weak memory ordering, so that we cannot miss seeing is_set
		 * if the signal byte is already in the pipe when we drain it.
215 216
		 */
		drainSelfPipe();
217

218
		if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
219
		{
220 221 222 223 224
			result |= WL_LATCH_SET;
			/*
			 * Leave loop immediately, avoid blocking again. We don't attempt
			 * to report any other events that might also be satisfied.
			 */
225 226 227
			break;
		}

228
		/* Must wait ... set up the event masks for select() */
229
		FD_ZERO(&input_mask);
230 231
		FD_ZERO(&output_mask);

232 233
		FD_SET(selfpipe_readfd, &input_mask);
		hifd = selfpipe_readfd;
234 235 236 237 238 239 240 241 242

		if (wakeEvents & WL_POSTMASTER_DEATH)
		{
			FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask);
			if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd)
				hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
		}

		if (wakeEvents & WL_SOCKET_READABLE)
243 244 245 246 247 248
		{
			FD_SET(sock, &input_mask);
			if (sock > hifd)
				hifd = sock;
		}

249
		if (wakeEvents & WL_SOCKET_WRITEABLE)
250 251 252 253 254 255
		{
			FD_SET(sock, &output_mask);
			if (sock > hifd)
				hifd = sock;
		}

256
		/* Sleep */
257
		rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
258 259

		/* Check return code */
260 261 262 263 264 265 266 267
		if (rc < 0)
		{
			if (errno == EINTR)
				continue;
			ereport(ERROR,
					(errcode_for_socket_access(),
					 errmsg("select() failed: %m")));
		}
268
		if (rc == 0 && (wakeEvents & WL_TIMEOUT))
269 270
		{
			/* timeout exceeded */
271
			result |= WL_TIMEOUT;
272
		}
273
		if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask))
274
		{
275 276
			/* data available in socket */
			result |= WL_SOCKET_READABLE;
277
		}
278 279 280 281 282 283 284 285 286
		if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask))
		{
			result |= WL_SOCKET_WRITEABLE;
		}
		if ((wakeEvents & WL_POSTMASTER_DEATH) &&
			 FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask))
		{
			result |= WL_POSTMASTER_DEATH;
		}
287
	} while (result == 0);
288 289 290 291 292 293
	waiting = false;

	return result;
}

/*
294 295 296
 * Sets a latch and wakes up anyone waiting on it.
 *
 * This is cheap if the latch is already set, otherwise not so much.
297 298 299 300
 */
void
SetLatch(volatile Latch *latch)
{
301
	pid_t		owner_pid;
302

303 304 305 306 307 308 309 310
	/*
	 * XXX there really ought to be a memory barrier operation right here,
	 * to ensure that any flag variables we might have changed get flushed
	 * to main memory before we check/set is_set.  Without that, we have to
	 * require that callers provide their own synchronization for machines
	 * with weak memory ordering (see latch.h).
	 */

311 312 313 314 315 316 317
	/* Quick exit if already set */
	if (latch->is_set)
		return;

	latch->is_set = true;

	/*
318 319 320
	 * See if anyone's waiting for the latch. It can be the current process if
	 * we're in a signal handler. We use the self-pipe to wake up the select()
	 * in that case. If it's another process, send a signal.
321
	 *
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
	 * Fetch owner_pid only once, in case the latch is concurrently getting
	 * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
	 * guaranteed to be true! In practice, the effective range of pid_t fits
	 * in a 32 bit integer, and so should be atomic. In the worst case, we
	 * might end up signaling the wrong process. Even then, you're very
	 * unlucky if a process with that bogus pid exists and belongs to
	 * Postgres; and PG database processes should handle excess SIGUSR1
	 * interrupts without a problem anyhow.
	 *
	 * Another sort of race condition that's possible here is for a new process
	 * to own the latch immediately after we look, so we don't signal it.
	 * This is okay so long as all callers of ResetLatch/WaitLatch follow the
	 * standard coding convention of waiting at the bottom of their loops,
	 * not the top, so that they'll correctly process latch-setting events that
	 * happen before they enter the loop.
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
	 */
	owner_pid = latch->owner_pid;
	if (owner_pid == 0)
		return;
	else if (owner_pid == MyProcPid)
		sendSelfPipeByte();
	else
		kill(owner_pid, SIGUSR1);
}

/*
 * Clear the latch. Calling WaitLatch after this will sleep, unless
 * the latch is set again before the WaitLatch call.
 */
void
ResetLatch(volatile Latch *latch)
{
	/* Only the owner should reset the latch */
	Assert(latch->owner_pid == MyProcPid);

	latch->is_set = false;
358 359 360 361 362 363 364 365 366 367

	/*
	 * XXX there really ought to be a memory barrier operation right here, to
	 * ensure that the write to is_set gets flushed to main memory before we
	 * examine any flag variables.  Otherwise a concurrent SetLatch might
	 * falsely conclude that it needn't signal us, even though we have missed
	 * seeing some flag updates that SetLatch was supposed to inform us of.
	 * For the moment, callers must supply their own synchronization of flag
	 * variables (see latch.h).
	 */
368 369 370
}

/*
371 372 373 374
 * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
 *
 * Wake up WaitLatch, if we're waiting.  (We might not be, since SIGUSR1 is
 * overloaded for multiple purposes.)
375 376 377 378 379 380 381 382 383 384 385 386
 */
void
latch_sigusr1_handler(void)
{
	if (waiting)
		sendSelfPipeByte();
}

/* initialize the self-pipe */
static void
initSelfPipe(void)
{
387
	int			pipefd[2];
388 389 390 391 392

	/*
	 * Set up the self-pipe that allows a signal handler to wake up the
	 * select() in WaitLatch. Make the write-end non-blocking, so that
	 * SetLatch won't block if the event has already been set many times
393 394
	 * filling the kernel buffer. Make the read-end non-blocking too, so that
	 * we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
	 */
	if (pipe(pipefd) < 0)
		elog(FATAL, "pipe() failed: %m");
	if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
	if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
		elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");

	selfpipe_readfd = pipefd[0];
	selfpipe_writefd = pipefd[1];
}

/* Send one byte to the self-pipe, to wake up WaitLatch */
static void
sendSelfPipeByte(void)
{
411 412
	int			rc;
	char		dummy = 0;
413 414 415 416 417 418 419 420 421 422

retry:
	rc = write(selfpipe_writefd, &dummy, 1);
	if (rc < 0)
	{
		/* If interrupted by signal, just retry */
		if (errno == EINTR)
			goto retry;

		/*
423 424
		 * If the pipe is full, we don't need to retry, the data that's there
		 * already is enough to wake up WaitLatch.
425 426 427 428 429
		 */
		if (errno == EAGAIN || errno == EWOULDBLOCK)
			return;

		/*
430 431 432
		 * Oops, the write() failed for some other reason. We might be in a
		 * signal handler, so it's not safe to elog(). We have no choice but
		 * silently ignore the error.
433 434 435 436 437 438 439 440 441 442
		 */
		return;
	}
}

/* Read all available data from the self-pipe */
static void
drainSelfPipe(void)
{
	/*
443 444
	 * There shouldn't normally be more than one byte in the pipe, or maybe a
	 * few more if multiple processes run SetLatch at the same instant.
445
	 */
446 447
	char		buf[16];
	int			rc;
448 449 450 451 452 453 454

	for (;;)
	{
		rc = read(selfpipe_readfd, buf, sizeof(buf));
		if (rc < 0)
		{
			if (errno == EAGAIN || errno == EWOULDBLOCK)
455
				break;			/* the pipe is empty */
456
			else if (errno == EINTR)
457
				continue;		/* retry */
458 459 460 461 462 463 464
			else
				elog(ERROR, "read() on self-pipe failed: %m");
		}
		else if (rc == 0)
			elog(ERROR, "unexpected EOF on self-pipe");
	}
}