diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 4066f6e72b0d9300d669a9a58902d0828c871970..189ccd13843b2522658083bfd68a6e114ae67f77 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -36,7 +36,7 @@ /* * We don't want to log each fetching of a value from a sequence, * so we pre-log a few fetches in advance. In the event of - * crash we can lose as much as we pre-logged. + * crash we can lose (skip over) as many values as we pre-logged. */ #define SEQ_LOG_VALS 32 @@ -70,7 +70,7 @@ typedef struct SeqTableData int64 cached; /* last value already cached for nextval */ /* if last != cached, we have not used up all the cached values */ int64 increment; /* copy of sequence's increment field */ - /* note that increment is zero until we first do read_info() */ + /* note that increment is zero until we first do read_seq_tuple() */ } SeqTableData; typedef SeqTableData *SeqTable; @@ -86,7 +86,8 @@ static SeqTableData *last_used_seq = NULL; static int64 nextval_internal(Oid relid); static Relation open_share_lock(SeqTable seq); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); -static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf); +static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel, + Buffer *buf, HeapTuple seqtuple); static void init_params(List *options, bool isInit, Form_pg_sequence new, List **owned_by); static void do_setval(Oid relid, int64 next, bool iscalled); @@ -171,7 +172,7 @@ DefineSequence(CreateSeqStmt *seq) case SEQ_COL_LOG: coldef->typename = makeTypeNameFromOid(INT8OID, -1); coldef->colname = "log_cnt"; - value[i - 1] = Int64GetDatum((int64) 1); + value[i - 1] = Int64GetDatum((int64) 0); break; case SEQ_COL_CYCLE: coldef->typename = makeTypeNameFromOid(BOOLOID, -1); @@ -267,12 +268,6 @@ DefineSequence(CreateSeqStmt *seq) xl_seq_rec xlrec; XLogRecPtr recptr; XLogRecData rdata[2]; - Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple); - - /* We do not log first nextval call, so "advance" sequence here */ - /* Note we are scribbling on local tuple, not the disk buffer */ - newseq->is_called = true; - newseq->log_cnt = 0; xlrec.node = rel->rd_node; rdata[0].data = (char *) &xlrec; @@ -314,7 +309,7 @@ AlterSequence(AlterSeqStmt *stmt) SeqTable elm; Relation seqrel; Buffer buf; - Page page; + HeapTupleData seqtuple; Form_pg_sequence seq; FormData_pg_sequence new; List *owned_by; @@ -329,8 +324,7 @@ AlterSequence(AlterSeqStmt *stmt) stmt->sequence->relname); /* lock page' buffer and read tuple into new sequence structure */ - seq = read_info(elm, seqrel, &buf); - page = BufferGetPage(buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); /* Copy old values of options into workspace */ memcpy(&new, seq, sizeof(FormData_pg_sequence)); @@ -343,10 +337,10 @@ AlterSequence(AlterSeqStmt *stmt) elm->cached = elm->last; /* Now okay to update the on-disk tuple */ - memcpy(seq, &new, sizeof(FormData_pg_sequence)); - START_CRIT_SECTION(); + memcpy(seq, &new, sizeof(FormData_pg_sequence)); + MarkBufferDirty(buf); /* XLOG stuff */ @@ -355,6 +349,7 @@ AlterSequence(AlterSeqStmt *stmt) xl_seq_rec xlrec; XLogRecPtr recptr; XLogRecData rdata[2]; + Page page = BufferGetPage(buf); xlrec.node = seqrel->rd_node; rdata[0].data = (char *) &xlrec; @@ -362,9 +357,8 @@ AlterSequence(AlterSeqStmt *stmt) rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -419,6 +413,7 @@ nextval_internal(Oid relid) Relation seqrel; Buffer buf; Page page; + HeapTupleData seqtuple; Form_pg_sequence seq; int64 incby, maxv, @@ -453,7 +448,7 @@ nextval_internal(Oid relid) } /* lock page' buffer and read tuple */ - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); page = BufferGetPage(buf); last = next = result = seq->last_value; @@ -465,9 +460,8 @@ nextval_internal(Oid relid) if (!seq->is_called) { - rescnt++; /* last_value if not called */ + rescnt++; /* return last_value if not is_called */ fetch--; - log--; } /* @@ -480,7 +474,7 @@ nextval_internal(Oid relid) * checkpoint would fail to advance the sequence past the logged values. * In this case we may as well fetch extra values. */ - if (log < fetch) + if (log < fetch || !seq->is_called) { /* forced log to satisfy local demand for values */ fetch = log = fetch + SEQ_LOG_VALS; @@ -571,8 +565,18 @@ nextval_internal(Oid relid) last_used_seq = elm; + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); + /* + * We must mark the buffer dirty before doing XLogInsert(); see notes in + * SyncOneBuffer(). However, we don't apply the desired changes just yet. + * This looks like a violation of the buffer update protocol, but it is + * in fact safe because we hold exclusive lock on the buffer. Any other + * process, including a checkpoint, that tries to examine the buffer + * contents will block until we release the lock, and then will see the + * final state that we install below. + */ MarkBufferDirty(buf); /* XLOG stuff */ @@ -582,20 +586,26 @@ nextval_internal(Oid relid) XLogRecPtr recptr; XLogRecData rdata[2]; - xlrec.node = seqrel->rd_node; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(xl_seq_rec); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + /* + * We don't log the current state of the tuple, but rather the state + * as it would appear after "log" more fetches. This lets us skip + * that many future WAL records, at the cost that we lose those + * sequence values if we crash. + */ /* set values that will be saved in xlog */ seq->last_value = next; seq->is_called = true; seq->log_cnt = 0; - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + xlrec.node = seqrel->rd_node; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_seq_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -605,7 +615,7 @@ nextval_internal(Oid relid) PageSetTLI(page, ThisTimeLineID); } - /* update on-disk data */ + /* Now update sequence tuple to the intended final state */ seq->last_value = last; /* last fetched number */ seq->is_called = true; seq->log_cnt = log; /* how much is logged */ @@ -706,6 +716,7 @@ do_setval(Oid relid, int64 next, bool iscalled) SeqTable elm; Relation seqrel; Buffer buf; + HeapTupleData seqtuple; Form_pg_sequence seq; /* open and AccessShareLock sequence */ @@ -718,7 +729,7 @@ do_setval(Oid relid, int64 next, bool iscalled) RelationGetRelationName(seqrel)))); /* lock page' buffer and read tuple */ - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); if ((next < seq->min_value) || (next > seq->max_value)) { @@ -746,8 +757,13 @@ do_setval(Oid relid, int64 next, bool iscalled) /* In any case, forget any future cached numbers */ elm->cached = elm->last; + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); + seq->last_value = next; /* last fetched number */ + seq->is_called = iscalled; + seq->log_cnt = 0; + MarkBufferDirty(buf); /* XLOG stuff */ @@ -764,14 +780,8 @@ do_setval(Oid relid, int64 next, bool iscalled) rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); - /* set values that will be saved in xlog */ - seq->last_value = next; - seq->is_called = true; - seq->log_cnt = 0; - - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -781,11 +791,6 @@ do_setval(Oid relid, int64 next, bool iscalled) PageSetTLI(page, ThisTimeLineID); } - /* save info in sequence relation */ - seq->last_value = next; /* last fetched number */ - seq->is_called = iscalled; - seq->log_cnt = (iscalled) ? 0 : 1; - END_CRIT_SECTION(); UnlockReleaseBuffer(buf); @@ -925,13 +930,20 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel) } -/* Given an opened relation, lock the page buffer and find the tuple */ +/* + * Given an opened sequence relation, lock the page buffer and find the tuple + * + * *buf receives the reference to the pinned-and-ex-locked buffer + * *seqtuple receives the reference to the sequence tuple proper + * (this arg should point to a local variable of type HeapTupleData) + * + * Function's return value points to the data payload of the tuple + */ static Form_pg_sequence -read_info(SeqTable elm, Relation rel, Buffer *buf) +read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple) { PageHeader page; ItemId lp; - HeapTupleData tuple; sequence_magic *sm; Form_pg_sequence seq; @@ -947,7 +959,10 @@ read_info(SeqTable elm, Relation rel, Buffer *buf) lp = PageGetItemId(page, FirstOffsetNumber); Assert(ItemIdIsNormal(lp)); - tuple.t_data = (HeapTupleHeader) PageGetItem((Page) page, lp); + + /* Note we currently only bother to set these two fields of *seqtuple */ + seqtuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lp); + seqtuple->t_len = ItemIdGetLength(lp); /* * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE @@ -957,15 +972,15 @@ read_info(SeqTable elm, Relation rel, Buffer *buf) * hint bit update, ie, don't bother to WAL-log it, since we can certainly * do this again if the update gets lost. */ - if (HeapTupleHeaderGetXmax(tuple.t_data) != InvalidTransactionId) + if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId) { - HeapTupleHeaderSetXmax(tuple.t_data, InvalidTransactionId); - tuple.t_data->t_infomask &= ~HEAP_XMAX_COMMITTED; - tuple.t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId); + seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED; + seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(*buf); } - seq = (Form_pg_sequence) GETSTRUCT(&tuple); + seq = (Form_pg_sequence) GETSTRUCT(seqtuple); /* this is a handy place to update our copy of the increment */ elm->increment = seq->increment_by; @@ -1065,6 +1080,13 @@ init_params(List *options, bool isInit, defel->defname); } + /* + * We must reset log_cnt when isInit or when changing any parameters + * that would affect future nextval allocations. + */ + if (isInit) + new->log_cnt = 0; + /* INCREMENT BY */ if (increment_by != NULL) { @@ -1073,6 +1095,7 @@ init_params(List *options, bool isInit, ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("INCREMENT must not be zero"))); + new->log_cnt = 0; } else if (isInit) new->increment_by = 1; @@ -1082,30 +1105,39 @@ init_params(List *options, bool isInit, { new->is_cycled = intVal(is_cycled->arg); Assert(new->is_cycled == false || new->is_cycled == true); + new->log_cnt = 0; } else if (isInit) new->is_cycled = false; /* MAXVALUE (null arg means NO MAXVALUE) */ if (max_value != NULL && max_value->arg) + { new->max_value = defGetInt64(max_value); + new->log_cnt = 0; + } else if (isInit || max_value != NULL) { if (new->increment_by > 0) new->max_value = SEQ_MAXVALUE; /* ascending seq */ else new->max_value = -1; /* descending seq */ + new->log_cnt = 0; } /* MINVALUE (null arg means NO MINVALUE) */ if (min_value != NULL && min_value->arg) + { new->min_value = defGetInt64(min_value); + new->log_cnt = 0; + } else if (isInit || min_value != NULL) { if (new->increment_by > 0) new->min_value = 1; /* ascending seq */ else new->min_value = SEQ_MINVALUE; /* descending seq */ + new->log_cnt = 0; } /* crosscheck min/max */ @@ -1179,6 +1211,7 @@ init_params(List *options, bool isInit, errmsg("CACHE (%s) must be greater than zero", buf))); } + new->log_cnt = 0; } else if (isInit) new->cache_value = 1; @@ -1306,7 +1339,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record) item = (char *) xlrec + sizeof(xl_seq_rec); itemsz = record->xl_len - sizeof(xl_seq_rec); - itemsz = MAXALIGN(itemsz); + if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber, false, false) == InvalidOffsetNumber) elog(PANIC, "seq_redo: failed to add item to page"); diff --git a/src/test/regress/expected/sequence.out b/src/test/regress/expected/sequence.out index 0576b575ee680a7b559ae0f494183888f0444821..b218b53839b06336d50b8ed894c28224ee6a2cdc 100644 --- a/src/test/regress/expected/sequence.out +++ b/src/test/regress/expected/sequence.out @@ -101,7 +101,7 @@ ALTER TABLE foo_seq RENAME TO foo_seq_new; SELECT * FROM foo_seq_new; sequence_name | last_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 1 | 1 | 9223372036854775807 | 1 | 1 | 1 | f | f + foo_seq | 1 | 1 | 9223372036854775807 | 1 | 1 | 0 | f | f (1 row) DROP SEQUENCE foo_seq_new;