提交 511e902b 编写于 作者: T Tom Lane

Make TRUNCATE ... RESTART IDENTITY restart sequences transactionally.

In the previous coding, we simply issued ALTER SEQUENCE RESTART commands,
which do not roll back on error.  This meant that an error between
truncating and committing left the sequences out of sync with the table
contents, with potentially bad consequences as were noted in a Warning on
the TRUNCATE man page.

To fix, create a new storage file (relfilenode) for a sequence that is to
be reset due to RESTART IDENTITY.  If the transaction aborts, we'll
automatically revert to the old storage file.  This acts just like a
rewriting ALTER TABLE operation.  A penalty is that we have to take
exclusive lock on the sequence, but since we've already got exclusive lock
on its owning table, that seems unlikely to be much of a problem.

The interaction of this with usual nontransactional behaviors of sequence
operations is a bit weird, but it's hard to see what would be completely
consistent.  Our choice is to discard cached-but-unissued sequence values
both when the RESTART is executed, and at rollback if any; but to not touch
the currval() state either time.

In passing, move the sequence reset operations to happen before not after
any AFTER TRUNCATE triggers are fired.  The previous ordering was not
logically sensible, but was forced by the need to minimize inconsistency
if the triggers caused an error.  Transactional rollback is a much better
solution to that.

Patch by Steve Singer, rather heavily adjusted by me.
上级 cfad144f
......@@ -108,7 +108,9 @@ TRUNCATE [ TABLE ] [ ONLY ] <replaceable class="PARAMETER">name</replaceable> [,
<para>
<command>TRUNCATE</> acquires an <literal>ACCESS EXCLUSIVE</> lock on each
table it operates on, which blocks all other concurrent operations
on the table. If concurrent access to a table is required, then
on the table. When <literal>RESTART IDENTITY</> is specified, any
sequences that are to be restarted are likewise locked exclusively.
If concurrent access to a table is required, then
the <command>DELETE</> command should be used instead.
</para>
......@@ -130,7 +132,8 @@ TRUNCATE [ TABLE ] [ ONLY ] <replaceable class="PARAMETER">name</replaceable> [,
the tables, then all <literal>BEFORE TRUNCATE</literal> triggers are
fired before any truncation happens, and all <literal>AFTER
TRUNCATE</literal> triggers are fired after the last truncation is
performed. The triggers will fire in the order that the tables are
performed and any sequences are reset.
The triggers will fire in the order that the tables are
to be processed (first those listed in the command, and then any
that were added due to cascading).
</para>
......@@ -159,32 +162,21 @@ TRUNCATE [ TABLE ] [ ONLY ] <replaceable class="PARAMETER">name</replaceable> [,
transaction does not commit.
</para>
<warning>
<para>
Any <command>ALTER SEQUENCE RESTART</> operations performed as a
consequence of using the <literal>RESTART IDENTITY</> option are
nontransactional and will not be rolled back on failure. To minimize
the risk, these operations are performed only after all the rest of
<command>TRUNCATE</>'s work is done. However, there is still a risk
if <command>TRUNCATE</> is performed inside a transaction block that is
aborted afterwards. For example, consider
<programlisting>
BEGIN;
TRUNCATE TABLE foo RESTART IDENTITY;
COPY foo FROM ...;
COMMIT;
</programlisting>
If the <command>COPY</> fails partway through, the table data
rolls back correctly, but the sequences will be left with values
that are probably smaller than they had before, possibly leading
to duplicate-key failures or other problems in later transactions.
If this is likely to be a problem, it's best to avoid using
<literal>RESTART IDENTITY</>, and accept that the new contents of
the table will have higher serial numbers than the old.
</para>
</warning>
<para>
When <literal>RESTART IDENTITY</> is specified, the implied
<command>ALTER SEQUENCE RESTART</> operations are also done
transactionally; that is, they will be rolled back if the surrounding
transaction does not commit. This is unlike the normal behavior of
<command>ALTER SEQUENCE RESTART</>. Be aware that if any additional
sequence operations are done on the restarted sequences before the
transaction rolls back, the effects of these operations on the sequences
will be rolled back, but not their effects on <function>currval()</>;
that is, after the transaction <function>currval()</> will continue to
reflect the last sequence value obtained inside the failed transaction,
even though the sequence itself may no longer be consistent with that.
This is similar to the usual behavior of <function>currval()</> after
a failed transaction.
</para>
</refsect1>
<refsect1>
......@@ -222,13 +214,14 @@ TRUNCATE othertable CASCADE;
<title>Compatibility</title>
<para>
The SQL:2008 standard includes a <command>TRUNCATE</command> command with the syntax
<literal>TRUNCATE TABLE <replaceable>tablename</replaceable></literal>.
The clauses <literal>CONTINUE IDENTITY</literal>/<literal>RESTART IDENTITY</literal>
also appear in that standard but have slightly different but related meanings.
Some of the concurrency behavior of this command is left implementation-defined
by the standard, so the above notes should be considered and compared with
other implementations if necessary.
The SQL:2008 standard includes a <command>TRUNCATE</command> command
with the syntax <literal>TRUNCATE TABLE
<replaceable>tablename</replaceable></literal>. The clauses
<literal>CONTINUE IDENTITY</literal>/<literal>RESTART IDENTITY</literal>
also appear in that standard, but have slightly different though related
meanings. Some of the concurrency behavior of this command is left
implementation-defined by the standard, so the above notes should be
considered and compared with other implementations if necessary.
</para>
</refsect1>
</refentry>
......@@ -68,6 +68,7 @@ typedef struct SeqTableData
{
struct SeqTableData *next; /* link to next SeqTable object */
Oid relid; /* pg_class OID of this sequence */
Oid filenode; /* last seen relfilenode of this sequence */
LocalTransactionId lxid; /* xact in which we last did a seq op */
bool last_valid; /* do we have a valid "last" value? */
int64 last; /* value last returned by nextval */
......@@ -87,6 +88,7 @@ static SeqTable seqtab = NULL; /* Head of list of SeqTable items */
*/
static SeqTableData *last_used_seq = NULL;
static void fill_seq_with_data(Relation rel, HeapTuple tuple);
static int64 nextval_internal(Oid relid);
static Relation open_share_lock(SeqTable seq);
static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
......@@ -109,9 +111,6 @@ DefineSequence(CreateSeqStmt *seq)
CreateStmt *stmt = makeNode(CreateStmt);
Oid seqoid;
Relation rel;
Buffer buf;
Page page;
sequence_magic *sm;
HeapTuple tuple;
TupleDesc tupDesc;
Datum value[SEQ_COL_LASTCOL];
......@@ -211,6 +210,100 @@ DefineSequence(CreateSeqStmt *seq)
rel = heap_open(seqoid, AccessExclusiveLock);
tupDesc = RelationGetDescr(rel);
/* now initialize the sequence's data */
tuple = heap_form_tuple(tupDesc, value, null);
fill_seq_with_data(rel, tuple);
/* process OWNED BY if given */
if (owned_by)
process_owned_by(rel, owned_by);
heap_close(rel, NoLock);
}
/*
* Reset a sequence to its initial value.
*
* The change is made transactionally, so that on failure of the current
* transaction, the sequence will be restored to its previous state.
* We do that by creating a whole new relfilenode for the sequence; so this
* works much like the rewriting forms of ALTER TABLE.
*
* Caller is assumed to have acquired AccessExclusiveLock on the sequence,
* which must not be released until end of transaction. Caller is also
* responsible for permissions checking.
*/
void
ResetSequence(Oid seq_relid)
{
Relation seq_rel;
SeqTable elm;
Form_pg_sequence seq;
Buffer buf;
Page page;
HeapTuple tuple;
HeapTupleData tupledata;
ItemId lp;
/*
* Read the old sequence. This does a bit more work than really
* necessary, but it's simple, and we do want to double-check that it's
* indeed a sequence.
*/
init_sequence(seq_relid, &elm, &seq_rel);
seq = read_info(elm, seq_rel, &buf);
/*
* Copy the existing sequence tuple.
*/
page = BufferGetPage(buf);
lp = PageGetItemId(page, FirstOffsetNumber);
Assert(ItemIdIsNormal(lp));
tupledata.t_data = (HeapTupleHeader) PageGetItem(page, lp);
tupledata.t_len = ItemIdGetLength(lp);
tuple = heap_copytuple(&tupledata);
/* Now we're done with the old page */
UnlockReleaseBuffer(buf);
/*
* Modify the copied tuple to execute the restart (compare the RESTART
* action in AlterSequence)
*/
seq = (Form_pg_sequence) GETSTRUCT(tuple);
seq->last_value = seq->start_value;
seq->is_called = false;
seq->log_cnt = 1;
/*
* Create a new storage file for the sequence. We want to keep the
* sequence's relfrozenxid at 0, since it won't contain any unfrozen XIDs.
*/
RelationSetNewRelfilenode(seq_rel, InvalidTransactionId);
/*
* Insert the modified tuple into the new storage file.
*/
fill_seq_with_data(seq_rel, tuple);
/* Clear local cache so that we don't think we have cached numbers */
/* Note that we do not change the currval() state */
elm->cached = elm->last;
relation_close(seq_rel, NoLock);
}
/*
* Initialize a sequence's relation with the specified tuple as content
*/
static void
fill_seq_with_data(Relation rel, HeapTuple tuple)
{
Buffer buf;
Page page;
sequence_magic *sm;
/* Initialize first page of relation with special magic number */
buf = ReadBuffer(rel, P_NEW);
......@@ -225,8 +318,7 @@ DefineSequence(CreateSeqStmt *seq)
/* hack: ensure heap_insert will insert on the just-created page */
RelationSetTargetBlock(rel, 0);
/* Now form & insert sequence tuple */
tuple = heap_form_tuple(tupDesc, value, null);
/* Now insert sequence tuple */
simple_heap_insert(rel, tuple);
Assert(ItemPointerGetOffsetNumber(&(tuple->t_self)) == FirstOffsetNumber);
......@@ -306,12 +398,6 @@ DefineSequence(CreateSeqStmt *seq)
END_CRIT_SECTION();
UnlockReleaseBuffer(buf);
/* process OWNED BY if given */
if (owned_by)
process_owned_by(rel, owned_by);
heap_close(rel, NoLock);
}
/*
......@@ -323,29 +409,6 @@ void
AlterSequence(AlterSeqStmt *stmt)
{
Oid relid;
/* find sequence */
relid = RangeVarGetRelid(stmt->sequence, false);
/* allow ALTER to sequence owner only */
/* if you change this, see also callers of AlterSequenceInternal! */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
stmt->sequence->relname);
/* do the work */
AlterSequenceInternal(relid, stmt->options);
}
/*
* AlterSequenceInternal
*
* Same as AlterSequence except that the sequence is specified by OID
* and we assume the caller already checked permissions.
*/
void
AlterSequenceInternal(Oid relid, List *options)
{
SeqTable elm;
Relation seqrel;
Buffer buf;
......@@ -355,8 +418,14 @@ AlterSequenceInternal(Oid relid, List *options)
List *owned_by;
/* open and AccessShareLock sequence */
relid = RangeVarGetRelid(stmt->sequence, false);
init_sequence(relid, &elm, &seqrel);
/* allow ALTER to sequence owner only */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
stmt->sequence->relname);
/* lock page' buffer and read tuple into new sequence structure */
seq = read_info(elm, seqrel, &buf);
page = BufferGetPage(buf);
......@@ -365,7 +434,7 @@ AlterSequenceInternal(Oid relid, List *options)
memcpy(&new, seq, sizeof(FormData_pg_sequence));
/* Check and set new values */
init_params(options, false, &new, &owned_by);
init_params(stmt->options, false, &new, &owned_by);
/* Clear local cache so that we don't think we have cached numbers */
/* Note that we do not change the currval() state */
......@@ -937,6 +1006,7 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
elm->relid = relid;
elm->filenode = InvalidOid;
elm->lxid = InvalidLocalTransactionId;
elm->last_valid = false;
elm->last = elm->cached = elm->increment = 0;
......@@ -955,6 +1025,18 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel)
errmsg("\"%s\" is not a sequence",
RelationGetRelationName(seqrel))));
/*
* If the sequence has been transactionally replaced since we last saw it,
* discard any cached-but-unissued values. We do not touch the currval()
* state, however.
*/
if (seqrel->rd_rel->relfilenode != elm->filenode)
{
elm->filenode = seqrel->rd_rel->relfilenode;
elm->cached = elm->last;
}
/* Return results */
*p_elm = elm;
*p_rel = seqrel;
}
......
......@@ -916,10 +916,9 @@ ExecuteTruncate(TruncateStmt *stmt)
/*
* If we are asked to restart sequences, find all the sequences, lock them
* (we only need AccessShareLock because that's all that ALTER SEQUENCE
* takes), and check permissions. We want to do this early since it's
* pointless to do all the truncation work only to fail on sequence
* permissions.
* (we need AccessExclusiveLock for ResetSequence), and check permissions.
* We want to do this early since it's pointless to do all the truncation
* work only to fail on sequence permissions.
*/
if (stmt->restart_seqs)
{
......@@ -934,7 +933,7 @@ ExecuteTruncate(TruncateStmt *stmt)
Oid seq_relid = lfirst_oid(seqcell);
Relation seq_rel;
seq_rel = relation_open(seq_relid, AccessShareLock);
seq_rel = relation_open(seq_relid, AccessExclusiveLock);
/* This check must match AlterSequence! */
if (!pg_class_ownercheck(seq_relid, GetUserId()))
......@@ -1043,6 +1042,16 @@ ExecuteTruncate(TruncateStmt *stmt)
}
}
/*
* Restart owned sequences if we were asked to.
*/
foreach(cell, seq_relids)
{
Oid seq_relid = lfirst_oid(cell);
ResetSequence(seq_relid);
}
/*
* Process all AFTER STATEMENT TRUNCATE triggers.
*/
......@@ -1067,25 +1076,6 @@ ExecuteTruncate(TruncateStmt *stmt)
heap_close(rel, NoLock);
}
/*
* Lastly, restart any owned sequences if we were asked to. This is done
* last because it's nontransactional: restarts will not roll back if we
* abort later. Hence it's important to postpone them as long as
* possible. (This is also a big reason why we locked and
* permission-checked the sequences beforehand.)
*/
if (stmt->restart_seqs)
{
List *options = list_make1(makeDefElem("restart", NULL));
foreach(cell, seq_relids)
{
Oid seq_relid = lfirst_oid(cell);
AlterSequenceInternal(seq_relid, options);
}
}
}
/*
......
......@@ -2623,8 +2623,8 @@ RelationBuildLocalRelation(const char *relname,
* Caller must already hold exclusive lock on the relation.
*
* The relation is marked with relfrozenxid = freezeXid (InvalidTransactionId
* must be passed for indexes). This should be a lower bound on the XIDs
* that will be put into the new relation contents.
* must be passed for indexes and sequences). This should be a lower bound on
* the XIDs that will be put into the new relation contents.
*/
void
RelationSetNewRelfilenode(Relation relation, TransactionId freezeXid)
......@@ -2635,9 +2635,10 @@ RelationSetNewRelfilenode(Relation relation, TransactionId freezeXid)
HeapTuple tuple;
Form_pg_class classform;
/* Indexes must have Invalid frozenxid; other relations must not */
Assert((relation->rd_rel->relkind == RELKIND_INDEX &&
freezeXid == InvalidTransactionId) ||
/* Indexes, sequences must have Invalid frozenxid; other rels must not */
Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
relation->rd_rel->relkind == RELKIND_SEQUENCE) ?
freezeXid == InvalidTransactionId :
TransactionIdIsNormal(freezeXid));
/* Allocate a new relfilenode */
......@@ -2687,8 +2688,11 @@ RelationSetNewRelfilenode(Relation relation, TransactionId freezeXid)
classform->relfilenode = newrelfilenode;
/* These changes are safe even for a mapped relation */
classform->relpages = 0; /* it's empty until further notice */
classform->reltuples = 0;
if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
{
classform->relpages = 0; /* it's empty until further notice */
classform->reltuples = 0;
}
classform->relfrozenxid = freezeXid;
simple_heap_update(pg_class, &tuple->t_self, tuple);
......
......@@ -71,7 +71,7 @@ extern Datum lastval(PG_FUNCTION_ARGS);
extern void DefineSequence(CreateSeqStmt *stmt);
extern void AlterSequence(AlterSeqStmt *stmt);
extern void AlterSequenceInternal(Oid relid, List *options);
extern void ResetSequence(Oid seq_relid);
extern void seq_redo(XLogRecPtr lsn, XLogRecord *rptr);
extern void seq_desc(StringInfo buf, uint8 xl_info, char *rec);
......
......@@ -398,6 +398,28 @@ SELECT * FROM truncate_a;
2 | 34
(2 rows)
-- check rollback of a RESTART IDENTITY operation
BEGIN;
TRUNCATE truncate_a RESTART IDENTITY;
INSERT INTO truncate_a DEFAULT VALUES;
SELECT * FROM truncate_a;
id | id1
----+-----
1 | 33
(1 row)
ROLLBACK;
INSERT INTO truncate_a DEFAULT VALUES;
INSERT INTO truncate_a DEFAULT VALUES;
SELECT * FROM truncate_a;
id | id1
----+-----
1 | 33
2 | 34
3 | 35
4 | 36
(4 rows)
DROP TABLE truncate_a;
SELECT nextval('truncate_a_id1'); -- fail, seq should have been dropped
ERROR: relation "truncate_a_id1" does not exist
......
......@@ -202,6 +202,16 @@ INSERT INTO truncate_a DEFAULT VALUES;
INSERT INTO truncate_a DEFAULT VALUES;
SELECT * FROM truncate_a;
-- check rollback of a RESTART IDENTITY operation
BEGIN;
TRUNCATE truncate_a RESTART IDENTITY;
INSERT INTO truncate_a DEFAULT VALUES;
SELECT * FROM truncate_a;
ROLLBACK;
INSERT INTO truncate_a DEFAULT VALUES;
INSERT INTO truncate_a DEFAULT VALUES;
SELECT * FROM truncate_a;
DROP TABLE truncate_a;
SELECT nextval('truncate_a_id1'); -- fail, seq should have been dropped
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册