未验证 提交 471653e7 编写于 作者: W Weinan WANG 提交者: GitHub

Dump combocid message to dynamic shared memory instead of BufFile

For write-gang and read-gang combocid synchronization, I remove
BufFile code and replace it with dynamic shared memory.

- Remove the combocids array from SharedLocalSnapshotSlot. Always rely on
  the array sharing mechanism in combocid.c. 

- Revert comboCids array to the way it is in the upstream; remove 'xmin'
  field.

- Remove the changes to assertions referring to MPP-8317. As far as I can
  see, QE reader processed need to always have a correct view of all their 
  "current" transactions, whether or not you're running a cursor. We
  were not consistent with relaxing those assertions, we had relaxed the
  one in HeapTupleHeaderGetCmax(), for example, but not the one in
  HeapTupleHeaderGetCmin(). The relaxations must have become obsolete 
  somewhere along the line.

- In the DSM segment, use the same array format as in the backend-private
  comboCids array. Rename and move things around to make it more explicit
  that the shared array is a copy of the backend-private comboCids array.

- Improve the dsm_attach() retry logic. We can detect the case that
  dsm_attach() fails because the QE writer process reallocated a new DSM
  segment, so check for that explicitly, remove the sleeps, and throw an
  error on other failures.
Co-authored-by: NHeikki Linnakangas <hlinnakangas@pivotal.io>
上级 fe4bcafa
......@@ -2819,6 +2819,9 @@ CommitTransaction(void)
RESOURCE_RELEASE_BEFORE_LOCKS,
true, true);
/* detach combocid dsm */
AtEOXact_ComboCid_Dsm_Detach();
/* Check we've released all buffer pins */
AtEOXact_Buffers(true);
......@@ -3152,6 +3155,8 @@ PrepareTransaction(void)
RESOURCE_RELEASE_BEFORE_LOCKS,
true, true);
/* detach combocid dsm */
AtEOXact_ComboCid_Dsm_Detach();
/* Check we've released all buffer pins */
AtEOXact_Buffers(true);
......@@ -3405,6 +3410,7 @@ AbortTransaction(void)
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_BEFORE_LOCKS,
false, true);
AtEOXact_ComboCid_Dsm_Detach();
AtEOXact_Buffers(false);
AtEOXact_RelationCache(false);
AtEOXact_Inval(false);
......
......@@ -1576,8 +1576,6 @@ updateSharedLocalSnapshot(DtxContextInfo *dtxContextInfo,
Snapshot snapshot,
char *debugCaller)
{
int combocidSize;
Assert(SharedLocalSnapshotSlot != NULL);
Assert(snapshot != NULL);
......@@ -1607,18 +1605,11 @@ updateSharedLocalSnapshot(DtxContextInfo *dtxContextInfo,
memcpy(SharedLocalSnapshotSlot->snapshot.xip, snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
}
/* combocid stuff */
combocidSize = ((usedComboCids < MaxComboCids) ? usedComboCids : MaxComboCids );
SharedLocalSnapshotSlot->combocidcnt = combocidSize;
memcpy((void *)SharedLocalSnapshotSlot->combocids, comboCids,
combocidSize * sizeof(ComboCidKeyData));
SharedLocalSnapshotSlot->snapshot.curcid = snapshot->curcid;
ereport((Debug_print_full_dtm ? LOG : DEBUG5),
(errmsg("updateSharedLocalSnapshot: combocidsize is now %d max %d segmateSync %d->%d",
combocidSize, MaxComboCids, SharedLocalSnapshotSlot->segmateSync, dtxContextInfo->segmateSync)));
(errmsg("updateSharedLocalSnapshot: segmateSync %d->%d",
SharedLocalSnapshotSlot->segmateSync, dtxContextInfo->segmateSync)));
SetSharedTransactionId_writer(distributedTransactionContext);
......@@ -1688,17 +1679,6 @@ copyLocalSnapshot(Snapshot snapshot)
snapshot->curcid = SharedLocalSnapshotSlot->snapshot.curcid;
snapshot->subxcnt = -1;
/* combocid */
usedComboCids = SharedLocalSnapshotSlot->combocidcnt;
Assert(usedComboCids <= MaxComboCids);
if (usedComboCids > 0)
{
if (comboCids == NULL)
comboCids = MemoryContextAlloc(TopTransactionContext, MaxComboCids * sizeof(ComboCidKeyData));
memcpy(comboCids, (char *)SharedLocalSnapshotSlot->combocids, usedComboCids * sizeof(ComboCidKeyData));
}
if (TransactionIdPrecedes(snapshot->xmin, TransactionXmin))
TransactionXmin = snapshot->xmin;
......@@ -3146,8 +3126,6 @@ UpdateSerializableCommandId(CommandId curcid)
SharedLocalSnapshotSlot != NULL &&
FirstSnapshotSet)
{
int combocidSize;
LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);
if (SharedLocalSnapshotSlot->QDxid != QEDtxContextInfo.distributedXid)
......@@ -3171,16 +3149,6 @@ UpdateSerializableCommandId(CommandId curcid)
getDistributedTransactionId(),
DtxContextToString(DistributedTransactionContext))));
ereport((Debug_print_snapshot_dtm ? LOG : DEBUG5),
(errmsg("serializable writer updating combocid: used combocids %d shared %d",
usedComboCids, SharedLocalSnapshotSlot->combocidcnt)));
combocidSize = ((usedComboCids < MaxComboCids) ? usedComboCids : MaxComboCids );
SharedLocalSnapshotSlot->combocidcnt = combocidSize;
memcpy((void *)SharedLocalSnapshotSlot->combocids, comboCids,
combocidSize * sizeof(ComboCidKeyData));
SharedLocalSnapshotSlot->snapshot.curcid = curcid;
SharedLocalSnapshotSlot->segmateSync = QEDtxContextInfo.segmateSync;
......
此差异已折叠。
......@@ -707,8 +707,6 @@ dumpSharedLocalSnapshot_forCursor(void)
FileWriteFieldWithCount(count, f, src->xid);
FileWriteFieldWithCount(count, f, src->startTimestamp);
FileWriteFieldWithCount(count, f, src->combocidcnt);
FileWriteFieldWithCount(count, f, src->combocids);
FileWriteFieldWithCount(count, f, src->snapshot.xmin);
FileWriteFieldWithCount(count, f, src->snapshot.xmax);
FileWriteFieldWithCount(count, f, src->snapshot.xcnt);
......@@ -757,9 +755,6 @@ readSharedLocalSnapshot_forCursor(Snapshot snapshot, DtxContext distributedTrans
TransactionId localXid;
TimestampTz localXactStartTimestamp;
uint32 combocidcnt;
ComboCidKeyData tmp_combocids[MaxComboCids];
Assert(Gp_role == GP_ROLE_EXECUTE);
Assert(!Gp_is_writer);
Assert(SharedLocalSnapshotSlot != NULL);
......@@ -818,27 +813,6 @@ readSharedLocalSnapshot_forCursor(Snapshot snapshot, DtxContext distributedTrans
memcpy(&localXactStartTimestamp, p, sizeof(localXactStartTimestamp));
p += sizeof(localXactStartTimestamp);
memcpy(&combocidcnt, p, sizeof(combocidcnt));
p += sizeof(combocidcnt);
memcpy(tmp_combocids, p, sizeof(tmp_combocids));
p += sizeof(tmp_combocids);
/* handle the combocid stuff (same as in GetSnapshotData()) */
if (usedComboCids != combocidcnt)
{
if (usedComboCids == 0)
{
MemoryContext oldCtx = MemoryContextSwitchTo(TopTransactionContext);
comboCids = palloc(combocidcnt * sizeof(ComboCidKeyData));
MemoryContextSwitchTo(oldCtx);
}
else
repalloc(comboCids, combocidcnt * sizeof(ComboCidKeyData));
}
memcpy(comboCids, tmp_combocids, combocidcnt * sizeof(ComboCidKeyData));
usedComboCids = ((combocidcnt < MaxComboCids) ? combocidcnt : MaxComboCids);
memcpy(&snapshot->xmin, p, sizeof(snapshot->xmin));
p += sizeof(snapshot->xmin);
......
......@@ -44,7 +44,6 @@ test_write_read_shared_snapshot_for_cursor(void **state)
slot.QDxid = 10;
slot.ready = true;
slot.segmateSync = 1;
slot.combocidcnt = 0;
slot.snapshot.xmin = 99;
slot.snapshot.xmax = 110;
slot.snapshot.xcnt = XCNT;
......@@ -59,11 +58,11 @@ test_write_read_shared_snapshot_for_cursor(void **state)
expect_any(LWLockAcquire, mode);
will_be_called(LWLockAcquire);
expect_any_count(FaultInjector_InjectFaultIfSet, faultName, 11);
expect_any_count(FaultInjector_InjectFaultIfSet, ddlStatement, 11);
expect_any_count(FaultInjector_InjectFaultIfSet, databaseName, 11);
expect_any_count(FaultInjector_InjectFaultIfSet, tableName, 11);
will_be_called_count(FaultInjector_InjectFaultIfSet, 11);
expect_any_count(FaultInjector_InjectFaultIfSet, faultName, 9);
expect_any_count(FaultInjector_InjectFaultIfSet, ddlStatement, 9);
expect_any_count(FaultInjector_InjectFaultIfSet, databaseName, 9);
expect_any_count(FaultInjector_InjectFaultIfSet, tableName, 9);
will_be_called_count(FaultInjector_InjectFaultIfSet, 9);
expect_any(LWLockRelease, lock);
will_be_called(LWLockRelease);
......
......@@ -1145,12 +1145,6 @@ HeapTupleSatisfiesMVCC(Relation relation, HeapTuple htup, Snapshot snapshot,
return true;
}
/*
* MPP-8317: cursors can't always *tell* that this is the current transaction.
*/
Assert(QEDtxContextInfo.cursorContext ||
TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)));
if (HeapTupleHeaderGetCmax(tuple) >= snapshot->curcid)
return true; /* deleted after scan started */
else
......
......@@ -27,6 +27,7 @@
#include "cdb/cdblocaldistribxact.h" /* LocalDistribXactData */
#include "cdb/cdbtm.h" /* TMGXACT */
#include "dsm.h"
/*
* Each backend advertises up to PGPROC_MAX_CACHED_SUBXIDS TransactionIds
......@@ -172,9 +173,10 @@ struct PGPROC
uint32 waitPortalId; /* portal id we are waiting on */
/*
* Information for our combocid-map (populated in writer/dispatcher backends only)
* Handle for our shared comboCids array (populated in writer/dispatcher
* backends only)
*/
uint32 combocid_map_count; /* how many entries in the map ? */
dsm_handle comboCidsHandle;
/*
* Current command_id for the running query
......
......@@ -19,33 +19,9 @@
* are in access/htup.h, because that's where the macro definitions that
* those functions replaced used to be.
*/
/* CDB TODO: This is the max number of combo cids that we copy into the Shared
* snapshot to the reader gangs. As a result, transactions that have more than
* 128 update/delete statements within them can result in the reader gangs
* reading a tuple whose visibility they cannot determine because of not having
* the entire combocids table. Those statements will result in the reader gangs
* throwing an error. Ultimately, we need to find another way of serializing
* this information that can support the arbitrary size of the combocids
* datastructure.
*/
#define MaxComboCids 256
/* Key and entry structures for the hash table */
typedef struct
{
CommandId cmin;
CommandId cmax;
TransactionId xmin;
} ComboCidKeyData;
typedef ComboCidKeyData *ComboCidKey;
extern volatile ComboCidKey comboCids;
extern volatile int usedComboCids;
extern volatile int sizeComboCids;
extern void AtEOXact_ComboCid(void);
extern void AtEOXact_ComboCid_Dsm_Detach(void);
extern void RestoreComboCIDState(char *comboCIDstate);
extern void SerializeComboCIDState(Size maxsize, char *start_address);
extern Size EstimateComboCIDStateSpace(void);
......
......@@ -27,8 +27,6 @@ typedef struct SharedSnapshotSlot
volatile TransactionId QDxid;
volatile bool ready;
volatile uint32 segmateSync;
uint32 combocidcnt;
ComboCidKeyData combocids[MaxComboCids];
SnapshotData snapshot;
LWLock *slotLock;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册