diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 679f856b9ceb1393b22440d09949aedfe116d61b..5ccece2600143a08120a09e20d75dfbdbcfabf3c 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -14179,10 +14179,8 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); text Get last transaction log location received and synced to disk by streaming replication. While streaming replication is in progress - this will increase monotonically. But when streaming replication is - restarted this will back off to the replication starting position, - typically the beginning of the WAL file containing the current - replay location. If recovery has completed this will remain static at + this will increase monotonically. If recovery has completed this will + remain static at the value of the last WAL record received and synced to disk during recovery. If streaming replication is disabled, or if it has not yet started, the function returns NULL. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 3826e82c0523080448fb14a7d4db9b34d0c2bc9b..32a1575ab07c84c11fbf51f929763bb2eef61f2d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -218,7 +218,7 @@ WalReceiverMain(void) /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); - startpoint = walrcv->receivedUpto; + startpoint = walrcv->receiveStart; SpinLockRelease(&walrcv->mutex); /* Arrange to clean up at walreceiver exit */ @@ -558,8 +558,11 @@ XLogWalRcvFlush(bool dying) /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); - walrcv->latestChunkStart = walrcv->receivedUpto; - walrcv->receivedUpto = LogstreamResult.Flush; + if (XLByteLT(walrcv->receivedUpto, LogstreamResult.Flush)) + { + walrcv->latestChunkStart = walrcv->receivedUpto; + walrcv->receivedUpto = LogstreamResult.Flush; + } SpinLockRelease(&walrcv->mutex); /* Signal the startup process that new WAL has arrived */ diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 04c900494374ab2adebdfbc654e73a8f55a4c24d..48ab503d893ab991d2173f1d515e4fd93a449eda 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -199,8 +199,17 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) walrcv->walRcvState = WALRCV_STARTING; walrcv->startTime = now; - walrcv->receivedUpto = recptr; - walrcv->latestChunkStart = recptr; + /* + * If this is the first startup of walreceiver, we initialize + * receivedUpto and latestChunkStart to receiveStart. + */ + if (walrcv->receiveStart.xlogid == 0 && + walrcv->receiveStart.xrecoff == 0) + { + walrcv->receivedUpto = recptr; + walrcv->latestChunkStart = recptr; + } + walrcv->receiveStart = recptr; SpinLockRelease(&walrcv->mutex); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9137b861c7d1f3109417997fcdfa0293bd59f7ea..775232b6e6f98b02c852334c4b673b8711c728cb 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -51,12 +51,18 @@ typedef struct WalRcvState walRcvState; pg_time_t startTime; + /* + * receiveStart is the first byte position that will be received. + * When startup process starts the walreceiver, it sets receiveStart + * to the point where it wants the streaming to begin. + */ + XLogRecPtr receiveStart; + /* * receivedUpto-1 is the last byte position that has already been - * received. When startup process starts the walreceiver, it sets - * receivedUpto to the point where it wants the streaming to begin. After - * that, walreceiver updates this whenever it flushes the received WAL to - * disk. + * received. At the first startup of walreceiver, receivedUpto is + * set to receiveStart. After that, walreceiver updates this whenever + * it flushes the received WAL to disk. */ XLogRecPtr receivedUpto;