diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 8270db44eaf921f0dfe93677d7fa8b1ca8374b70..18ac69325bf6bf3ed96868e35b7ed2883ee548e4 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1421,8 +1421,8 @@ RecordTransactionCommit(void) bool RelcacheInitFileInval = false; bool wrote_xlog; bool isDtxPrepared = 0; - bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxact->isOnePhaseCommit); - TMGXACT_LOG gxact_log; + bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit); + XLogRecPtr recptr = InvalidXLogRecPtr; /* Like in CommitTransaction(), treat a QE reader as if there was no XID */ if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON || @@ -1517,7 +1517,7 @@ RecordTransactionCommit(void) * checkpoint process fails to record this transaction in the * checkpoint. Crash recovery will never see the commit record for * this transaction and the second phase of 2PC will never happen. The - * inCommit flag avoids this situation by blocking checkpointer until a + * delayChkpt flag avoids this situation by blocking checkpointer until a * backend has finished updating the state. */ START_CRIT_SECTION(); @@ -1529,7 +1529,8 @@ RecordTransactionCommit(void) if (isDtxPrepared) { - getDtxLogInfo(&gxact_log); + char gid[TMGIDSIZE]; + dtxFormGID(gid, MyTmGxact->distribTimeStamp, MyTmGxact->gxid); insertingDistributedCommitted(); XactLogCommitRecord(xactStopTimestamp, @@ -1539,7 +1540,7 @@ RecordTransactionCommit(void) ndeldbs, deldbs, RelcacheInitFileInval, forceSyncCommit, InvalidTransactionId /* plain commit */, - gxact_log.gid /* distributed commit */); + gid /* distributed commit */); insertedDistributedCommitted(); } else @@ -1637,7 +1638,7 @@ RecordTransactionCommit(void) DistributedTransactionId distribXid; dtxCrackOpenGid(MyTmGxact->gid, &distribTimeStamp, &distribXid); DistributedLog_SetCommittedTree(xid, nchildren, children, - distribTimeStamp, distribXid, + MyTmGxact->distribTimeStamp, MyTmGxact->gxid, /* isRedo */ false); } @@ -2360,6 +2361,8 @@ StartTransaction(void) case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: { /* + * FIXME: get rid of currentDistribXid and use MyTmGxact->gxid + * * Generate the distributed transaction ID and save it. * it's not really needed by a select-only implicit transaction, but * currently gpfdist and pxf is using it. diff --git a/src/backend/cdb/cdbdtxcontextinfo.c b/src/backend/cdb/cdbdtxcontextinfo.c index f5616b9398fbe77007c79baa6824f96dbea0dec1..0aaee84d27e32b574600e60d41a73f2fe9f79571 100644 --- a/src/backend/cdb/cdbdtxcontextinfo.c +++ b/src/backend/cdb/cdbdtxcontextinfo.c @@ -102,7 +102,7 @@ DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo, memcpy(gid, "", 8); elog((Debug_print_full_dtm ? LOG : DEBUG5), - "DtxContextInfo_CreateOnMaster Gp_role is DISPATCH and have currentGxact = %s, gxid = %u --> have distributed snapshot", + "DtxContextInfo_CreateOnMaster Gp_role is DISPATCH and have gid = %s, gxid = %u --> have distributed snapshot", gid, getDistributedTransactionId()); elog((Debug_print_full_dtm ? LOG : DEBUG5), diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index e7d19e286f7759b77a0f04be20d271af3a747c0b..6e230bbdee27aeb07ca9daf3a30df3452c023210 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -61,11 +61,6 @@ volatile DistributedTransactionId *shmGIDSeq; uint32 *shmNextSnapshotId; -/** - * This pointer into shared memory is on the QD, and represents the current open transaction. - */ -static TMGXACT *currentGxact; - int max_tm_gxacts = 100; @@ -94,7 +89,6 @@ int max_tm_gxacts = 100; * FUNCTIONS PROTOTYPES */ static void clearAndResetGxact(void); -static void resetCurrentGxact(void); static void doPrepareTransaction(void); static void doInsertForgetCommitted(void); static void doNotifyingOnePhaseCommit(void); @@ -103,6 +97,8 @@ static void doNotifyingCommitNotPrepared(void); static void doNotifyingAbort(void); static void retryAbortPrepared(void); static void doQEDistributedExplicitBegin(); +static void currentDtxActivateTwoPhase(void); +static void setCurrentDtxState(DtxState state); static bool isDtxQueryDispatcher(void); static void performDtxProtocolCommitPrepared(const char *gid, bool raiseErrorIfNotFound); @@ -137,32 +133,6 @@ requireDistributedTransactionContext(DtxContext requiredCurrentContext) } } -static void -setGxactState(TMGXACT *transaction, DtxState state) -{ - Assert(transaction != NULL); - - /* - * elog(INFO, "Setting transaction state to '%s'", - * DtxStateToString(state)); - */ - transaction->state = state; -} - -/** - * All assignments of currentGxact->state should go through this function - * (so we can add logging here to see all assignments) - * - * This should only be called when currentGxact is non-NULL - * - * @param state the new value for currentGxact->state - */ -static void -setCurrentGxactState(DtxState state) -{ - setGxactState(currentGxact, state); -} - /** * Does DistributedTransactionContext indicate that this is acting as a QD? */ @@ -258,25 +228,60 @@ getDistributedTransactionIdentifier(char *id) bool isPreparedDtxTransaction(void) { - if (Gp_role != GP_ROLE_DISPATCH || - DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE || - currentGxact == NULL) - return false; + AssertImply(MyTmGxactLocal->state == DTX_STATE_PREPARED, + (Gp_role == GP_ROLE_DISPATCH && + DistributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE)); - return (currentGxact->state == DTX_STATE_PREPARED); + return (MyTmGxactLocal->state == DTX_STATE_PREPARED); } -void -getDtxLogInfo(TMGXACT_LOG *gxact_log) +/* + * The executor can avoid starting a distributed transaction if it knows that + * the current dtx is clean and we aren't in a user-started global transaction. + */ +bool +isCurrentDtxTwoPhaseActivated(void) +{ + return MyTmGxactLocal->state != DTX_STATE_NONE; +} + +static void +currentDtxActivateTwoPhase(void) { - if (currentGxact == NULL) + DistributedTransactionId gxid; + + gxid = GetCurrentDistributedTransactionId(); + if (gxid == InvalidDistributedTransactionId) { - elog(FATAL, "getDtxLogInfo found current distributed transaction is NULL"); + gxid = generateGID(); + SetCurrentDistributedTransactionId(gxid); + Assert(gxid != InvalidDistributedTransactionId); } - Assert(strlen(currentGxact->gid) < TMGIDSIZE); - memcpy(gxact_log->gid, currentGxact->gid, TMGIDSIZE); - gxact_log->gxid = currentGxact->gxid; + MyTmGxact->distribTimeStamp = getDtxStartTime(); + MyTmGxact->sessionId = gp_session_id; + setCurrentDtxState(DTX_STATE_ACTIVE_DISTRIBUTED); + + /* + * As addressed in access/transam/README, we have to hold ProcArrayLock + * when cleaning MyPgXact->xid as well as MyTmGxact->gxid to make sure + * someone else can take a consistent snapshot at the same time. But it's + * fine to not hold ProcArrayLock when setting xid and gxid, it won't + * affect the correctness of snapshot. + */ + MyTmGxact->gxid = gxid; +} + +static void +setCurrentDtxState(DtxState state) +{ + MyTmGxactLocal->state = state; +} + +DtxState +getCurrentDtxState(void) +{ + return MyTmGxactLocal ? MyTmGxactLocal->state : DTX_STATE_NONE; } bool @@ -289,23 +294,15 @@ notifyCommittedDtxTransactionIsNeeded(void) return false; } - if (currentGxact == NULL) + if (!isCurrentDtxTwoPhaseActivated()) { - elog(DTM_DEBUG5, "notifyCommittedDtxTransaction nothing to do (currentGxact == NULL)"); + elog(DTM_DEBUG5, "notifyCommittedDtxTransaction nothing to do (two phase not activated)"); return false; } return true; } -bool -includeInCheckpointIsNeeded(TMGXACT *gxact) -{ - volatile DtxState state = gxact->state; - return ((state >= DTX_STATE_INSERTED_COMMITTED && - state < DTX_STATE_INSERTED_FORGET_COMMITTED) || - state == DTX_STATE_RETRY_COMMIT_PREPARED); -} /* * Notify commited a global transaction, called by user commit * or by CommitTransaction @@ -315,12 +312,12 @@ notifyCommittedDtxTransaction(void) { Assert(Gp_role == GP_ROLE_DISPATCH); Assert(DistributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE); - Assert(currentGxact != NULL); + Assert(isCurrentDtxTwoPhaseActivated()); - if (currentGxact->state == DTX_STATE_PREPARED || - currentGxact->state == DTX_STATE_INSERTED_COMMITTED) + if (MyTmGxactLocal->state == DTX_STATE_PREPARED || + MyTmGxactLocal->state == DTX_STATE_INSERTED_COMMITTED) doNotifyingCommitPrepared(); - else if (currentGxact->state == DTX_STATE_ONE_PHASE_COMMIT) + else if (MyTmGxactLocal->state == DTX_STATE_ONE_PHASE_COMMIT) doNotifyingOnePhaseCommit(); else doNotifyingCommitNotPrepared(); @@ -332,11 +329,11 @@ setupTwoPhaseTransaction(void) if (!IsTransactionState()) elog(ERROR, "DTM transaction is not active"); - if (currentGxact == NULL) - activeCurrentGxact(); + if (!isCurrentDtxTwoPhaseActivated()) + currentDtxActivateTwoPhase(); - if (currentGxact->state != DTX_STATE_ACTIVE_DISTRIBUTED) - elog(ERROR, "DTM transaction state (%s) is invalid", DtxStateToString(currentGxact->state)); + if (MyTmGxactLocal->state != DTX_STATE_ACTIVE_DISTRIBUTED) + elog(ERROR, "DTM transaction state (%s) is invalid", DtxStateToString(MyTmGxactLocal->state)); } @@ -361,9 +358,9 @@ doDispatchSubtransactionInternalCmd(DtxProtocolCommand cmdType) } if (cmdType == DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL && - currentGxact == NULL) + !isCurrentDtxTwoPhaseActivated()) { - activeCurrentGxact(); + currentDtxActivateTwoPhase(); } serializedDtxContextInfo = qdSerializeDtxContextInfo( @@ -391,32 +388,6 @@ doDispatchSubtransactionInternalCmd(DtxProtocolCommand cmdType) return succeeded; } -/* - * The executor can avoid starting a distributed transaction if it knows that - * the current dtx is clean and we aren't in a user-started global transaction. - */ -bool -isCurrentDtxTwoPhase(void) -{ - if (currentGxact == NULL) - { - return false; - } - else - { - return currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED; - } -} - -DtxState -getCurrentDtxState(void) -{ - if (currentGxact != NULL) - return currentGxact->state; - - return DTX_STATE_NONE; -} - static void doPrepareTransaction(void) { @@ -425,7 +396,7 @@ doPrepareTransaction(void) CHECK_FOR_INTERRUPTS(); elog(DTM_DEBUG5, "doPrepareTransaction entering in state = %s", - DtxStateToString(currentGxact->state)); + DtxStateToString(MyTmGxactLocal->state)); /* * Don't allow a cancel while we're dispatching our prepare (we wrap our @@ -433,10 +404,10 @@ doPrepareTransaction(void) */ HOLD_INTERRUPTS(); - Assert(currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED); - setCurrentGxactState(DTX_STATE_PREPARING); + Assert(MyTmGxactLocal->state == DTX_STATE_ACTIVE_DISTRIBUTED); + setCurrentDtxState(DTX_STATE_PREPARING); - elog(DTM_DEBUG5, "doPrepareTransaction moved to state = %s", DtxStateToString(currentGxact->state)); + elog(DTM_DEBUG5, "doPrepareTransaction moved to state = %s", DtxStateToString(MyTmGxactLocal->state)); Assert(currentGxact->twophaseSegments != NIL); succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_PREPARE, /* flags */ 0, @@ -460,12 +431,12 @@ doPrepareTransaction(void) elog(DTM_DEBUG5, "The distributed transaction 'Prepare' broadcast succeeded to the segments for gid = %s.", currentGxact->gid); - Assert(currentGxact->state == DTX_STATE_PREPARING); - setCurrentGxactState(DTX_STATE_PREPARED); + Assert(MyTmGxactLocal->state == DTX_STATE_PREPARING); + setCurrentDtxState(DTX_STATE_PREPARED); SIMPLE_FAULT_INJECTOR("dtm_broadcast_prepare"); - elog(DTM_DEBUG5, "doPrepareTransaction leaving in state = %s", DtxStateToString(currentGxact->state)); + elog(DTM_DEBUG5, "doPrepareTransaction leaving in state = %s", DtxStateToString(MyTmGxactLocal->state)); } /* @@ -476,18 +447,17 @@ doInsertForgetCommitted(void) { TMGXACT_LOG gxact_log; - elog(DTM_DEBUG5, "doInsertForgetCommitted entering in state = %s", DtxStateToString(currentGxact->state)); + elog(DTM_DEBUG5, "doInsertForgetCommitted entering in state = %s", DtxStateToString(MyTmGxactLocal->state)); - setCurrentGxactState(DTX_STATE_INSERTING_FORGET_COMMITTED); + setCurrentDtxState(DTX_STATE_INSERTING_FORGET_COMMITTED); - Assert(strlen(currentGxact->gid) < TMGIDSIZE); - - memcpy(&gxact_log.gid, currentGxact->gid, TMGIDSIZE); - gxact_log.gxid = currentGxact->gxid; + dtxFormGID(gxact_log.gid, MyTmGxact->distribTimeStamp, MyTmGxact->gxid); + gxact_log.gxid = MyTmGxact->gxid; RecordDistributedForgetCommitted(&gxact_log); - setCurrentGxactState(DTX_STATE_INSERTED_FORGET_COMMITTED); + setCurrentDtxState(DTX_STATE_INSERTED_FORGET_COMMITTED); + MyTmGxact->needIncludedInCkpt = false; } void @@ -527,7 +497,6 @@ ClearTransactionState(TransactionId latestXid) ProcArrayEndTransaction(MyProc, latestXid, true); ProcArrayEndGxact(); LWLockRelease(ProcArrayLock); - resetCurrentGxact(); } static void @@ -538,13 +507,9 @@ doNotifyingCommitNotPrepared(void) volatile int savedInterruptHoldoffCount; MemoryContext oldcontext = CurrentMemoryContext;; - if (currentGxact->twophaseSegments == NULL) + if (MyTmGxactLocal->twophaseSegments == NULL) return; - if (strlen(currentGxact->gid) >= TMGIDSIZE) - elog(PANIC, "Distribute transaction identifier too long (%d)", - (int) strlen(currentGxact->gid)); - savedInterruptHoldoffCount = InterruptHoldoffCount; PG_TRY(); @@ -581,20 +546,16 @@ doNotifyingOnePhaseCommit(void) bool badGangs; volatile int savedInterruptHoldoffCount; - Assert(list_length(currentGxact->twophaseSegments) <= 1); + Assert(list_length(MyTmGxactLocal->twophaseSegments) <= 1); - if (strlen(currentGxact->gid) >= TMGIDSIZE) - elog(PANIC, "Distributed transaction identifier too long (%d)", - (int) strlen(currentGxact->gid)); + elog(DTM_DEBUG5, "doNotifyingOnePhaseCommit entering in state = %s", DtxStateToString(MyTmGxactLocal->state)); - elog(DTM_DEBUG5, "doNotifyingOnePhaseCommit entering in state = %s", DtxStateToString(currentGxact->state)); - - Assert(currentGxact->state == DTX_STATE_ONE_PHASE_COMMIT); - setCurrentGxactState(DTX_STATE_PERFORMING_ONE_PHASE_COMMIT); + Assert(MyTmGxactLocal->state == DTX_STATE_ONE_PHASE_COMMIT); + setCurrentDtxState(DTX_STATE_PERFORMING_ONE_PHASE_COMMIT); savedInterruptHoldoffCount = InterruptHoldoffCount; - Assert(currentGxact->twophaseSegments != NIL); + Assert(MyTmGxactLocal->twophaseSegments != NIL); succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_COMMIT_ONEPHASE, /* flags */ 0, currentGxact->gid, currentGxact->gxid, @@ -602,7 +563,7 @@ doNotifyingOnePhaseCommit(void) currentGxact->twophaseSegments, NULL, 0); if (!succeeded) { - Assert(currentGxact->state == DTX_STATE_PERFORMING_ONE_PHASE_COMMIT); + Assert(MyTmGxactLocal->state == DTX_STATE_PERFORMING_ONE_PHASE_COMMIT); elog(ERROR, "one phase commit failed"); } } @@ -616,16 +577,15 @@ doNotifyingCommitPrepared(void) volatile int savedInterruptHoldoffCount; MemoryContext oldcontext = CurrentMemoryContext;; - elog(DTM_DEBUG5, "doNotifyingCommitPrepared entering in state = %s", DtxStateToString(currentGxact->state)); + elog(DTM_DEBUG5, "doNotifyingCommitPrepared entering in state = %s", DtxStateToString(MyTmGxactLocal->state)); - Assert(currentGxact->state == DTX_STATE_INSERTED_COMMITTED); - setCurrentGxactState(DTX_STATE_NOTIFYING_COMMIT_PREPARED); - Assert(strlen(currentGxact->gid) < TMGIDSIZE); + Assert(MyTmGxactLocal->state == DTX_STATE_INSERTED_COMMITTED); + setCurrentDtxState(DTX_STATE_NOTIFYING_COMMIT_PREPARED); SIMPLE_FAULT_INJECTOR("dtm_broadcast_commit_prepared"); savedInterruptHoldoffCount = InterruptHoldoffCount; - Assert(currentGxact->twophaseSegments != NIL); + Assert(MyTmGxactLocal->twophaseSegments != NIL); PG_TRY(); { succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_COMMIT_PREPARED, /* flags */ 0, @@ -647,11 +607,13 @@ doNotifyingCommitPrepared(void) if (!succeeded) { - Assert(currentGxact->state == DTX_STATE_NOTIFYING_COMMIT_PREPARED); - elog(DTM_DEBUG5, "marking retry needed for distributed transaction" - " 'Commit Prepared' broadcast to the segments for gid = %s.", - currentGxact->gid); - setCurrentGxactState(DTX_STATE_RETRY_COMMIT_PREPARED); + Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_COMMIT_PREPARED); + ereport(DTM_DEBUG5, + (errmsg("marking retry needed for distributed transaction " + "'Commit Prepared' broadcast to the segments"), + TM_ERRDETAIL)); + + setCurrentDtxState(DTX_STATE_RETRY_COMMIT_PREPARED); setDistributedTransactionContext(DTX_CONTEXT_QD_RETRY_PHASE_2); } @@ -790,20 +752,20 @@ doNotifyingAbort(void) volatile int savedInterruptHoldoffCount; MemoryContext oldcontext = CurrentMemoryContext; - elog(DTM_DEBUG5, "doNotifyingAborted entering in state = %s", DtxStateToString(currentGxact->state)); + elog(DTM_DEBUG5, "doNotifyingAborted entering in state = %s", DtxStateToString(MyTmGxactLocal->state)); - Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); + Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); - if (currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED) + if (MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED) { /* * In some cases, dtmPreCommand said two phase commit is needed, but some errors * occur before the command is actually dispatched, no need to dispatch DTX for * such cases. */ - if (!currentGxact->writerGangLost && currentGxact->twophaseSegments) + if (!MyTmGxactLocal->writerGangLost && MyTmGxactLocal->twophaseSegments) { succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_ABORT_NO_PREPARED, /* flags */ 0, currentGxact->gid, currentGxact->gxid, @@ -847,10 +809,10 @@ doNotifyingAbort(void) char *abortString; int retry = 0; - Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); + Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); - if (currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED) + if (MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED) { dtxProtocolCommand = DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED; abortString = "Abort [Prepared]"; @@ -888,7 +850,7 @@ doNotifyingAbort(void) " to one or more segments for gid = %s. Retrying ... try %d", abortString, currentGxact->gid, retry); - setCurrentGxactState(DTX_STATE_RETRY_ABORT_PREPARED); + setCurrentDtxState(DTX_STATE_RETRY_ABORT_PREPARED); setDistributedTransactionContext(DTX_CONTEXT_QD_RETRY_PHASE_2); retryAbortPrepared(); } @@ -896,11 +858,10 @@ doNotifyingAbort(void) SIMPLE_FAULT_INJECTOR("dtm_broadcast_abort_prepared"); - Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED || - currentGxact->state == DTX_STATE_RETRY_ABORT_PREPARED); - elog(DTM_DEBUG5, "doNotifyingAbort called resetCurrentGxact"); + Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED || + MyTmGxactLocal->state == DTX_STATE_RETRY_ABORT_PREPARED); } /* @@ -920,11 +881,11 @@ prepareDtxTransaction(void) return; } - if (currentGxact == NULL) + if (!isCurrentDtxTwoPhaseActivated()) { Assert(MyTmGxact->gxid == InvalidDistributedTransactionId); - Assert(MyTmGxact->state == DTX_STATE_NONE); - initGxact(MyTmGxact, false); + Assert(MyTmGxactLocal->state == DTX_STATE_NONE); + resetGxact(); return; } @@ -937,19 +898,18 @@ prepareDtxTransaction(void) * on that one segment. Otherwise, broadcast PREPARE TRANSACTION to the * segments. */ - if (!markXidCommitted && list_length(currentGxact->twophaseSegments) < 2) + if (!markXidCommitted && list_length(MyTmGxactLocal->twophaseSegments) < 2) { - setCurrentGxactState(DTX_STATE_ONE_PHASE_COMMIT); + setCurrentDtxState(DTX_STATE_ONE_PHASE_COMMIT); return; } elog(DTM_DEBUG5, "prepareDtxTransaction called with state = %s", - DtxStateToString(currentGxact->state)); + DtxStateToString(MyTmGxactLocal->state)); - Assert(currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED); - Assert(currentGxact->gxid > FirstDistributedTransactionId); - Assert(strlen(currentGxact->gid) > 0); + Assert(MyTmGxactLocal->state == DTX_STATE_ACTIVE_DISTRIBUTED); + Assert(MyTmGxact->gxid > FirstDistributedTransactionId); doPrepareTransaction(); } @@ -967,25 +927,25 @@ rollbackDtxTransaction(void) DtxContextToString(DistributedTransactionContext)); return; } - if (currentGxact == NULL) + if (!isCurrentDtxTwoPhaseActivated()) { - elog(DTM_DEBUG5, "rollbackDtxTransaction nothing to do (currentGxact == NULL)"); + elog(DTM_DEBUG5, "rollbackDtxTransaction nothing to do (two phase not activate)"); return; } elog(DTM_DEBUG5, "rollbackDtxTransaction called with state = %s, gid = %s", DtxStateToString(currentGxact->state), currentGxact->gid); - switch (currentGxact->state) + switch (MyTmGxactLocal->state) { case DTX_STATE_ACTIVE_DISTRIBUTED: - setCurrentGxactState(DTX_STATE_NOTIFYING_ABORT_NO_PREPARED); + setCurrentDtxState(DTX_STATE_NOTIFYING_ABORT_NO_PREPARED); break; case DTX_STATE_PREPARING: - if (currentGxact->badPrepareGangs) + if (MyTmGxactLocal->badPrepareGangs) { - setCurrentGxactState(DTX_STATE_RETRY_ABORT_PREPARED); + setCurrentDtxState(DTX_STATE_RETRY_ABORT_PREPARED); /* * DisconnectAndDestroyAllGangs and ResetSession happens @@ -995,16 +955,16 @@ rollbackDtxTransaction(void) clearAndResetGxact(); return; } - setCurrentGxactState(DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED); + setCurrentDtxState(DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED); break; case DTX_STATE_PREPARED: - setCurrentGxactState(DTX_STATE_NOTIFYING_ABORT_PREPARED); + setCurrentDtxState(DTX_STATE_NOTIFYING_ABORT_PREPARED); break; case DTX_STATE_ONE_PHASE_COMMIT: case DTX_STATE_PERFORMING_ONE_PHASE_COMMIT: - setCurrentGxactState(DTX_STATE_NOTIFYING_ABORT_NO_PREPARED); + setCurrentDtxState(DTX_STATE_NOTIFYING_ABORT_NO_PREPARED); break; case DTX_STATE_NOTIFYING_ABORT_NO_PREPARED: @@ -1040,20 +1000,20 @@ rollbackDtxTransaction(void) case DTX_STATE_RETRY_COMMIT_PREPARED: case DTX_STATE_RETRY_ABORT_PREPARED: elog(DTM_DEBUG5, "rollbackDtxTransaction dtx state \"%s\" not expected here", - DtxStateToString(currentGxact->state)); + DtxStateToString(MyTmGxactLocal->state)); clearAndResetGxact(); return; default: elog(PANIC, "Unrecognized dtx state: %d", - (int) currentGxact->state); + (int) MyTmGxactLocal->state); break; } - Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); + Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); /* * if the process is in the middle of blowing up... then we don't do @@ -1067,14 +1027,14 @@ rollbackDtxTransaction(void) * Unable to complete distributed abort broadcast with possible * prepared transactions... */ - if (currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || - currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED) + if (MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || + MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_PREPARED) { elog(FATAL, "Unable to complete the 'Abort Prepared' broadcast for gid '%s'", currentGxact->gid); } - Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED); + Assert(MyTmGxactLocal->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED); /* * By deallocating the gang, we will force a new gang to connect to @@ -1198,7 +1158,7 @@ mppTxnOptions(bool needTwoPhase) if (XactReadOnly) options |= GP_OPT_READ_ONLY; - if (currentGxact != NULL && currentGxact->explicitBeginRemembered) + if (isCurrentDtxTwoPhaseActivated() && MyTmGxactLocal->explicitBeginRemembered) options |= GP_OPT_EXPLICT_BEGIN; elog(DTM_DEBUG5, @@ -1421,28 +1381,31 @@ dispatchDtxCommand(const char *cmd) return (numOfFailed == 0); } -/* initialize a global transaction context */ +/* reset global transaction context */ void -initGxact(TMGXACT *gxact, bool resetXid) +resetGxact() { - if (resetXid) - { - MemSet(gxact->gid, 0, TMGIDSIZE); - gxact->gxid = InvalidDistributedTransactionId; - setGxactState(gxact, DTX_STATE_NONE); - } - - /* - * Memory only fields. - */ - gxact->sessionId = gp_session_id; - gxact->explicitBeginRemembered = false; - gxact->xminDistributedSnapshot = InvalidDistributedTransactionId; - gxact->badPrepareGangs = false; - gxact->writerGangLost = false; - gxact->twophaseSegmentsMap = NULL; - gxact->twophaseSegments = NIL; - gxact->isOnePhaseCommit = false; + if (Gp_role == GP_ROLE_DISPATCH && MyTmGxact->gxid != InvalidDistributedTransactionId) + { + /* As QE doesn't take distributed snapshot, we don't have to hold ProcArrayLock to clear gxid */ + Assert(LWLockHeldByMe(ProcArrayLock)); + MyTmGxact->gxid = InvalidDistributedTransactionId; + } + else if (Gp_role == GP_ROLE_EXECUTE) + MyTmGxact->gxid = InvalidDistributedTransactionId; + + MyTmGxact->distribTimeStamp = 0; + MyTmGxact->xminDistributedSnapshot = InvalidDistributedTransactionId; + MyTmGxact->needIncludedInCkpt = false; + MyTmGxact->sessionId = 0; + + MyTmGxactLocal->explicitBeginRemembered = false; + MyTmGxactLocal->badPrepareGangs = false; + MyTmGxactLocal->writerGangLost = false; + MyTmGxactLocal->twophaseSegmentsMap = NULL; + MyTmGxactLocal->twophaseSegments = NIL; + MyTmGxactLocal->isOnePhaseCommit = false; + setCurrentDtxState(DTX_STATE_NONE); } bool @@ -1459,44 +1422,14 @@ getNextDistributedXactStatus(TMGALLXACTSTATUS *allDistributedXactStatus, TMGXACT return true; } -void -activeCurrentGxact(void) -{ - DistributedTransactionId gxid; - currentGxact = MyTmGxact; - - gxid = GetCurrentDistributedTransactionId(); - if (gxid == InvalidDistributedTransactionId) - { - gxid = generateGID(); - SetCurrentDistributedTransactionId(gxid); - Assert(gxid != InvalidDistributedTransactionId); - } - - dtxFormGID(currentGxact->gid, getDtxStartTime(), gxid); - setCurrentGxactState(DTX_STATE_ACTIVE_DISTRIBUTED); - - currentGxact->gxid = gxid; -} - -static void -resetCurrentGxact(void) -{ - Assert (currentGxact != NULL); - Assert (currentGxact->gxid == InvalidDistributedTransactionId); - currentGxact = NULL; -} - static void clearAndResetGxact(void) { - Assert(currentGxact != NULL); + Assert(isCurrentDtxTwoPhaseActivated()); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); ProcArrayEndGxact(); LWLockRelease(ProcArrayLock); - - resetCurrentGxact(); } /* @@ -1508,10 +1441,10 @@ insertingDistributedCommitted(void) { elog(DTM_DEBUG5, "insertingDistributedCommitted entering in state = %s", - DtxStateToString(currentGxact->state)); + DtxStateToString(MyTmGxactLocal->state)); - Assert(currentGxact->state == DTX_STATE_PREPARED); - setCurrentGxactState(DTX_STATE_INSERTING_COMMITTED); + Assert(MyTmGxactLocal->state == DTX_STATE_PREPARED); + setCurrentDtxState(DTX_STATE_INSERTING_COMMITTED); } /* @@ -1524,8 +1457,16 @@ insertedDistributedCommitted(void) "insertedDistributedCommitted entering in state = %s for gid = %s", DtxStateToString(currentGxact->state), currentGxact->gid); - Assert(currentGxact->state == DTX_STATE_INSERTING_COMMITTED); - setCurrentGxactState(DTX_STATE_INSERTED_COMMITTED); + Assert(MyTmGxactLocal->state == DTX_STATE_INSERTING_COMMITTED); + setCurrentDtxState(DTX_STATE_INSERTED_COMMITTED); + + /* + * We don't have to hold ProcArrayLock here because needIncludedInCkpt is used + * during creating checkpoint and we already set delayChkpt before we got here. + */ + Assert(MyPgXact->delayChkpt); + if (IS_QUERY_DISPATCHER()) + MyTmGxact->needIncludedInCkpt = true; } /* generate global transaction id */ @@ -1984,9 +1925,9 @@ finishDistributedTransactionContext(char *debugCaller, bool aborted) * We let the 2 retry states go up to PostgresMain.c, otherwise everything * MUST be complete. */ - if (currentGxact != NULL && - (currentGxact->state != DTX_STATE_RETRY_COMMIT_PREPARED && - currentGxact->state != DTX_STATE_RETRY_ABORT_PREPARED)) + if (isCurrentDtxTwoPhaseActivated() && + (MyTmGxactLocal->state != DTX_STATE_RETRY_COMMIT_PREPARED && + MyTmGxactLocal->state != DTX_STATE_RETRY_ABORT_PREPARED)) { elog(FATAL, "Expected currentGxact to be NULL at this point. Found gid =%s, gxid = %u (state = %s, caller = %s)", currentGxact->gid, currentGxact->gxid, DtxStateToString(currentGxact->state), debugCaller); @@ -2009,25 +1950,27 @@ finishDistributedTransactionContext(char *debugCaller, bool aborted) static void rememberDtxExplicitBegin(void) { - Assert (currentGxact != NULL); + Assert (isCurrentDtxTwoPhaseActivated()); - if (!currentGxact->explicitBeginRemembered) + if (!MyTmGxactLocal->explicitBeginRemembered) { - elog(DTM_DEBUG5, "rememberDtxExplicitBegin explicit BEGIN for gid = %s", - currentGxact->gid); - currentGxact->explicitBeginRemembered = true; + ereport(DTM_DEBUG5, + (errmsg("rememberDtxExplicitBegin explicit BEGIN"), + TM_ERRDETAIL)); + MyTmGxactLocal->explicitBeginRemembered = true; } else { - elog(DTM_DEBUG5, "rememberDtxExplicitBegin already an explicit BEGIN for gid = %s", - currentGxact->gid); + ereport(DTM_DEBUG5, + (errmsg("rememberDtxExplicitBegin already an explicit BEGIN"), + TM_ERRDETAIL)); } } bool isDtxExplicitBegin(void) { - return (currentGxact && currentGxact->explicitBeginRemembered); + return (isCurrentDtxTwoPhaseActivated() && MyTmGxactLocal->explicitBeginRemembered); } /* @@ -2081,8 +2024,8 @@ performDtxProtocolCommitOnePhase(const char *gid) "performDtxProtocolCommitOnePhase going to call CommitTransaction for distributed transaction %s", gid); /* MyTmGxact is now not used on QE for one-phase commit */ - memcpy(MyTmGxact->gid, gid, TMGIDSIZE); - MyTmGxact->isOnePhaseCommit = true; + dtxCrackOpenGid(gid, &MyTmGxact->distribTimeStamp, &MyTmGxact->gxid); + MyTmGxactLocal->isOnePhaseCommit = true; StartTransactionCommand(); @@ -2381,13 +2324,13 @@ performDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, void markCurrentGxactWriterGangLost(void) { - MyTmGxact->writerGangLost = true; + MyTmGxactLocal->writerGangLost = true; } bool currentGxactWriterGangLost(void) { - return MyTmGxact->writerGangLost; + return MyTmGxactLocal->writerGangLost; } /* @@ -2401,13 +2344,11 @@ addToGxactTwophaseSegments(Gang *gang) int segindex; int i; - if (!currentGxact) - return; - - if (list_length(currentGxact->twophaseSegments) >= getgpsegmentCount()) + if (!isCurrentDtxTwoPhaseActivated()) return; - if (currentGxact->state != DTX_STATE_ACTIVE_DISTRIBUTED) + /* skip if all segdbs are in the list */ + if (list_length(MyTmGxactLocal->twophaseSegments) >= getgpsegmentCount()) return; oldContext = MemoryContextSwitchTo(TopTransactionContext); @@ -2422,14 +2363,14 @@ addToGxactTwophaseSegments(Gang *gang) continue; /* skip if record already */ - if (bms_is_member(segindex, currentGxact->twophaseSegmentsMap)) + if (bms_is_member(segindex, MyTmGxactLocal->twophaseSegmentsMap)) continue; - currentGxact->twophaseSegmentsMap = - bms_add_member(currentGxact->twophaseSegmentsMap, segindex); + MyTmGxactLocal->twophaseSegmentsMap = + bms_add_member(MyTmGxactLocal->twophaseSegmentsMap, segindex); - currentGxact->twophaseSegments = - lappend_int(currentGxact->twophaseSegments, segindex); + MyTmGxactLocal->twophaseSegments = + lappend_int(MyTmGxactLocal->twophaseSegments, segindex); } MemoryContextSwitchTo(oldContext); } diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c index 27f08eefb3a43bc205af6fa42142d4b58ed96267..364674253f47bcc869c3ec38ef39d4771290e91b 100644 --- a/src/backend/executor/nodeSubplan.c +++ b/src/backend/executor/nodeSubplan.c @@ -1042,7 +1042,7 @@ PG_TRY(); { if (shouldDispatch) { - needDtxTwoPhase = isCurrentDtxTwoPhase(); + needDtxTwoPhase = isCurrentDtxTwoPhaseActivated(); /* * This call returns after launching the threads that send the diff --git a/src/backend/executor/test/nodeSubplan_test.c b/src/backend/executor/test/nodeSubplan_test.c index 15bf20c23cc9fead046ae88cecbfe7d19c1c3457..41bfcee53a277031e4ccd5ad0093b6d107b84689 100644 --- a/src/backend/executor/test/nodeSubplan_test.c +++ b/src/backend/executor/test/nodeSubplan_test.c @@ -90,7 +90,7 @@ test__ExecSetParamPlan__Check_Dispatch_Results(void **state) Gp_role = GP_ROLE_DISPATCH; ((SubPlan*)(plan->xprstate.expr))->initPlanParallel = true; - will_be_called(isCurrentDtxTwoPhase); + will_be_called(isCurrentDtxTwoPhaseActivated); expect_any(CdbDispatchPlan,queryDesc); expect_any(CdbDispatchPlan,planRequiresTxn); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 50c9748dcd1fddc7825477fc556eb287f21a88ae..a2d95cf8660c0fbc74084f8ac95e05c1fe5ac23d 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -405,7 +405,7 @@ ProcArrayEndGxact(void) if (InvalidDistributedTransactionId != gxid && TransactionIdPrecedes(ShmemVariableCache->latestCompletedDxid, gxid)) ShmemVariableCache->latestCompletedDxid = gxid; - initGxact(MyTmGxact, true); + resetGxact(); } /* @@ -490,8 +490,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid, bool lockHeld) } /* Clear distributed transaction status for one-phase commit transaction */ - if (Gp_role == GP_ROLE_EXECUTE && MyTmGxact->isOnePhaseCommit) - initGxact(MyTmGxact, false); + if (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit) + resetGxact(); } @@ -1746,9 +1746,8 @@ getAllDistributedXactStatus(TMGALLXACTSTATUS **allDistributedXactStatus) TMGXACT *gxact = &allTmGxact[arrayP->pgprocnos[i]]; all->statusArray[i].gxid = gxact->gxid; - Assert(strlen(gxact->gid) < TMGIDSIZE); - memcpy(all->statusArray[i].gid, gxact->gid, TMGIDSIZE); - all->statusArray[i].state = gxact->state; + dtxFormGID(all->statusArray[i].gid, gxact->distribTimeStamp, gxact->gxid); + all->statusArray[i].state = 0; /* deprecate this field */ all->statusArray[i].sessionId = gxact->sessionId; all->statusArray[i].xminDistributedSnapshot = gxact->xminDistributedSnapshot; } @@ -1802,9 +1801,8 @@ getDtxCheckPointInfo(char **result, int *result_size) /* * If a transaction inserted 'commit' record logically before the checkpoint - * REDO pointer, and it hasn't inserted the 'forget' record. we will see its - * 'TMGXACT->state' is between 'DTX_STATE_INSERTED_COMMITTED' and - * 'DTX_STATE_INSERTING_FORGET_COMMITTED'. such transactions should be included + * REDO pointer, and it hasn't inserted the 'forget' record. we will see + * needIncludedInCkpt is true. such transactions should be included * in the checkpoint record so that the second phase of 2PC can be executed * during crash recovery. * @@ -1819,35 +1817,17 @@ getDtxCheckPointInfo(char **result, int *result_size) for (i = 0; i < arrayP->numProcs; i++) { TMGXACT_LOG *gxact_log; - - /* - * Note no 'volatile' is used to describe 'gxact'. We will check - * gxact->state first before memcpy gxact->gid. And the allowed state - * are: - * DTX_STATE_INSERTED_COMMITTED, - * DTX_STATE_FORCED_COMMITTED, - * DTX_STATE_NOTIFYING_COMMIT_PREPARED, - * DTX_STATE_INSERTING_FORGET_COMMITTED, - * DTX_STATE_RETRY_COMMIT_PREPARED. - * - * So this will not contend with setCurrentGxact, as it sets - * gxact->state to DTX_STATE_ACTIVE_NOT_DISTRIBUTED after settling down - * gxact->gid. - */ TMGXACT *gxact = &allTmGxact[arrayP->pgprocnos[i]]; - if (!includeInCheckpointIsNeeded(gxact)) + if (!gxact->needIncludedInCkpt) continue; gxact_log = &gxact_log_array[actual]; - if (strlen(gxact->gid) >= TMGIDSIZE) - elog(PANIC, "Distribute transaction identifier too long (%d)", - (int) strlen(gxact->gid)); - memcpy(gxact_log->gid, gxact->gid, TMGIDSIZE); + dtxFormGID(gxact_log->gid, gxact->distribTimeStamp, gxact->gxid); gxact_log->gxid = gxact->gxid; elog((Debug_print_full_dtm ? LOG : DEBUG5), - "Add DTM checkpoint entry gid = %s.", gxact->gid); + "Add DTM checkpoint entry gid = %s.", gxact_log->gid); actual++; } @@ -1931,8 +1911,6 @@ CreateDistributedSnapshot(DistributedSnapshot *ds) if (gxid == InvalidDistributedTransactionId) continue; - Assert(gxact_candidate->state != DTX_STATE_NONE); - /* * Include the current distributed transaction in the min/max * calculation. diff --git a/src/backend/storage/ipc/test/procarray_test.c b/src/backend/storage/ipc/test/procarray_test.c index d23423a9a62a2f94bfdde9dcb704b7f4c99d9880..86146138ff85e13baf972ccec22e3d49f4975aa6 100644 --- a/src/backend/storage/ipc/test/procarray_test.c +++ b/src/backend/storage/ipc/test/procarray_test.c @@ -56,7 +56,6 @@ test__CreateDistributedSnapshot(void **state) /* This is going to act as our gxact */ allTmGxact[procArray->pgprocnos[0]].gxid = 20; - allTmGxact[procArray->pgprocnos[0]].state = DTX_STATE_ACTIVE_DISTRIBUTED; allTmGxact[procArray->pgprocnos[0]].xminDistributedSnapshot = InvalidDistributedTransactionId; procArray->numProcs = 1; @@ -85,11 +84,9 @@ test__CreateDistributedSnapshot(void **state) allTmGxact[procArray->pgprocnos[0]].xminDistributedSnapshot = InvalidDistributedTransactionId; allTmGxact[procArray->pgprocnos[1]].gxid = 10; - allTmGxact[procArray->pgprocnos[1]].state = DTX_STATE_ACTIVE_DISTRIBUTED; allTmGxact[procArray->pgprocnos[1]].xminDistributedSnapshot = 5; allTmGxact[procArray->pgprocnos[2]].gxid = 30; - allTmGxact[procArray->pgprocnos[2]].state = DTX_STATE_ACTIVE_DISTRIBUTED; allTmGxact[procArray->pgprocnos[2]].xminDistributedSnapshot = 20; procArray->numProcs = 3; @@ -113,11 +110,9 @@ test__CreateDistributedSnapshot(void **state) allTmGxact[procArray->pgprocnos[0]].xminDistributedSnapshot = InvalidDistributedTransactionId; allTmGxact[procArray->pgprocnos[3]].gxid = 15; - allTmGxact[procArray->pgprocnos[3]].state = DTX_STATE_ACTIVE_DISTRIBUTED; allTmGxact[procArray->pgprocnos[3]].xminDistributedSnapshot = 12; allTmGxact[procArray->pgprocnos[4]].gxid = 7; - allTmGxact[procArray->pgprocnos[4]].state = DTX_STATE_ACTIVE_DISTRIBUTED; allTmGxact[procArray->pgprocnos[4]].xminDistributedSnapshot = 7; procArray->numProcs = 5; diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 1224217bc224fe534470ea061e3c1551cf07d855..905d44e7deaaead0d853cfe9fb6320280130144a 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -82,6 +82,7 @@ bool log_lock_waits = false; PGPROC *MyProc = NULL; PGXACT *MyPgXact = NULL; TMGXACT *MyTmGxact = NULL; +TMGXACTLOCAL *MyTmGxactLocal = NULL; /* Special for MPP reader gangs */ PGPROC *lockHolderProcPtr; @@ -378,6 +379,9 @@ InitProcess(void) } MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno]; MyTmGxact = &ProcGlobal->allTmGxact[MyProc->pgprocno]; + MyTmGxactLocal = (TMGXACTLOCAL*)MemoryContextAlloc(TopMemoryContext, sizeof(TMGXACTLOCAL)); + if (MyTmGxactLocal == NULL) + elog(FATAL, "allocating TMGXACTLOCAL failed"); if (gp_debug_pgproc) { @@ -501,7 +505,7 @@ InitProcess(void) MyProc->queryCommandId = -1; /* Init gxact */ - initGxact(MyTmGxact, true); + resetGxact(); /* * Arrange to clean up at backend exit. @@ -608,6 +612,9 @@ InitAuxiliaryProcess(void) lockHolderProcPtr = auxproc; MyPgXact = &ProcGlobal->allPgXact[auxproc->pgprocno]; MyTmGxact = &ProcGlobal->allTmGxact[auxproc->pgprocno]; + MyTmGxactLocal = (TMGXACTLOCAL*)MemoryContextAlloc(TopMemoryContext, sizeof(TMGXACTLOCAL)); + if (MyTmGxactLocal == NULL) + elog(FATAL, "allocating TMGXACTLOCAL failed"); SpinLockRelease(ProcStructLock); diff --git a/src/include/cdb/cdbtm.h b/src/include/cdb/cdbtm.h index 5a56712069efe79036e08813904e93c70eaea010..df75d2f4e39588aa588027f8378e0cd316e3c39e 100644 --- a/src/include/cdb/cdbtm.h +++ b/src/include/cdb/cdbtm.h @@ -131,7 +131,7 @@ typedef enum * transaction. Whether or not the transaction has been started on a QE * is not a part of the QD state -- that is tracked by assigning one of the * DTX_CONTEXT_QE* values on the QE process, and by updating the state field of the - * currentGxact. + * MyTmGxactLocal. */ DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE, @@ -202,13 +202,7 @@ typedef struct TMGXACT_UTILITY_MODE_REDO typedef struct TMGXACT { - /* - * These fields will be recorded in the log. They are the same - * as those in the TMGXACT_LOG struct. We will be copying the - * fields individually, so they dont have to match the same order, - * but it a good idea. - */ - char gid[TMGIDSIZE]; + DistributedTransactionTimeStamp distribTimeStamp; /* * Like PGPROC->xid to local transaction, gxid is set if distributed @@ -218,30 +212,34 @@ typedef struct TMGXACT DistributedTransactionId gxid; /* - * Memory only fields. + * This is similar to xmin of PROC, stores lowest dxid on first snapshot + * by process with this as MyTmGxact. */ - DtxState state; + DistributedTransactionId xminDistributedSnapshot; + bool needIncludedInCkpt; int sessionId; +} TMGXACT; + +typedef struct TMGXACTLOCAL +{ + /* + * Memory only fields. + */ + DtxState state; bool explicitBeginRemembered; /* Used on QE, indicates the transaction applies one-phase commit protocol */ bool isOnePhaseCommit; - /* - * This is similar to xmin of PROC, stores lowest dxid on first snapshot - * by process with this as currentGXact. - */ - DistributedTransactionId xminDistributedSnapshot; - bool badPrepareGangs; bool writerGangLost; Bitmapset *twophaseSegmentsMap; List *twophaseSegments; -} TMGXACT; +} TMGXACTLOCAL; typedef struct TMGXACTSTATUS { @@ -306,16 +304,13 @@ extern void dtxFormGID(char *gid, extern DistributedTransactionId getDistributedTransactionId(void); extern bool getDistributedTransactionIdentifier(char *id); -extern void initGxact(TMGXACT *gxact, bool resetXid); -extern void activeCurrentGxact(void); +extern void resetGxact(void); extern void prepareDtxTransaction(void); extern bool isPreparedDtxTransaction(void); -extern void getDtxLogInfo(TMGXACT_LOG *gxact_log); extern bool notifyCommittedDtxTransactionIsNeeded(void); extern void notifyCommittedDtxTransaction(void); extern void rollbackDtxTransaction(void); -extern bool includeInCheckpointIsNeeded(TMGXACT *gxact); extern void insertingDistributedCommitted(void); extern void insertedDistributedCommitted(void); @@ -326,6 +321,7 @@ extern void redoDistributedForgetCommitRecord(TMGXACT_LOG *gxact_log); extern void setupTwoPhaseTransaction(void); extern bool isCurrentDtxTwoPhase(void); extern DtxState getCurrentDtxState(void); +extern bool isCurrentDtxTwoPhaseActivated(void); extern void sendDtxExplicitBegin(void); extern bool isDtxExplicitBegin(void); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 41f87d4902fa6eb42ac4df348b593a4d3eade49d..ef941336c3248563e88d67b1c26ed7f22e7a2f07 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -101,7 +101,7 @@ struct PGPROC /* * Distributed transaction information. This is only maintained on QE's * and accessed by the backend itself, so this doesn't need to be - * protected by any lock. On QD currentGXact provides this info, hence + * protected by any lock. On QD MyTmGxact provides this info, hence * redundant info is not maintained here for QD. In fact, it could be just * a global variable in backend-private memory, but it seems useful to * have this information available for debugging purposes. @@ -207,6 +207,7 @@ struct PGPROC extern PGDLLIMPORT PGPROC *MyProc; extern PGDLLIMPORT struct PGXACT *MyPgXact; extern PGDLLIMPORT struct TMGXACT *MyTmGxact; +extern PGDLLIMPORT struct TMGXACTLOCAL *MyTmGxactLocal; /* Special for MPP reader gangs */ extern PGDLLIMPORT PGPROC *lockHolderProcPtr;