提交 a6eea210 编写于 作者: A Asim R P 提交者: Asim RP

Make use of transaction options dispatched by QD on QE

To enforce consistent isolation level within a distributed transaction, local
transactions on QEs should assume the same isolation level as the transaction
on QD.  This was previously achieved by dispatching isolation level and
read-only property from QD to QE as command line options.  In case of explicit
BEGIN, the isolation level was dispatched as flags in DtxContextInfo.  This
patch makes it consistent such that QEs (readers as well as writers) read the
transaction options from DtxContextInfo.

The poblem with setting transaction isolation level as command line opiton is
command line options are processed during process initialization, when a
transaction is already open and a snapshot is taken.  Changing isolation level
after taking a snapshot is not correct.

This patch allows merging with the check for transaction_isolation GUC as it
stands in 9.1, without any Greenplum-specific changes.
Co-authored-by: NJacob Champion <pchampion@pivotal.io>
上级 ced70000
......@@ -2115,6 +2115,13 @@ StartTransaction(void)
AtStart_Memory();
AtStart_ResourceOwner();
/*
* Transactions may be started while recovery is in progress, if
* hot standby is enabled. This mode is not supported in
* Greenplum yet.
*/
AssertImply(DistributedTransactionContext != DTX_CONTEXT_LOCAL_ONLY,
!s->startedInRecovery);
/*
* MPP Modification
*
......@@ -2177,6 +2184,18 @@ StartTransaction(void)
DtxContextToString(DistributedTransactionContext));
}
/*
* Snapshot must not be created before setting transaction
* isolation level.
*/
Assert(!FirstSnapshotSet);
/* Assume transaction characteristics as sent by QD */
XactIsoLevel = mppTxOptions_IsoLevel(
QEDtxContextInfo.distributedTxnOptions);
XactReadOnly = isMppTxOptions_ReadOnly(
QEDtxContextInfo.distributedTxnOptions);
/*
* MPP: we're a QE Writer.
*
......@@ -2247,6 +2266,18 @@ StartTransaction(void)
Assert (SharedLocalSnapshotSlot != NULL);
currentDistribXid = QEDtxContextInfo.distributedXid;
/*
* Snapshot must not be created before setting transaction
* isolation level.
*/
Assert(!FirstSnapshotSet);
/* Assume transaction characteristics as sent by QD */
XactIsoLevel = mppTxOptions_IsoLevel(
QEDtxContextInfo.distributedTxnOptions);
XactReadOnly = isMppTxOptions_ReadOnly(
QEDtxContextInfo.distributedTxnOptions);
ereportif(Debug_print_full_dtm, LOG,
(errmsg("qExec reader: distributedXid %d currcid %d "
"gxid = %u DtxContext '%s' sharedsnapshots: %s",
......@@ -2360,8 +2391,10 @@ StartTransaction(void)
ShowTransactionState("StartTransaction");
ereportif(Debug_print_full_dtm, LOG,
(errmsg("StartTransaction in DTX Context = '%s', %s",
(errmsg("StartTransaction in DTX Context = '%s', "
"isolation level %s, read-only = %d, %s",
DtxContextToString(DistributedTransactionContext),
IsoLevelAsUpperString(XactIsoLevel), XactReadOnly,
LocalDistribXact_DisplayString(MyProc))));
}
......
......@@ -96,20 +96,18 @@ typedef struct InDoubtDtx
* PQsendGpQuery
*/
/* bit 1 is for statement wants DTX transaction
*
* bits 2-3 for iso level 00 read-committed
* 01 read-uncommitted
* 10 repeatable-read
* 11 serializable
* bit 4 is for read-only
/*
* bit 1 is for statement wants DTX transaction
* bits 2-4 for iso level
* bit 5 is for read-only
*/
#define GP_OPT_NEED_TWO_PHASE 0x0001
#define GP_OPT_READ_COMMITTED 0x0002
#define GP_OPT_READ_UNCOMMITTED 0x0004
#define GP_OPT_REPEATABLE_READ 0x0006
#define GP_OPT_SERIALIZABLE 0x0008
#define GP_OPT_ISOLATION_LEVEL_MASK 0x000E
#define GP_OPT_READ_UNCOMMITTED (1 << 1)
#define GP_OPT_READ_COMMITTED (2 << 1)
#define GP_OPT_REPEATABLE_READ (3 << 1)
#define GP_OPT_SERIALIZABLE (4 << 1)
#define GP_OPT_READ_ONLY 0x0010
......@@ -141,7 +139,7 @@ static void doNotifyingAbort(void);
static void retryAbortPrepared(void);
static bool doNotifyCommittedInDoubt(char *gid);
static void doAbortInDoubt(char *gid);
static void doQEDistributedExplicitBegin(int txnOptions);
static void doQEDistributedExplicitBegin();
static bool isDtxQueryDispatcher(void);
static void UtilityModeSaveRedo(bool committed, TMGXACT_LOG *gxact_log);
......@@ -1622,6 +1620,8 @@ mppTxnOptions(bool needTwoPhase)
options |= GP_OPT_REPEATABLE_READ;
else if (XactIsoLevel == XACT_SERIALIZABLE)
options |= GP_OPT_SERIALIZABLE;
else if (XactIsoLevel == XACT_READ_UNCOMMITTED)
options |= GP_OPT_READ_UNCOMMITTED;
if (XactReadOnly)
options |= GP_OPT_READ_ONLY;
......@@ -1642,14 +1642,16 @@ mppTxnOptions(bool needTwoPhase)
int
mppTxOptions_IsoLevel(int txnOptions)
{
if ((txnOptions & GP_OPT_SERIALIZABLE) == GP_OPT_SERIALIZABLE)
if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_SERIALIZABLE)
return XACT_SERIALIZABLE;
else if ((txnOptions & GP_OPT_REPEATABLE_READ) == GP_OPT_REPEATABLE_READ)
else if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_REPEATABLE_READ)
return XACT_REPEATABLE_READ;
else if ((txnOptions & GP_OPT_READ_COMMITTED) == GP_OPT_READ_COMMITTED)
else if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_READ_COMMITTED)
return XACT_READ_COMMITTED;
else
else if ((txnOptions & GP_OPT_ISOLATION_LEVEL_MASK) == GP_OPT_READ_UNCOMMITTED)
return XACT_READ_UNCOMMITTED;
/* QD must set transaction isolation level */
elog(ERROR, "transaction options from QD did not include isolation level");
}
bool
......@@ -1658,24 +1660,6 @@ isMppTxOptions_ReadOnly(int txnOptions)
return ((txnOptions & GP_OPT_READ_ONLY) != 0);
}
/* unpackMppTxnOptions:
* Unpack an int containing the appropriate flags to direct the remote
* segdb QE process to perform any needed transaction commands before or
* after the statement.
*/
void
unpackMppTxnOptions(int txnOptions, int *isoLevel, bool *readOnly)
{
*isoLevel = mppTxOptions_IsoLevel(txnOptions);
*readOnly = isMppTxOptions_ReadOnly(txnOptions);
}
/* isMppTxOptions_StatementWantsDtxTransaction:
* Return the NeedTwoPhase flag.
*/
bool
isMppTxOptions_NeedTwoPhase(int txnOptions)
{
......@@ -2641,11 +2625,8 @@ assign_gp_write_shared_snapshot(bool newval, bool doit, GucSource source __attri
}
static void
doQEDistributedExplicitBegin(int txnOptions)
doQEDistributedExplicitBegin()
{
int ExplicitIsoLevel;
bool ExplicitReadOnly;
/*
* Start a command.
*/
......@@ -2654,21 +2635,11 @@ doQEDistributedExplicitBegin(int txnOptions)
/* Here is the explicit BEGIN. */
BeginTransactionBlock();
unpackMppTxnOptions(txnOptions,
&ExplicitIsoLevel, &ExplicitReadOnly);
XactIsoLevel = ExplicitIsoLevel;
XactReadOnly = ExplicitReadOnly;
elog(DTM_DEBUG5, "doQEDistributedExplicitBegin setting XactIsoLevel = %s and XactReadOnly = %s",
IsoLevelAsUpperString(XactIsoLevel), (XactReadOnly ? "true" : "false"));
/*
* Finish the BEGIN command. It will leave the explict transaction
* in-progress.
*/
CommitTransactionCommand();
}
static bool
......@@ -2919,7 +2890,7 @@ setupQEDtxContext(DtxContextInfo *dtxContextInfo)
*/
setDistributedTransactionContext(DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER);
doQEDistributedExplicitBegin(txnOptions);
doQEDistributedExplicitBegin();
}
else
{
......
......@@ -650,34 +650,6 @@ makeOptions(void)
appendStringInfo(&string, " -c gp_qd_hostname=%s", qdinfo->hostip);
appendStringInfo(&string, " -c gp_qd_port=%d", qdinfo->port);
/*
* Transactions are tricky. Here is the copy and pasted code, and we know
* they are working. The problem, is that QE may ends up with different
* iso level, but postgres really does not have read uncommited and
* repeated read. (is this true?) and they are mapped.
*
* Put these two gucs in the generic framework works (pass make
* installcheck-good) if we make assign_defaultxactisolevel and
* assign_XactIsoLevel correct take string "readcommitted" etc. (space
* stripped). However, I do not want to change this piece of code unless
* I know it is broken.
*/
if (DefaultXactIsoLevel != XACT_READ_COMMITTED)
{
if (DefaultXactIsoLevel == XACT_REPEATABLE_READ)
appendStringInfo(&string, " -c default_transaction_isolation=repeatable\\ read");
else if (DefaultXactIsoLevel == XACT_SERIALIZABLE)
appendStringInfo(&string, " -c default_transaction_isolation=serializable");
}
if (XactIsoLevel != XACT_READ_COMMITTED)
{
if (XactIsoLevel == XACT_REPEATABLE_READ)
appendStringInfo(&string, " -c transaction_isolation=repeatable\\ read");
else if (XactIsoLevel == XACT_SERIALIZABLE)
appendStringInfo(&string, " -c transaction_isolation=serializable");
}
for (i = 0; i < ngucs; ++i)
{
struct config_generic *guc = gucs[i];
......
......@@ -332,7 +332,6 @@ extern void verify_shared_snapshot_ready(void);
int mppTxnOptions(bool needTwoPhase);
int mppTxOptions_IsoLevel(int txnOptions);
bool isMppTxOptions_ReadOnly(int txnOptions);
void unpackMppTxnOptions(int txnOptions, int *isoLevel, bool *readOnly);
bool isMppTxOptions_NeedTwoPhase(int txnOptions);
bool isMppTxOptions_ExplicitBegin(int txnOptions);
......
......@@ -342,6 +342,21 @@ select gp_inject_fault('after_one_slice_dispatched', 'reset', 1);
select * from gp_dist_random('gp_id')
where gpname > (select * from repeat('sssss', 10000000));
-- Cover all transaction isolation levels to ensure that a gang can be
-- created. Connect again so that existing gangs are destroyed.
\connect
begin isolation level serializable;
end;
\connect
begin isolation level repeatable read;
end;
\connect
begin isolation level read committed;
end;
\connect
begin isolation level read uncommitted;
end;
-- resume FTS probes
select gp_inject_fault('fts_probe', 'reset', 1);
......@@ -637,6 +637,20 @@ select * from gp_dist_random('gp_id')
--------+-------------+------+---------
(0 rows)
-- Cover all transaction isolation levels to ensure that a gang can be
-- created. Connect again so that existing gangs are destroyed.
\connect
begin isolation level serializable;
end;
\connect
begin isolation level repeatable read;
end;
\connect
begin isolation level read committed;
end;
\connect
begin isolation level read uncommitted;
end;
-- resume FTS probes
select gp_inject_fault('fts_probe', 'reset', 1);
NOTICE: Success:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册