diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 5a43774b33dc73d6a066874ac30ed22705638172..63c6283f915bf92a43db94816fec55b165b10311 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1984,6 +1984,29 @@ SET ENABLE_SEQSCAN TO OFF;
+
+ wal_receiver_status_interval (integer)
+
+ wal_receiver_status_interval> configuration parameter
+
+
+
+ Specifies the minimum frequency, in seconds, for the WAL receiver
+ process on the standby to send information about replication progress
+ to the primary, where they can be seen using the
+ pg_stat_replication view. The standby will report
+ the last transaction log position it has written, the last position it
+ has flushed to disk, and the last position it has applied. Updates are
+ sent each time the write or flush positions changed, or at least as
+ often as specified by this parameter. Thus, the apply position may
+ lag slightly behind the true position. Setting this parameter to zero
+ disables status updates completely. This parameter can only be set in
+ the postgresql.conf> file or on the server command line.
+ The default value is 10 seconds.
+
+
+
+
vacuum_defer_cleanup_age (integer)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 2e8427a40f1913d2bade6c4210cf5ac5d917963b..58e3459e6785bc7294a51cf12adc0bc4c1c4d57b 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -298,8 +298,11 @@ postgres: user> database> host> pg_stat_replication>pg_stat_replication
One row per WAL sender process, showing process ID>,
user OID, user name, application name, client's address and port number,
- time at which the server process began execution, current WAL sender
- state and transaction log location. The columns detailing what exactly
+ time at which the server process began execution, and the current WAL
+ sender state and transaction log location. In addition, the standby
+ reports the last transaction log position it received and wrote, the last
+ position it flushed to disk, and the last position it replayed, and this
+ information is also displayed here. The columns detailing what exactly
the connection is doing are only visible if the user examining the view
is a superuser.
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index c09d961d67e22748b5807313aecf6d63d071b3a2..c923d3b154c5a05b22389dc66fc0fb9379d20977 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1469,6 +1469,82 @@ The commands accepted in walsender mode are:
shutdown), it will send a CommandComplete message before exiting.
This might not happen during an abnormal shutdown, of course.
+
+
+ The receiving process can send a status update back to the sender at
+ any time, using the following message format (also in the payload of
+ a CopyData message):
+
+
+
+
+
+
+ Standby status update (F)
+
+
+
+
+
+
+ Byte1('r')
+
+
+
+ Identifies the message as a receiver status update.
+
+
+
+
+
+ Byte8
+
+
+
+ The location of the last WAL byte + 1 received and written to disk
+ in the standby, in XLogRecPtr format.
+
+
+
+
+
+ Byte8
+
+
+
+ The location of the last WAL byte + 1 flushed to disk in
+ the standby, in XLogRecPtr format.
+
+
+
+
+
+ Byte8
+
+
+
+ The location of the last WAL byte + 1 applied in the standby, in
+ XLogRecPtr format.
+
+
+
+
+
+ Byte8
+
+
+
+ The server's system clock at the time of transmission,
+ given in TimestampTz format.
+
+
+
+
+
+
+
+
+
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e9dc7ba92a954345953971a0eee1cc87d584765..f5cb6576de4bd00ab4d87ed3b30fb3977d98c21a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9317,6 +9317,25 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(cstring_to_text(location));
}
+/*
+ * Get latest redo apply position.
+ *
+ * Exported to allow WALReceiver to read the pointer directly.
+ */
+XLogRecPtr
+GetXLogReplayRecPtr(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+ XLogRecPtr recptr;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ recptr = xlogctl->recoveryLastRecPtr;
+ SpinLockRelease(&xlogctl->info_lck);
+
+ return recptr;
+}
+
/*
* Report the last WAL replay location (same format as pg_start_backup etc)
*
@@ -9326,14 +9345,10 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
Datum
pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
{
- /* use volatile pointer to prevent code rearrangement */
- volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- SpinLockAcquire(&xlogctl->info_lck);
- recptr = xlogctl->recoveryLastRecPtr;
- SpinLockRelease(&xlogctl->info_lck);
+ recptr = GetXLogReplayRecPtr();
if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL();
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index e1d91afbd360579a96f46f2172f3043b1227e7e8..408e174658cd6303bbf29fe0d8cb31f7d5d8ea80 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -509,7 +509,10 @@ CREATE VIEW pg_stat_replication AS
S.client_port,
S.backend_start,
W.state,
- W.sent_location
+ W.sent_location,
+ W.write_location,
+ W.flush_location,
+ W.apply_location
FROM pg_stat_get_activity(NULL) AS S, pg_authid U,
pg_stat_get_wal_senders() AS W
WHERE S.usesysid = U.oid AND
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7005307dc250da66e88e03d7da05ece6734e0e48..30e35dbd28ad976f05e8a259961ecb4fd8a3cd4e 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -54,6 +54,9 @@
/* Global variable to indicate if this process is a walreceiver process */
bool am_walreceiver;
+/* GUC variable */
+int wal_receiver_status_interval;
+
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
@@ -88,6 +91,8 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+static StandbyReplyMessage reply_message;
+
/*
* About SIGTERM handling:
*
@@ -114,6 +119,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
+static void XLogWalRcvSendReply(void);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -306,12 +312,23 @@ WalReceiverMain(void)
while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
+ /* Let the master know that we received some data. */
+ XLogWalRcvSendReply();
+
/*
* If we've written some records, flush them to disk and let the
* startup process know about them.
*/
XLogWalRcvFlush();
}
+ else
+ {
+ /*
+ * We didn't receive anything new, but send a status update to
+ * the master anyway, to report any progress in applying WAL.
+ */
+ XLogWalRcvSendReply();
+ }
}
}
@@ -546,5 +563,60 @@ XLogWalRcvFlush(void)
LogstreamResult.Write.xrecoff);
set_ps_display(activitymsg, false);
}
+
+ /* Also let the master know that we made some progress */
+ XLogWalRcvSendReply();
}
}
+
+/*
+ * Send reply message to primary, indicating our current XLOG positions and
+ * the current time.
+ */
+static void
+XLogWalRcvSendReply(void)
+{
+ char buf[sizeof(StandbyReplyMessage) + 1];
+ TimestampTz now;
+
+ /*
+ * If the user doesn't want status to be reported to the master, be sure
+ * to exit before doing anything at all.
+ */
+ if (wal_receiver_status_interval <= 0)
+ return;
+
+ /* Get current timestamp. */
+ now = GetCurrentTimestamp();
+
+ /*
+ * We can compare the write and flush positions to the last message we
+ * sent without taking any lock, but the apply position requires a spin
+ * lock, so we don't check that unless something else has changed or 10
+ * seconds have passed. This means that the apply log position will
+ * appear, from the master's point of view, to lag slightly, but since
+ * this is only for reporting purposes and only on idle systems, that's
+ * probably OK.
+ */
+ if (XLByteEQ(reply_message.write, LogstreamResult.Write)
+ && XLByteEQ(reply_message.flush, LogstreamResult.Flush)
+ && !TimestampDifferenceExceeds(reply_message.sendTime, now,
+ wal_receiver_status_interval * 1000))
+ return;
+
+ /* Construct a new message. */
+ reply_message.write = LogstreamResult.Write;
+ reply_message.flush = LogstreamResult.Flush;
+ reply_message.apply = GetXLogReplayRecPtr();
+ reply_message.sendTime = now;
+
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+ reply_message.write.xlogid, reply_message.write.xrecoff,
+ reply_message.flush.xlogid, reply_message.flush.xrecoff,
+ reply_message.apply.xlogid, reply_message.apply.xrecoff);
+
+ /* Prepend with the message type and send it. */
+ buf[0] = 'r';
+ memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
+ walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 78963c1e6be895d8c2985024afcf3ba562436f31..3ad95b495ec8dbe86d218959d8336b8576847d24 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -39,6 +39,7 @@
#include "funcapi.h"
#include "access/xlog_internal.h"
+#include "access/transam.h"
#include "catalog/pg_type.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -51,6 +52,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
@@ -106,9 +108,10 @@ static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyReplyMessage(void);
+static void ProcessRepliesIfAny(void);
/* Main entry point for walsender process */
@@ -442,7 +445,7 @@ HandleReplicationCommand(const char *cmd_string)
* Check if the remote end has closed the connection.
*/
static void
-CheckClosedConnection(void)
+ProcessRepliesIfAny(void)
{
unsigned char firstchar;
int r;
@@ -465,6 +468,13 @@ CheckClosedConnection(void)
/* Handle the very limited subset of commands expected in this phase */
switch (firstchar)
{
+ /*
+ * 'd' means a standby reply wrapped in a COPY BOTH packet.
+ */
+ case 'd':
+ ProcessStandbyReplyMessage();
+ break;
+
/*
* 'X' means that the standby is closing down the socket.
*/
@@ -479,6 +489,62 @@ CheckClosedConnection(void)
}
}
+/*
+ * Process a status update message received from standby.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+ static StringInfoData input_message;
+ StandbyReplyMessage reply;
+ char msgtype;
+
+ initStringInfo(&input_message);
+
+ /*
+ * Read the message contents.
+ */
+ if (pq_getmessage(&input_message, 0))
+ {
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+
+ /*
+ * Check message type from the first byte. At the moment, there is only
+ * one type.
+ */
+ msgtype = pq_getmsgbyte(&input_message);
+ if (msgtype != 'r')
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected message type %c", msgtype)));
+
+ pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage));
+
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
+ reply.write.xlogid, reply.write.xrecoff,
+ reply.flush.xlogid, reply.flush.xrecoff,
+ reply.apply.xlogid, reply.apply.xrecoff);
+
+ /*
+ * Update shared state for this WalSender process
+ * based on reply data from standby.
+ */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->write = reply.write;
+ walsnd->flush = reply.flush;
+ walsnd->apply = reply.apply;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
/* Main loop of walsender process */
static int
WalSndLoop(void)
@@ -518,6 +584,7 @@ WalSndLoop(void)
{
if (!XLogSend(output_message, &caughtup))
break;
+ ProcessRepliesIfAny();
if (caughtup)
walsender_shutdown_requested = true;
}
@@ -561,9 +628,6 @@ WalSndLoop(void)
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
WalSndDelay * 1000L);
}
-
- /* Check if the connection was closed */
- CheckClosedConnection();
}
else
{
@@ -574,6 +638,7 @@ WalSndLoop(void)
/* Update our state to indicate if we're behind or not */
WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
+ ProcessRepliesIfAny();
}
/*
@@ -1104,7 +1169,7 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 3
+#define PG_STAT_GET_WAL_SENDERS_COLS 6
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -1141,8 +1206,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- char sent_location[MAXFNAMELEN];
+ char location[MAXFNAMELEN];
XLogRecPtr sentPtr;
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -1153,13 +1221,14 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
SpinLockAcquire(&walsnd->mutex);
sentPtr = walsnd->sentPtr;
state = walsnd->state;
+ write = walsnd->write;
+ flush = walsnd->flush;
+ apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
- snprintf(sent_location, sizeof(sent_location), "%X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
-
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(walsnd->pid);
+
if (!superuser())
{
/*
@@ -1168,11 +1237,35 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
nulls[1] = true;
nulls[2] = true;
+ nulls[3] = true;
+ nulls[4] = true;
+ nulls[5] = true;
}
else
{
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
- values[2] = CStringGetTextDatum(sent_location);
+
+ snprintf(location, sizeof(location), "%X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff);
+ values[2] = CStringGetTextDatum(location);
+
+ if (write.xlogid == 0 && write.xrecoff == 0)
+ nulls[3] = true;
+ snprintf(location, sizeof(location), "%X/%X",
+ write.xlogid, write.xrecoff);
+ values[3] = CStringGetTextDatum(location);
+
+ if (flush.xlogid == 0 && flush.xrecoff == 0)
+ nulls[4] = true;
+ snprintf(location, sizeof(location), "%X/%X",
+ flush.xlogid, flush.xrecoff);
+ values[4] = CStringGetTextDatum(location);
+
+ if (apply.xlogid == 0 && apply.xrecoff == 0)
+ nulls[5] = true;
+ snprintf(location, sizeof(location), "%X/%X",
+ apply.xlogid, apply.xrecoff);
+ values[5] = CStringGetTextDatum(location);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 216236b52945eeaec805d689ba9fb284c1748cce..470183d4abad4ce8af8424c27380da6a01ee57c9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -55,6 +55,7 @@
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/walwriter.h"
+#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/standby.h"
@@ -1440,6 +1441,16 @@ static struct config_int ConfigureNamesInt[] =
30 * 1000, -1, INT_MAX / 1000, NULL, NULL
},
+ {
+ {"wal_receiver_status_interval", PGC_SIGHUP, WAL_STANDBY_SERVERS,
+ gettext_noop("Sets the maximum interval between WAL receiver status reports to the master."),
+ NULL,
+ GUC_UNIT_S
+ },
+ &wal_receiver_status_interval,
+ 10, 0, INT_MAX/1000, NULL, NULL
+ },
+
{
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index fe80c4dc23938ce7f96919eb6d12ee51e21f4e07..5d31365dab44001c81cb2db4b6cda8f207fcfc0d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -202,6 +202,7 @@
#max_standby_streaming_delay = 30s # max delay before canceling queries
# when reading streaming WAL;
# -1 allows indefinite delay
+#wal_receiver_status_interval = 10s # replies at least this often, 0 disables
#------------------------------------------------------------------------------
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index ff73272f755dd505bc119fb74e6474cb119767a6..1803d5ab2010962ea66411e62c74b694625dd327 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -291,6 +291,7 @@ extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
extern bool RecoveryInProgress(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
+extern XLogRecPtr GetXLogReplayRecPtr(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 01b002a1454b405f3861054e76a25a35e906db8c..cb275b8b5817a7515de1d5870170026e06a59917 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3075,7 +3075,7 @@ DATA(insert OID = 1936 ( pg_stat_get_backend_idset PGNSP PGUID 12 1 100 0 f f
DESCR("statistics: currently active backend IDs");
DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 f f f f t s 1 0 2249 "23" "{23,26,23,26,25,25,16,1184,1184,1184,869,23}" "{i,o,o,o,o,o,o,o,o,o,o,o}" "{pid,datid,procpid,usesysid,application_name,current_query,waiting,xact_start,query_start,backend_start,client_addr,client_port}" _null_ pg_stat_get_activity _null_ _null_ _null_ ));
DESCR("statistics: information about currently active backends");
-DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25}" "{o,o,o}" "{procpid,state,sent_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 f f f f t s 0 0 2249 "" "{23,25,25,25,25,25}" "{o,o,o,o,o,o}" "{procpid,state,sent_location,write_location,flush_location,apply_location}" _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 f f f t f s 0 0 23 "" _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ ));
DESCR("statistics: current backend PID");
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
index 199385120a5e3d89b39ca058fcb5b278b4528c6d..32c49620c1da58a1ea2d191a3b19c821249e918d 100644
--- a/src/include/replication/walprotocol.h
+++ b/src/include/replication/walprotocol.h
@@ -39,6 +39,27 @@ typedef struct
TimestampTz sendTime;
} WalDataMessageHeader;
+/*
+ * Reply message from standby (message type 'r'). This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * Note that the data length is not specified here.
+ */
+typedef struct
+{
+ /*
+ * The xlog locations that have been written, flushed, and applied
+ * by standby-side. These may be invalid if the standby-side is unable
+ * to or chooses not to report these.
+ */
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+
+ /* Sender's system clock at the time of transmission */
+ TimestampTz sendTime;
+} StandbyReplyMessage;
+
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
*
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 24ad43839f96eed7d2df934a6c4998adea2d3476..aa5bfb7aea1e82e88ea0f4cca2d64a52c025db0a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -17,6 +17,7 @@
#include "pgtime.h"
extern bool am_walreceiver;
+extern int wal_receiver_status_interval;
/*
* MAXCONNINFO: maximum size of a connection string.
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 9a196ab1c8bb69d4a7013e416d7a23792a7bb954..5843307c9dc0f8f28b18cf45af8d0016ec722645 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -35,7 +35,17 @@ typedef struct WalSnd
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
- slock_t mutex; /* locks shared variables shown above */
+ /*
+ * The xlog locations that have been written, flushed, and applied
+ * by standby-side. These may be invalid if the standby-side has not
+ * offered values yet.
+ */
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
+
+ /* Protects shared variables shown above. */
+ slock_t mutex;
/*
* Latch used by backends to wake up this walsender when it has work
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b49467870efd86418adeac482e1c3e5efff6d8c4..c0142c25772a09e25617ca66a0acc4714fa1995b 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1297,7 +1297,7 @@ SELECT viewname, definition FROM pg_views WHERE schemaname <> 'information_schem
pg_stat_bgwriter | SELECT pg_stat_get_bgwriter_timed_checkpoints() AS checkpoints_timed, pg_stat_get_bgwriter_requested_checkpoints() AS checkpoints_req, pg_stat_get_bgwriter_buf_written_checkpoints() AS buffers_checkpoint, pg_stat_get_bgwriter_buf_written_clean() AS buffers_clean, pg_stat_get_bgwriter_maxwritten_clean() AS maxwritten_clean, pg_stat_get_buf_written_backend() AS buffers_backend, pg_stat_get_buf_fsync_backend() AS buffers_backend_fsync, pg_stat_get_buf_alloc() AS buffers_alloc, pg_stat_get_bgwriter_stat_reset_time() AS stats_reset;
pg_stat_database | SELECT d.oid AS datid, d.datname, pg_stat_get_db_numbackends(d.oid) AS numbackends, pg_stat_get_db_xact_commit(d.oid) AS xact_commit, pg_stat_get_db_xact_rollback(d.oid) AS xact_rollback, (pg_stat_get_db_blocks_fetched(d.oid) - pg_stat_get_db_blocks_hit(d.oid)) AS blks_read, pg_stat_get_db_blocks_hit(d.oid) AS blks_hit, pg_stat_get_db_tuples_returned(d.oid) AS tup_returned, pg_stat_get_db_tuples_fetched(d.oid) AS tup_fetched, pg_stat_get_db_tuples_inserted(d.oid) AS tup_inserted, pg_stat_get_db_tuples_updated(d.oid) AS tup_updated, pg_stat_get_db_tuples_deleted(d.oid) AS tup_deleted, pg_stat_get_db_conflict_all(d.oid) AS conflicts, pg_stat_get_db_stat_reset_time(d.oid) AS stats_reset FROM pg_database d;
pg_stat_database_conflicts | SELECT d.oid AS datid, d.datname, pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace, pg_stat_get_db_conflict_lock(d.oid) AS confl_lock, pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d;
- pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_port, s.backend_start, w.state, w.sent_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
+ pg_stat_replication | SELECT s.procpid, s.usesysid, u.rolname AS usename, s.application_name, s.client_addr, s.client_port, s.backend_start, w.state, w.sent_location, w.write_location, w.flush_location, w.apply_location FROM pg_stat_get_activity(NULL::integer) s(datid, procpid, usesysid, application_name, current_query, waiting, xact_start, query_start, backend_start, client_addr, client_port), pg_authid u, pg_stat_get_wal_senders() w(procpid, state, sent_location, write_location, flush_location, apply_location) WHERE ((s.usesysid = u.oid) AND (s.procpid = w.procpid));
pg_stat_sys_indexes | SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, pg_stat_all_indexes.relname, pg_stat_all_indexes.indexrelname, pg_stat_all_indexes.idx_scan, pg_stat_all_indexes.idx_tup_read, pg_stat_all_indexes.idx_tup_fetch FROM pg_stat_all_indexes WHERE ((pg_stat_all_indexes.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_indexes.schemaname ~ '^pg_toast'::text));
pg_stat_sys_tables | SELECT pg_stat_all_tables.relid, pg_stat_all_tables.schemaname, pg_stat_all_tables.relname, pg_stat_all_tables.seq_scan, pg_stat_all_tables.seq_tup_read, pg_stat_all_tables.idx_scan, pg_stat_all_tables.idx_tup_fetch, pg_stat_all_tables.n_tup_ins, pg_stat_all_tables.n_tup_upd, pg_stat_all_tables.n_tup_del, pg_stat_all_tables.n_tup_hot_upd, pg_stat_all_tables.n_live_tup, pg_stat_all_tables.n_dead_tup, pg_stat_all_tables.last_vacuum, pg_stat_all_tables.last_autovacuum, pg_stat_all_tables.last_analyze, pg_stat_all_tables.last_autoanalyze, pg_stat_all_tables.vacuum_count, pg_stat_all_tables.autovacuum_count, pg_stat_all_tables.analyze_count, pg_stat_all_tables.autoanalyze_count FROM pg_stat_all_tables WHERE ((pg_stat_all_tables.schemaname = ANY (ARRAY['pg_catalog'::name, 'information_schema'::name])) OR (pg_stat_all_tables.schemaname ~ '^pg_toast'::text));
pg_stat_user_functions | SELECT p.oid AS funcid, n.nspname AS schemaname, p.proname AS funcname, pg_stat_get_function_calls(p.oid) AS calls, (pg_stat_get_function_time(p.oid) / 1000) AS total_time, (pg_stat_get_function_self_time(p.oid) / 1000) AS self_time FROM (pg_proc p LEFT JOIN pg_namespace n ON ((n.oid = p.pronamespace))) WHERE ((p.prolang <> (12)::oid) AND (pg_stat_get_function_calls(p.oid) IS NOT NULL));