提交 456c4353 编写于 作者: A Alexandra Wang 提交者: Lei (Alexandra) Wang

Create replication slot on fts promote message on mirror.

To "incrementally" recover old primary as a mirror at a later time via
pg_rewind, all xlog must be preserved from point of divergence.  Hence,
replication slot must be created at promote time. So, this commit adds
logic on FTS promote message to create physical replication slot and
also sets the restart_lsn of the slot to start preserving the xlog right
away.
Co-authored-by: NAshwin Agrawal <aagrawal@pivotal.io>
上级 fd94da93
......@@ -15,6 +15,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include <replication/slot.h>
#include "access/xlog.h"
#include "libpq/pqformat.h"
......@@ -318,6 +319,56 @@ HandleFtsWalRepSyncRepOff(void)
SendFtsResponse(&response, FTS_MSG_SYNCREP_OFF);
}
static void
CreateReplicationSlotOnPromote(const char *name)
{
int i;
Assert(MyReplicationSlot == NULL);
/*
* Check for name collision, and identify an allocatable slot. We need to
* hold ReplicationSlotControlLock in shared mode for this, so that nobody
* else can change the in_use flags while we're looking at them.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
MyReplicationSlot = s;
}
LWLockRelease(ReplicationSlotControlLock);
if (MyReplicationSlot == NULL)
{
ereport(LOG, (errmsg("creating replication slot %s", name)));
ReplicationSlotCreate(name, false, RS_PERSISTENT);
}
else
ereport(LOG, (errmsg("replication slot %s exists", name)));
/*
* Only on promote signal replication slot is created on mirror. If
* node was acting as mirror, no replication slot should exists on it.
* Hence, no-zero restart_lsn means was set by previous attempt on promote
* signal and hence no need to overwrite the same.
*/
if (MyReplicationSlot->data.restart_lsn == 0)
{
/* Starting reserving WAL right away for pg_rewind to work later */
ReplicationSlotReserveWal();
/* Write this slot to disk */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
if (MyReplicationSlot->active)
ReplicationSlotRelease();
}
MyReplicationSlot = NULL;
}
static void
HandleFtsWalRepPromote(void)
{
......@@ -349,6 +400,9 @@ HandleFtsWalRepPromote(void)
* sync_standby_names.
*/
UnsetSyncStandbysDefined();
CreateReplicationSlotOnPromote(INTERNAL_WAL_REPLICATION_SLOT_NAME);
SignalPromote();
}
else
......
......@@ -14,7 +14,8 @@ ftsmessagehandler.t: \
$(MOCK_DIR)/backend/libpq/pqcomm_mock.o \
$(MOCK_DIR)/backend/tcop/dest_mock.o \
$(MOCK_DIR)/backend/postmaster/postmaster_mock.o \
$(MOCK_DIR)/backend/access/transam/xlog_mock.o
$(MOCK_DIR)/backend/access/transam/xlog_mock.o \
$(MOCK_DIR)/backend/replication/slot_mock.o
ftsprobe.t: \
$(MOCK_DIR)/backend/fts/fts_mock.o \
......
......@@ -130,9 +130,23 @@ test_HandleFtsWalRepProbeMirror(void **state)
HandleFtsWalRepProbe();
}
static void
set_replication_slot(ReplicationSlotCtlData *repCtl)
{
MyReplicationSlot = &repCtl->replication_slots[0];
/*
* any number except 1 to avoid calling ReplicationSlotReserveWal and
* other friends.
*/
MyReplicationSlot->data.restart_lsn = 8948;
}
void
test_HandleFtsWalRepPromoteMirror(void **state)
{
ReplicationSlotCtlData repCtl;
ReplicationSlotCtl = &repCtl;
max_replication_slots = 1;
am_mirror = true;
will_return(GetCurrentDBState, DB_IN_STANDBY_MODE);
......@@ -146,6 +160,19 @@ test_HandleFtsWalRepPromoteMirror(void **state)
mockresponse.IsRoleMirror = am_mirror;
mockresponse.RequestRetry = false;
expect_value(LWLockAcquire, l, ReplicationSlotControlLock);
expect_value(LWLockAcquire, mode, LW_SHARED);
will_return(LWLockAcquire, true);
expect_value(LWLockRelease, l, ReplicationSlotControlLock);
will_be_called(LWLockRelease);
expect_value(ReplicationSlotCreate, name, INTERNAL_WAL_REPLICATION_SLOT_NAME);
expect_value(ReplicationSlotCreate, db_specific, false);
expect_value(ReplicationSlotCreate, persistency, RS_PERSISTENT);
will_be_called_with_sideeffect(ReplicationSlotCreate,
set_replication_slot, &ReplicationSlotCtl);
/* expect SignalPromote() */
expectSendFtsResponse(FTS_MSG_PROMOTE, &mockresponse);
......
......@@ -95,6 +95,8 @@ enum probe_transition_e
/* buffer size for SQL command */
#define SQL_CMD_BUF_SIZE 1024
#define INTERNAL_WAL_REPLICATION_SLOT_NAME "internal_wal_replication_slot"
/*
* STRUCTURES
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册