提交 8468bcc8 编写于 作者: T Tom Lane

Fix longstanding crash-safety bug with newly-created-or-reset sequences.

If a crash occurred immediately after the first nextval() call for a serial
column, WAL replay would restore the sequence to a state in which it
appeared that no nextval() had been done, thus allowing the first sequence
value to be returned again by the next nextval() call; as reported in
bug #6748 from Xiangming Mei.

More generally, the problem would occur if an ALTER SEQUENCE was executed
on a freshly created or reset sequence.  (The manifestation with serial
columns was introduced in 8.2 when we added an ALTER SEQUENCE OWNED BY step
to serial column creation.)  The cause is that sequence creation attempted
to save one WAL entry by writing out a WAL record that made it appear that
the first nextval() had already happened (viz, with is_called = true),
while marking the sequence's in-database state with log_cnt = 1 to show
that the first nextval() need not emit a WAL record.  However, ALTER
SEQUENCE would emit a new WAL entry reflecting the actual in-database state
(with is_called = false).  Then, nextval would allocate the first sequence
value and set is_called = true, but it would trust the log_cnt value and
not emit any WAL record.  A crash at this point would thus restore the
sequence to its post-ALTER state, causing the next nextval() call to return
the first sequence value again.

To fix, get rid of the idea of logging an is_called status different from
reality.  This means that the first nextval-driven WAL record will happen
at the first nextval call not the second, but the marginal cost of that is
pretty negligible.  In addition, make sure that ALTER SEQUENCE resets
log_cnt to zero in any case where it touches sequence parameters that
affect future nextval results.  This will result in some user-visible
changes in the contents of a sequence's log_cnt column, as reflected in the
patch's regression test changes; but no application should be depending on
that anyway, since it was already true that log_cnt changes rather
unpredictably depending on checkpoint timing.

In addition, make some basically-cosmetic improvements to get rid of
sequence.c's undesirable intimacy with page layout details.  It was always
really trying to WAL-log the contents of the sequence tuple, so we should
have it do that directly using a HeapTuple's t_data and t_len, rather than
backing into it with some magic assumptions about where the tuple would be
on the sequence's page.

Back-patch to all supported branches.
上级 038c36b6
......@@ -36,7 +36,7 @@
/*
* We don't want to log each fetching of a value from a sequence,
* so we pre-log a few fetches in advance. In the event of
* crash we can lose as much as we pre-logged.
* crash we can lose (skip over) as many values as we pre-logged.
*/
#define SEQ_LOG_VALS 32
......@@ -70,7 +70,7 @@ typedef struct SeqTableData
int64 cached; /* last value already cached for nextval */
/* if last != cached, we have not used up all the cached values */
int64 increment; /* copy of sequence's increment field */
/* note that increment is zero until we first do read_info() */
/* note that increment is zero until we first do read_seq_tuple() */
} SeqTableData;
typedef SeqTableData *SeqTable;
......@@ -86,7 +86,8 @@ static SeqTableData *last_used_seq = NULL;
static int64 nextval_internal(Oid relid);
static Relation open_share_lock(SeqTable seq);
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf);
static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
Buffer *buf, HeapTuple seqtuple);
static void init_params(List *options, bool isInit,
Form_pg_sequence new, List **owned_by);
static void do_setval(Oid relid, int64 next, bool iscalled);
......@@ -171,7 +172,7 @@ DefineSequence(CreateSeqStmt *seq)
case SEQ_COL_LOG:
coldef->typename = makeTypeNameFromOid(INT8OID, -1);
coldef->colname = "log_cnt";
value[i - 1] = Int64GetDatum((int64) 1);
value[i - 1] = Int64GetDatum((int64) 0);
break;
case SEQ_COL_CYCLE:
coldef->typename = makeTypeNameFromOid(BOOLOID, -1);
......@@ -267,12 +268,6 @@ DefineSequence(CreateSeqStmt *seq)
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
/* We do not log first nextval call, so "advance" sequence here */
/* Note we are scribbling on local tuple, not the disk buffer */
newseq->is_called = true;
newseq->log_cnt = 0;
xlrec.node = rel->rd_node;
rdata[0].data = (char *) &xlrec;
......@@ -314,7 +309,7 @@ AlterSequence(AlterSeqStmt *stmt)
SeqTable elm;
Relation seqrel;
Buffer buf;
Page page;
HeapTupleData seqtuple;
Form_pg_sequence seq;
FormData_pg_sequence new;
List *owned_by;
......@@ -329,8 +324,7 @@ AlterSequence(AlterSeqStmt *stmt)
stmt->sequence->relname);
/* lock page' buffer and read tuple into new sequence structure */
seq = read_info(elm, seqrel, &buf);
page = BufferGetPage(buf);
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
/* Copy old values of options into workspace */
memcpy(&new, seq, sizeof(FormData_pg_sequence));
......@@ -343,10 +337,10 @@ AlterSequence(AlterSeqStmt *stmt)
elm->cached = elm->last;
/* Now okay to update the on-disk tuple */
memcpy(seq, &new, sizeof(FormData_pg_sequence));
START_CRIT_SECTION();
memcpy(seq, &new, sizeof(FormData_pg_sequence));
MarkBufferDirty(buf);
/* XLOG stuff */
......@@ -355,6 +349,7 @@ AlterSequence(AlterSeqStmt *stmt)
xl_seq_rec xlrec;
XLogRecPtr recptr;
XLogRecData rdata[2];
Page page = BufferGetPage(buf);
xlrec.node = seqrel->rd_node;
rdata[0].data = (char *) &xlrec;
......@@ -362,9 +357,8 @@ AlterSequence(AlterSeqStmt *stmt)
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
rdata[1].len = ((PageHeader) page)->pd_special -
((PageHeader) page)->pd_upper;
rdata[1].data = (char *) seqtuple.t_data;
rdata[1].len = seqtuple.t_len;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
......@@ -419,6 +413,7 @@ nextval_internal(Oid relid)
Relation seqrel;
Buffer buf;
Page page;
HeapTupleData seqtuple;
Form_pg_sequence seq;
int64 incby,
maxv,
......@@ -453,7 +448,7 @@ nextval_internal(Oid relid)
}
/* lock page' buffer and read tuple */
seq = read_info(elm, seqrel, &buf);
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
page = BufferGetPage(buf);
last = next = result = seq->last_value;
......@@ -465,9 +460,8 @@ nextval_internal(Oid relid)
if (!seq->is_called)
{
rescnt++; /* last_value if not called */
rescnt++; /* return last_value if not is_called */
fetch--;
log--;
}
/*
......@@ -480,7 +474,7 @@ nextval_internal(Oid relid)
* checkpoint would fail to advance the sequence past the logged values.
* In this case we may as well fetch extra values.
*/
if (log < fetch)
if (log < fetch || !seq->is_called)
{
/* forced log to satisfy local demand for values */
fetch = log = fetch + SEQ_LOG_VALS;
......@@ -571,8 +565,18 @@ nextval_internal(Oid relid)
last_used_seq = elm;
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
/*
* We must mark the buffer dirty before doing XLogInsert(); see notes in
* SyncOneBuffer(). However, we don't apply the desired changes just yet.
* This looks like a violation of the buffer update protocol, but it is
* in fact safe because we hold exclusive lock on the buffer. Any other
* process, including a checkpoint, that tries to examine the buffer
* contents will block until we release the lock, and then will see the
* final state that we install below.
*/
MarkBufferDirty(buf);
/* XLOG stuff */
......@@ -582,20 +586,26 @@ nextval_internal(Oid relid)
XLogRecPtr recptr;
XLogRecData rdata[2];
xlrec.node = seqrel->rd_node;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
/*
* We don't log the current state of the tuple, but rather the state
* as it would appear after "log" more fetches. This lets us skip
* that many future WAL records, at the cost that we lose those
* sequence values if we crash.
*/
/* set values that will be saved in xlog */
seq->last_value = next;
seq->is_called = true;
seq->log_cnt = 0;
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
rdata[1].len = ((PageHeader) page)->pd_special -
((PageHeader) page)->pd_upper;
xlrec.node = seqrel->rd_node;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(xl_seq_rec);
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) seqtuple.t_data;
rdata[1].len = seqtuple.t_len;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
......@@ -605,7 +615,7 @@ nextval_internal(Oid relid)
PageSetTLI(page, ThisTimeLineID);
}
/* update on-disk data */
/* Now update sequence tuple to the intended final state */
seq->last_value = last; /* last fetched number */
seq->is_called = true;
seq->log_cnt = log; /* how much is logged */
......@@ -706,6 +716,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
SeqTable elm;
Relation seqrel;
Buffer buf;
HeapTupleData seqtuple;
Form_pg_sequence seq;
/* open and AccessShareLock sequence */
......@@ -718,7 +729,7 @@ do_setval(Oid relid, int64 next, bool iscalled)
RelationGetRelationName(seqrel))));
/* lock page' buffer and read tuple */
seq = read_info(elm, seqrel, &buf);
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
if ((next < seq->min_value) || (next > seq->max_value))
{
......@@ -746,8 +757,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
/* In any case, forget any future cached numbers */
elm->cached = elm->last;
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
seq->last_value = next; /* last fetched number */
seq->is_called = iscalled;
seq->log_cnt = 0;
MarkBufferDirty(buf);
/* XLOG stuff */
......@@ -764,14 +780,8 @@ do_setval(Oid relid, int64 next, bool iscalled)
rdata[0].buffer = InvalidBuffer;
rdata[0].next = &(rdata[1]);
/* set values that will be saved in xlog */
seq->last_value = next;
seq->is_called = true;
seq->log_cnt = 0;
rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
rdata[1].len = ((PageHeader) page)->pd_special -
((PageHeader) page)->pd_upper;
rdata[1].data = (char *) seqtuple.t_data;
rdata[1].len = seqtuple.t_len;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = NULL;
......@@ -781,11 +791,6 @@ do_setval(Oid relid, int64 next, bool iscalled)
PageSetTLI(page, ThisTimeLineID);
}
/* save info in sequence relation */
seq->last_value = next; /* last fetched number */
seq->is_called = iscalled;
seq->log_cnt = (iscalled) ? 0 : 1;
END_CRIT_SECTION();
UnlockReleaseBuffer(buf);
......@@ -925,13 +930,20 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
}
/* Given an opened relation, lock the page buffer and find the tuple */
/*
* Given an opened sequence relation, lock the page buffer and find the tuple
*
* *buf receives the reference to the pinned-and-ex-locked buffer
* *seqtuple receives the reference to the sequence tuple proper
* (this arg should point to a local variable of type HeapTupleData)
*
* Function's return value points to the data payload of the tuple
*/
static Form_pg_sequence
read_info(SeqTable elm, Relation rel, Buffer *buf)
read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
{
PageHeader page;
ItemId lp;
HeapTupleData tuple;
sequence_magic *sm;
Form_pg_sequence seq;
......@@ -947,7 +959,10 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsNormal(lp));
tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
/* Note we currently only bother to set these two fields of *seqtuple */
seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp);
seqtuple->t_len = ItemIdGetLength(lp);
/*
* Previous releases of Postgres neglected to prevent SELECT FOR UPDATE
......@@ -957,15 +972,15 @@ read_info(SeqTable elm, Relation rel, Buffer *buf)
* hint bit update, ie, don't bother to WAL-log it, since we can certainly
* do this again if the update gets lost.
*/
if (HeapTupleHeaderGetXmax(tuple.t_data) != InvalidTransactionId)
if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
{
HeapTupleHeaderSetXmax(tuple.t_data, InvalidTransactionId);
tuple.t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
SetBufferCommitInfoNeedsSave(*buf);
}
seq = (Form_pg_sequence) GETSTRUCT(&tuple);
seq = (Form_pg_sequence) GETSTRUCT(seqtuple);
/* this is a handy place to update our copy of the increment */
elm->increment = seq->increment_by;
......@@ -1065,6 +1080,13 @@ init_params(List *options, bool isInit,
defel->defname);
}
/*
* We must reset log_cnt when isInit or when changing any parameters
* that would affect future nextval allocations.
*/
if (isInit)
new->log_cnt = 0;
/* INCREMENT BY */
if (increment_by != NULL)
{
......@@ -1073,6 +1095,7 @@ init_params(List *options, bool isInit,
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("INCREMENT must not be zero")));
new->log_cnt = 0;
}
else if (isInit)
new->increment_by = 1;
......@@ -1082,30 +1105,39 @@ init_params(List *options, bool isInit,
{
new->is_cycled = intVal(is_cycled->arg);
Assert(new->is_cycled == false || new->is_cycled == true);
new->log_cnt = 0;
}
else if (isInit)
new->is_cycled = false;
/* MAXVALUE (null arg means NO MAXVALUE) */
if (max_value != NULL && max_value->arg)
{
new->max_value = defGetInt64(max_value);
new->log_cnt = 0;
}
else if (isInit || max_value != NULL)
{
if (new->increment_by > 0)
new->max_value = SEQ_MAXVALUE; /* ascending seq */
else
new->max_value = -1; /* descending seq */
new->log_cnt = 0;
}
/* MINVALUE (null arg means NO MINVALUE) */
if (min_value != NULL && min_value->arg)
{
new->min_value = defGetInt64(min_value);
new->log_cnt = 0;
}
else if (isInit || min_value != NULL)
{
if (new->increment_by > 0)
new->min_value = 1; /* ascending seq */
else
new->min_value = SEQ_MINVALUE; /* descending seq */
new->log_cnt = 0;
}
/* crosscheck min/max */
......@@ -1179,6 +1211,7 @@ init_params(List *options, bool isInit,
errmsg("CACHE (%s) must be greater than zero",
buf)));
}
new->log_cnt = 0;
}
else if (isInit)
new->cache_value = 1;
......@@ -1306,7 +1339,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
item = (char *) xlrec + sizeof(xl_seq_rec);
itemsz = record->xl_len - sizeof(xl_seq_rec);
itemsz = MAXALIGN(itemsz);
if (PageAddItem(page, (Item) item, itemsz,
FirstOffsetNumber, false, false) == InvalidOffsetNumber)
elog(PANIC, "seq_redo: failed to add item to page");
......
......@@ -101,7 +101,7 @@ ALTER TABLE foo_seq RENAME TO foo_seq_new;
SELECT * FROM foo_seq_new;
sequence_name | last_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called
---------------+------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
foo_seq | 1 | 1 | 9223372036854775807 | 1 | 1 | 1 | f | f
foo_seq | 1 | 1 | 9223372036854775807 | 1 | 1 | 0 | f | f
(1 row)
DROP SEQUENCE foo_seq_new;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册