提交 c0b60c3f 编写于 作者: G Gang Xiong 提交者: xiong-gang

Refactor TMGXACT

- some members of 'MyTmGxact' is only accessed locally, extract them to local
  variable 'MyTmGxactLocal'
- get rid of gid in MyTmGxact and form it with timestamp and gxid if needed.
- get rid of 'currentGxact' and check 'MyTmGxactLocal->state' to see if the
  distributed transaction is started or not.
上级 cae7abe9
......@@ -1421,8 +1421,8 @@ RecordTransactionCommit(void)
bool RelcacheInitFileInval = false;
bool wrote_xlog;
bool isDtxPrepared = 0;
bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxact->isOnePhaseCommit);
TMGXACT_LOG gxact_log;
bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit);
XLogRecPtr recptr = InvalidXLogRecPtr;
/* Like in CommitTransaction(), treat a QE reader as if there was no XID */
if (DistributedTransactionContext == DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ||
......@@ -1517,7 +1517,7 @@ RecordTransactionCommit(void)
* checkpoint process fails to record this transaction in the
* checkpoint. Crash recovery will never see the commit record for
* this transaction and the second phase of 2PC will never happen. The
* inCommit flag avoids this situation by blocking checkpointer until a
* delayChkpt flag avoids this situation by blocking checkpointer until a
* backend has finished updating the state.
*/
START_CRIT_SECTION();
......@@ -1529,7 +1529,8 @@ RecordTransactionCommit(void)
if (isDtxPrepared)
{
getDtxLogInfo(&gxact_log);
char gid[TMGIDSIZE];
dtxFormGID(gid, MyTmGxact->distribTimeStamp, MyTmGxact->gxid);
insertingDistributedCommitted();
XactLogCommitRecord(xactStopTimestamp,
......@@ -1539,7 +1540,7 @@ RecordTransactionCommit(void)
ndeldbs, deldbs,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */,
gxact_log.gid /* distributed commit */);
gid /* distributed commit */);
insertedDistributedCommitted();
}
else
......@@ -1637,7 +1638,7 @@ RecordTransactionCommit(void)
DistributedTransactionId distribXid;
dtxCrackOpenGid(MyTmGxact->gid, &distribTimeStamp, &distribXid);
DistributedLog_SetCommittedTree(xid, nchildren, children,
distribTimeStamp, distribXid,
MyTmGxact->distribTimeStamp, MyTmGxact->gxid,
/* isRedo */ false);
}
......@@ -2360,6 +2361,8 @@ StartTransaction(void)
case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE:
{
/*
* FIXME: get rid of currentDistribXid and use MyTmGxact->gxid
*
* Generate the distributed transaction ID and save it.
* it's not really needed by a select-only implicit transaction, but
* currently gpfdist and pxf is using it.
......
......@@ -102,7 +102,7 @@ DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo,
memcpy(gid, "<empty>", 8);
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"DtxContextInfo_CreateOnMaster Gp_role is DISPATCH and have currentGxact = %s, gxid = %u --> have distributed snapshot",
"DtxContextInfo_CreateOnMaster Gp_role is DISPATCH and have gid = %s, gxid = %u --> have distributed snapshot",
gid,
getDistributedTransactionId());
elog((Debug_print_full_dtm ? LOG : DEBUG5),
......
此差异已折叠。
......@@ -1042,7 +1042,7 @@ PG_TRY();
{
if (shouldDispatch)
{
needDtxTwoPhase = isCurrentDtxTwoPhase();
needDtxTwoPhase = isCurrentDtxTwoPhaseActivated();
/*
* This call returns after launching the threads that send the
......
......@@ -90,7 +90,7 @@ test__ExecSetParamPlan__Check_Dispatch_Results(void **state)
Gp_role = GP_ROLE_DISPATCH;
((SubPlan*)(plan->xprstate.expr))->initPlanParallel = true;
will_be_called(isCurrentDtxTwoPhase);
will_be_called(isCurrentDtxTwoPhaseActivated);
expect_any(CdbDispatchPlan,queryDesc);
expect_any(CdbDispatchPlan,planRequiresTxn);
......
......@@ -405,7 +405,7 @@ ProcArrayEndGxact(void)
if (InvalidDistributedTransactionId != gxid &&
TransactionIdPrecedes(ShmemVariableCache->latestCompletedDxid, gxid))
ShmemVariableCache->latestCompletedDxid = gxid;
initGxact(MyTmGxact, true);
resetGxact();
}
/*
......@@ -490,8 +490,8 @@ ProcArrayEndTransaction(PGPROC *proc, TransactionId latestXid, bool lockHeld)
}
/* Clear distributed transaction status for one-phase commit transaction */
if (Gp_role == GP_ROLE_EXECUTE && MyTmGxact->isOnePhaseCommit)
initGxact(MyTmGxact, false);
if (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit)
resetGxact();
}
......@@ -1746,9 +1746,8 @@ getAllDistributedXactStatus(TMGALLXACTSTATUS **allDistributedXactStatus)
TMGXACT *gxact = &allTmGxact[arrayP->pgprocnos[i]];
all->statusArray[i].gxid = gxact->gxid;
Assert(strlen(gxact->gid) < TMGIDSIZE);
memcpy(all->statusArray[i].gid, gxact->gid, TMGIDSIZE);
all->statusArray[i].state = gxact->state;
dtxFormGID(all->statusArray[i].gid, gxact->distribTimeStamp, gxact->gxid);
all->statusArray[i].state = 0; /* deprecate this field */
all->statusArray[i].sessionId = gxact->sessionId;
all->statusArray[i].xminDistributedSnapshot = gxact->xminDistributedSnapshot;
}
......@@ -1802,9 +1801,8 @@ getDtxCheckPointInfo(char **result, int *result_size)
/*
* If a transaction inserted 'commit' record logically before the checkpoint
* REDO pointer, and it hasn't inserted the 'forget' record. we will see its
* 'TMGXACT->state' is between 'DTX_STATE_INSERTED_COMMITTED' and
* 'DTX_STATE_INSERTING_FORGET_COMMITTED'. such transactions should be included
* REDO pointer, and it hasn't inserted the 'forget' record. we will see
* needIncludedInCkpt is true. such transactions should be included
* in the checkpoint record so that the second phase of 2PC can be executed
* during crash recovery.
*
......@@ -1819,35 +1817,17 @@ getDtxCheckPointInfo(char **result, int *result_size)
for (i = 0; i < arrayP->numProcs; i++)
{
TMGXACT_LOG *gxact_log;
/*
* Note no 'volatile' is used to describe 'gxact'. We will check
* gxact->state first before memcpy gxact->gid. And the allowed state
* are:
* DTX_STATE_INSERTED_COMMITTED,
* DTX_STATE_FORCED_COMMITTED,
* DTX_STATE_NOTIFYING_COMMIT_PREPARED,
* DTX_STATE_INSERTING_FORGET_COMMITTED,
* DTX_STATE_RETRY_COMMIT_PREPARED.
*
* So this will not contend with setCurrentGxact, as it sets
* gxact->state to DTX_STATE_ACTIVE_NOT_DISTRIBUTED after settling down
* gxact->gid.
*/
TMGXACT *gxact = &allTmGxact[arrayP->pgprocnos[i]];
if (!includeInCheckpointIsNeeded(gxact))
if (!gxact->needIncludedInCkpt)
continue;
gxact_log = &gxact_log_array[actual];
if (strlen(gxact->gid) >= TMGIDSIZE)
elog(PANIC, "Distribute transaction identifier too long (%d)",
(int) strlen(gxact->gid));
memcpy(gxact_log->gid, gxact->gid, TMGIDSIZE);
dtxFormGID(gxact_log->gid, gxact->distribTimeStamp, gxact->gxid);
gxact_log->gxid = gxact->gxid;
elog((Debug_print_full_dtm ? LOG : DEBUG5),
"Add DTM checkpoint entry gid = %s.", gxact->gid);
"Add DTM checkpoint entry gid = %s.", gxact_log->gid);
actual++;
}
......@@ -1931,8 +1911,6 @@ CreateDistributedSnapshot(DistributedSnapshot *ds)
if (gxid == InvalidDistributedTransactionId)
continue;
Assert(gxact_candidate->state != DTX_STATE_NONE);
/*
* Include the current distributed transaction in the min/max
* calculation.
......
......@@ -56,7 +56,6 @@ test__CreateDistributedSnapshot(void **state)
/* This is going to act as our gxact */
allTmGxact[procArray->pgprocnos[0]].gxid = 20;
allTmGxact[procArray->pgprocnos[0]].state = DTX_STATE_ACTIVE_DISTRIBUTED;
allTmGxact[procArray->pgprocnos[0]].xminDistributedSnapshot = InvalidDistributedTransactionId;
procArray->numProcs = 1;
......@@ -85,11 +84,9 @@ test__CreateDistributedSnapshot(void **state)
allTmGxact[procArray->pgprocnos[0]].xminDistributedSnapshot = InvalidDistributedTransactionId;
allTmGxact[procArray->pgprocnos[1]].gxid = 10;
allTmGxact[procArray->pgprocnos[1]].state = DTX_STATE_ACTIVE_DISTRIBUTED;
allTmGxact[procArray->pgprocnos[1]].xminDistributedSnapshot = 5;
allTmGxact[procArray->pgprocnos[2]].gxid = 30;
allTmGxact[procArray->pgprocnos[2]].state = DTX_STATE_ACTIVE_DISTRIBUTED;
allTmGxact[procArray->pgprocnos[2]].xminDistributedSnapshot = 20;
procArray->numProcs = 3;
......@@ -113,11 +110,9 @@ test__CreateDistributedSnapshot(void **state)
allTmGxact[procArray->pgprocnos[0]].xminDistributedSnapshot = InvalidDistributedTransactionId;
allTmGxact[procArray->pgprocnos[3]].gxid = 15;
allTmGxact[procArray->pgprocnos[3]].state = DTX_STATE_ACTIVE_DISTRIBUTED;
allTmGxact[procArray->pgprocnos[3]].xminDistributedSnapshot = 12;
allTmGxact[procArray->pgprocnos[4]].gxid = 7;
allTmGxact[procArray->pgprocnos[4]].state = DTX_STATE_ACTIVE_DISTRIBUTED;
allTmGxact[procArray->pgprocnos[4]].xminDistributedSnapshot = 7;
procArray->numProcs = 5;
......
......@@ -82,6 +82,7 @@ bool log_lock_waits = false;
PGPROC *MyProc = NULL;
PGXACT *MyPgXact = NULL;
TMGXACT *MyTmGxact = NULL;
TMGXACTLOCAL *MyTmGxactLocal = NULL;
/* Special for MPP reader gangs */
PGPROC *lockHolderProcPtr;
......@@ -378,6 +379,9 @@ InitProcess(void)
}
MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];
MyTmGxact = &ProcGlobal->allTmGxact[MyProc->pgprocno];
MyTmGxactLocal = (TMGXACTLOCAL*)MemoryContextAlloc(TopMemoryContext, sizeof(TMGXACTLOCAL));
if (MyTmGxactLocal == NULL)
elog(FATAL, "allocating TMGXACTLOCAL failed");
if (gp_debug_pgproc)
{
......@@ -501,7 +505,7 @@ InitProcess(void)
MyProc->queryCommandId = -1;
/* Init gxact */
initGxact(MyTmGxact, true);
resetGxact();
/*
* Arrange to clean up at backend exit.
......@@ -608,6 +612,9 @@ InitAuxiliaryProcess(void)
lockHolderProcPtr = auxproc;
MyPgXact = &ProcGlobal->allPgXact[auxproc->pgprocno];
MyTmGxact = &ProcGlobal->allTmGxact[auxproc->pgprocno];
MyTmGxactLocal = (TMGXACTLOCAL*)MemoryContextAlloc(TopMemoryContext, sizeof(TMGXACTLOCAL));
if (MyTmGxactLocal == NULL)
elog(FATAL, "allocating TMGXACTLOCAL failed");
SpinLockRelease(ProcStructLock);
......
......@@ -131,7 +131,7 @@ typedef enum
* transaction. Whether or not the transaction has been started on a QE
* is not a part of the QD state -- that is tracked by assigning one of the
* DTX_CONTEXT_QE* values on the QE process, and by updating the state field of the
* currentGxact.
* MyTmGxactLocal.
*/
DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE,
......@@ -202,13 +202,7 @@ typedef struct TMGXACT_UTILITY_MODE_REDO
typedef struct TMGXACT
{
/*
* These fields will be recorded in the log. They are the same
* as those in the TMGXACT_LOG struct. We will be copying the
* fields individually, so they dont have to match the same order,
* but it a good idea.
*/
char gid[TMGIDSIZE];
DistributedTransactionTimeStamp distribTimeStamp;
/*
* Like PGPROC->xid to local transaction, gxid is set if distributed
......@@ -218,30 +212,34 @@ typedef struct TMGXACT
DistributedTransactionId gxid;
/*
* Memory only fields.
* This is similar to xmin of PROC, stores lowest dxid on first snapshot
* by process with this as MyTmGxact.
*/
DtxState state;
DistributedTransactionId xminDistributedSnapshot;
bool needIncludedInCkpt;
int sessionId;
} TMGXACT;
typedef struct TMGXACTLOCAL
{
/*
* Memory only fields.
*/
DtxState state;
bool explicitBeginRemembered;
/* Used on QE, indicates the transaction applies one-phase commit protocol */
bool isOnePhaseCommit;
/*
* This is similar to xmin of PROC, stores lowest dxid on first snapshot
* by process with this as currentGXact.
*/
DistributedTransactionId xminDistributedSnapshot;
bool badPrepareGangs;
bool writerGangLost;
Bitmapset *twophaseSegmentsMap;
List *twophaseSegments;
} TMGXACT;
} TMGXACTLOCAL;
typedef struct TMGXACTSTATUS
{
......@@ -306,16 +304,13 @@ extern void dtxFormGID(char *gid,
extern DistributedTransactionId getDistributedTransactionId(void);
extern bool getDistributedTransactionIdentifier(char *id);
extern void initGxact(TMGXACT *gxact, bool resetXid);
extern void activeCurrentGxact(void);
extern void resetGxact(void);
extern void prepareDtxTransaction(void);
extern bool isPreparedDtxTransaction(void);
extern void getDtxLogInfo(TMGXACT_LOG *gxact_log);
extern bool notifyCommittedDtxTransactionIsNeeded(void);
extern void notifyCommittedDtxTransaction(void);
extern void rollbackDtxTransaction(void);
extern bool includeInCheckpointIsNeeded(TMGXACT *gxact);
extern void insertingDistributedCommitted(void);
extern void insertedDistributedCommitted(void);
......@@ -326,6 +321,7 @@ extern void redoDistributedForgetCommitRecord(TMGXACT_LOG *gxact_log);
extern void setupTwoPhaseTransaction(void);
extern bool isCurrentDtxTwoPhase(void);
extern DtxState getCurrentDtxState(void);
extern bool isCurrentDtxTwoPhaseActivated(void);
extern void sendDtxExplicitBegin(void);
extern bool isDtxExplicitBegin(void);
......
......@@ -101,7 +101,7 @@ struct PGPROC
/*
* Distributed transaction information. This is only maintained on QE's
* and accessed by the backend itself, so this doesn't need to be
* protected by any lock. On QD currentGXact provides this info, hence
* protected by any lock. On QD MyTmGxact provides this info, hence
* redundant info is not maintained here for QD. In fact, it could be just
* a global variable in backend-private memory, but it seems useful to
* have this information available for debugging purposes.
......@@ -207,6 +207,7 @@ struct PGPROC
extern PGDLLIMPORT PGPROC *MyProc;
extern PGDLLIMPORT struct PGXACT *MyPgXact;
extern PGDLLIMPORT struct TMGXACT *MyTmGxact;
extern PGDLLIMPORT struct TMGXACTLOCAL *MyTmGxactLocal;
/* Special for MPP reader gangs */
extern PGDLLIMPORT PGPROC *lockHolderProcPtr;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册