提交 fd94da93 编写于 作者: A Andres Freund 提交者: Lei (Alexandra) Wang

Allow pg_create_physical_replication_slot() to reserve WAL.

When creating a physical slot it's often useful to immediately reserve
the current WAL position instead of only doing after the first feedback
message arrives. That e.g. allows slots to guarantee that all the WAL
for a base backup will be available afterwards.

Logical slots already have to reserve WAL during creation, so generalize
that logic into being usable for both physical and logical slots.

Catversion bump because of the new parameter.

Author: Gurjeet Singh
Reviewed-By: Andres Freund
Discussion: CABwTF4Wh_dBCzTU=49pFXR6coR4NW1ynb+vBqT+Po=7fuq5iCw@mail.gmail.com
上级 eacc688a
...@@ -16823,7 +16823,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); ...@@ -16823,7 +16823,7 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
<indexterm> <indexterm>
<primary>pg_create_physical_replication_slot</primary> <primary>pg_create_physical_replication_slot</primary>
</indexterm> </indexterm>
<literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type>)</function></literal> <literal><function>pg_create_physical_replication_slot(<parameter>slot_name</parameter> <type>name</type><optional>, <parameter>immediately_reserve</> <type>boolean</> </optional>)</function></literal>
</entry> </entry>
<entry> <entry>
(<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>) (<parameter>slot_name</parameter> <type>name</type>, <parameter>xlog_position</parameter> <type>pg_lsn</type>)
...@@ -16833,7 +16833,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup()); ...@@ -16833,7 +16833,11 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
<parameter>slot_name</parameter>. Streaming changes from a physical slot <parameter>slot_name</parameter>. Streaming changes from a physical slot
is only possible with the streaming-replication protocol - see <xref is only possible with the streaming-replication protocol - see <xref
linkend="protocol-replication">. Corresponds to the replication protocol linkend="protocol-replication">. Corresponds to the replication protocol
command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. command <literal>CREATE_REPLICATION_SLOT ... PHYSICAL</literal>. The optional
second parameter, when <literal>true</>, specifies that the <acronym>LSN</>
for this replication slot be reserved immediately; the <acronym<LSN</>
is otherwise reserved on first connection from a streaming replication
client.
</entry> </entry>
</row> </row>
<row> <row>
......
...@@ -1357,6 +1357,13 @@ LANGUAGE INTERNAL ...@@ -1357,6 +1357,13 @@ LANGUAGE INTERNAL
VOLATILE ROWS 1000 COST 1000 VOLATILE ROWS 1000 COST 1000
AS 'pg_logical_slot_peek_binary_changes'; AS 'pg_logical_slot_peek_binary_changes';
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
OUT slot_name name, OUT xlog_position pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
AS 'pg_create_physical_replication_slot';
CREATE OR REPLACE FUNCTION CREATE OR REPLACE FUNCTION
make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0, make_interval(years int4 DEFAULT 0, months int4 DEFAULT 0, weeks int4 DEFAULT 0,
days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0, days int4 DEFAULT 0, hours int4 DEFAULT 0, mins int4 DEFAULT 0,
......
...@@ -258,52 +258,7 @@ CreateInitDecodingContext(char *plugin, ...@@ -258,52 +258,7 @@ CreateInitDecodingContext(char *plugin,
NameStr(slot->data.plugin)[NAMEDATALEN - 1] = '\0'; NameStr(slot->data.plugin)[NAMEDATALEN - 1] = '\0';
SpinLockRelease(&slot->mutex); SpinLockRelease(&slot->mutex);
/* ReplicationSlotReserveWal();
* The replication slot mechanism is used to prevent removal of required
* WAL. As there is no interlock between this and checkpoints required WAL
* could be removed before ReplicationSlotsComputeRequiredLSN() has been
* called to prevent that. In the very unlikely case that this happens
* we'll just retry.
*/
while (true)
{
XLogSegNo segno;
/*
* Let's start with enough information if we can, so log a standby
* snapshot and start decoding at exactly that position.
*/
if (!RecoveryInProgress())
{
XLogRecPtr flushptr;
/* start at current insert position */
slot->data.restart_lsn = GetXLogInsertRecPtr();
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
}
else
slot->data.restart_lsn = GetRedoRecPtr();
/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
/*
* If all required WAL is still there, great, otherwise retry. The
* slot should prevent further removal of WAL, unless there's a
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
* the new restart_lsn above, so normally we should never need to loop
* more than twice.
*/
XLByteToSeg(slot->data.restart_lsn, segno);
if (XLogGetLastRemovedSegno() < segno)
break;
}
/* ---- /* ----
* This is a bit tricky: We need to determine a safe xmin horizon to start * This is a bit tricky: We need to determine a safe xmin horizon to start
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include "access/transam.h" #include "access/transam.h"
#include "access/xlog_internal.h"
#include "common/string.h" #include "common/string.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "replication/slot.h" #include "replication/slot.h"
...@@ -811,6 +812,76 @@ CheckSlotRequirements(void) ...@@ -811,6 +812,76 @@ CheckSlotRequirements(void)
errmsg("replication slots can only be used if wal_level >= archive"))); errmsg("replication slots can only be used if wal_level >= archive")));
} }
/*
* Reserve WAL for the currently active slot.
*
* Compute and set restart_lsn in a manner that's appropriate for the type of
* the slot and concurrency safe.
*/
void
ReplicationSlotReserveWal(void)
{
ReplicationSlot *slot = MyReplicationSlot;
Assert(slot != NULL);
Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
/*
* The replication slot mechanism is used to prevent removal of required
* WAL. As there is no interlock between this routine and checkpoints, WAL
* segments could concurrently be removed when a now stale return value of
* ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
* this happens we'll just retry.
*/
while (true)
{
XLogSegNo segno;
/*
* For logical slots log a standby snapshot and start logical decoding
* at exactly that position. That allows the slot to start up more
* quickly.
*
* That's not needed (or indeed helpful) for physical slots as they'll
* start replay at the last logged checkpoint anyway. Instead return
* the location of the last redo LSN. While that slightly increases
* the chance that we have to retry, it's where a base backup has to
* start replay at.
*/
if (!RecoveryInProgress() && SlotIsLogical(slot))
{
XLogRecPtr flushptr;
/* start at current insert position */
slot->data.restart_lsn = GetXLogInsertRecPtr();
/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();
/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
}
else
{
slot->data.restart_lsn = GetRedoRecPtr();
}
/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
/*
* If all required WAL is still there, great, otherwise retry. The
* slot should prevent further removal of WAL, unless there's a
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
* the new restart_lsn above, so normally we should never need to loop
* more than twice.
*/
XLByteToSeg(slot->data.restart_lsn, segno);
if (XLogGetLastRemovedSegno() < segno)
break;
}
}
/* /*
* Flush all replication slots to disk. * Flush all replication slots to disk.
* *
......
...@@ -40,6 +40,7 @@ Datum ...@@ -40,6 +40,7 @@ Datum
pg_create_physical_replication_slot(PG_FUNCTION_ARGS) pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
{ {
Name name = PG_GETARG_NAME(0); Name name = PG_GETARG_NAME(0);
bool immediately_reserve = PG_GETARG_BOOL(1);
Datum values[2]; Datum values[2];
bool nulls[2]; bool nulls[2];
TupleDesc tupdesc; TupleDesc tupdesc;
...@@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) ...@@ -59,9 +60,25 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT);
values[0] = NameGetDatum(&MyReplicationSlot->data.name); values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[0] = false; nulls[0] = false;
nulls[1] = true;
if (immediately_reserve)
{
/* Reserve WAL as the user asked for it */
ReplicationSlotReserveWal();
/* Write this slot to disk */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
nulls[1] = false;
}
else
{
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
nulls[1] = true;
}
tuple = heap_form_tuple(tupdesc, values, nulls); tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple); result = HeapTupleGetDatum(tuple);
......
...@@ -56,6 +56,6 @@ ...@@ -56,6 +56,6 @@
*/ */
/* 3yyymmddN */ /* 3yyymmddN */
#define CATALOG_VERSION_NO 301812111 #define CATALOG_VERSION_NO 301901101
#endif #endif
...@@ -5100,7 +5100,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0 ...@@ -5100,7 +5100,7 @@ DATA(insert OID = 3473 ( spg_range_quad_leaf_consistent PGNSP PGUID 12 1 0 0 0
DESCR("SP-GiST support for quad tree over range"); DESCR("SP-GiST support for quad tree over range");
/* replication slots */ /* replication slots */
DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2249 "19" "{19,19,3220}" "{i,o,o}" "{slot_name,slot_name,xlog_position}" _null_ pg_create_physical_replication_slot _null_ _null_ _null_ )); DATA(insert OID = 3779 ( pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2249 "19 16" "{19,16,19,3220}" "{i,i,o,o}" "{slot_name,immediately_reserve,slot_name,xlog_position}" _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
DESCR("create a physical replication slot"); DESCR("create a physical replication slot");
DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ )); DATA(insert OID = 3780 ( pg_drop_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "19" _null_ _null_ _null_ _null_ pg_drop_replication_slot _null_ _null_ _null_ ));
DESCR("drop a replication slot"); DESCR("drop a replication slot");
......
...@@ -125,6 +125,9 @@ typedef struct ReplicationSlot ...@@ -125,6 +125,9 @@ typedef struct ReplicationSlot
XLogRecPtr candidate_restart_lsn; XLogRecPtr candidate_restart_lsn;
} ReplicationSlot; } ReplicationSlot;
#define SlotIsPhysical(slot) (slot->data.database == InvalidOid)
#define SlotIsLogical(slot) (slot->data.database != InvalidOid)
/* /*
* Shared memory control area for all of replication slots. * Shared memory control area for all of replication slots.
*/ */
...@@ -159,6 +162,7 @@ extern void ReplicationSlotMarkDirty(void); ...@@ -159,6 +162,7 @@ extern void ReplicationSlotMarkDirty(void);
/* misc stuff */ /* misc stuff */
extern bool ReplicationSlotValidateName(const char *name, int elevel); extern bool ReplicationSlotValidateName(const char *name, int elevel);
extern void ReplicationSlotReserveWal(void);
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
extern void ReplicationSlotsComputeRequiredLSN(void); extern void ReplicationSlotsComputeRequiredLSN(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册