未验证 提交 32a3a4db 编写于 作者: W Weinan WANG 提交者: GitHub

Fix cursor snapshot dump xid issue

For cursor snapshot dump, we need to record both distributed and local
xid. So far, we only record distributed xid in the dump, as well as,
incorrectly assign distributed xid to local xid by dump read function.

Fix it.
上级 fe26d931
......@@ -2435,7 +2435,7 @@ StartTransaction(void)
SharedLocalSnapshotSlot->ready = false;
SharedLocalSnapshotSlot->xid = s->transactionId;
SharedLocalSnapshotSlot->startTimestamp = stmtStartTimestamp;
SharedLocalSnapshotSlot->QDxid = QEDtxContextInfo.distributedXid;
SharedLocalSnapshotSlot->distributedXid = QEDtxContextInfo.distributedXid;
SharedLocalSnapshotSlot->writer_proc = MyProc;
SharedLocalSnapshotSlot->writer_xact = MyPgXact;
......@@ -2446,7 +2446,7 @@ StartTransaction(void)
" (shared timeStamp = " INT64_FORMAT " -> "
INT64_FORMAT ")",
QEDtxContextInfo.distributedXid,
SharedLocalSnapshotSlot->QDxid,
SharedLocalSnapshotSlot->distributedXid,
SharedLocalSnapshotSlot->xid,
s->transactionId,
SharedLocalSnapshotSlot->ready ? "true" : "false",
......@@ -3620,7 +3620,7 @@ StartTransactionCommand(void)
}
SharedLocalSnapshotSlot->startTimestamp = xactStartTimestamp;
SharedLocalSnapshotSlot->QDxid = QEDtxContextInfo.distributedXid;
SharedLocalSnapshotSlot->distributedXid = QEDtxContextInfo.distributedXid;
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
......
......@@ -1671,7 +1671,7 @@ setupQEDtxContext(DtxContextInfo *dtxContextInfo)
SharedLocalSnapshotSlot->snapshot.xmax,
SharedLocalSnapshotSlot->snapshot.xcnt,
SharedLocalSnapshotSlot->snapshot.curcid,
SharedLocalSnapshotSlot->QDxid,
SharedLocalSnapshotSlot->distributedXid,
SharedLocalSnapshotSlot->segmateSync);
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
}
......
......@@ -1625,7 +1625,7 @@ updateSharedLocalSnapshot(DtxContextInfo *dtxContextInfo,
SetSharedTransactionId_writer(distributedTransactionContext);
SharedLocalSnapshotSlot->QDxid = dtxContextInfo->distributedXid;
SharedLocalSnapshotSlot->distributedXid = dtxContextInfo->distributedXid;
SharedLocalSnapshotSlot->segmateSync = dtxContextInfo->segmateSync;
SharedLocalSnapshotSlot->ready = true;
......@@ -1637,12 +1637,12 @@ updateSharedLocalSnapshot(DtxContextInfo *dtxContextInfo,
SharedLocalSnapshotSlot->snapshot.xmax,
SharedLocalSnapshotSlot->snapshot.xcnt,
SharedLocalSnapshotSlot->snapshot.curcid,
SharedLocalSnapshotSlot->QDxid)));
SharedLocalSnapshotSlot->distributedXid)));
ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
(errmsg("[Distributed Snapshot #%u] *Writer Set Shared* gxid %u, (gxid = %u, slot #%d, '%s', '%s')",
QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
SharedLocalSnapshotSlot->QDxid,
SharedLocalSnapshotSlot->distributedXid,
getDistributedTransactionId(),
SharedLocalSnapshotSlot->slotid,
debugCaller,
......@@ -1758,10 +1758,10 @@ readerFillLocalSnapshot(Snapshot snapshot, DtxContext distributedTransactionCont
if (QEDtxContextInfo.segmateSync == SharedLocalSnapshotSlot->segmateSync &&
SharedLocalSnapshotSlot->ready)
{
if (QEDtxContextInfo.distributedXid != SharedLocalSnapshotSlot->QDxid)
if (QEDtxContextInfo.distributedXid != SharedLocalSnapshotSlot->distributedXid)
elog(ERROR, "transaction ID doesn't match between the reader gang "
"and the writer gang, expect %d but having %d",
QEDtxContextInfo.distributedXid, SharedLocalSnapshotSlot->QDxid);
QEDtxContextInfo.distributedXid, SharedLocalSnapshotSlot->distributedXid);
copyLocalSnapshot(snapshot);
SetSharedTransactionId_reader(SharedLocalSnapshotSlot->xid, snapshot->curcid, distributedTransactionContext);
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
......@@ -1780,7 +1780,7 @@ readerFillLocalSnapshot(Snapshot snapshot, DtxContext distributedTransactionCont
"DistributedTransactionContext = %s. "
" Our slotindex is: %d \n"
"Dump of all sharedsnapshots in shmem: %s",
QEDtxContextInfo.distributedXid, SharedLocalSnapshotSlot->QDxid,
QEDtxContextInfo.distributedXid, SharedLocalSnapshotSlot->distributedXid,
QEDtxContextInfo.segmateSync,
SharedLocalSnapshotSlot->segmateSync, SharedLocalSnapshotSlot->ready,
DtxContextToString(distributedTransactionContext),
......@@ -1796,7 +1796,7 @@ readerFillLocalSnapshot(Snapshot snapshot, DtxContext distributedTransactionCont
(errmsg("[Distributed Snapshot #%u] *No Match* gxid %u = %u and segmateSync %d = %d (%s)",
QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
QEDtxContextInfo.distributedXid,
SharedLocalSnapshotSlot->QDxid,
SharedLocalSnapshotSlot->distributedXid,
QEDtxContextInfo.segmateSync,
SharedLocalSnapshotSlot->segmateSync,
DtxContextToString(distributedTransactionContext))));
......@@ -1808,7 +1808,7 @@ readerFillLocalSnapshot(Snapshot snapshot, DtxContext distributedTransactionCont
" Our slotindex is: %d \n"
"DistributedTransactionContext = %s.",
QEDtxContextInfo.distributedXid, QEDtxContextInfo.segmateSync,
SharedLocalSnapshotSlot->QDxid, SharedLocalSnapshotSlot->segmateSync,
SharedLocalSnapshotSlot->distributedXid, SharedLocalSnapshotSlot->segmateSync,
SharedLocalSnapshotSlot->ready,
SharedLocalSnapshotSlot->slotindex,
DtxContextToString(distributedTransactionContext))));
......@@ -3140,12 +3140,12 @@ UpdateSerializableCommandId(CommandId curcid)
{
LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);
if (SharedLocalSnapshotSlot->QDxid != QEDtxContextInfo.distributedXid)
if (SharedLocalSnapshotSlot->distributedXid != QEDtxContextInfo.distributedXid)
{
ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
(errmsg("[Distributed Snapshot #%u] *Can't Update Serializable Command Id* QDxid = %u (gxid = %u, '%s')",
QEDtxContextInfo.distributedSnapshot.distribSnapshotId,
SharedLocalSnapshotSlot->QDxid,
SharedLocalSnapshotSlot->distributedXid,
getDistributedTransactionId(),
DtxContextToString(DistributedTransactionContext))));
LWLockRelease(SharedLocalSnapshotSlot->slotLock);
......
......@@ -350,7 +350,7 @@ SharedSnapshotDump(void)
if (testSlot->slotid != -1)
{
appendStringInfo(&str, "(SLOT index: %d slotid: %d QDxid: %u pid: %u)",
testSlot->slotindex, testSlot->slotid, testSlot->QDxid,
testSlot->slotindex, testSlot->slotid, testSlot->distributedXid,
testSlot->writer_proc ? testSlot->writer_proc->pid : 0);
}
......@@ -462,7 +462,7 @@ retry:
slot->slotid = slotId;
slot->xid = 0;
slot->startTimestamp = 0;
slot->QDxid = 0;
slot->distributedXid = 0;
slot->segmateSync = 0;
/* Remember the writer proc for IsCurrentTransactionIdForReader */
slot->writer_proc = MyProc;
......@@ -573,7 +573,7 @@ SharedSnapshotRemove(volatile SharedSnapshotSlot *slot, char *creatorDescription
slot->slotid = -1;
slot->xid = 0;
slot->startTimestamp = 0;
slot->QDxid = 0;
slot->distributedXid = 0;
slot->segmateSync = 0;
sharedSnapshotArray->numSlots -= 1;
......@@ -672,7 +672,8 @@ dumpSharedLocalSnapshot_forCursor(void)
pDump->segment = segment;
pDump->handle = dsm_segment_handle(segment);
pDump->segmateSync = src->segmateSync;
pDump->xid = src->QDxid;
pDump->distributedXid = src->distributedXid;
pDump->localXid = src->xid;
elog(LOG, "Dump syncmate : %u snapshot to slot %d", src->segmateSync, id);
......@@ -725,7 +726,7 @@ readSharedLocalSnapshot_forCursor(Snapshot snapshot, DtxContext distributedTrans
search_iter = SNAPSHOTDUMPARRAYSZ - 1;
if(src->dump[search_iter].segmateSync == QEDtxContextInfo.segmateSync &&
src->dump[search_iter].xid == QEDtxContextInfo.distributedXid)
src->dump[search_iter].distributedXid == QEDtxContextInfo.distributedXid)
{
pDump = &src->dump[search_iter];
found = true;
......@@ -751,7 +752,7 @@ readSharedLocalSnapshot_forCursor(Snapshot snapshot, DtxContext distributedTrans
char *ptr = dsm_segment_address(segment);
entry->snapshot = RestoreSnapshot(ptr);
entry->localXid = pDump->xid;
entry->localXid = pDump->localXid;
dsm_detach(segment);
......
......@@ -20,7 +20,8 @@
typedef struct SnapshotDump
{
uint32 segmateSync;
TransactionId xid;
DistributedTransactionId distributedXid;
TransactionId localXid;
dsm_handle handle;
dsm_segment *segment;
} SnapshotDump;
......@@ -34,7 +35,10 @@ typedef struct SharedSnapshotSlot
int32 slotid;
PGPROC *writer_proc;
PGXACT *writer_xact;
volatile TransactionId QDxid;
/* only used by cursor dump identification, dose not always set */
volatile DistributedTransactionId distributedXid;
volatile bool ready;
volatile uint32 segmateSync;
SnapshotData snapshot;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册