提交 e80b0212 编写于 作者: G Gang Xiong 提交者: xiong-gang

The XactLogCommitRecord logical is broken when merge 9.5, doing minor refactor

to fix it.
上级 9471a04d
......@@ -1997,8 +1997,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
ninvalmsgs, invalmsgs,
ndeldbs, deldbs,
initfileinval, false,
xid,
NULL/* commit prepared */);
xid);
/*
* We don't currently try to sleep before flush here ... nor is there any
......
......@@ -1422,7 +1422,6 @@ RecordTransactionCommit(void)
bool wrote_xlog;
bool isDtxPrepared = 0;
bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit);
XLogRecPtr recptr = InvalidXLogRecPtr;
/* Like in CommitTransaction(), treat a QE reader as if there was no XID */
if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
......@@ -1527,31 +1526,13 @@ RecordTransactionCommit(void)
SIMPLE_FAULT_INJECTOR("onephase_transaction_commit");
if (isDtxPrepared)
{
char gid[TMGIDSIZE];
dtxFormGID(gid, MyTmGxact->distribTimeStamp, MyTmGxact->gxid);
insertingDistributedCommitted();
XactLogCommitRecord(xactStopTimestamp,
GetPendingTablespaceForDeletionForCommit(),
nchildren, children, nrels, rels,
nmsgs, invalMessages,
ndeldbs, deldbs,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */,
gid /* distributed commit */);
insertedDistributedCommitted();
}
else
XactLogCommitRecord(xactStopTimestamp,
GetPendingTablespaceForDeletionForCommit(),
nchildren, children, nrels, rels,
nmsgs, invalMessages,
ndeldbs, deldbs,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */,
NULL);
XactLogCommitRecord(xactStopTimestamp,
GetPendingTablespaceForDeletionForCommit(),
nchildren, children, nrels, rels,
nmsgs, invalMessages,
ndeldbs, deldbs,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */);
/*
* Record plain commit ts if not replaying remote actions, or if no
......@@ -1633,14 +1614,9 @@ RecordTransactionCommit(void)
getDistributedTransactionId(),
/* isRedo */ false);
else if (isOnePhaseQE)
{
DistributedTransactionTimeStamp distribTimeStamp;
DistributedTransactionId distribXid;
dtxCrackOpenGid(MyTmGxact->gid, &distribTimeStamp, &distribXid);
DistributedLog_SetCommittedTree(xid, nchildren, children,
MyTmGxact->distribTimeStamp, MyTmGxact->gxid,
/* isRedo */ false);
}
TransactionIdCommitTree(xid, nchildren, children);
}
......@@ -6423,8 +6399,7 @@ XactLogCommitRecord(TimestampTz commit_time,
int nmsgs, SharedInvalidationMessage *msgs,
int ndeldbs, DbDirNode *deldbs,
bool relcacheInval, bool forceSync,
TransactionId twophase_xid,
const char *gid)
TransactionId twophase_xid)
{
xl_xact_commit xlrec;
xl_xact_xinfo xl_xinfo;
......@@ -6436,6 +6411,9 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_origin xl_origin;
xl_xact_distrib xl_distrib;
xl_xact_deldbs xl_deldbs;
XLogRecPtr recptr;
bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit);
bool isDtxPrepared = isPreparedDtxTransaction();
uint8 info;
......@@ -6443,11 +6421,8 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xinfo.xinfo = 0;
/* cannot log commit prepared and distributed commit record at the same time */
Assert(!(TransactionIdIsValid(twophase_xid) && gid != NULL));
/* decide between a plain and 2pc commit */
if (gid)
if (isDtxPrepared)
info = XLOG_XACT_DISTRIBUTED_COMMIT;
else if (!TransactionIdIsValid(twophase_xid))
info = XLOG_XACT_COMMIT;
......@@ -6514,19 +6489,11 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp;
}
if (gid)
if (isDtxPrepared || isOnePhaseQE)
{
DistributedTransactionTimeStamp distrib_timestamp;
DistributedTransactionId distrib_xid;
dtxCrackOpenGid(gid, &distrib_timestamp, &distrib_xid);
if (Gp_role == GP_ROLE_EXECUTE && MyTmGxact->isOnePhaseCommit)
xl_xinfo.xinfo |= XLOG_XACT_COMMIT;
else
xl_xinfo.xinfo |= XACT_XINFO_HAS_DISTRIB;
xl_distrib.distrib_xid = distrib_xid;
xl_distrib.distrib_timestamp = distrib_timestamp;
xl_xinfo.xinfo |= XACT_XINFO_HAS_DISTRIB;
xl_distrib.distrib_timestamp = MyTmGxact->distribTimeStamp;
xl_distrib.distrib_xid = MyTmGxact->gxid;
}
if (xl_xinfo.xinfo != 0)
......@@ -6586,7 +6553,15 @@ XactLogCommitRecord(TimestampTz commit_time,
/* we allow filtering by xacts */
XLogIncludeOrigin();
return XLogInsert(RM_XACT_ID, info);
if (isDtxPrepared)
insertingDistributedCommitted();
recptr = XLogInsert(RM_XACT_ID, info);
if (isDtxPrepared)
insertedDistributedCommitted();
return recptr;
}
/*
......
......@@ -430,8 +430,7 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
int nmsgs, SharedInvalidationMessage *msgs,
int ndeldbs, DbDirNode *deldbs,
bool relcacheInval, bool forceSync,
TransactionId twophase_xid,
const char *gid);
TransactionId twophase_xid);
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
Oid tablespace_oid_to_abort,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册