/*------------------------------------------------------------------------- * * cdbtm.c * Provides routines for performing distributed transaction * * Copyright (c) 2005-2009, Greenplum inc * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include #include #include #include "cdb/cdbtm.h" #include "libpq/libpq-be.h" #include "miscadmin.h" #include "storage/shmem.h" #include "storage/ipc.h" #include "cdb/cdbdisp.h" #include "cdb/cdbdtxcontextinfo.h" #include "cdb/cdbvars.h" #include "gp-libpq-fe.h" #include "access/transam.h" #include "access/xact.h" #include "cdb/cdbfts.h" #include "lib/stringinfo.h" #include "access/twophase.h" #include "access/distributedlog.h" #include "postmaster/postmaster.h" #include "cdb/cdbpersistentrecovery.h" #include "cdb/cdbpersistentcheck.h" #include "cdb/cdbllize.h" #include "utils/faultinjector.h" #include "utils/fmgroids.h" #include "utils/memutils.h" extern bool Test_print_direct_dispatch_info; extern struct Port *MyProcPort; #define DTM_DEBUG3 (Debug_print_full_dtm ? LOG : DEBUG3) #define DTM_DEBUG5 (Debug_print_full_dtm ? LOG : DEBUG5) /* * Directory where Utility Mode DTM REDO file reside within PGDATA */ #define UTILITYMODEDTMREDO_DIR "pg_utilitymodedtmredo" /* * File name for Utility Mode DTM REDO */ #define UTILITYMODEDTMREDO_FILE "savedtmredo.file" static LWLockId shmControlLock; static volatile bool *shmTmRecoverred; static volatile DistributedTransactionTimeStamp *shmDistribTimeStamp; static volatile DistributedTransactionId *shmGIDSeq; static volatile int *shmNumGxacts; static int *shmCurrentPhase1Count; static int ControlLockCount = 0; volatile int *shmSegmentCount; volatile int *shmSegmentStatesByteLen; DistributedTransactionId *shmXminAllDistributedSnapshots; uint32 *shmNextSnapshotId; volatile bool *shmDtmStarted; volatile bool *shmDtmRecoveryDeferred; /* when starting in readonly mode */ /* global transaction array */ static TMGXACT **shmGxactArray; /** * This pointer into shared memory is on the QD, and represents the current open transaction. */ static TMGXACT *currentGxact; static bool duringRecovery = false; static int max_tm_gxacts = 100; static int redoFileFD = -1; static int redoFileOffset; typedef struct InDoubtDtx { char gid[TMGIDSIZE]; } InDoubtDtx; /* here are some flag options relationed to the txnOptions field of * PQsendGpQuery */ /* bit 1 is for statement wants DTX transaction * * bits 2-3 for iso level 00 read-committed * 01 read-uncommitted * 10 repeatable-read * 11 serializable * bit 4 is for read-only */ #define GP_OPT_NEED_TWO_PHASE 0x0001 #define GP_OPT_READ_COMMITTED 0x0002 #define GP_OPT_READ_UNCOMMITTED 0x0004 #define GP_OPT_REPEATABLE_READ 0x0006 #define GP_OPT_SERIALIZABLE 0x0008 #define GP_OPT_READ_ONLY 0x0010 #define GP_OPT_EXPLICT_BEGIN 0x0020 /*========================================================================= * FUNCTIONS PROTOTYPES */ static void initGxact(TMGXACT * gxact); static void releaseGxact_UnderLocks(void); static void releaseGxact(void); static void generateGID(char *gid, DistributedTransactionId *gxid); static void recoverTM(void); static bool recoverInDoubtTransactions(void); static HTAB *gatherRMInDoubtTransactions(void); static void abortRMInDoubtTransactions(HTAB *htab); static void cleanRMResultSets(PGresult **pgresultSets); static void dumpAllDtx(void); /* static void resolveInDoubtDtx(void); */ static void dumpRMOnlyDtx(HTAB *htab, StringInfoData *buff); static bool doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags, char* gid, DistributedTransactionId gxid, bool *badGangs, bool raiseError, CdbDispatchDirectDesc *direct, char *serializedArgument, int serializedArgumentLen); static void doPrepareTransaction(void); static void doInsertForgetCommitted(void); static void doNotifyingCommitPrepared(void); static void doNotifyingAbort(void); static bool doNotifyCommittedInDoubt(char *gid); static void doAbortInDoubt(char *gid); static void doQEDistributedExplicitBegin(int txnOptions); static bool isDtxQueryDispatcher(void); static void UtilityModeSaveRedo(bool committed, TMGXACT_LOG * gxact_log); static void ReplayRedoFromUtilityMode(void); static void RemoveRedoUtilityModeFile(void); static void performDtxProtocolCommitPrepared(const char *gid, bool raiseErrorIfNotFound); static void performDtxProtocolAbortPrepared(const char *gid, bool raiseErrorIfNotFound); static DistributedTransactionId determineSegmentMaxDistributedXid(void); extern void resetSessionForPrimaryGangLoss(void); extern void CheckForResetSession(void); /** * All assignments of the global DistributedTransactionContext should go through this function * (so we can add logging here to see all assignments) * * @param context the new value for DistributedTransactionContext */ static void setDistributedTransactionContext(DtxContext context) { // elog(INFO, "Setting DistributedTransactionContext to '%s'", DtxContextToString(context)); DistributedTransactionContext = context; } static void requireDistributedTransactionContext(DtxContext requiredCurrentContext) { if (DistributedTransactionContext != requiredCurrentContext) { elog(FATAL, "Expected segment distributed transaction context to be '%s', found '%s'", DtxContextToString(requiredCurrentContext), DtxContextToString(DistributedTransactionContext)); } } static void setGxactState(TMGXACT *transaction, DtxState state) { Assert(transaction != NULL); // elog(INFO, "Setting transaction state to '%s'", DtxStateToString(state)); transaction->state = state; } /** * All assignments of currentGxact->state should go through this function * (so we can add logging here to see all assignments) * * This should only be called when currentGxact is non-NULL * * @param state the new value for currentGxact->state */ static void setCurrentGxactState(DtxState state) { setGxactState(currentGxact, state); } /** * Does DistributedTransactionContext indicate that this is acting as a QD? */ static bool isQDContext(void) { switch (DistributedTransactionContext) { case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: case DTX_CONTEXT_QD_RETRY_PHASE_2: return true; default: return false; } } /** * Does DistributedTransactionContext indicate that this is acting as a QE? */ static bool isQEContext() { switch(DistributedTransactionContext) { case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT: case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: case DTX_CONTEXT_QE_READER: return true; default: return false; } } /*========================================================================= * VISIBLE FUNCTIONS */ DistributedTransactionTimeStamp getDtxStartTime(void) { if (shmDistribTimeStamp != NULL) return *shmDistribTimeStamp; else return 0; } DistributedTransactionId getDistributedTransactionId(void) { if ( isQDContext()) { return currentGxact == NULL ? InvalidDistributedTransactionId : currentGxact->gxid; } else if ( isQEContext()) { return QEDtxContextInfo.distributedXid; } else { return InvalidDistributedTransactionId; } } bool getDistributedTransactionIdentifier(char *id) { if ( isQDContext()) { if (currentGxact != NULL) { /* * The length check here requires the identifer have a trailing NUL character. */ if (strlen(currentGxact->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(currentGxact->gid)); memcpy(id, currentGxact->gid, TMGIDSIZE); return true; } } else if ( isQEContext()) { if (QEDtxContextInfo.distributedXid != InvalidDistributedTransactionId) { if (strlen(QEDtxContextInfo.distributedId) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(QEDtxContextInfo.distributedId)); memcpy(id, QEDtxContextInfo.distributedId, TMGIDSIZE); return true; } } MemSet(id, 0, TMGIDSIZE); return false; } bool isPreparedDtxTransaction(void) { if (Gp_role != GP_ROLE_DISPATCH || DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE || currentGxact == NULL) return false; return (currentGxact->state == DTX_STATE_PREPARED); } void getDtxLogInfo(TMGXACT_LOG *gxact_log) { if (currentGxact == NULL) { elog(FATAL, "getDtxLogInfo found current distributed transaction is NULL"); } if (strlen(currentGxact->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(currentGxact->gid)); memcpy(gxact_log->gid, currentGxact->gid, TMGIDSIZE); gxact_log->gxid = currentGxact->gxid; } bool notifyCommittedDtxTransactionIsNeeded(void) { if (DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE) { elog(DTM_DEBUG5, "notifyCommittedDtxTransaction nothing to do (DistributedTransactionContext = '%s')", DtxContextToString(DistributedTransactionContext)); return false; } if (currentGxact == NULL) { elog(DTM_DEBUG5, "notifyCommittedDtxTransaction nothing to do (currentGxact == NULL)"); return false; } return true; } /* * Notify commited a global transaction, called by user commit * or by CommitTransaction */ void notifyCommittedDtxTransaction(void) { Assert (DistributedTransactionContext == DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE); Assert (currentGxact != NULL); doNotifyingCommitPrepared(); } static inline void copyDirectDispatchFromTransaction(CdbDispatchDirectDesc *dOut) { if (currentGxact->directTransaction) { dOut->directed_dispatch = true; dOut->count = 1; dOut->content[0] = currentGxact->directTransactionContentId; } else { dOut->directed_dispatch = false; } } static bool GetRootNodeIsDirectDispatch(PlannedStmt *stmt) { if ( stmt == NULL ) return false; if ( stmt->planTree == NULL) return false; return stmt->planTree->directDispatch.isDirectDispatch; } /** * note that the ability to look at the root node of a plan in order to determine * direct dispatch overall depends on the way we assign direct dispatch. Parent slices are * never more directed than child slices. This could be fixed with an iteration over all slices and * combine from every slice. * * return true IFF the directDispatch data stored in n should be applied to the transaction */ static bool GetPlannedStmtDirectDispatch_AndUsingNodeIsSufficient(PlannedStmt *stmt) { if ( ! GetRootNodeIsDirectDispatch(stmt)) return false; /* * now look at number initplans .. we do something simple. ANY initPlans means we don't do directDispatch * at the dtm level. It's technically possible that the initPlan and the node share the same * direct dispatch set but we don't bother right now. */ if ( stmt->nInitPlans > 0 ) return false; return true; } /* * @param needsTwoPhaseCommit if true then marks the current Distributed Transaction as needing to use the * 2 phase commit protocol. */ void dtmPreCommand(const char *debugCaller, const char *debugDetail, PlannedStmt *stmt, bool needsTwoPhaseCommit, bool wantSnapshot, bool inCursor ) { bool needsPromotionFromDirectDispatch = false; const bool rootNodeIsDirectDispatch = GetRootNodeIsDirectDispatch(stmt); const bool nodeSaysDirectDispatch = GetPlannedStmtDirectDispatch_AndUsingNodeIsSufficient(stmt); Assert(debugCaller != NULL); Assert(debugDetail != NULL); /** * update the information about what segments are participating in the transaction */ if (currentGxact == NULL) { /* no open transaction so don't do anything */ } else if (currentGxact->state == DTX_STATE_ACTIVE_NOT_DISTRIBUTED) { /* Can we direct this transaction to a single content-id ? */ if (nodeSaysDirectDispatch) { currentGxact->directTransaction = true; currentGxact->directTransactionContentId = linitial_int(stmt->planTree->directDispatch.contentIds); elog(DTM_DEBUG5, "dtmPreCommand going distributed (to content %d) for gid = %s (%s, detail = '%s')", currentGxact->directTransactionContentId, currentGxact->gid, debugCaller, debugDetail); } else { currentGxact->directTransaction = false; if ( rootNodeIsDirectDispatch ) { /* implicit write on the root, but some initPlan was to all contents...so send explicit start */ needsPromotionFromDirectDispatch = true; } elog(DTM_DEBUG5, "dtmPreCommand going distributed (all gangs) for gid = %s (%s, detail = '%s')", currentGxact->gid, debugCaller, debugDetail); } } else if (currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED) { bool wasDirected = currentGxact->directTransaction; int wasPromotedFromDirectDispatchContentId = wasDirected ? currentGxact->directTransactionContentId : -1; /* Can we still direct this transaction to a single content-id ? */ if ( currentGxact->directTransaction) { currentGxact->directTransaction = false; // turn off, but may be restored below if (nodeSaysDirectDispatch) { int contentId = linitial_int(stmt->planTree->directDispatch.contentIds); if ( contentId == currentGxact->directTransactionContentId) { // it was the same content! Stay in a single direct transaction currentGxact->directTransaction = true; } } } if ( currentGxact->directTransaction) { /** was not actually promoted */ wasPromotedFromDirectDispatchContentId = -1; } if ( wasPromotedFromDirectDispatchContentId != -1 ) needsPromotionFromDirectDispatch = true; elog(DTM_DEBUG5, "dtmPreCommand gid = %s is already distributed (%s, detail = '%s'), (was %s : now %s)", currentGxact->gid, debugCaller, debugDetail, wasDirected ? "directed" : "all gangs", currentGxact->directTransaction ? "directed" : "all gangs" ); } /** * If two-phase commit then begin transaction. */ if ( needsTwoPhaseCommit ) { if (currentGxact == NULL) { elog(ERROR, "DTM transaction is not active (%s, detail = '%s')", debugCaller, debugDetail); } else if (currentGxact->state == DTX_STATE_ACTIVE_NOT_DISTRIBUTED) { setCurrentGxactState( DTX_STATE_ACTIVE_DISTRIBUTED ); } else if (currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED) { /* already distributed, no need to change */ } else { elog(ERROR, "DTM transaction is not active (state = %s, %s, detail = '%s')", DtxStateToString(currentGxact->state), debugCaller, debugDetail); } } /** * If promotion from direct-dispatch to whole-cluster dispatch was done then tell about it. * * FUTURE: note that this is only needed if the query we are going to run would not itself * do this (that is, if the query we are going to run is a read-only one) */ if ( currentGxact && currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED && needsPromotionFromDirectDispatch) { CdbDispatchDirectDesc direct = default_dispatch_direct_desc; char *serializedDtxContextInfo; int serializedDtxContextInfoLen; bool badGangs, succeeded; serializedDtxContextInfo = qdSerializeDtxContextInfo(&serializedDtxContextInfoLen, wantSnapshot, inCursor, mppTxnOptions(true), "promoteTransactionIn_dtmPreCommand"); succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_STAY_AT_OR_BECOME_IMPLIED_WRITER, /* flags */ 0, currentGxact->gid, currentGxact->gxid, &badGangs, /* raiseError */ false, &direct, serializedDtxContextInfo, serializedDtxContextInfoLen); /* send a DTM command to others to tell them about the transaction */ if (!succeeded) { ereport(ERROR, (errmsg("Global transaction upgrade from single segment to entire cluster failed for gid = \"%s\" due to error", currentGxact->gid))); } } } /* * Routine to dispatch internal sub-transaction calls from UDFs to segments. * The calls are BeginInternalSubTransaction, ReleaseCurrentSubTransaction and * RollbackAndReleaseCurrentSubTransaction. */ bool doDispatchSubtransactionInternalCmd(DtxProtocolCommand cmdType) { CdbDispatchDirectDesc direct = default_dispatch_direct_desc; char *serializedDtxContextInfo = NULL; int serializedDtxContextInfoLen = 0; bool badGangs, succeeded = false; if (cmdType == DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL) { getTmLock(); if (currentGxact->state == DTX_STATE_ACTIVE_NOT_DISTRIBUTED) { setCurrentGxactState(DTX_STATE_ACTIVE_DISTRIBUTED); } releaseTmLock(); } serializedDtxContextInfo = qdSerializeDtxContextInfo( &serializedDtxContextInfoLen, false /* wantSnapshot */, false /* inCursor */, mppTxnOptions(true), "doDispatchSubtransactionInternalCmd"); succeeded = doDispatchDtxProtocolCommand( cmdType, /* flags */ 0, currentGxact->gid, currentGxact->gxid, &badGangs, /* raiseError */ true, &direct, serializedDtxContextInfo, serializedDtxContextInfoLen); /* send a DTM command to others to tell them about the transaction */ if (!succeeded) { ereport(ERROR, (errmsg( "dispatching subtransaction internal command failed for gid = \"%s\" due to error", currentGxact->gid))); } return succeeded; } /* * The executor can avoid starting a distributed transaction if it knows that * the current dtx is clean and we aren't in a user-started global transaction. */ bool isCurrentDtxTwoPhase(void) { if (currentGxact == NULL) { return false; } else { return currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED; } } /* * When cleaning up FTS needs to know if there is a transaction active. */ bool isCurrentDtxActive(void) { return (currentGxact != NULL); } DtxState getCurrentDtxState(void) { if (currentGxact != NULL) return currentGxact->state; return DTX_STATE_NONE; } static void doPrepareTransaction(void) { bool succeeded; CdbDispatchDirectDesc direct=default_dispatch_direct_desc; CHECK_FOR_INTERRUPTS(); elog(DTM_DEBUG5, "doPrepareTransaction entering in state = %s", DtxStateToString(currentGxact->state)); /* Don't allow a cancel while we're dispatching our prepare (we wrap our * state change as well; for good measure. */ HOLD_INTERRUPTS(); copyDirectDispatchFromTransaction(&direct); getTmLock(); Assert(currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED); setCurrentGxactState( DTX_STATE_PREPARING ); /* * We keep a current count of the number of DTX transactions between * 'PREPARING' and confirmed 'COMMIT' or 'ABORT'. * * To simplify bookkeeping, we turn on a flag in the gxact object. */ (*shmCurrentPhase1Count)++; currentGxact->bumpedPhase1Count = true; releaseTmLock(); elog(DTM_DEBUG5, "doPrepareTransaction moved to state = %s", DtxStateToString(currentGxact->state)); succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_PREPARE, /* flags */ 0, currentGxact->gid, currentGxact->gxid, ¤tGxact->badPrepareGangs, /* raiseError */ false, &direct, NULL, 0); /* Now we've cleaned up our dispatched statement, cancels are allowed again. */ RESUME_INTERRUPTS(); if (!succeeded) { elog(DTM_DEBUG5, "doPrepareTransaction error finds badPrimaryGangs = %s", (currentGxact->badPrepareGangs ? "true" : "false")); elog(ERROR, "The distributed transaction 'Prepare' broadcast failed to one or more segments for gid = %s.", currentGxact->gid); } elog(DTM_DEBUG5, "The distributed transaction 'Prepare' broadcast succeeded to the segments for gid = %s.", currentGxact->gid); getTmLock(); Assert(currentGxact->state == DTX_STATE_PREPARING); setCurrentGxactState( DTX_STATE_PREPARED ); releaseTmLock(); #ifdef FAULT_INJECTOR FaultInjector_InjectFaultIfSet( DtmBroadcastPrepare, DDLNotSpecified, "", // databaseName ""); // tableName #endif elog(DTM_DEBUG5, "doPrepareTransaction leaving in state = %s", DtxStateToString(currentGxact->state)); } /* * Insert FORGET COMMITTED into the xlog. * Call with both ProcArrayLock and DTM lock already held. */ static void doInsertForgetCommitted(void) { TMGXACT_LOG gxact_log; elog(DTM_DEBUG5, "doInsertForgetCommitted entering in state = %s", DtxStateToString(currentGxact->state)); setCurrentGxactState( DTX_STATE_INSERTING_FORGET_COMMITTED ); if (strlen(currentGxact->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(currentGxact->gid)); memcpy(&gxact_log.gid, currentGxact->gid, TMGIDSIZE); gxact_log.gxid = currentGxact->gxid; RecordDistributedForgetCommitted(&gxact_log); setCurrentGxactState( DTX_STATE_INSERTED_FORGET_COMMITTED ); /* * These two actions must be performed for a distributed transaction under the same * locking of ProceArrayLock so the visibility of the transaction changes for local * master readers (e.g. those using SnapshotNow for reading) the same as for * distributed transactions. */ ClearTransactionFromPgProc_UnderLock(); releaseGxact_UnderLocks(); elog(DTM_DEBUG5, "doInsertForgetCommitted called releaseGxact"); } static void doNotifyingCommitPrepared(void) { bool succeeded; bool badGangs; CdbDispatchDirectDesc direct=default_dispatch_direct_desc; elog(DTM_DEBUG5, "doNotifyingCommitPrepared entering in state = %s", DtxStateToString(currentGxact->state)); getTmLock(); copyDirectDispatchFromTransaction(&direct); Assert(currentGxact->state == DTX_STATE_FORCED_COMMITTED); setCurrentGxactState( DTX_STATE_NOTIFYING_COMMIT_PREPARED ); releaseTmLock(); if (strlen(currentGxact->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(currentGxact->gid)); #ifdef FAULT_INJECTOR FaultInjector_InjectFaultIfSet( DtmBroadcastCommitPrepared, DDLNotSpecified, "", // databaseName ""); // tableName #endif succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_COMMIT_PREPARED, /* flags */ 0, currentGxact->gid, currentGxact->gxid, &badGangs, /* raiseError */ false, &direct, NULL, 0); if (!succeeded) { elog(WARNING, "The distributed transaction 'Commit Prepared' broadcast failed to one or more segments for gid = %s.", currentGxact->gid); /* * We must succeed in delivering the commit to all segment instances, or any failed * segment instances must be marked INVALID. */ /* * Reset the dispatch logic (i.e. deallocate gang) so later we can attempt a retry * at the top in PostgresMain. */ elog(NOTICE, "Releasing segworker group to retry broadcast."); disconnectAndDestroyAllGangs(); /* * This call will at a minimum change the session id so we will * not have SharedSnapshotAdd colissions. */ CheckForResetSession(); getTmLock(); Assert(currentGxact->state == DTX_STATE_NOTIFYING_COMMIT_PREPARED); setCurrentGxactState( DTX_STATE_RETRY_COMMIT_PREPARED ); releaseTmLock(); elog(DTM_DEBUG5, "Marking retry needed for distributed transaction 'Commit Prepared' broadcast to the segments for gid = %s.", currentGxact->gid); return; } else { elog(DTM_DEBUG5, "The distributed transaction 'Commit Prepared' broadcast succeeded to all the segments for gid = %s.", currentGxact->gid); } /* * Global locking order: ProcArrayLock then DTM lock since * calls doInsertForgetCommitted calls releaseGxact. */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); getTmLock(); Assert(currentGxact->state == DTX_STATE_NOTIFYING_COMMIT_PREPARED); doInsertForgetCommitted(); releaseTmLock(); LWLockRelease(ProcArrayLock); } static void doNotifyingAbort(void) { bool succeeded; bool badGangs; CdbDispatchDirectDesc direct=default_dispatch_direct_desc; elog(DTM_DEBUG5, "doNotifyingAborted entering in state = %s", DtxStateToString(currentGxact->state)); getTmLock(); Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); releaseTmLock(); copyDirectDispatchFromTransaction(&direct); if (currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED) { if (gangsExist()) { succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_ABORT_NO_PREPARED, /* flags */ 0, currentGxact->gid, currentGxact->gxid, &badGangs, /* raiseError */ false, &direct, NULL, 0); if (!succeeded) { elog(WARNING, "The distributed transaction 'Abort' broadcast failed to one or more segments for gid = %s.", currentGxact->gid); /* * Reset the dispatch logic and disconnect from any segment that didn't respond to our abort. */ elog(NOTICE, "Releasing segworker groups to finish aborting the transaction."); disconnectAndDestroyAllGangs(); /* * This call will at a minimum change the session id so we will * not have SharedSnapshotAdd colissions. */ CheckForResetSession(); } else { elog(DTM_DEBUG5, "The distributed transaction 'Abort' broadcast succeeded to all the segments for gid = %s.", currentGxact->gid); } } else { elog(DTM_DEBUG5, "The distributed transaction 'Abort' broadcast was omitted (segworker group already dead) gid = %s.", currentGxact->gid); } } else { DtxProtocolCommand dtxProtocolCommand; char *abortString; Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); if (currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED) { dtxProtocolCommand = DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED; abortString = "Abort [Prepared]"; } else { dtxProtocolCommand = DTX_PROTOCOL_COMMAND_ABORT_PREPARED; abortString = "Abort Prepared"; } succeeded = doDispatchDtxProtocolCommand(dtxProtocolCommand, /* flags */ 0, currentGxact->gid, currentGxact->gxid, &badGangs, /* raiseError */ false, &direct, NULL, 0); if (!succeeded) { elog(WARNING, "The distributed transaction '%s' broadcast failed to one or more segments for gid = %s.", abortString, currentGxact->gid); /* * We must succeed in delivering the abort to all segment instances, or any failed * segment instances must be marked INVALID. */ /* * Reset the dispatch logic (i.e. deallocate gang) so we can attempt a retry. */ elog(NOTICE, "Releasing segworker groups to retry broadcast."); disconnectAndDestroyAllGangs(); /* * This call will at a minimum change the session id so we will * not have SharedSnapshotAdd colissions. */ CheckForResetSession(); getTmLock(); setCurrentGxactState( DTX_STATE_RETRY_ABORT_PREPARED ); releaseTmLock(); elog(DTM_DEBUG5, "Marking retry needed for distributed transaction for '%s' broadcast to the segments for gid = %s.", abortString, currentGxact->gid); return; } elog(DTM_DEBUG5, "The distributed transaction '%s' broadcast succeeded to all the segments for gid = %s.", abortString, currentGxact->gid); } #ifdef FAULT_INJECTOR FaultInjector_InjectFaultIfSet( DtmBroadcastAbortPrepared, DDLNotSpecified, "", // databaseName ""); // tableName #endif /* * Global locking order: ProcArrayLock then DTM lock. */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); getTmLock(); Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); releaseGxact_UnderLocks(); elog(DTM_DEBUG5, "doNotifyingAbort called releaseGxact"); releaseTmLock(); LWLockRelease(ProcArrayLock); } /* * Attempt to retry DTM messaging. */ void doDtxPhase2Retry(void) { bool succeeded; bool badGangs; CdbDispatchDirectDesc direct=default_dispatch_direct_desc; if (currentGxact == NULL) { elog(DTM_DEBUG5, "doDtxPhase2Retry found no work to do (currentGxact == NULL)"); return; } copyDirectDispatchFromTransaction(&direct); switch (currentGxact->state) { case DTX_STATE_RETRY_COMMIT_PREPARED: case DTX_STATE_RETRY_ABORT_PREPARED: if (currentGxact->retryPhase2RecursionStop) { /* * Take down GP Array. */ elog(PANIC, "Unable to complete '%s Prepared' broadcast for gid = %s", (currentGxact->state == DTX_STATE_RETRY_COMMIT_PREPARED ? "Commit" : "Abort"), currentGxact->gid); } PG_TRY(); { bool commit = (currentGxact->state == DTX_STATE_RETRY_COMMIT_PREPARED); DtxProtocolCommand dtxProtocolCommand; char *prepareKind; if (commit) { dtxProtocolCommand = DTX_PROTOCOL_COMMAND_RETRY_COMMIT_PREPARED; prepareKind = "Commit"; } else { dtxProtocolCommand = DTX_PROTOCOL_COMMAND_RETRY_ABORT_PREPARED; prepareKind = "Abort"; } /* * KLUDGE: FtsHandleGangConnectionFailure will need a special * transaction context to tell it not to raise an ERROR... */ setDistributedTransactionContext(DTX_CONTEXT_QD_RETRY_PHASE_2); elog(DTM_DEBUG5, "doDtxPhase2Retry setting DistributedTransactionContext to '%s' for retry of '%s Prepared'.", DtxContextToString(DistributedTransactionContext), prepareKind); StartTransactionCommand(); currentGxact->retryPhase2RecursionStop = true; succeeded = doDispatchDtxProtocolCommand(dtxProtocolCommand, /* flags */ 0, currentGxact->gid, currentGxact->gxid, &badGangs, /* raiseError */ false, &direct, NULL, 0); if (!succeeded) { elog(FATAL, "A retry of the distributed transaction '%s Prepared' broadcast failed to one or more segments for gid = %s.", prepareKind, currentGxact->gid); } elog(NOTICE, "Retry of the distributed transaction '%s Prepared' broadcast succeeded to the segments for gid = %s.", prepareKind, currentGxact->gid); /* * Global locking order: ProcArrayLock then DTM lock since * calls doInsertForgetCommitted calls releaseGxact. */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); getTmLock(); if (commit) { doInsertForgetCommitted(); } else { releaseGxact_UnderLocks(); } Assert(currentGxact == NULL); releaseTmLock(); LWLockRelease(ProcArrayLock); CommitTransactionCommand(); Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); } PG_CATCH(); { setDistributedTransactionContext(DTX_CONTEXT_LOCAL_ONLY); PG_RE_THROW(); } PG_END_TRY(); break; case DTX_STATE_ACTIVE_NOT_DISTRIBUTED: case DTX_STATE_ACTIVE_DISTRIBUTED: case DTX_STATE_PREPARING: case DTX_STATE_PREPARED: case DTX_STATE_FORCED_COMMITTED: case DTX_STATE_NOTIFYING_COMMIT_PREPARED: case DTX_STATE_NOTIFYING_ABORT_PREPARED: case DTX_STATE_INSERTING_COMMITTED: case DTX_STATE_INSERTED_COMMITTED: case DTX_STATE_INSERTING_FORGET_COMMITTED: case DTX_STATE_INSERTED_FORGET_COMMITTED: case DTX_STATE_CRASH_COMMITTED: elog(DTM_DEBUG5, "doDtxPhase2Retry dtx state \"%s\" not applicable here", DtxStateToString(currentGxact->state)); break; default: elog(PANIC, "Unrecognized dtx state: %d", (int) currentGxact->state); break; } } static bool doNotifyCommittedInDoubt(char *gid) { bool succeeded; bool badGangs; CdbDispatchDirectDesc direct=default_dispatch_direct_desc; /* UNDONE: Pass real gxid instead of InvalidDistributedTransactionId. */ succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_RECOVERY_COMMIT_PREPARED, /* flags */ 0, gid, InvalidDistributedTransactionId, &badGangs, /* raiseError */ false, &direct, NULL, 0); if (!succeeded) { elog(FATAL, "Crash recovery broadcast of the distributed transaction 'Commit Prepared' broadcast failed to one or more segments for gid = %s.", gid); } else { elog(LOG, "Crash recovery broadcast of the distributed transaction 'Commit Prepared' broadcast succeeded for gid = %s.", gid); } return succeeded; } static void doAbortInDoubt(char *gid) { bool succeeded; bool badGangs; CdbDispatchDirectDesc direct=default_dispatch_direct_desc; /* UNDONE: Pass real gxid instead of InvalidDistributedTransactionId. */ succeeded = doDispatchDtxProtocolCommand(DTX_PROTOCOL_COMMAND_RECOVERY_ABORT_PREPARED, /* flags */ 0, gid, InvalidDistributedTransactionId, &badGangs, /* raiseError */ false, &direct, NULL, 0); if (!succeeded) { elog(FATAL, "Crash recovery retry of the distributed transaction 'Abort Prepared' broadcast failed to one or more segments for gid = %s. System will retry again later", gid); } else { elog(LOG, "Crash recovery broadcast of the distributed transaction 'Abort Prepared' broadcast succeeded for gid = %s", gid); } } /* * prepare a global transaction, called by user commit * or by CommitTransaction */ void prepareDtxTransaction(void) { if (DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE) { elog(DTM_DEBUG5, "prepareDtxTransaction nothing to do (DistributedTransactionContext = '%s')", DtxContextToString(DistributedTransactionContext)); return; } if (currentGxact == NULL) { return; } if (currentGxact->state == DTX_STATE_ACTIVE_NOT_DISTRIBUTED) { /* * This transaction did not go distributed. */ elog(DTM_DEBUG5, "prepareDtxTransaction ignoring not distributed gid = %s", currentGxact->gid); releaseGxact(); return; } elog(DTM_DEBUG5, "prepareDtxTransaction called with state = %s", DtxStateToString(currentGxact->state)); Assert(currentGxact->state == DTX_STATE_ACTIVE_DISTRIBUTED); /* * Broadcast PREPARE TRANSACTION to segments. */ doPrepareTransaction(); } /* * rollback a global transaction, called by user rollback * or by AbortTransaction during Postgres automatic rollback */ void rollbackDtxTransaction(void) { if (DistributedTransactionContext != DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE) { elog(DTM_DEBUG5, "rollbackDtxTransaction nothing to do (DistributedTransactionContext = '%s')", DtxContextToString(DistributedTransactionContext)); return; } if (currentGxact == NULL) { elog(DTM_DEBUG5, "rollbackDtxTransaction nothing to do (currentGxact == NULL)"); return; } elog(DTM_DEBUG5, "rollbackDtxTransaction called with state = %s, gid = %s", DtxStateToString(currentGxact->state), currentGxact->gid); switch (currentGxact->state) { case DTX_STATE_ACTIVE_NOT_DISTRIBUTED: /* * Let go of these... */ releaseGxact(); return; case DTX_STATE_ACTIVE_DISTRIBUTED: setCurrentGxactState( DTX_STATE_NOTIFYING_ABORT_NO_PREPARED ); break; case DTX_STATE_PREPARING: if (currentGxact->badPrepareGangs) { /* * By deallocating the gang, we will force a new gang to connect to all the * segment instances. And, we will abort the transactions in the * segments. What's left are possibily prepared transactions. */ elog(WARNING, "Releasing segworker groups since one or more segment connections failed. This will abort the transactions in the segments that did not get prepared."); disconnectAndDestroyAllGangs(); /* * This call will at a minimum change the session id so we will * not have SharedSnapshotAdd colissions. */ CheckForResetSession(); setCurrentGxactState( DTX_STATE_RETRY_ABORT_PREPARED ); return; } setCurrentGxactState( DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED ); break; case DTX_STATE_PREPARED: setCurrentGxactState( DTX_STATE_NOTIFYING_ABORT_PREPARED ); break; case DTX_STATE_NOTIFYING_ABORT_NO_PREPARED: /* * By deallocating the gang, we will force a new gang to connect to all the * segment instances. And, we will abort the transactions in the * segments. */ elog(NOTICE, "Releasing segworker groups to finish aborting the transaction."); disconnectAndDestroyAllGangs(); /* * This call will at a minimum change the session id so we will * not have SharedSnapshotAdd colissions. */ CheckForResetSession(); releaseGxact(); return; case DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED: case DTX_STATE_NOTIFYING_ABORT_PREPARED: elog(FATAL, "Unable to complete the 'Abort Prepared' broadcast for gid '%s'", currentGxact->gid); break; case DTX_STATE_FORCED_COMMITTED: case DTX_STATE_NOTIFYING_COMMIT_PREPARED: case DTX_STATE_INSERTING_COMMITTED: case DTX_STATE_INSERTED_COMMITTED: case DTX_STATE_INSERTING_FORGET_COMMITTED: case DTX_STATE_INSERTED_FORGET_COMMITTED: case DTX_STATE_RETRY_COMMIT_PREPARED: case DTX_STATE_RETRY_ABORT_PREPARED: case DTX_STATE_CRASH_COMMITTED: elog(DTM_DEBUG5, "rollbackDtxTransaction dtx state \"%s\" not expected here", DtxStateToString(currentGxact->state)); releaseGxact(); return; default: elog(PANIC, "Unrecognized dtx state: %d", (int) currentGxact->state); break; } Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED); /* * if the process is in the middle of blowing up... then we don't do * anything here. we can resolve any in-doubt transactions later. * * We can't dispatch -- but we *do* need to free up shared-memory * entries. */ if (proc_exit_inprogress) { /* * Unable to complete distributed abort broadcast with possible * prepared transactions... */ if (currentGxact->state == DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED || currentGxact->state == DTX_STATE_NOTIFYING_ABORT_PREPARED) { elog(FATAL, "Unable to complete the 'Abort Prepared' broadcast for gid '%s'", currentGxact->gid); } Assert(currentGxact->state == DTX_STATE_NOTIFYING_ABORT_NO_PREPARED); /* * By deallocating the gang, we will force a new gang to connect to all the * segment instances. And, we will abort the transactions in the * segments. */ disconnectAndDestroyAllGangs(); /* * This call will at a minimum change the session id so we will * not have SharedSnapshotAdd colissions. */ CheckForResetSession(); releaseGxact(); return; } doNotifyingAbort(); return; } /* * Error handling in initTM() is our caller. * * recoverTM() may throw errors. */ static void initTM_recover_as_needed(void) { Assert(shmTmRecoverred != NULL); /* Need to recover ? */ if (!*shmTmRecoverred) { getTmLock(); /* Still need to recover? */ if (!*shmTmRecoverred) { volatile int savedInterruptHoldoffCount = InterruptHoldoffCount; /* * We have to catch errors here, otherwise the silly TmLock will * stay in the backend process until this process goes away. */ PG_TRY(); { recoverTM(); *shmTmRecoverred = true; /* * The in-doubt transactions are recovered. Perform * PersistentTable-Catalog non-database specific verifications, * if requested and if needed */ if (debug_persistent_ptcat_verification && Persistent_PostDTMRecv_PTCatVerificationNeeded()) { Persistent_PrintHash(); Persistent_PostDTMRecv_NonDBSpecificPTCatVerification(); } } PG_CATCH(); { /* * We can't simply use HOLD_INTERRUPTS as in LWLockRelease, * because at this point we don't know if other LWLocks have * been acquired by myself. Also, we don't know if releaseTmLock * actually releases the lock, depending on ControlLockCount. * Instead, restore the previous value, which is reset to 0 * in errfinish. */ InterruptHoldoffCount = savedInterruptHoldoffCount; releaseTmLock(); /* Assuming we have a catcher above... */ PG_RE_THROW(); } PG_END_TRY(); } releaseTmLock(); } } static char * getSuperuser(Oid *userOid) { char *suser = NULL; Relation auth_rel; HeapTuple auth_tup; HeapScanDesc auth_scan; ScanKeyData key[2]; bool isNull; ScanKeyInit(&key[0], Anum_pg_authid_rolsuper, BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); ScanKeyInit(&key[1], Anum_pg_authid_rolcanlogin, BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); auth_rel = heap_open(AuthIdRelationId, AccessShareLock); auth_scan = heap_beginscan(auth_rel, SnapshotNow, 2, key); while (HeapTupleIsValid(auth_tup = heap_getnext(auth_scan, ForwardScanDirection))) { Datum attrName; Datum attrNameOid; (void) heap_getattr(auth_tup, Anum_pg_authid_rolvaliduntil, auth_rel->rd_att, &isNull); /* we actually want it to be NULL, that means always valid */ if (!isNull) continue; attrName = heap_getattr(auth_tup, Anum_pg_authid_rolname, auth_rel->rd_att, &isNull); Assert(!isNull); suser = pstrdup(DatumGetCString(attrName)); attrNameOid = heap_getattr(auth_tup, ObjectIdAttributeNumber, auth_rel->rd_att, &isNull); Assert(!isNull); *userOid = DatumGetObjectId(attrNameOid); break; } heap_endscan(auth_scan); heap_close(auth_rel, AccessShareLock); return suser; } static char * ChangeToSuperuser() { char *olduser = NULL; char *newuser; Oid userOid = InvalidOid; MemoryContext oldcontext; if (!IsAuthenticatedUserSuperUser()) { oldcontext = MemoryContextSwitchTo(TopMemoryContext); newuser = getSuperuser(&userOid); MemoryContextSwitchTo(oldcontext); olduser = MyProcPort->user_name; SetSessionUserId(userOid, true); MyProcPort->user_name = newuser; } return olduser; } static void RestoreToUser(char *olduser) { MemoryContext oldcontext; if (!IsAuthenticatedUserSuperUser()) { oldcontext = MemoryContextSwitchTo(TopMemoryContext); pfree(MyProcPort->user_name); MemoryContextSwitchTo(oldcontext); MyProcPort->user_name = olduser; SetSessionUserId(GetAuthenticatedUserId(), false); } } /* * Initialize TM, called by cdb_setup() for each QD process. * * First call to this function will trigger tm recovery. * * MPP-9894: in 4.0, if we've been started with enough segments to * run, but without having them in the right "roles" (see * gp_segment_configuration), we need to prober to convert them -- our * first attempt to dispatch will fail, we've got to catch that! The * retry should be fine, if not we're in serious "FATAL" trouble. */ void initTM(void) { char *olduser = NULL; bool exists = false; MemoryContext oldcontext; bool succeeded, first; Assert(shmTmRecoverred != NULL); /* Need to recover ? */ if (!*shmTmRecoverred) { /* * DTM initialization should be done in the context * of the superuser, and not the user who initiated this * backend (MPP-13866). * Following code changes the context to superuser and * and then restores it back. */ olduser = ChangeToSuperuser(); #ifdef FAULT_INJECTOR FaultInjector_InjectFaultIfSet( DtmInit, DDLNotSpecified, "", //databaseName, ""); //tableName; #endif oldcontext = CurrentMemoryContext; succeeded = false; first = true; while (true) { /* * MPP-9894: during startup, we don't have a top-level * PG_TRY/PG_CATCH block yet, the dispatcher may throw errors: * we need to catch them. */ PG_TRY(); { /* * detectFailedConnections could throw ERROR, so * we should catch it if it happens. */ if (!first) detectFailedConnections(); initTM_recover_as_needed(); succeeded = true; } PG_CATCH(); { ErrorData *edata; MemoryContextSwitchTo(oldcontext); edata = CopyErrorData(); elog(LOG, "DTM initialization, caught exception: " "looking for failed segments."); ereport(LOG, (edata->message ? errmsg("%s", edata->message) : 0, edata->detail ? errdetail("%s", edata->detail) : 0, edata->hint ? errhint("%s", edata->hint) : 0)); FreeErrorData(edata); FlushErrorState(); /* * Keep going outside of PG_TRY block even if we want to * retry; don't jumping out of this block without PG_END_TRY. */ } PG_END_TRY(); if (!succeeded) { if (first) { first = false; continue; } else { elog(LOG, "DTM initialization, failed on retry."); elog(FATAL, "DTM initialization: failure during startup " "recovery, retry failed, check segment status"); } } /* * We are done with the recovery. */ break; } RestoreToUser(olduser); freeGangsForPortal(NULL); } else { PG_TRY(); { /* * Do Database-specific PTCat verification ? * Yes, * if 1- GUC is turned ON and * if 2- DTM recovery INDEED happened and * if 3- Current Database has not been verified before * (If the Database is present in the Hash Table) */ Assert(MyDatabaseId != InvalidOid); if (debug_persistent_ptcat_verification && Persistent_PostDTMRecv_PTCatVerificationNeeded()) { getTmLock(); Persistent_PostDTMRecv_LookupHashEntry(MyDatabaseId, &exists); if (exists) { olduser = ChangeToSuperuser(); Persistent_PostDTMRecv_DBSpecificPTCatVerification(); /* * The current database is now verified. Remove its entry from the * Hash table to avoid performing these verifications next time * a session connects to the current database */ Persistent_PostDTMRecv_RemoveHashEntry(MyDatabaseId); RestoreToUser(olduser); freeGangsForPortal(NULL); } releaseTmLock(); } } PG_CATCH(); { elog(FATAL, " Failure during DTM Post Recovery PersistentTables-Catalog" " DB-specific Verification"); } PG_END_TRY(); } } /* get tm share memory size */ int tmShmemSize(void) { if ((Gp_role != GP_ROLE_DISPATCH) && (Gp_role != GP_ROLE_UTILITY)) return 0; return MAXALIGN(TMCONTROLBLOCK_BYTES(max_tm_gxacts) + max_tm_gxacts * sizeof(TMGXACT)); } /* * tmShmemInit - should be called only once from postmaster and inherit by all * postgres processes */ void tmShmemInit(void) { bool found; TmControlBlock *shared; /* * max_prepared_xacts is a guc which is postmaster-startup setable * -- it can only be updated by restarting the system. Global * transactions will all use two-phase commit, so the number of * global transactions is bound to the number of prepared. */ max_tm_gxacts = max_prepared_xacts; if ((Gp_role != GP_ROLE_DISPATCH) && (Gp_role != GP_ROLE_UTILITY)) return; shared = (TmControlBlock *) ShmemInitStruct("Transaction manager", tmShmemSize(), &found); if (!shared) elog(FATAL, "could not initialize transaction manager share memory"); shmControlLock = shared->ControlLock; shmTmRecoverred = &shared->recoverred; shmDistribTimeStamp = &shared->distribTimeStamp; shmGIDSeq = &shared->seqno; /* Only initialize this if we are the creator of the shared memory */ if (!found) { time_t t = time(NULL); if (t == (time_t) -1) { elog(PANIC, "cannot generate global transaction id"); } *shmDistribTimeStamp = (DistributedTransactionTimeStamp)t; elog(DEBUG1, "DTM start timestamp %u", *shmDistribTimeStamp); *shmGIDSeq = FirstDistributedTransactionId; } shmDtmStarted = &shared->DtmStarted; shmDtmRecoveryDeferred = &shared->DtmDeferRecovery; shmSegmentCount = &shared->SegmentCount; shmSegmentStatesByteLen = &shared->SegmentsStatesByteLen; shmXminAllDistributedSnapshots = &shared->xminAllDistributedSnapshots; shmNextSnapshotId = &shared->NextSnapshotId; shmNumGxacts = &shared->num_active_xacts; shmCurrentPhase1Count = &shared->currentPhase1Count; shmGxactArray = shared->gxact_array; if (!IsUnderPostmaster) /* Initialize locks and shared memory area */ { int i = 0; TMGXACT *gxact = NULL; shared->ControlLock = LWLockAssign(); shmControlLock = shared->ControlLock; /* initialize gxact array */ gxact = (TMGXACT *) (shmGxactArray + max_tm_gxacts); for (i = 0; i < max_tm_gxacts; i++) { gxact->debugIndex = i; shmGxactArray[i] = gxact++; } } } /* * restore global transaction during tm log recovery */ void restoreGxact(TMGXACT_LOG * gxact_log, DtxState state) { Assert (gxact_log != NULL); initGxact(currentGxact); /* * Copy the log fields. * * The length check here requires the identifer have a trailing NUL character. */ if (strlen(gxact_log->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(gxact_log->gid)); memcpy(currentGxact->gid, gxact_log->gid, TMGIDSIZE); currentGxact->gxid = gxact_log->gxid; setCurrentGxactState( state ); } /* mppTxnOptions: * Generates an int containing the appropriate flags to direct the remote * segdb QE process to perform any needed transaction commands before or * after the statement. */ int mppTxnOptions(bool needTwoPhase) { int options = 0; elog(DTM_DEBUG5, "mppTxnOptions DefaultXactIsoLevel = %s, DefaultXactReadOnly = %s, XactIsoLevel = %s, XactReadOnly = %s.", IsoLevelAsUpperString(DefaultXactIsoLevel), (DefaultXactReadOnly ? "true" : "false"), IsoLevelAsUpperString(XactIsoLevel), (XactReadOnly ? "true" : "false")); if (needTwoPhase) options |= GP_OPT_NEED_TWO_PHASE; if (XactIsoLevel == XACT_READ_COMMITTED) options |= GP_OPT_READ_COMMITTED; else if (XactIsoLevel == XACT_REPEATABLE_READ) options |= GP_OPT_REPEATABLE_READ; else if (XactIsoLevel == XACT_SERIALIZABLE) options |= GP_OPT_SERIALIZABLE; if (XactReadOnly) options |= GP_OPT_READ_ONLY; if (currentGxact != NULL && currentGxact->explicitBeginRemembered) options |= GP_OPT_EXPLICT_BEGIN; elog(DTM_DEBUG5, "mppTxnOptions txnOptions = 0x%x, needTwoPhase = %s, explicitBegin = %s, isoLevel = %s, readOnly = %s.", options, (isMppTxOptions_NeedTwoPhase(options) ? "true" : "false"), (isMppTxOptions_ExplicitBegin(options) ? "true" : "false"), IsoLevelAsUpperString(mppTxOptions_IsoLevel(options)), (isMppTxOptions_ReadOnly(options) ? "true" : "false")); return options; } int mppTxOptions_IsoLevel(int txnOptions) { if (txnOptions & GP_OPT_READ_COMMITTED) return XACT_READ_COMMITTED; else if (txnOptions & GP_OPT_SERIALIZABLE) return XACT_SERIALIZABLE; else if ( txnOptions & GP_OPT_REPEATABLE_READ) return XACT_REPEATABLE_READ; else return XACT_READ_UNCOMMITTED; } bool isMppTxOptions_ReadOnly(int txnOptions) { return ((txnOptions & GP_OPT_READ_ONLY) != 0); } /* unpackMppTxnOptions: * Unpack an int containing the appropriate flags to direct the remote * segdb QE process to perform any needed transaction commands before or * after the statement. */ void unpackMppTxnOptions(int txnOptions, int *isoLevel, bool *readOnly) { *isoLevel = mppTxOptions_IsoLevel(txnOptions); *readOnly = isMppTxOptions_ReadOnly(txnOptions); } /* isMppTxOptions_StatementWantsDtxTransaction: * Return the NeedTwoPhase flag. */ bool isMppTxOptions_NeedTwoPhase(int txnOptions) { return ((txnOptions & GP_OPT_NEED_TWO_PHASE) != 0); } /* isMppTxOptions_ExplicitBegin: * Return the ExplicitBegin flag. */ bool isMppTxOptions_ExplicitBegin(int txnOptions) { return ((txnOptions & GP_OPT_EXPLICT_BEGIN) != 0); } /* acquire tm lw lock */ void getTmLock(void) { if (ControlLockCount++ == 0) LWLockAcquire(shmControlLock, LW_EXCLUSIVE); } /* release tm lw lock */ void releaseTmLock(void) { if (--ControlLockCount == 0) LWLockRelease(shmControlLock); } bool isTMInRecovery(void) { return duringRecovery; } /* * Redo transaction commit log record. */ void redoDtxCheckPoint(TMGXACT_CHECKPOINT *gxact_checkpoint) { int committedCount; int segmentCount; int i; /* * For checkpoint same as REDO, lets add entries to file in utility and in-memory if Dispatch. */ committedCount = gxact_checkpoint->committedCount; segmentCount = gxact_checkpoint->segmentCount; elog(DTM_DEBUG5, "redoDtxCheckPoint has committedCount = %d, " "segmentCount = %d", committedCount, segmentCount); if (Debug_persistent_recovery_print) { elog(PersistentRecovery_DebugPrintLevel(), "redoDtxCheckPoint: committedCount = %d", committedCount); } for (i = 0; i < committedCount; i++) { redoDistributedCommitRecord(&gxact_checkpoint->committedGxactArray[i]); } } static void GetRedoFileName(char *path) { snprintf(path, MAXPGPATH, "%s/" UTILITYMODEDTMREDO_DIR "/" UTILITYMODEDTMREDO_FILE, DataDir); elog(DTM_DEBUG3, "Returning save DTM redo file path = %s",path); } void UtilityModeFindOrCreateDtmRedoFile(void) { char path[MAXPGPATH]; if (Gp_role != GP_ROLE_UTILITY) { elog(DTM_DEBUG3, "Not in Utility Mode (role = %s) -- skipping finding or creating DTM redo file", role_to_string(Gp_role)); return; } GetRedoFileName(path); redoFileFD = open(path, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); if (redoFileFD < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not create save DTM redo file \"%s\"", path))); } redoFileOffset = lseek(redoFileFD, 0, SEEK_END); elog(DTM_DEBUG3, "Succesfully opened DTM redo file %s (end offset %d)", path, redoFileOffset); } /* * */ static void UtilityModeSaveRedo(bool committed, TMGXACT_LOG * gxact_log) { TMGXACT_UTILITY_MODE_REDO utilityModeRedo; int write_len; utilityModeRedo.committed = committed; memcpy(&utilityModeRedo.gxact_log, gxact_log, sizeof(TMGXACT_LOG)); elog(DTM_DEBUG5, "Writing {committed = %s, gid = %s, gxid = %u} to DTM redo file", (utilityModeRedo.committed ? "true" : "false"), utilityModeRedo.gxact_log.gid, utilityModeRedo.gxact_log.gxid); write_len = write(redoFileFD, &utilityModeRedo, sizeof(TMGXACT_UTILITY_MODE_REDO)); if (write_len != sizeof(TMGXACT_UTILITY_MODE_REDO)) { ereport(ERROR, (errcode_for_file_access(), errmsg("could not write save DTM redo file : %m"))); } } void UtilityModeCloseDtmRedoFile(void) { if (Gp_role != GP_ROLE_UTILITY) { elog(DTM_DEBUG3, "Not in Utility Mode (role = %s)-- skipping closing DTM redo file", role_to_string(Gp_role)); return; } elog(DTM_DEBUG3, "Closing DTM redo file"); close(redoFileFD); } static void ReplayRedoFromUtilityMode(void) { TMGXACT_UTILITY_MODE_REDO utilityModeRedo; int fd; int read_len; int errno; char path[MAXPGPATH]; int entries; entries = 0; GetRedoFileName(path); fd = open(path, O_RDONLY, 0); if (fd < 0) { /* UNDONE: Distinquish "not found" from other errors. */ elog(DTM_DEBUG3, "Could not open DTM redo file %s for reading", path); return; } elog(DTM_DEBUG3, "Succesfully opened DTM redo file %s for reading", path); while(true) { errno = 0; read_len = read(fd, &utilityModeRedo, sizeof(TMGXACT_UTILITY_MODE_REDO)); if (read_len == 0) break; else if (read_len != sizeof(TMGXACT_UTILITY_MODE_REDO) && errno == 0) elog(ERROR, "Bad redo length (expected %d and found %d)", (int)sizeof(TMGXACT_UTILITY_MODE_REDO), read_len); else if (errno != 0) { close(fd); ereport(ERROR, (errcode_for_file_access(), errmsg("error reading DTM redo file: %m"))); } elog(DTM_DEBUG5, "Read {committed = %s, gid = %s, gxid = %u} from DTM redo file", (utilityModeRedo.committed ? "true" : "false"), utilityModeRedo.gxact_log.gid, utilityModeRedo.gxact_log.gxid); if (utilityModeRedo.committed) { redoDistributedCommitRecord(&utilityModeRedo.gxact_log); } else { redoDistributedForgetCommitRecord(&utilityModeRedo.gxact_log); } entries++; } elog(DTM_DEBUG5, "Processed %d entries from DTM redo file", entries); close(fd); } static void RemoveRedoUtilityModeFile(void) { char path[MAXPGPATH]; bool removed; GetRedoFileName(path); removed = (unlink(path) == 0); elog(DTM_DEBUG5, "Removed DTM redo file %s (%s)", path, (removed ? "true" : "false")); } /* * Redo transaction commit log record. */ void redoDistributedCommitRecord(TMGXACT_LOG * gxact_log) { int i; /* * The length check here requires the identifer have a trailing NUL character. */ if (strlen(gxact_log->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(gxact_log->gid)); if (Gp_role == GP_ROLE_UTILITY) { elog(DTM_DEBUG3, "DB in Utility mode. Save DTM distributed commit until later."); UtilityModeSaveRedo(true, gxact_log); return; } for (i = 0; i < *shmNumGxacts; i++) { if (strcmp(gxact_log->gid, shmGxactArray[i]->gid) == 0) { /* found an active global transaction */ currentGxact = shmGxactArray[i]; break; } } if (i == *shmNumGxacts) { /* * Transaction not found, this is the first log of this transaction. */ if (*shmNumGxacts >= max_tm_gxacts) { ereport(FATAL, (errmsg("the limit of %d distributed transactions has been reached.", max_tm_gxacts), errdetail("The global user configuration (GUC) server parameter max_prepared_transactions controls this limit."))); } currentGxact = shmGxactArray[(*shmNumGxacts)++]; } restoreGxact(gxact_log, DTX_STATE_CRASH_COMMITTED); elog((Debug_print_full_dtm ? LOG : DEBUG5), "Crash recovery redo added committed distributed transaction gid = %s",currentGxact->gid); elog((Debug_print_full_dtm ? INFO : DEBUG5), "Crash recovery redo added committed distributed transaction gid = %s",currentGxact->gid); /* * Don't leave the currentGxact point in-use. */ currentGxact = NULL; } /* * Redo transaction forget commit log record. */ void redoDistributedForgetCommitRecord(TMGXACT_LOG * gxact_log) { int i; if (Gp_role == GP_ROLE_UTILITY) { elog(DTM_DEBUG3, "DB in Utility mode. Save DTM disributed forget until later."); UtilityModeSaveRedo(false, gxact_log); return; } for (i = 0; i < *shmNumGxacts; i++) { if (strcmp(gxact_log->gid, shmGxactArray[i]->gid) == 0) { /* found an active global transaction */ currentGxact = shmGxactArray[i]; elog((Debug_print_full_dtm ? INFO : DEBUG5), "Crash recovery redo removed committed distributed transaction gid = %s for forget", currentGxact->gid); releaseGxact(); return; } } elog((Debug_print_full_dtm ? WARNING : DEBUG5), "Crash recovery redo did not find committed distributed transaction gid = %s for forget", gxact_log->gid); } static void descGxactLog(StringInfo buf, TMGXACT_LOG * gxact_log) { appendStringInfo(buf, " gid = %s, gxid = %u", gxact_log->gid, gxact_log->gxid); } /* * Describe redo transaction commit log record. */ void descDistributedCommitRecord(StringInfo buf, TMGXACT_LOG * gxact_log) { descGxactLog(buf, gxact_log); } /* * Describe redo transaction forget commit log record. */ void descDistributedForgetCommitRecord(StringInfo buf, TMGXACT_LOG * gxact_log) { descGxactLog(buf, gxact_log); } /*========================================================================= * HELPER FUNCTIONS */ static bool doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags, char* gid, DistributedTransactionId gxid, bool *badGangs, bool raiseError, CdbDispatchDirectDesc *direct, char *serializedArgument, int serializedArgumentLen) { int i, resultCount, numOfFailed = 0; char *dtxProtocolCommandStr = 0; struct pg_result **results = NULL; StringInfoData errbuf; dtxProtocolCommandStr = DtxProtocolCommandToString(dtxProtocolCommand); if ( Test_print_direct_dispatch_info ) { if ( direct->directed_dispatch) elog(INFO, "Distributed transaction command '%s' to SINGLE content", dtxProtocolCommandStr); else elog(INFO, "Distributed transaction command '%s' to ALL contents", dtxProtocolCommandStr); } elog(DTM_DEBUG5, "dispatchDtxProtocolCommand: %d ('%s'), direct content #: %d", dtxProtocolCommand, dtxProtocolCommandStr, direct->directed_dispatch ? direct->content[0] : -1); initStringInfo(&errbuf); results = cdbdisp_dispatchDtxProtocolCommand(dtxProtocolCommand, flags, dtxProtocolCommandStr, gid, gxid, &errbuf, &resultCount, badGangs, direct, serializedArgument, serializedArgumentLen); if (errbuf.len > 0) { ereport((raiseError ? ERROR : LOG), (errmsg("DTM error (gathered %d results from cmd '%s')", resultCount, dtxProtocolCommandStr), errdetail("%s", errbuf.data))); return false; } Assert(results != NULL); if (results == NULL) { numOfFailed++; /* If we got no results, we need to treat it as an error! */ } for (i = 0; i < resultCount; i++) { char *cmdStatus; ExecStatusType resultStatus; /* note: PQresultStatus() is smart enough to deal with results[i] == NULL */ resultStatus = PQresultStatus(results[i]); if (resultStatus != PGRES_COMMAND_OK && resultStatus != PGRES_TUPLES_OK) { numOfFailed++; } else { /* * success ? If an error happened during a transaction which * hasn't already been caught when we try a prepare we'll get a * rollback from our prepare ON ONE SEGMENT: so we go look at the * status, otherwise we could issue a COMMIT when we don't want * to! */ cmdStatus = PQcmdStatus(results[i]); elog(DEBUG3, "DTM: status message cmd '%s' [%d] result '%s'", dtxProtocolCommandStr, i, cmdStatus); if (strncmp(cmdStatus, dtxProtocolCommandStr, strlen(cmdStatus)) != 0) { /* failed */ numOfFailed++; } } } /* discard the errbuf text */ pfree(errbuf.data); /* Now we clean up the results array. */ cleanRMResultSets(results); return (numOfFailed == 0); } bool dispatchDtxCommand(const char *cmd, bool withSnapshot, bool raiseError) { int i, resultCount, numOfFailed = 0; struct pg_result **results = NULL; StringInfoData errbuf; elog(DTM_DEBUG5, "dispatchDtxCommand: '%s'", cmd); initStringInfo(&errbuf); results = cdbdisp_dispatchRMCommand(cmd, withSnapshot, &errbuf, &resultCount); if (errbuf.len > 0) { ereport((raiseError ? ERROR : WARNING), (errmsg("DTM error (gathered %d results from cmd '%s')", resultCount, cmd), errdetail("%s", errbuf.data))); return false; } Assert(results != NULL); if (results == NULL) { numOfFailed++; /* If we got no results, we need to treat it as an error! */ } for (i = 0; i < resultCount; i++) { char *cmdStatus; ExecStatusType resultStatus; /* note: PQresultStatus() is smart enough to deal with results[i] == NULL */ resultStatus = PQresultStatus(results[i]); if (resultStatus != PGRES_COMMAND_OK && resultStatus != PGRES_TUPLES_OK) { numOfFailed++; } else { /* * success ? If an error happened during a transaction which * hasn't already been caught when we try a prepare we'll get a * rollback from our prepare ON ONE SEGMENT: so we go look at the * status, otherwise we could issue a COMMIT when we don't want * to! */ cmdStatus = PQcmdStatus(results[i]); elog(DEBUG3, "DTM: status message cmd '%s' [%d] result '%s'", cmd, i, cmdStatus); if (strncmp(cmdStatus, cmd, strlen(cmdStatus)) != 0) { /* failed */ numOfFailed++; } } } /* discard the errbuf text */ pfree(errbuf.data); /* Now we clean up the results array. */ cleanRMResultSets(results); return (numOfFailed == 0); } /* initialize a global transaction context */ static void initGxact(TMGXACT * gxact) { MemSet(gxact->gid, 0, TMGIDSIZE); gxact->gxid = InvalidDistributedTransactionId; setGxactState(gxact, DTX_STATE_NONE); /* * Memory only fields. */ gxact->sessionId = gp_session_id; gxact->localXid = InvalidTransactionId; gxact->explicitBeginRemembered = false; LocalDistribXactRef_Init(&gxact->localDistribXactRef); gxact->xminDistributedSnapshot = 0; gxact->bumpedPhase1Count = false; gxact->badPrepareGangs = false; gxact->retryPhase2RecursionStop = false; gxact->directTransaction = false; gxact->directTransactionContentId = 0; } void getAllDistributedXactStatus(TMGALLXACTSTATUS **allDistributedXactStatus) { TMGALLXACTSTATUS *all; int count; all = palloc(sizeof(TMGALLXACTSTATUS)); all->next = 0; all->count = 0; all->statusArray = NULL; if (shmDtmStarted != NULL && *shmDtmStarted) { getTmLock(); count = *shmNumGxacts; if (count > 0) { int i; all->statusArray = palloc(MAXALIGN(count * sizeof(TMGXACTSTATUS))); for (i = 0; i < count; i++) { TMGXACT *gxact = shmGxactArray[i]; all->statusArray[i].gxid = gxact->gxid; if (strlen(gxact->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(gxact->gid)); memcpy(all->statusArray[i].gid, gxact->gid, TMGIDSIZE); all->statusArray[i].state = gxact->state; all->statusArray[i].sessionId = gxact->sessionId; all->statusArray[i].xminDistributedSnapshot = gxact->xminDistributedSnapshot; } all->count = count; } releaseTmLock(); } *allDistributedXactStatus = all; } bool getNextDistributedXactStatus(TMGALLXACTSTATUS *allDistributedXactStatus, TMGXACTSTATUS **distributedXactStatus) { if (allDistributedXactStatus->next >= allDistributedXactStatus->count) { return false; } *distributedXactStatus = &allDistributedXactStatus->statusArray[allDistributedXactStatus->next]; allDistributedXactStatus->next++; return true; } /* * DistributedSnapshotMappedEntry_Compare: * A compare function for array of DistributedSnapshotMapEntry * for use with qsort. */ static int DistributedSnapshotMappedEntry_Compare(const void *p1, const void *p2) { const DistributedSnapshotMapEntry *entry1 = (DistributedSnapshotMapEntry *) p1; const DistributedSnapshotMapEntry *entry2 = (DistributedSnapshotMapEntry *) p2; if (entry1->distribXid == entry2->distribXid) return 0; else if (entry1->distribXid > entry2->distribXid) return 1; else return -1; } bool createDtxSnapshot( DistributedSnapshotWithLocalMapping *distribSnapshotWithLocalMapping) { int globalCount; int i; TMGXACT *gxact_candidate; DtxState state; int count; DistributedTransactionId xmin; DistributedTransactionId xmax; DistributedTransactionId inProgressXid; DistributedSnapshotId distribSnapshotId; DistributedTransactionId xminAllDistributedSnapshots; DistributedSnapshotMapEntry *inProgressEntryArray; if (currentGxact == NULL) { elog(DTM_DEBUG5, "createDtxSnapshot found currentGxact is NULL"); return false; } xmin = LastDistributedTransactionId; /* * This is analogous to the code in GetSnapshotData() (which calls * ReadNewTransactionId(), the distributed-xmax of a transaction * is the last distributed-xmax available */ xmax = LocalDistribXact_GetMaxDistributedXid(); count = 0; inProgressEntryArray = distribSnapshotWithLocalMapping->inProgressEntryArray; getTmLock(); /* * Gather up current in-progress global transactions for the * distributed snapshot. */ globalCount = *shmNumGxacts; for (i = 0; i < globalCount; i++) { gxact_candidate = shmGxactArray[i]; state = gxact_candidate->state; switch (state) { case DTX_STATE_ACTIVE_NOT_DISTRIBUTED: case DTX_STATE_ACTIVE_DISTRIBUTED: case DTX_STATE_PREPARING: case DTX_STATE_PREPARED: case DTX_STATE_INSERTED_COMMITTED: case DTX_STATE_FORCED_COMMITTED: case DTX_STATE_NOTIFYING_COMMIT_PREPARED: case DTX_STATE_NOTIFYING_ABORT_NO_PREPARED: case DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED: case DTX_STATE_NOTIFYING_ABORT_PREPARED: case DTX_STATE_RETRY_COMMIT_PREPARED: case DTX_STATE_RETRY_ABORT_PREPARED: /* * Active or commit/abort not complete. Keep this transaction * considered for distributed snapshots. */ break; case DTX_STATE_INSERTED_FORGET_COMMITTED: elog(FATAL, "Should not see this transitional state with TM lock held -- dtx state \"%s\" not expected here", DtxStateToString(state)); break; case DTX_STATE_INSERTING_COMMITTED: elog(FATAL, "Cannot also be inserting COMMITTED into log buffer from another process with TM lock held"); break; case DTX_STATE_INSERTING_FORGET_COMMITTED: elog(FATAL, "Cannot also be inserting FORGET COMMITTED into log buffer from another process with TM lock held"); break; case DTX_STATE_CRASH_COMMITTED: /* * From a previous system incarnation. */ continue; default: elog(PANIC, "Unexpected dtm state: %d", (int) state); break; } /* * Include the current distributed transaction in the min/max * calculation. */ inProgressXid = gxact_candidate->gxid; if (inProgressXid < xmin) { xmin = inProgressXid; } if (inProgressXid > xmax) { xmax = inProgressXid; } if (gxact_candidate == currentGxact) continue; if (count >= distribSnapshotWithLocalMapping->header.maxCount) elog(ERROR, "Too many distributed transactions for snapshot"); inProgressEntryArray[count].distribXid = inProgressXid; inProgressEntryArray[count].localXid = gxact_candidate->localXid; count++; elog(DTM_DEBUG5, "createDtxSnapshot added inProgressXid = %u (and local xid = %u) to distributed snapshot", inProgressEntryArray[count].distribXid, inProgressEntryArray[count].localXid); } distribSnapshotId = (*shmNextSnapshotId)++; xminAllDistributedSnapshots = *shmXminAllDistributedSnapshots; releaseTmLock(); /* * Sort the entry {distribXid, localXid} * to support the QEs doing culls on their DisribToLocalXact sorted lists. */ qsort( inProgressEntryArray, count, sizeof(DistributedSnapshotMapEntry), DistributedSnapshotMappedEntry_Compare); /* * Copy the information we just captured under lock and then sorted into * the distributed snapshot. */ distribSnapshotWithLocalMapping->header.distribTransactionTimeStamp = *shmDistribTimeStamp; distribSnapshotWithLocalMapping->header.xminAllDistributedSnapshots = xminAllDistributedSnapshots; distribSnapshotWithLocalMapping->header.distribSnapshotId = distribSnapshotId; distribSnapshotWithLocalMapping->header.xmin = xmin; distribSnapshotWithLocalMapping->header.xmax = xmax; distribSnapshotWithLocalMapping->header.count = count; if (xmin < currentGxact->xminDistributedSnapshot) currentGxact->xminDistributedSnapshot = xmin; elog(DTM_DEBUG5, "createDtxSnapshot distributed snapshot has xmin = %u, count = %u, xmax = %u.", xmin, count, xmax); elog((Debug_print_snapshot_dtm ? LOG : DEBUG5), "[Distributed Snapshot #%u] *Create* (gxid = %u, '%s')", distribSnapshotId, currentGxact->gxid, DtxContextToString(DistributedTransactionContext)); return true; } /* * Introduced by MPP-9829. * * Fail safe in regards to the shmControlLock and ProcArrayLock. * * Generally speaking, we defer to LWLockReleaseAll to clean up our held_lwlocks. In * the case of an ERROR, this results from AbortTransaction. And in the case of a FATAL, * this results from a ProcKill towards the end of proc_exit. * * Unfortunately, two subtleties are dealt with here. In the case of an ERROR where this * current process continues to exist, ControlLockCount must be reset to coincide with * the cleanup of shmControlLock. Also, during a FATAL, we'd like to release the ProcArrayLock; * otherwise, we'd PANIC when trying to reacquire the ProcArrayLock for the purposes * of the ProcArrayRemove nearing the end of proc_exit. */ static void createDtxErrorCallback(int code, Datum arg) { HOLD_INTERRUPTS(); ControlLockCount = 1; releaseTmLock(); HOLD_INTERRUPTS(); LWLockRelease(ProcArrayLock); } /* * Create a global transaction context from share memory. */ void createDtx(DistributedTransactionId *distribXid, TransactionId *localXid) { TMGXACT *gxact; MIRRORED_LOCK_DECLARE; /* * Global locking order: ProcArrayLock then DTM lock. */ MIRRORED_LOCK; LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); getTmLock(); if (*shmNumGxacts >= max_tm_gxacts) { dumpAllDtx(); releaseTmLock(); LWLockRelease(ProcArrayLock); ereport(FATAL, (errmsg("the limit of %d distributed transactions has been reached.", max_tm_gxacts), errdetail("The global user configuration (GUC) server parameter max_prepared_transactions controls this limit."))); } gxact = shmGxactArray[(*shmNumGxacts)++]; initGxact(gxact); generateGID(gxact->gid, &gxact->gxid); /* * MPP-9829: don't hold these if we're throwing an error. if * we *do* hold them, we'll just cause a PANIC later. */ PG_ENSURE_ERROR_CLEANUP(createDtxErrorCallback, 0); { /* * This routine requires the ProcArrayLock to already be held. */ LocalDistribXact_StartOnMaster( *shmDistribTimeStamp, gxact->gxid, &gxact->localXid, &gxact->localDistribXactRef); *distribXid = gxact->gxid; *localXid = gxact->localXid; } PG_END_ENSURE_ERROR_CLEANUP(createDtxErrorCallback, 0); /* * Until we get our first distributed snapshot, we use our * distributed transaction identifier for the minimum. */ gxact->xminDistributedSnapshot = gxact->gxid; setGxactState(gxact, DTX_STATE_ACTIVE_NOT_DISTRIBUTED); releaseTmLock(); LWLockRelease(ProcArrayLock); MIRRORED_UNLOCK; currentGxact = gxact; elog(DTM_DEBUG5, "createDtx created new distributed transaction gid = %s, gxid = %u.", currentGxact->gid, currentGxact->gxid); } /* * Release global transaction's shared memory. * Must already hold ProcArrayLock and the DTM lock. */ static void releaseGxact_UnderLocks(void) { int i; int curr; if (currentGxact == NULL) { elog(FATAL, "releaseGxact expected currentGxact to not be NULL"); } elog(DTM_DEBUG5, "releaseGxact called for gid = %s (index = %d)", currentGxact->gid, currentGxact->debugIndex); /* * Protected by ProcArrayLock. */ LocalDistribXactRef_ReleaseUnderLock( ¤tGxact->localDistribXactRef); /* find slot of current transaction */ /* * To maintain shmXminAllDistributedSnapshots we need to re-calculate, * so we must scan the whole array for that purpose. When we see * the current one, we note its index and exclude it from the xmin * calculation. */ curr = *shmNumGxacts; /* A bad value we can safely test. */ *shmXminAllDistributedSnapshots = InvalidDistributedTransactionId; for (i = 0; i < *shmNumGxacts; i++) { if (shmGxactArray[i] == currentGxact) { curr = i; } else { DistributedTransactionId xminDistributedSnapshot = shmGxactArray[i]->xminDistributedSnapshot; if (xminDistributedSnapshot != InvalidDistributedTransactionId) { /* * Check for minimum. */ if (*shmXminAllDistributedSnapshots == InvalidDistributedTransactionId) { /* * Always set first one other than current being released. */ *shmXminAllDistributedSnapshots = xminDistributedSnapshot; } else if (xminDistributedSnapshot < *shmXminAllDistributedSnapshots) { *shmXminAllDistributedSnapshots = xminDistributedSnapshot; } } } } elog(DTM_DEBUG5, "releaseGxact re-calculated all distributed snapshots xmin = %u for %d global transactions.", *shmXminAllDistributedSnapshots, *shmNumGxacts-1); /* move this to the next available slot */ (*shmNumGxacts)--; if (curr != *shmNumGxacts) { shmGxactArray[curr] = shmGxactArray[*shmNumGxacts]; shmGxactArray[*shmNumGxacts] = currentGxact; } if (currentGxact->bumpedPhase1Count) { (*shmCurrentPhase1Count)--; } currentGxact = NULL; } /* * Release global transaction's shared memory. */ static void releaseGxact(void) { /* * Global locking order: ProcArrayLock then DTM lock. */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); getTmLock(); releaseGxact_UnderLocks(); releaseTmLock(); LWLockRelease(ProcArrayLock); } /* * Get lock that serializes commits with DTM checkpoint info. * Change state to DTX_STATE_INSERTING_COMMITTED. */ void insertingDistributedCommitted(void) { elog(DTM_DEBUG5, "insertingDistributedCommitted entering in state = %s", DtxStateToString(currentGxact->state)); getTmLock(); Assert(currentGxact->state == DTX_STATE_PREPARED); setCurrentGxactState( DTX_STATE_INSERTING_COMMITTED ); } /* * Change state to DTX_STATE_INSERTED_COMMITTED. * Release lock. */ void insertedDistributedCommitted(void) { elog(DTM_DEBUG5, "insertedDistributedCommitted entering in state = %s for gid = %s", DtxStateToString(currentGxact->state), currentGxact->gid); Assert(currentGxact->state == DTX_STATE_INSERTING_COMMITTED); setCurrentGxactState( DTX_STATE_INSERTED_COMMITTED ); releaseTmLock(); } /* * Change state to DTX_STATE_FORCED_COMMITTED. */ void forcedDistributedCommitted(XLogRecPtr *recptr) { elog(DTM_DEBUG5, "forcedDistributedCommitted entering in state = %s for gid = %s (xlog record %X/%X)", DtxStateToString(currentGxact->state), currentGxact->gid, recptr->xlogid, recptr->xrecoff); getTmLock(); Assert(currentGxact->state == DTX_STATE_INSERTED_COMMITTED); setCurrentGxactState( DTX_STATE_FORCED_COMMITTED ); releaseTmLock(); } /* * Get check point information * * Whether DTM started or not, we must always store DTM information in * this checkpoint record. A possible case to consider is we might have * in-progress global transactions in shared memory after postmaster reset, * and shutting down without performing DTM recovery. The subsequent * recovery after this shutdown will read this checkpoint, so we would * lose the in-progress global transaction information if we didn't write it * here. Note we will certainly read this global transaction information * even if this is a clean shutdown (i.e. not performing multi-pass recovery.) */ void getDtxCheckPointInfoAndLock(char **result, int *result_size) { TMGXACT *gxact; DtxState state; int n; TMGXACT_CHECKPOINT *gxact_checkpoint; TMGXACT_LOG *gxact_log_array; int i; int actual; TMGXACT_LOG *gxact_log; getTmLock(); /* We will return with lock held below. */ n = *shmNumGxacts; gxact_checkpoint = palloc(TMGXACT_CHECKPOINT_BYTES(n)); gxact_log_array = &gxact_checkpoint->committedGxactArray[0]; actual = 0; for (i = 0; i < n; i++) { gxact = shmGxactArray[i]; gxact_log = &gxact_log_array[actual]; state = gxact->state; switch (state) { case DTX_STATE_ACTIVE_NOT_DISTRIBUTED: case DTX_STATE_ACTIVE_DISTRIBUTED: case DTX_STATE_PREPARING: case DTX_STATE_PREPARED: /* * Active or attempting to commit -- ignore. */ continue; case DTX_STATE_INSERTED_FORGET_COMMITTED: elog(FATAL, "Should not see this transitional state with TM lock held -- dtx state \"%s\" not expected here", DtxStateToString(state)); continue; case DTX_STATE_INSERTING_COMMITTED: elog(FATAL, "Cannot also be buffering COMMITTED from another process with TM lock held"); continue; case DTX_STATE_INSERTING_FORGET_COMMITTED: elog(FATAL, "Cannot also be buffering FORGET COMMITTED from another process with TM lock held"); continue; case DTX_STATE_INSERTED_COMMITTED: case DTX_STATE_FORCED_COMMITTED: case DTX_STATE_NOTIFYING_COMMIT_PREPARED: case DTX_STATE_RETRY_COMMIT_PREPARED: case DTX_STATE_CRASH_COMMITTED: break; case DTX_STATE_NOTIFYING_ABORT_NO_PREPARED: case DTX_STATE_NOTIFYING_ABORT_SOME_PREPARED: case DTX_STATE_NOTIFYING_ABORT_PREPARED: case DTX_STATE_RETRY_ABORT_PREPARED: /* * We don't checkpoint abort. */ continue; default: elog(PANIC, "Unexpected dtm state: %d", (int) state); break; } if (strlen(gxact->gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(gxact->gid)); memcpy(gxact_log->gid, gxact->gid, TMGIDSIZE); gxact_log->gxid = gxact->gxid; elog(DTM_DEBUG5, "Add DTM checkpoint entry gid = %s.", gxact->gid); if (Debug_persistent_recovery_print) { SUPPRESS_ERRCONTEXT_DECLARE; SUPPRESS_ERRCONTEXT_PUSH(); elog(PersistentRecovery_DebugPrintLevel(), "getDtxCheckPointInfoAndLock[%d]: distributed transaction identifier %s (distributed xid %u)", actual, gxact_log->gid, gxact_log->gxid); SUPPRESS_ERRCONTEXT_POP(); } actual++; } gxact_checkpoint->committedCount = actual; gxact_checkpoint->segmentCount = 0; /* UNDONE: Not used yet. */ *result = (char*)gxact_checkpoint; *result_size = TMGXACT_CHECKPOINT_BYTES(actual); /* Return with lock held. */ elog(DTM_DEBUG5, "Filled in DTM checkpoint information (count = %d).", actual); } void freeDtxCheckPointInfoAndUnlock(char *info, int info_size __attribute__((unused)) , XLogRecPtr *recptr) { pfree(info); releaseTmLock(); elog(DTM_DEBUG5, "Checkpoint with DTM information written at %X/%X.", recptr->xlogid, recptr->xrecoff); } /* generate global transaction id */ static void generateGID(char *gid, DistributedTransactionId *gxid) { /* tm lock acquired by caller */ if (*shmGIDSeq >= LastDistributedTransactionId) { releaseTmLock(); ereport(FATAL, (errmsg("reached limit of %u global transactions per start", LastDistributedTransactionId))); } *gxid = (*shmGIDSeq)++; sprintf(gid, "%u-%.10u", *shmDistribTimeStamp, (*gxid)); if (strlen(gid) >= TMGIDSIZE) elog(PANIC, "Distribute transaction identifier too long (%d)", (int)strlen(gid)); } /* * recoverTM: * perform TM recovery, this connects to all QE and resolve all in-doubt txn. * * This gets called when there is not any other DTM activity going on. * * First, we'll replay the dtm log and get our shmem as up to date as possible * in order to help resolve in-doubt transactions. Then we'll go through and * try and resolve in-doubt transactions based on information in the DTM log. * The remaining in-doubt transactions that remain (ones the DTM doesn't know * about) are all ABORTed. * * If we're in read-only mode; we need to get started, but we can't run the * full recovery. So we go get the highest distributed-xid, but don't run * the recovery. */ static void recoverTM(void) { DistributedTransactionId segmentMaxDistributedTransactionId = 0; /* intialize fts sync count */ verifyFtsSyncCount(); elog(DTM_DEBUG3, "Starting to Recover DTM..."); /* * do not do recovery if read only mode is set. * in this case, there may be in-doubt transaction in down segdb * , which will not be resolved at this time. */ if (isFtsReadOnlySet()) { elog(DTM_DEBUG3, "FTS is Read Only. Defer DTM recovery till later."); if (currentGxact != NULL) { elog(DTM_DEBUG5, "recoverTM setting currentGxact to NULL for gid = %s (index = %d)", currentGxact->gid, currentGxact->debugIndex); } currentGxact = NULL; *shmDtmRecoveryDeferred = true; } else *shmDtmRecoveryDeferred = false; if (Gp_role == GP_ROLE_UTILITY) { elog(DTM_DEBUG3, "DB in Utility mode. Defer DTM recovery till later."); return; } if (!*shmDtmRecoveryDeferred) { /* * Attempt to recover all in-doubt transactions. * * first resolve all in-doubt transactions from the DTM's perspective * and then resolve any remaining in-doubt transactions that the * RMs have. */ recoverInDoubtTransactions(); } else elog(LOG, "DTM starting in readonly-mode: deferring recovery"); /* finished recovery successfully. */ segmentMaxDistributedTransactionId = determineSegmentMaxDistributedXid(); if (segmentMaxDistributedTransactionId >= *shmGIDSeq) { *shmGIDSeq = segmentMaxDistributedTransactionId + 1; } elog(LOG, "Next distributed transaction id is %u", *shmGIDSeq); *shmDtmStarted = true; elog(LOG, "DTM Started"); } static bool deferredRecoveryActive=false; /* * When running in readonly-mode, we may wind up in a state where we've deferred * recovery waiting for a segment to come online. */ void cdbtm_performDeferredRecovery(void) { if (deferredRecoveryActive) return; if (*shmDtmRecoveryDeferred) { getTmLock(); if (*shmDtmRecoveryDeferred) { deferredRecoveryActive=true; PG_TRY(); { recoverInDoubtTransactions(); deferredRecoveryActive=false; } PG_CATCH(); { deferredRecoveryActive=false; PG_RE_THROW(); } PG_END_TRY(); *shmDtmRecoveryDeferred = false; elog(NOTICE, "Releasing segworker groups for deferred recovery."); disconnectAndDestroyAllGangs(); resetSessionForPrimaryGangLoss(); } releaseTmLock(); CheckForResetSession(); } } /* recoverInDoubtTransactions: * Go through all in-doubt transactions that the DTM knows about and * resolve them. */ static bool recoverInDoubtTransactions(void) { int i; HTAB *htab; TMGXACT *saved_currentGxact; elog(DTM_DEBUG3, "recover in-doubt distributed transactions"); ReplayRedoFromUtilityMode(); /* * For each committed transaction found in the redo pass that was not * matched by a forget committed record, change its state indicating * committed notification needed. Attempt a notification. */ elog(DTM_DEBUG5, "Going to retry commit notification for distributed transactions (count = %d)", *shmNumGxacts); dumpAllDtx(); saved_currentGxact = currentGxact; for (i = 0; i < *shmNumGxacts; i++) { TMGXACT *gxact = shmGxactArray[i]; /* * MPP-4867: if we are running deferred, skip any transaction we're already inside. */ if (saved_currentGxact != NULL && gxact == saved_currentGxact) { continue; } else if (gxact->state == DTX_STATE_ACTIVE_NOT_DISTRIBUTED) { /* should take care of other sessions. */ continue; } if (gxact->state != DTX_STATE_CRASH_COMMITTED) { dumpAllDtx(); elog(PANIC, "recoverInDoubtTransactions found transaction in '%s' state and was expecting it to be in '%s' state", DtxStateToString(gxact->state), DtxStateToString(DTX_STATE_CRASH_COMMITTED)); } elog(DTM_DEBUG5, "Recovering committed distributed transaction gid = %s", gxact->gid); /* * Can't start system if we cannot deliiver commits. */ doNotifyCommittedInDoubt(gxact->gid); /* * This means there was atleast one in-doubt transactions for * whom prepared commit was sent out. Hence, we've a chance to * perform PT-Catalog verification if requested. */ if (debug_persistent_ptcat_verification) Persistent_Set_PostDTMRecv_PTCatVerificationNeeded(); currentGxact = gxact; /* * Global locking order: ProcArrayLock then DTM lock since * calls doInsertForgetCommitted calls releaseGxact. */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); getTmLock(); doInsertForgetCommitted(); releaseTmLock(); LWLockRelease(ProcArrayLock); } currentGxact = saved_currentGxact; dumpAllDtx(); /* * UNDONE: Thus, any in-doubt transctions found will be for aborted transactions. * UNDONE: Gather in-boubt transactions and issue aborts. */ htab = gatherRMInDoubtTransactions(); /* * go through and resolve any remaining in-doubt transactions that the * RM's have AFTER recoverDTMInDoubtTransactions. ALL of these in doubt * transactions will be ABORT'd. The fact that the DTM doesn't know about * them means that something bad happened before everybody voted to * COMMIT. */ abortRMInDoubtTransactions(htab); /* * If there were any In-Doubt transactions collected from the segment * instances and if the verification checks are requested then make the flag * green so that the system is informed to run the verification * checks post DTM is recovered */ if (debug_persistent_ptcat_verification && htab) Persistent_Set_PostDTMRecv_PTCatVerificationNeeded(); /* get rid of the hashtable */ hash_destroy(htab); /* yes... we are paranoid and will double check */ htab = gatherRMInDoubtTransactions(); /* * Hmm. we still have some remaining indoubt transactions. For now we * dont have an automated way to clean this mess up. So we'll have to * rely on smart Admins to do the job manually. We'll error out of here * and try and provide as much info as possible. * * TODO: We really want to be able to say this particular segdb has these * remaining in-doubt transactions. */ if (htab != NULL) { StringInfoData indoubtBuff; initStringInfo(&indoubtBuff); dumpAllDtx(); dumpRMOnlyDtx(htab, &indoubtBuff); ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("DTM Log recovery failed. There are still unresolved " "in-doubt transactions on some of the segment databaes " "that were not able to be resolved for an unknown reason. "), errdetail("Here is a list of in-doubt transactions in the system: %s", indoubtBuff.data), errhint("Try restarting the Greenplum Database array. If the problem persists " " an Administrator will need to resolve these transactions " " manually."))); } RemoveRedoUtilityModeFile(); return true; } /* * determineSegmentMaxDistributedXid: * * Since we don't know whether the Segment Databases were left over from * an earlier start or left over from a postmaster system reset due to * a server process failure, we must go out and determine the maximum * distributed transaction id currently in-use. */ static DistributedTransactionId determineSegmentMaxDistributedXid(void) { DistributedTransactionId max = 0; int i; int resultCount = 0; struct pg_result **results = NULL; StringInfoData buffer; StringInfoData errbuf; #define FUNCTION_DOES_NOT_EXIST "function gp_max_distributed_xid() does not exist" initStringInfo(&buffer); appendStringInfo(&buffer, "select gp_max_distributed_xid()"); initStringInfo(&errbuf); results = cdbdisp_dispatchRMCommand(buffer.data, /* Snapshot */ false, &errbuf, &resultCount); if (errbuf.len > 0) { int cmplen = strlen(FUNCTION_DOES_NOT_EXIST); if (strlen(errbuf.data) >= cmplen && strncmp(errbuf.data, FUNCTION_DOES_NOT_EXIST, cmplen ) == 0) { elog(LOG, "The function gp_max_distributed_xid is missing on one or more segments. The beta 3.0 upgrade steps are necessary"); } else { ereport(ERROR, (errmsg("gp_max_distributed_xid error (gathered %d results from cmd '%s')", resultCount, buffer.data), errdetail("%s", errbuf.data))); } } else { elog(DTM_DEBUG5, "determineSegmentMaxDistributedXid resultCount = %d",resultCount); for (i = 0; i < resultCount; i++) { if (PQresultStatus(results[i]) != PGRES_TUPLES_OK) { elog(ERROR, "gp_max_distributed_xid: resultStatus not tuples_Ok"); } else { /* * Due to funkyness in the current dispatch agent code, instead of 1 result * per QE with 1 row each, we can get back 1 result per dispatch agent, with * one row per QE controlled by that agent. */ int j; for (j = 0; j < PQntuples(results[i]); j++) { DistributedTransactionId temp = 0; temp = atol(PQgetvalue(results[i], j, 0)); elog(DTM_DEBUG5, "determineSegmentMaxDistributedXid value = %d",temp); if (temp > max) max = temp; } } } } pfree(errbuf.data); for (i = 0; i < resultCount; i++) PQclear(results[i]); free(results); return max; } /* * gatherRMInDoubtTransactions: * Builds a hashtable of all of the in-doubt transactions that exist on the * segment databases. The hashtable basically just serves as a single list * without duplicates of all the in-doubt transactions. It does not keep track * of which seg db's have which transactions in-doubt. It currently doesn't * need to due to the way we handle this information later. */ static HTAB * gatherRMInDoubtTransactions(void) { PGresult **pgresultSets = NULL; StringInfoData errmsgbuf; const char *cmdbuf = "select gid from pg_prepared_xacts"; PGresult **rs; InDoubtDtx *lastDtx = NULL; HASHCTL hctl; HTAB *htab = NULL; int j, rows; bool found; initStringInfo(&errmsgbuf); /* call to all QE to get in-doubt transactions */ pgresultSets = cdbdisp_dispatchRMCommand(cmdbuf, /* withSnapshot */false, &errmsgbuf, NULL); /* display error messages */ if (errmsgbuf.len > 0) { ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), errmsg("Unable to collect list of prepared transactions " "from all segment databases."), errdetail("%s", errmsgbuf.data))); pfree(errmsgbuf.data); /* need recovery */ return NULL; } pfree(errmsgbuf.data); /* If any result set is nonempty, there are in-doubt transactions. */ for (rs = pgresultSets; *rs; ++rs) { rows = PQntuples(*rs); for (j = 0; j < rows; j++) { char *gid; /* * we dont setup our hashtable until we know we have at least one * in doubt transaction */ if (htab == NULL) { /* setup a hash table */ hctl.keysize = TMGIDSIZE; /* GID */ hctl.entrysize = sizeof(InDoubtDtx); htab = hash_create("InDoubtDtxHash", 10, &hctl, HASH_ELEM); if (htab == NULL) { ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("DTM could not allocate hash table for InDoubtDtxList."))); } } gid = PQgetvalue(*rs, j, 0); /* Now we can add entry to hash table */ lastDtx = (InDoubtDtx *) hash_search(htab, gid, HASH_ENTER, &found); /* * only need to bother doing work if there isn't already an entry * for our GID */ if (!found) { elog(DEBUG3, "Found in-doubt transaction with GID: %s on remote RM", gid); strcpy(lastDtx->gid, gid); } } } cleanRMResultSets(pgresultSets); return htab; } static bool FindRetryNotifyCommitted(char* gid, TMGXACT **gxact) { int i; for (i = 0; i < *shmNumGxacts; i++) { if (strcmp(gid, shmGxactArray[i]->gid) == 0) { /* found an retry notify committed */ *gxact = shmGxactArray[i]; return true; } } return false; } /* * abortRMInDoubtTransactions: * Goes through all the InDoubtDtx's in the provided htab and ABORTs them * across all of the QEs by sending a ROLLBACK PREPARED. * */ static void abortRMInDoubtTransactions(HTAB *htab) { HASH_SEQ_STATUS status; InDoubtDtx *entry = NULL; TMGXACT *gxact = NULL; if (htab == NULL) return; /* * now we have a nice hashtable full of in-doubt dtx's that we need to * resolve. so we'll use a nice big hammer to get this job done. instead * of keeping track of which QEs have a prepared txn to be aborted and * which ones don't. we just issue a ROLLBACK to all of them and ignore * any pesky errors. This is certainly not an elegant solution but is OK * for now. */ hash_seq_init(&status, htab); while ((entry = (InDoubtDtx *) hash_seq_search(&status)) != NULL) { if (FindRetryNotifyCommitted(entry->gid, &gxact)) { elog(DTM_DEBUG5, "Skipping in-doubt transaction gid = %s since it is \"%s\"", gxact->gid, DtxStateToString(gxact->state)); continue; } elog(DTM_DEBUG3, "Aborting in-doubt transaction with gid = %s", entry->gid); doAbortInDoubt(entry->gid); } } /* * cleanRMResultSets: * Goes the set of PGresults passed in and clears and frees them. */ static void cleanRMResultSets(PGresult **pgresultSets) { /* TODO: need to handle multiple dbs in a segment case */ /* int tolaldbs = CdbClusterDefinition->segment_count * CdbClusterDefinition->db_count; */ int i = 0; PGresult *rs = pgresultSets[0]; do { PQclear(rs); } while ((rs = pgresultSets[++i]) != NULL); free(pgresultSets); } /* * dumpAllDtx: * used to log the current state (according to the DTM) of all the * global distributed transactions that are still active (in shmem). */ static void dumpAllDtx(void) { int i; elog(LOG, "dumping all global transactions: "); for (i = 0; i < *shmNumGxacts; i++) { elog(LOG, "%d - GID: %s STATE: %s", i, shmGxactArray[i]->gid, DtxStateToString(shmGxactArray[i]->state)); } } static void dumpRMOnlyDtx(HTAB *htab, StringInfoData *buff) { HASH_SEQ_STATUS status; InDoubtDtx *entry = NULL; if (htab == NULL) return; hash_seq_init(&status, htab); appendStringInfo(buff, "List of In-doubt transactions remaining across the segdbs: ("); while ((entry = (InDoubtDtx *) hash_seq_search(&status)) != NULL) { appendStringInfo(buff, "\"%s\" , ", entry->gid); } appendStringInfo(buff, ")"); } /* * When called, a SET command is dispatched and the writer gang * writes the shared snapshot. This function actually does nothing * useful besides making sure that a writer gang is alive and has * set the shared snapshot so that the readers could access it. * * At this point this function is added as a helper for cursor * query execution since in MPP cursor queries don't use writer * gangs. However, it could be used for other purposes as well. * * See declaration of assign_gp_write_shared_snapshot(...) for more * information. */ void verify_shared_snapshot_ready(void) { if (Gp_role == GP_ROLE_DISPATCH) { CdbDoCommand("set gp_write_shared_snapshot=true", true, true); dumpSharedLocalSnapshot_forCursor(); /* * To keep our readers in sync (since they will be dispatched * separately) we need to rewind the segmate synchronizer. */ DtxContextInfo_RewindSegmateSync(); } } /* * Force the writer QE to write the shared snapshot. Will get called * after a "set gp_write_shared_snapshot=" is executed * in dispatch mode. * * See verify_shared_snapshot_ready(...) for additional information. */ bool assign_gp_write_shared_snapshot(bool newval, bool doit, GucSource source __attribute__((unused)) ) { #if FALSE elog(DEBUG1, "SET gp_write_shared_snapshot: %s, doit=%s", (newval ? "true" : "false"), (doit ? "true" : "false")); #endif /* * Make sure newval is "true". if it's "false" this could be a * part of a ROLLBACK so we don't want to set the snapshot then. */ if (doit && newval) { if (Gp_role == GP_ROLE_EXECUTE) { ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); if (Gp_is_writer) { dumpSharedLocalSnapshot_forCursor(); } } } return true; } static void doQEDistributedExplicitBegin(int txnOptions) { int ExplicitIsoLevel; bool ExplicitReadOnly; /* * Start a command. */ StartTransactionCommand(); /* Here is the explicit BEGIN. */ BeginTransactionBlock(); unpackMppTxnOptions(txnOptions, &ExplicitIsoLevel, &ExplicitReadOnly); XactIsoLevel = ExplicitIsoLevel; XactReadOnly = ExplicitReadOnly; elog(DTM_DEBUG5, "doQEDistributedExplicitBegin setting XactIsoLevel = %s and XactReadOnly = %s", IsoLevelAsUpperString(XactIsoLevel), (XactReadOnly ? "true" : "false")); /* * Finish the BEGIN command. It will leave the explict transaction * in-progress. */ CommitTransactionCommand(); } static bool isDtxQueryDispatcher(void) { bool isDtmStarted; bool isSharedLocalSnapshotSlotPresent; isDtmStarted = (shmDtmStarted != NULL && *shmDtmStarted); isSharedLocalSnapshotSlotPresent = (SharedLocalSnapshotSlot != NULL); return (Gp_role == GP_ROLE_DISPATCH && isDtmStarted && isSharedLocalSnapshotSlotPresent); } /* * Called prior to handling a requested that comes to the QD, or a utility request to a QE. * * Sets up the distributed transaction context value and does some basic error checking. * * Essentially: * if the DistributedTransactionContext is already QD_DISTRIBUTED_CAPABLE then leave it * else if the DistributedTransactionContext is already QE_TWO_PHASE_EXPLICIT_WRITER then leave it * else it MUST be a LOCAL_ONLY, and is converted to QD_DISTRIBUTED_CAPABLE if this process is acting * as a QE. */ void setupRegularDtxContext(void) { switch(DistributedTransactionContext) { case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: /* Continue in this context. Do not touch QEDtxContextInfo, etc. */ break; case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: /* Allow this for copy...??? Do not touch QEDtxContextInfo, etc. */ break; default: if (DistributedTransactionContext != DTX_CONTEXT_LOCAL_ONLY) { /* we must be one of: DTX_CONTEXT_QD_RETRY_PHASE_2, DTX_CONTEXT_QE_ENTRY_DB_SINGLETON, DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT, DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER, DTX_CONTEXT_QE_READER, DTX_CONTEXT_QE_PREPARED */ elog(ERROR, "setupRegularDtxContext finds unexpected DistributedTransactionContext = '%s'", DtxContextToString(DistributedTransactionContext)); } /* DistributedTransactionContext is DTX_CONTEXT_LOCAL_ONLY */ Assert(QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId); /* * Determine if we are strictly local or a distributed capable QD. */ Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); if (isDtxQueryDispatcher()) { setDistributedTransactionContext(DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE); } break; } elog(DTM_DEBUG5, "setupRegularDtxContext leaving with DistributedTransactionContext = '%s'.", DtxContextToString(DistributedTransactionContext)); } /** * Called on the QE when a query to process has been received. * * This will set up all distributed transaction information and set the state appropriately. */ void setupQEDtxContext (DtxContextInfo *dtxContextInfo) { DistributedSnapshot *distributedSnapshot; int txnOptions; bool needTwoPhase; bool explicitBegin; bool haveDistributedSnapshot; bool isEntryDbSingleton = false; bool isReaderQE = false; bool isWriterQE = false; bool isSharedLocalSnapshotSlotPresent; Assert (dtxContextInfo != NULL); /* * DTX Context Info (even when empty) only comes in QE requests. */ distributedSnapshot = &dtxContextInfo->distributedSnapshot; txnOptions = dtxContextInfo->distributedTxnOptions; needTwoPhase = isMppTxOptions_NeedTwoPhase(txnOptions); explicitBegin = isMppTxOptions_ExplicitBegin(txnOptions); haveDistributedSnapshot = (dtxContextInfo->distributedXid!= InvalidDistributedTransactionId); isSharedLocalSnapshotSlotPresent = (SharedLocalSnapshotSlot != NULL); if (DEBUG5 >= log_min_messages || Debug_print_full_dtm) { elog(DTM_DEBUG5, "setupQEDtxContext inputs (part 1): Gp_role = %s, Gp_is_writer = %s, " "txnOptions = 0x%x, needTwoPhase = %s, explicitBegin = %s, isoLevel = %s, readOnly = %s, haveDistributedSnapshot = %s.", role_to_string(Gp_role), (Gp_is_writer ? "true" : "false"), txnOptions, (needTwoPhase ? "true" : "false"), (explicitBegin ? "true" : "false"), IsoLevelAsUpperString(mppTxOptions_IsoLevel(txnOptions)), (isMppTxOptions_ReadOnly(txnOptions) ? "true" : "false"), (haveDistributedSnapshot ? "true" : "false")); elog(DTM_DEBUG5, "setupQEDtxContext inputs (part 2): distributedXid = %u, isSharedLocalSnapshotSlotPresent = %s.", dtxContextInfo->distributedXid, (isSharedLocalSnapshotSlotPresent ? "true" : "false")); if (haveDistributedSnapshot) { elog(DTM_DEBUG5, "setupQEDtxContext inputs (part 2a): distributedXid = %u, " "distributedSnapshotData (xmin = %u, xmax = %u, xcnt = %u), distributedCommandId = %d", dtxContextInfo->distributedXid, distributedSnapshot->header.xmin, distributedSnapshot->header.xmax, distributedSnapshot->header.count, dtxContextInfo->curcid); } if (isSharedLocalSnapshotSlotPresent) { elog(DTM_DEBUG5, "setupQEDtxContext inputs (part 2b): shared local snapshot xid = %u " "(xmin: %u xmax: %u xcnt: %u) curcid: %d, QDxid = %u/%u, QDcid = %u", SharedLocalSnapshotSlot->xid, SharedLocalSnapshotSlot->snapshot.xmin, SharedLocalSnapshotSlot->snapshot.xmax, SharedLocalSnapshotSlot->snapshot.xcnt, SharedLocalSnapshotSlot->snapshot.curcid, SharedLocalSnapshotSlot->QDxid, SharedLocalSnapshotSlot->segmateSync, SharedLocalSnapshotSlot->QDcid); } } switch (Gp_role) { case GP_ROLE_EXECUTE: if (Gp_segment == -1 && !Gp_is_writer) { isEntryDbSingleton = true; } else { /* * NOTE: this is a bit hackish. It appears as though * StartTransaction() gets called during connection setup before * we even have time to setup our shared snapshot slot. */ if (SharedLocalSnapshotSlot == NULL) { if (explicitBegin || haveDistributedSnapshot) { elog(ERROR, "setupQEDtxContext not expecting distributed begin or snapshot when no Snapshot slot exists"); } } else { if (Gp_is_writer) { isWriterQE = true; } else { isReaderQE = true; } } } break; default: Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); elog(DTM_DEBUG5, "setupQEDtxContext leaving context = 'Local Only' for Gp_role = %s", role_to_string(Gp_role) ); return; } elog(DTM_DEBUG5, "setupQEDtxContext intermediate result: isEntryDbSingleton = %s, isWriterQE = %s, isReaderQE = %s.", (isEntryDbSingleton ? "true" : "false"), (isWriterQE ? "true" : "false"), (isReaderQE ? "true" : "false")); /* * Copy to our QE global variable. */ DtxContextInfo_Copy(&QEDtxContextInfo, dtxContextInfo); switch (DistributedTransactionContext) { case DTX_CONTEXT_LOCAL_ONLY: if (isEntryDbSingleton && haveDistributedSnapshot) { /* * Later, in GetSnapshotData, we will adopt the QD's * transaction and snapshot information. */ setDistributedTransactionContext( DTX_CONTEXT_QE_ENTRY_DB_SINGLETON ); } else if (isReaderQE && haveDistributedSnapshot) { /* * Later, in GetSnapshotData, we will adopt the QE Writer's * transaction and snapshot information. */ setDistributedTransactionContext( DTX_CONTEXT_QE_READER ); } else if (isWriterQE && (explicitBegin || needTwoPhase)) { if (!haveDistributedSnapshot) { elog(DTM_DEBUG5, "setupQEDtxContext Segment Writer is involved in a distributed transaction without a distributed snapshot..."); } if (IsTransactionOrTransactionBlock()) { elog(ERROR, "Starting an explicit distributed transaction in segment -- cannot already be in a transaction"); } if (explicitBegin) { /* * We set the DistributedTransactionContext BEFORE we create the * transactions to influence the behavior of * StartTransaction. */ setDistributedTransactionContext( DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER ); doQEDistributedExplicitBegin(txnOptions); } else { Assert(needTwoPhase); setDistributedTransactionContext( DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER ); } } else if (haveDistributedSnapshot) { if (IsTransactionOrTransactionBlock()) { elog(ERROR, "Going to start a local implicit transaction in segment using a distribute " "snapshot -- cannot already be in a transaction"); } /* * Before executing the query, postgres.c make a standard call to * StartTransactionCommand which will begin a local transaction with * StartTransaction. This is fine. * * However, when the snapshot is created later, the state below will * tell GetSnapshotData to make the local snapshot from the * distributed snapshot. */ setDistributedTransactionContext( DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT ); } else { Assert (!haveDistributedSnapshot); /* * A local implicit transaction without reference to * a distributed snapshot. Stay in NONE state. */ Assert(DistributedTransactionContext == DTX_CONTEXT_LOCAL_ONLY); } break; case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: /* elog(NOTICE, "We should have left this transition state '%s' at the end of the previous command...", DtxContextToString(DistributedTransactionContext)); */ Assert (IsTransactionOrTransactionBlock()); if (explicitBegin) { elog(ERROR, "Cannot have an explicit BEGIN statement..."); } break; case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT: elog(ERROR, "We should have left this transition state '%s' at the end of the previous command", DtxContextToString(DistributedTransactionContext)); break; case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: Assert (IsTransactionOrTransactionBlock()); break; case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: case DTX_CONTEXT_QE_READER: /* * We are playing games with the xact.c code, so we shouldn't test * with the IsTransactionOrTransactionBlock() routine. */ break; case DTX_CONTEXT_QE_PREPARED: case DTX_CONTEXT_QE_FINISH_PREPARED: elog(ERROR, "We should not be trying to execute a query in state '%s'", DtxContextToString(DistributedTransactionContext)); break; default: elog(PANIC, "Unexpected segment distribute transaction context value: %d", (int) DistributedTransactionContext); break; } elog(DTM_DEBUG5, "setupQEDtxContext final result: DistributedTransactionContext = '%s'.", DtxContextToString(DistributedTransactionContext)); if (haveDistributedSnapshot) { elog((Debug_print_snapshot_dtm ? LOG : DEBUG5), "[Distributed Snapshot #%u] *Set QE* currcid = %d (gxid = %u, '%s')", dtxContextInfo->distributedSnapshot.header.distribSnapshotId, dtxContextInfo->curcid, getDistributedTransactionId(), DtxContextToString(DistributedTransactionContext)); } } void finishDistributedTransactionContext (char* debugCaller, bool aborted) { DistributedTransactionId gxid; /* * We let the 2 retry states go up to PostgresMain.c, otherwise * everything MUST be complete. */ if (currentGxact != NULL && (currentGxact->state != DTX_STATE_RETRY_COMMIT_PREPARED && currentGxact->state != DTX_STATE_RETRY_ABORT_PREPARED)) { /* PleaseDebugMe("finishDistributedTransactionContext currentGxact == NULL"); */ elog(FATAL, "Expected currentGxact to be NULL at this point. Found gid =%s, gxid = %u (state = %s, caller = %s)", currentGxact->gid, currentGxact->gxid, DtxStateToString(currentGxact->state), debugCaller); } gxid = getDistributedTransactionId(); elog(DTM_DEBUG5, "finishDistributedTransactionContext called to change DistributedTransactionContext from %s to %s (caller = %s, gxid = %u)", DtxContextToString(DistributedTransactionContext), DtxContextToString(DTX_CONTEXT_LOCAL_ONLY), debugCaller, gxid); setDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); DtxContextInfo_Reset(&QEDtxContextInfo); } static void rememberDtxExplicitBegin(void) { if (currentGxact == NULL) { return; } if (!currentGxact->explicitBeginRemembered) { elog(DTM_DEBUG5, "rememberDtxExplicitBegin explicit BEGIN for gid = %s", currentGxact->gid); currentGxact->explicitBeginRemembered = true; } else { elog(DTM_DEBUG5, "rememberDtxExplicitBegin already an explicit BEGIN for gid = %s", currentGxact->gid); } } /* * This is mostly here because * cdbcopy doesn't use cdbdisp's services. */ void sendDtxExplicitBegin(void) { char cmdbuf[100]; if (currentGxact == NULL) { return; } rememberDtxExplicitBegin(); dtmPreCommand("sendDtxExplicitBegin", "(none)", NULL, /* is two-phase */ true, /* withSnapshot */ true, /* inCursor */ false ); /* * Be explicit about both the isolation level and the access mode * since in MPP our QEs are in a another process. */ sprintf(cmdbuf, "BEGIN ISOLATION LEVEL %s, READ %s", IsoLevelAsUpperString(XactIsoLevel), (XactReadOnly ? "ONLY" : "WRITE")); /* * dispatch a DTX command, in the event of an error, this call * will either exit via elog()/ereport() or return false */ if (!dispatchDtxCommand(cmdbuf, false, /* raiseError */ false)) { ereport(ERROR, (errmsg("Global transaction BEGIN failed for gid = \"%s\" due to error", currentGxact->gid))); } } int dtxCurrentPhase1Count(void) { if (shmDtmStarted == NULL || !*shmDtmStarted) { return 0; } else { Assert(shmCurrentPhase1Count != NULL); return *shmCurrentPhase1Count; } } /** * On the QD, run the Prepare operation. */ static void performDtxProtocolPrepare(const char *gid) { StartTransactionCommand(); elog(DTM_DEBUG5, "performDtxProtocolCommand going to call PrepareTransactionBlock for distributed transaction (id = '%s')", gid); if (!PrepareTransactionBlock((char *)gid)) { elog(ERROR, "Prepare of distributed transaction %s failed", gid); return; } /* * Calling CommitTransactionCommand will cause the actual * COMMIT/PREPARE work to be performed. */ CommitTransactionCommand(); elog(DTM_DEBUG5, "Prepare of distributed transaction succeeded (id = '%s')", gid); setDistributedTransactionContext( DTX_CONTEXT_QE_PREPARED ); } /** * On the QD, run the Commit Prepared operation. */ static void performDtxProtocolCommitPrepared(const char *gid, bool raiseErrorIfNotFound) { elog(DTM_DEBUG5, "performDtxProtocolCommitPrepared going to call FinishPreparedTransaction for distributed transaction %s", gid); StartTransactionCommand(); /* * Since this call may fail, lets setup a handler. */ PG_TRY(); { FinishPreparedTransaction((char *)gid, /* isCommit */ true, raiseErrorIfNotFound); } PG_CATCH(); { finishDistributedTransactionContext("performDtxProtocolCommitPrepared -- Commit Prepared (error case)", false); PG_RE_THROW(); } PG_END_TRY(); /* * Calling CommitTransactionCommand will cause the actual * COMMIT/PREPARE work to be performed. */ CommitTransactionCommand(); finishDistributedTransactionContext("performDtxProtocolCommitPrepared -- Commit Prepared", false); } /** * On the QD, run the Abort Prepared operation. */ static void performDtxProtocolAbortPrepared(const char *gid, bool raiseErrorIfNotFound) { elog(DTM_DEBUG5, "performDtxProtocolAbortPrepared going to call FinishPreparedTransaction for distributed transaction %s", gid); StartTransactionCommand(); /* * Since this call may fail, lets setup a handler. */ PG_TRY(); { FinishPreparedTransaction((char *)gid, /* isCommit */ false, raiseErrorIfNotFound); } PG_CATCH(); { finishDistributedTransactionContext("performDtxProtocolAbortPrepared -- Commit Prepared (error case)", true); PG_RE_THROW(); } PG_END_TRY(); /* * Calling CommitTransactionCommand will cause the actual * COMMIT/PREPARE work to be performed. */ CommitTransactionCommand(); finishDistributedTransactionContext("performDtxProtocolAbortPrepared -- Commit Prepared", true); } /** * On the QE, handle a DtxProtocolCommand */ void performDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags __attribute__((unused)) , const char *loggingStr __attribute__((unused)) , const char *gid, DistributedTransactionId gxid __attribute__((unused)), DtxContextInfo *contextInfo) { elog(DTM_DEBUG5, "performDtxProtocolCommand called with DTX protocol = %s, segment distribute transaction context: '%s'", DtxProtocolCommandToString(dtxProtocolCommand), DtxContextToString(DistributedTransactionContext)); switch (dtxProtocolCommand) { case DTX_PROTOCOL_COMMAND_STAY_AT_OR_BECOME_IMPLIED_WRITER: switch(DistributedTransactionContext) { case DTX_CONTEXT_LOCAL_ONLY: /** convert to implicit_writer! */ setupQEDtxContext(contextInfo); StartTransactionCommand(); break; case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: /** already the state we like */ break; default: if ( isQEContext() || isQDContext()) { elog(FATAL, "Unexpected segment distributed transaction context: '%s'", DtxContextToString(DistributedTransactionContext)); } else { elog(PANIC, "Unexpected segment distributed transaction context value: %d", (int) DistributedTransactionContext); } break; } break; case DTX_PROTOCOL_COMMAND_ABORT_NO_PREPARED: elog(DTM_DEBUG5, "performDtxProtocolCommand going to call AbortOutOfAnyTransaction for distributed transaction %s", gid); AbortOutOfAnyTransaction(); break; case DTX_PROTOCOL_COMMAND_PREPARE: /* * The QD has directed us to read-only commit or prepare an implicit or explicit * distributed transaction. */ switch (DistributedTransactionContext) { case DTX_CONTEXT_LOCAL_ONLY: /* * Spontaneously aborted while we were back at the QD? */ elog(ERROR, "Distributed transaction %s not found", gid); break; case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: performDtxProtocolPrepare(gid); break; case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: case DTX_CONTEXT_QD_RETRY_PHASE_2: case DTX_CONTEXT_QE_PREPARED: case DTX_CONTEXT_QE_FINISH_PREPARED: case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: case DTX_CONTEXT_QE_READER: elog(FATAL, "Unexpected segment distribute transaction context: '%s'", DtxContextToString(DistributedTransactionContext)); default: elog(PANIC, "Unexpected segment distribute transaction context value: %d", (int) DistributedTransactionContext); break; } break; case DTX_PROTOCOL_COMMAND_ABORT_SOME_PREPARED: switch (DistributedTransactionContext) { case DTX_CONTEXT_LOCAL_ONLY: /* * Spontaneously aborted while we were back at the QD? */ elog(ERROR, "Distributed transaction %s not found", gid); break; case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: AbortOutOfAnyTransaction(); break; case DTX_CONTEXT_QE_PREPARED: setDistributedTransactionContext( DTX_CONTEXT_QE_FINISH_PREPARED ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ true); break; case DTX_CONTEXT_QD_DISTRIBUTED_CAPABLE: case DTX_CONTEXT_QD_RETRY_PHASE_2: case DTX_CONTEXT_QE_ENTRY_DB_SINGLETON: case DTX_CONTEXT_QE_READER: elog(PANIC, "Unexpected segment distribute transaction context: '%s'", DtxContextToString(DistributedTransactionContext)); default: elog(PANIC, "Unexpected segment distribute transaction context value: %d", (int) DistributedTransactionContext); break; } break; case DTX_PROTOCOL_COMMAND_COMMIT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_QE_PREPARED ); setDistributedTransactionContext( DTX_CONTEXT_QE_FINISH_PREPARED ); performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ true); break; case DTX_PROTOCOL_COMMAND_ABORT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_QE_PREPARED ); setDistributedTransactionContext( DTX_CONTEXT_QE_FINISH_PREPARED ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ true); break; case DTX_PROTOCOL_COMMAND_RETRY_COMMIT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ false); break; case DTX_PROTOCOL_COMMAND_RETRY_ABORT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ false); break; case DTX_PROTOCOL_COMMAND_RECOVERY_COMMIT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolCommitPrepared(gid, /* raiseErrorIfNotFound */ false); break; case DTX_PROTOCOL_COMMAND_RECOVERY_ABORT_PREPARED: requireDistributedTransactionContext( DTX_CONTEXT_LOCAL_ONLY ); performDtxProtocolAbortPrepared(gid, /* raiseErrorIfNotFound */ false); break; case DTX_PROTOCOL_COMMAND_SUBTRANSACTION_BEGIN_INTERNAL: switch(DistributedTransactionContext) { case DTX_CONTEXT_LOCAL_ONLY: /* * QE is not aware of DTX yet. * A typical case is SELECT foo(), where foo() opens internal subtransaction */ setupQEDtxContext(contextInfo); StartTransactionCommand(); break; case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER: /* We already marked this QE to be writer, and transaction is open. */ case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER: case DTX_CONTEXT_QE_READER: break; default: /* Lets flag this situation out, with explicit crash */ Assert (DistributedTransactionContext != DistributedTransactionContext); elog(DTM_DEBUG5, " SUBTRANSACTION_BEGIN_INTERNAL distributed transaction context invalid: %d", (int) DistributedTransactionContext); break; } BeginInternalSubTransaction(NULL); Assert(contextInfo->nestingLevel+1 == GetCurrentTransactionNestLevel()); break; case DTX_PROTOCOL_COMMAND_SUBTRANSACTION_RELEASE_INTERNAL: Assert(contextInfo->nestingLevel == GetCurrentTransactionNestLevel()); ReleaseCurrentSubTransaction(); break; case DTX_PROTOCOL_COMMAND_SUBTRANSACTION_ROLLBACK_INTERNAL: /* * Rollback performs work on master and then dispatches, * hence has nestingLevel its expecting post operation */ if ((contextInfo->nestingLevel + 1) > GetCurrentTransactionNestLevel()) { ereport(ERROR, (errmsg("transaction %s at level %d already processed (current level %d)", gid, contextInfo->nestingLevel, GetCurrentTransactionNestLevel()))); } unsigned int i = GetCurrentTransactionNestLevel() - contextInfo->nestingLevel; while (i > 0) { RollbackAndReleaseCurrentSubTransaction(); i--; } Assert(contextInfo->nestingLevel == GetCurrentTransactionNestLevel()); break; default: elog(ERROR, "Unrecognized dtx protocol command: %d", (int) dtxProtocolCommand); break; } elog(DTM_DEBUG5, "performDtxProtocolCommand successful return for distributed transaction %s", gid); }