提交 b1332e98 编写于 作者: H Heikki Linnakangas

Put the logic to decide which synchronous standby is active into a function.

This avoids duplicating the code.

Michael Paquier, reviewed by Simon Riggs and me
上级 7afc2336
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
* Synchronous replication is new as of PostgreSQL 9.1. * Synchronous replication is new as of PostgreSQL 9.1.
* *
* If requested, transaction commits wait until their commit LSN is * If requested, transaction commits wait until their commit LSN is
* acknowledged by the sync standby. * acknowledged by the synchronous standby.
* *
* This module contains the code for waiting and release of backends. * This module contains the code for waiting and release of backends.
* All code in this module executes on the primary. The core streaming * All code in this module executes on the primary. The core streaming
...@@ -357,6 +357,60 @@ SyncRepInitConfig(void) ...@@ -357,6 +357,60 @@ SyncRepInitConfig(void)
} }
} }
/*
* Find the WAL sender servicing the synchronous standby with the lowest
* priority value, or NULL if no synchronous standby is connected. If there
* are multiple standbys with the same lowest priority value, the first one
* found is selected. The caller must hold SyncRepLock.
*/
WalSnd *
SyncRepGetSynchronousStandby(void)
{
WalSnd *result = NULL;
int result_priority = 0;
int i;
for (i = 0; i < max_wal_senders; i++)
{
/* Use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
int this_priority;
/* Must be active */
if (walsnd->pid == 0)
continue;
/* Must be streaming */
if (walsnd->state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
this_priority = walsnd->sync_standby_priority;
if (this_priority == 0)
continue;
/* Must have a lower priority value than any previous ones */
if (result != NULL && result_priority <= this_priority)
continue;
/* Must have a valid flush position */
if (XLogRecPtrIsInvalid(walsnd->flush))
continue;
result = (WalSnd *) walsnd;
result_priority = this_priority;
/*
* If priority is equal to 1, there cannot be any other WAL senders
* with a lower priority, so we're done.
*/
if (this_priority == 1)
return result;
}
return result;
}
/* /*
* Update the LSNs on each queue based upon our latest state. This * Update the LSNs on each queue based upon our latest state. This
* implements a simple policy of first-valid-standby-releases-waiter. * implements a simple policy of first-valid-standby-releases-waiter.
...@@ -368,11 +422,9 @@ void ...@@ -368,11 +422,9 @@ void
SyncRepReleaseWaiters(void) SyncRepReleaseWaiters(void)
{ {
volatile WalSndCtlData *walsndctl = WalSndCtl; volatile WalSndCtlData *walsndctl = WalSndCtl;
volatile WalSnd *syncWalSnd = NULL; WalSnd *syncWalSnd;
int numwrite = 0; int numwrite = 0;
int numflush = 0; int numflush = 0;
int priority = 0;
int i;
/* /*
* If this WALSender is serving a standby that is not on the list of * If this WALSender is serving a standby that is not on the list of
...@@ -387,33 +439,13 @@ SyncRepReleaseWaiters(void) ...@@ -387,33 +439,13 @@ SyncRepReleaseWaiters(void)
/* /*
* We're a potential sync standby. Release waiters if we are the highest * We're a potential sync standby. Release waiters if we are the highest
* priority standby. If there are multiple standbys with same priorities * priority standby.
* then we use the first mentioned standby. If you change this, also
* change pg_stat_get_wal_senders().
*/ */
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
syncWalSnd = SyncRepGetSynchronousStandby();
for (i = 0; i < max_wal_senders; i++) /* We should have found ourselves at least */
{ Assert(syncWalSnd != NULL);
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &walsndctl->walsnds[i];
if (walsnd->pid != 0 &&
walsnd->state == WALSNDSTATE_STREAMING &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
priority > walsnd->sync_standby_priority) &&
!XLogRecPtrIsInvalid(walsnd->flush))
{
priority = walsnd->sync_standby_priority;
syncWalSnd = walsnd;
}
}
/*
* We should have found ourselves at least.
*/
Assert(syncWalSnd);
/* /*
* If we aren't managing the highest priority standby then just leave. * If we aren't managing the highest priority standby then just leave.
......
...@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -2741,9 +2741,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore; Tuplestorestate *tupstore;
MemoryContext per_query_ctx; MemoryContext per_query_ctx;
MemoryContext oldcontext; MemoryContext oldcontext;
int *sync_priority; WalSnd *sync_standby;
int priority = 0;
int sync_standby = -1;
int i; int i;
/* check to see if caller supports us returning a tuplestore */ /* check to see if caller supports us returning a tuplestore */
...@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -2772,38 +2770,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
/* /*
* Get the priorities of sync standbys all in one go, to minimise lock * Get the currently active synchronous standby.
* acquisitions and to allow us to evaluate who is the current sync
* standby. This code must match the code in SyncRepReleaseWaiters().
*/ */
sync_priority = palloc(sizeof(int) * max_wal_senders);
LWLockAcquire(SyncRepLock, LW_SHARED); LWLockAcquire(SyncRepLock, LW_SHARED);
for (i = 0; i < max_wal_senders; i++) sync_standby = SyncRepGetSynchronousStandby();
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
if (walsnd->pid != 0)
{
/*
* Treat a standby such as a pg_basebackup background process
* which always returns an invalid flush location, as an
* asynchronous standby.
*/
sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
0 : walsnd->sync_standby_priority;
if (walsnd->state == WALSNDSTATE_STREAMING &&
walsnd->sync_standby_priority > 0 &&
(priority == 0 ||
priority > walsnd->sync_standby_priority) &&
!XLogRecPtrIsInvalid(walsnd->flush))
{
priority = walsnd->sync_standby_priority;
sync_standby = i;
}
}
}
LWLockRelease(SyncRepLock); LWLockRelease(SyncRepLock);
for (i = 0; i < max_wal_senders; i++) for (i = 0; i < max_wal_senders; i++)
...@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -2814,6 +2784,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
XLogRecPtr write; XLogRecPtr write;
XLogRecPtr flush; XLogRecPtr flush;
XLogRecPtr apply; XLogRecPtr apply;
int priority;
WalSndState state; WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
...@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -2827,6 +2798,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
write = walsnd->write; write = walsnd->write;
flush = walsnd->flush; flush = walsnd->flush;
apply = walsnd->apply; apply = walsnd->apply;
priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls)); memset(nulls, 0, sizeof(nulls));
...@@ -2857,15 +2829,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -2857,15 +2829,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[5] = true; nulls[5] = true;
values[5] = LSNGetDatum(apply); values[5] = LSNGetDatum(apply);
values[6] = Int32GetDatum(sync_priority[i]); /*
* Treat a standby such as a pg_basebackup background process
* which always returns an invalid flush location, as an
* asynchronous standby.
*/
priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
values[6] = Int32GetDatum(priority);
/* /*
* More easily understood version of standby state. This is purely * More easily understood version of standby state. This is purely
* informational, not different from priority. * informational, not different from priority.
*/ */
if (sync_priority[i] == 0) if (priority == 0)
values[7] = CStringGetTextDatum("async"); values[7] = CStringGetTextDatum("async");
else if (i == sync_standby) else if (walsnd == sync_standby)
values[7] = CStringGetTextDatum("sync"); values[7] = CStringGetTextDatum("sync");
else else
values[7] = CStringGetTextDatum("potential"); values[7] = CStringGetTextDatum("potential");
...@@ -2873,7 +2852,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -2873,7 +2852,6 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
} }
pfree(sync_priority);
/* clean up and return the tuplestore */ /* clean up and return the tuplestore */
tuplestore_donestoring(tupstore); tuplestore_donestoring(tupstore);
......
...@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void); ...@@ -50,6 +50,10 @@ extern void SyncRepUpdateSyncStandbysDefined(void);
/* called by various procs */ /* called by various procs */
extern int SyncRepWakeQueue(bool all, int mode); extern int SyncRepWakeQueue(bool all, int mode);
/* forward declaration to avoid pulling in walsender_private.h */
struct WalSnd;
extern struct WalSnd *SyncRepGetSynchronousStandby(void);
extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source);
extern void assign_synchronous_commit(int newval, void *extra); extern void assign_synchronous_commit(int newval, void *extra);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册