Revert "Remap transient typmods on receivers instead of on senders."

This reverts commit ab4398dd.
[#142986717]
上级 828b99c4
......@@ -87,6 +87,7 @@ CreateTemplateTupleDesc(int natts, bool hasoid)
desc->constr = NULL;
desc->tdtypeid = RECORDOID;
desc->tdtypmod = -1;
desc->tdqdtypmod = -1;
desc->tdhasoid = hasoid;
desc->tdrefcount = -1; /* assume not reference-counted */
......@@ -120,6 +121,7 @@ CreateTupleDesc(int natts, bool hasoid, Form_pg_attribute *attrs)
desc->constr = NULL;
desc->tdtypeid = RECORDOID;
desc->tdtypmod = -1;
desc->tdqdtypmod = -1;
desc->tdhasoid = hasoid;
desc->tdrefcount = -1; /* assume not reference-counted */
......@@ -150,6 +152,7 @@ CreateTupleDescCopy(TupleDesc tupdesc)
desc->tdtypeid = tupdesc->tdtypeid;
desc->tdtypmod = tupdesc->tdtypmod;
desc->tdqdtypmod = tupdesc->tdqdtypmod;
return desc;
}
......@@ -208,6 +211,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
desc->tdtypeid = tupdesc->tdtypeid;
desc->tdtypmod = tupdesc->tdtypmod;
desc->tdqdtypmod = tupdesc->tdqdtypmod;
return desc;
}
......
......@@ -425,6 +425,7 @@ Relation DirectOpen_Open(
direct->descData.constr = &direct->constrData;
direct->descData.tdtypeid = pgClass->reltype;
direct->descData.tdtypmod = -1;
direct->descData.tdqdtypmod = -1;
direct->descData.tdhasoid = relHasOid;
direct->descData.tdrefcount = 1;
......
......@@ -13,6 +13,6 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(top_srcdir)/src/backend/gp_libpq_fe $(CPPFLAGS)
OBJS = cdbmotion.o tupchunklist.o tupser.o \
ic_common.o ic_udpifc.o htupfifo.o tupleremap.o
ic_common.o ic_udpifc.o htupfifo.o
include $(top_srcdir)/src/backend/common.mk
......@@ -22,7 +22,6 @@
#include "cdb/tupser.h"
#include "libpq/pqformat.h"
#include "utils/memutils.h"
#include "utils/typcache.h"
/*
......@@ -47,7 +46,7 @@ static TupleChunkListItem s_eos_chunk_data = (TupleChunkListItem)s_eos_buffer;
static ChunkSorterEntry *getChunkSorterEntry(MotionLayerState *mlStates,
MotionNodeEntry * motNodeEntry,
int16 srcRoute);
static void addChunkToSorter(MotionLayerState *mlStates,
static bool addChunkToSorter(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
MotionNodeEntry * pMNEntry,
TupleChunkListItem tcItem,
......@@ -60,7 +59,7 @@ static void processIncomingChunks(MotionLayerState *mlStates,
int16 motNodeID,
int16 srcRoute);
static inline void reconstructTuple(MotionNodeEntry * pMNEntry, ChunkSorterEntry * pCSEntry, TupleRemapper *remapper);
static inline void reconstructTuple(MotionNodeEntry * pMNEntry, ChunkSorterEntry * pCSEntry);
/* Stats-function declarations. */
static void statSendTuple(MotionLayerState *mlStates, MotionNodeEntry * pMNEntry, TupleChunkList tcList);
......@@ -70,8 +69,6 @@ static void statNewTupleArrived(MotionNodeEntry * pMNEntry, ChunkSorterEntry * p
static void statRecvTuple(MotionNodeEntry * pMNEntry,
ChunkSorterEntry * pCSEntry,
ReceiveReturnCode recvRC);
static bool ShouldSendRecordCache(MotionConn *conn, SerTupInfo *pSerInfo);
static void UpdateSentRecordCache(MotionConn *conn);
......@@ -81,21 +78,15 @@ static void UpdateSentRecordCache(MotionConn *conn);
* tuple-chunk list, and recording statistics about the newly formed tuple.
*/
static inline void
reconstructTuple(MotionNodeEntry * pMNEntry, ChunkSorterEntry * pCSEntry, TupleRemapper *remapper)
reconstructTuple(MotionNodeEntry * pMNEntry, ChunkSorterEntry * pCSEntry)
{
HeapTuple htup;
SerTupInfo *pSerInfo = &pMNEntry->ser_tup_info;
/*
* Convert the list of chunks into a tuple, then stow it away. This frees
* our TCList as a side-effect
*/
htup = CvtChunksToHeapTup(&pCSEntry->chunk_list, pSerInfo, remapper);
if (!htup)
return;
htup = TRCheckAndRemap(remapper, pSerInfo->tupdesc, htup);
htup = CvtChunksToHeapTup(&pCSEntry->chunk_list, &pMNEntry->ser_tup_info);
htfifo_addtuple(pCSEntry->ready_tuples, htup);
......@@ -391,83 +382,6 @@ SendStopMessage(MotionLayerState *mlStates,
transportStates->doSendStopMessage(transportStates, motNodeID);
}
void
CheckAndSendRecordCache(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 targetRoute)
{
MotionNodeEntry *pMNEntry;
TupleChunkListData tcList;
MemoryContext oldCtxt;
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn;
getChunkTransportState(transportStates, motNodeID, &pEntry);
/*
* for broadcast we only mark sent_record_typmod for connection 0
* for efficiency and convenience
*/
if (targetRoute == BROADCAST_SEGIDX)
conn = &pEntry->conns[0];
else
conn = &pEntry->conns[targetRoute];
/*
* Analyze tools. Do not send any thing if this slice is in the bit mask
*/
if (gp_motion_slice_noop != 0 && (gp_motion_slice_noop & (1 << currentSliceId)) != 0)
return;
/*
* Pull up the motion node entry with the node's details. This includes
* details that affect sending, such as whether the motion node needs to
* include backup segment-dbs.
*/
pMNEntry = getMotionNodeEntry(mlStates, motNodeID, "SendRecordCache");
if (!ShouldSendRecordCache(conn, &pMNEntry->ser_tup_info))
return;
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "Serializing RecordCache for sending.");
#endif
/* Create and store the serialized form, and some stats about it. */
oldCtxt = MemoryContextSwitchTo(mlStates->motion_layer_mctx);
SerializeRecordCacheIntoChunks(&pMNEntry->ser_tup_info, &tcList, conn);
MemoryContextSwitchTo(oldCtxt);
#ifdef AMS_VERBOSE_LOGGING
elog(DEBUG5, "Serialized RecordCache for sending:\n"
"\ttarget-route %d \n"
"\t%d bytes in serial form\n"
"\tbroken into %d chunks",
targetRoute,
tcList.serialized_data_length,
tcList.num_chunks);
#endif
/* do the send. */
if (!SendTupleChunkToAMS(mlStates, transportStates, motNodeID, targetRoute, tcList.p_first))
{
pMNEntry->stopped = true;
}
else
{
/* update stats */
statSendTuple(mlStates, pMNEntry, &tcList);
}
/* cleanup */
clearTCList(&pMNEntry->ser_tup_info.chunkCache, &tcList);
UpdateSentRecordCache(conn);
}
/*
* Function: SendTuple - Sends a portion or whole tuple to the AMS layer.
*/
......@@ -1065,7 +979,7 @@ materializeChunk(TupleChunkListItem * tcItem)
* true - if another HeapTuple is completed by this chunk.
* false - if the chunk does not complete a HeapTuple.
*/
static void
static bool
addChunkToSorter(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
MotionNodeEntry * pMNEntry,
......@@ -1075,9 +989,8 @@ addChunkToSorter(MotionLayerState *mlStates,
{
MemoryContext oldCtxt;
ChunkSorterEntry *chunkSorterEntry;
bool tupleCompleted = false;
TupleChunkType tcType;
ChunkTransportStateEntry *pEntry = NULL;
MotionConn *conn = NULL;
AssertArg(tcItem != NULL);
......@@ -1085,9 +998,6 @@ addChunkToSorter(MotionLayerState *mlStates,
chunkSorterEntry = getChunkSorterEntry(mlStates, pMNEntry, srcRoute);
getChunkTransportState(transportStates, motNodeID, &pEntry);
conn = pEntry->conns + srcRoute;
/* Look at the chunk's type, to figure out what to do with it. */
GetChunkType(tcItem, &tcType);
......@@ -1106,7 +1016,8 @@ addChunkToSorter(MotionLayerState *mlStates,
/* Put this chunk into the list, then turn it into a HeapTuple! */
appendChunkToTCList(&chunkSorterEntry->chunk_list, tcItem);
reconstructTuple(pMNEntry, chunkSorterEntry, conn->remapper);
reconstructTuple(pMNEntry, chunkSorterEntry);
tupleCompleted = true;
break;
......@@ -1164,7 +1075,8 @@ addChunkToSorter(MotionLayerState *mlStates,
/* Put this chunk into the list, then turn it into a HeapTuple! */
appendChunkToTCList(&chunkSorterEntry->chunk_list, tcItem);
reconstructTuple(pMNEntry, chunkSorterEntry, conn->remapper);
reconstructTuple(pMNEntry, chunkSorterEntry);
tupleCompleted = true;
break;
......@@ -1213,6 +1125,8 @@ addChunkToSorter(MotionLayerState *mlStates,
}
MemoryContextSwitchTo(oldCtxt);
return tupleCompleted;
}
......@@ -1336,23 +1250,3 @@ statRecvTuple(MotionNodeEntry * pMNEntry, ChunkSorterEntry * pCSEntry,
}
}
/*
* Return true if the record cache should be sent to master
*/
static bool
ShouldSendRecordCache(MotionConn *conn, SerTupInfo *pSerInfo)
{
return pSerInfo->has_record_types &&
NextRecordTypmod > 0 &&
NextRecordTypmod > conn->sent_record_typmod;
}
/*
* Update the number of sent record types.
*/
static void
UpdateSentRecordCache(MotionConn *conn)
{
conn->sent_record_typmod = NextRecordTypmod;
}
......@@ -706,8 +706,6 @@ createChunkTransportState(ChunkTransportState *transportStates,
conn->stillActive = false;
conn->stopRequested = false;
conn->cdbProc = NULL;
conn->sent_record_typmod = 0;
conn->remapper = NULL;
}
return pEntry;
......
......@@ -3099,7 +3099,6 @@ SetupUDPIFCInterconnect_Internal(EState *estate)
conn->conn_info.seq = 1;
conn->stillActive = true;
conn->remapper = CreateTupleRemapper();
incoming_count++;
......@@ -3488,10 +3487,6 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
/* free up the packet queue */
pfree(conn->pkt_q);
conn->pkt_q = NULL;
/* free up the tuple remapper */
if (conn->remapper)
DestroyTupleRemapper(conn->remapper);
}
pfree(pEntry->conns);
pEntry->conns = NULL;
......
此差异已折叠。
......@@ -13,7 +13,6 @@
#include "access/htup.h"
#include "catalog/pg_type.h"
#include "cdb/cdbmotion.h"
#include "cdb/cdbsrlz.h"
#include "cdb/tupser.h"
#include "cdb/cdbvars.h"
#include "libpq/pqformat.h"
......@@ -24,22 +23,9 @@
#include "utils/memutils.h"
#include "utils/builtins.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "access/memtup.h"
/*
* Transient record types table is sent to upsteam via a specially constructed
* tuple, on receiving side it can distinguish it from real tuples by checking
* below magic attributes in the header:
*
* - tuplen has MEMTUP_LEAD_BIT unset, so it's considered as a heap tuple;
* - natts is set to RECORD_CACHE_MAGIC_NATTS;
* - infomask is set to RECORD_CACHE_MAGIC_INFOMASK.
*/
#define RECORD_CACHE_MAGIC_NATTS 0xffff
#define RECORD_CACHE_MAGIC_INFOMASK 0xffff
/* A MemoryContext used within the tuple serialize code, so that freeing of
* space is SUPAFAST. It is initialized in the first call to InitSerTupInfo()
* since that must be called before any tuple serialization or deserialization
......@@ -151,8 +137,6 @@ InitSerTupInfo(TupleDesc tupdesc, SerTupInfo * pSerInfo)
pSerInfo->chunkCache.len = 0;
pSerInfo->chunkCache.items = NULL;
pSerInfo->has_record_types = false;
/*
* If we have some attributes, go ahead and prepare the information for
* each attribute in the descriptor. Otherwise, we can return right away.
......@@ -195,10 +179,6 @@ InitSerTupInfo(TupleDesc tupdesc, SerTupInfo * pSerInfo)
elog(ERROR, "cache lookup failed for type %u", attrInfo->atttypid);
pt = (Form_pg_type) GETSTRUCT(typeTuple);
/* Consider any non-basic types as potential containers of record types */
if (pt->typtype != TYPTYPE_BASE)
pSerInfo->has_record_types = true;
if (!pt->typisdefined)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
......@@ -366,104 +346,6 @@ typedef struct TupSerHeader
uint16 infomask; /* various flag bits */
} TupSerHeader;
/*
* Convert RecordCache into a byte-sequence, and store it directly
* into a chunklist for transmission.
*
* This code is based on the printtup_internal_20() function in printtup.c.
*/
void
SerializeRecordCacheIntoChunks(SerTupInfo *pSerInfo,
TupleChunkList tcList,
MotionConn *conn)
{
TupleChunkListItem tcItem = NULL;
MemoryContext oldCtxt;
TupSerHeader tsh;
List *typelist = NULL;
int size = -1;
char * buf = NULL;
AssertArg(tcList != NULL);
AssertArg(pSerInfo != NULL);
/* get ready to go */
tcList->p_first = NULL;
tcList->p_last = NULL;
tcList->num_chunks = 0;
tcList->serialized_data_length = 0;
tcList->max_chunk_length = Gp_max_tuple_chunk_size;
tcItem = getChunkFromCache(&pSerInfo->chunkCache);
if (tcItem == NULL)
{
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("Could not allocate space for first chunk item in new chunk list.")));
}
/* assume that we'll take a single chunk */
SetChunkType(tcItem->chunk_data, TC_WHOLE);
tcItem->chunk_length = TUPLE_CHUNK_HEADER_SIZE;
appendChunkToTCList(tcList, tcItem);
AssertState(s_tupSerMemCtxt != NULL);
/*
* To avoid inconsistency of record cache between sender and receiver in
* the same motion, send the serialized record cache to receiver before the
* first tuple is sent, the receiver is responsible for registering the
* records to its own local cache and remapping the typmod of tuples sent
* by sender.
*/
oldCtxt = MemoryContextSwitchTo(s_tupSerMemCtxt);
typelist = build_tuple_node_list(conn->sent_record_typmod);
buf = serializeNode((Node *) typelist, &size, NULL);
MemoryContextSwitchTo(oldCtxt);
tsh.tuplen = sizeof(TupSerHeader) + size;
/*
* we use natts==0xffff and infomask==0xffff to identify this special
* tuple which actually carry the serialized record cache table.
*/
tsh.natts = RECORD_CACHE_MAGIC_NATTS;
tsh.infomask = RECORD_CACHE_MAGIC_INFOMASK;
addByteStringToChunkList(tcList,
(char *)&tsh,
sizeof(TupSerHeader),
&pSerInfo->chunkCache);
addByteStringToChunkList(tcList, buf, size, &pSerInfo->chunkCache);
addPadding(tcList, &pSerInfo->chunkCache, size);
/*
* if we have more than 1 chunk we have to set the chunk types on our
* first chunk and last chunk
*/
if (tcList->num_chunks > 1)
{
TupleChunkListItem first,
last;
first = tcList->p_first;
last = tcList->p_last;
Assert(first != NULL);
Assert(first != last);
Assert(last != NULL);
SetChunkType(first->chunk_data, TC_PARTIAL_START);
SetChunkType(last->chunk_data, TC_PARTIAL_END);
/*
* any intervening chunks are already set to TC_PARTIAL_MID when
* allocated
*/
}
return;
}
/*
* Convert a HeapTuple into a byte-sequence, and store it directly
* into a chunklist for transmission.
......@@ -1134,7 +1016,7 @@ DeserializeTuple(SerTupInfo * pSerInfo, StringInfo serialTup)
}
HeapTuple
CvtChunksToHeapTup(TupleChunkList tcList, SerTupInfo * pSerInfo, TupleRemapper *remapper)
CvtChunksToHeapTup(TupleChunkList tcList, SerTupInfo * pSerInfo)
{
StringInfoData serData;
TupleChunkListItem tcItem;
......@@ -1251,23 +1133,6 @@ CvtChunksToHeapTup(TupleChunkList tcList, SerTupInfo * pSerInfo, TupleRemapper *
tshp = (TupSerHeader *)pos;
if (!(tshp->tuplen & MEMTUP_LEAD_BIT) &&
tshp->natts == RECORD_CACHE_MAGIC_NATTS &&
tshp->infomask == RECORD_CACHE_MAGIC_INFOMASK)
{
uint32 tuplen = tshp->tuplen & ~MEMTUP_LEAD_BIT;
/* a special tuple with record type cache */
List * typelist = (List *) deserializeNode(pos + sizeof(TupSerHeader),
tuplen - sizeof(TupSerHeader));
TRHandleTypeLists(remapper, typelist);
/* Free up memory we used. */
pfree(serData.data);
return NULL;
}
if ((tshp->tuplen & MEMTUP_LEAD_BIT) != 0)
{
uint32 tuplen = memtuple_size_from_uint32(tshp->tuplen);
......
......@@ -59,20 +59,14 @@
* An optional resultSlot can be passed as well.
*/
JunkFilter *
ExecInitJunkFilter(List *targetList, bool hasoid, TupleTableSlot *slot)
ExecInitJunkFilter(List *targetList, TupleDesc cleanTupType, TupleTableSlot *slot)
{
JunkFilter *junkfilter;
TupleDesc cleanTupType;
int cleanLength;
AttrNumber *cleanMap;
ListCell *t;
AttrNumber cleanResno;
/*
* Compute the tuple descriptor for the cleaned tuple.
*/
cleanTupType = ExecCleanTypeFromTL(targetList, hasoid);
/*
* Use the given slot, or make a new slot if we weren't given one.
*/
......
......@@ -392,6 +392,17 @@ ExecutorStart(QueryDesc *queryDesc, int eflags)
/* Initialize per-query resource (diskspace) tracking */
WorkfileQueryspace_InitEntry(gp_session_id, gp_command_count);
if (Gp_role != GP_ROLE_DISPATCH && queryDesc->ddesc &&
queryDesc->ddesc->transientTypeRecords != NULL)
{
ListCell *cell;
foreach(cell, queryDesc->ddesc->transientTypeRecords)
{
TupleDescNode *tmp = lfirst(cell);
assign_record_type_typmod(tmp->tuple);
}
}
/*
* Handling of the Slice table depends on context.
*/
......@@ -633,6 +644,8 @@ ExecutorStart(QueryDesc *queryDesc, int eflags)
queryDesc->ddesc->sliceTable = estate->es_sliceTable;
build_tuple_node_list(&queryDesc->ddesc->transientTypeRecords);
queryDesc->ddesc->oidAssignments = GetAssignedOidsForDispatch();
/*
......@@ -1909,8 +1922,10 @@ InitPlan(QueryDesc *queryDesc, int eflags)
ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc,
subplan->targetlist);
TupleDesc cleanTupType = ExecCleanTypeFromTL(subplan->targetlist,
resultRelInfo->ri_RelationDesc->rd_att->tdhasoid);
j = ExecInitJunkFilter(subplan->targetlist,
resultRelInfo->ri_RelationDesc->rd_att->tdhasoid,
cleanTupType,
ExecInitExtraTupleSlot(estate));
/*
* Since it must be UPDATE/DELETE, there had better be a
......@@ -1953,8 +1968,11 @@ InitPlan(QueryDesc *queryDesc, int eflags)
ExecCheckPlanOutput(estate->es_result_relation_info->ri_RelationDesc,
planstate->plan->targetlist);
TupleDesc cleanTupType = ExecCleanTypeFromTL(planstate->plan->targetlist,
tupType->tdhasoid);
j = ExecInitJunkFilter(planstate->plan->targetlist,
tupType->tdhasoid,
cleanTupType,
ExecInitExtraTupleSlot(estate));
estate->es_junkFilter = j;
if (estate->es_result_relation_info)
......
......@@ -900,7 +900,7 @@ ExecEvalWholeRowVar(WholeRowVarExprState *wrvstate, ExprContext *econtext,
oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_query_memory);
wrvstate->wrv_junkFilter =
ExecInitJunkFilter(subplan->plan->targetlist,
ExecGetResultType(subplan)->tdhasoid,
ExecGetResultType(subplan),
NULL);
MemoryContextSwitchTo(oldcontext);
}
......
......@@ -1197,7 +1197,10 @@ check_sql_fn_retval(Oid func_id, Oid rettype, List *queryTreeList,
* what the caller expects will happen at runtime.
*/
if (junkFilter)
*junkFilter = ExecInitJunkFilter(tlist, false, NULL);
{
TupleDesc cleanTupType = ExecCleanTypeFromTL(tlist, false /* hasoid */);
*junkFilter = ExecInitJunkFilter(tlist, cleanTupType, NULL);
}
return true;
}
Assert(tupdesc);
......
......@@ -168,8 +168,9 @@ ExecInitDML(DML *node, EState *estate, int eflags)
* Both input and output of the junk filter include dropped attributes, so
* the junk filter doesn't need to do anything special there about them
*/
TupleDesc cleanTupType = CreateTupleDescCopy(dmlstate->ps.state->es_result_relation_info->ri_RelationDesc->rd_att);
dmlstate->junkfilter = ExecInitJunkFilter(node->plan.targetlist,
false,
cleanTupType,
dmlstate->cleanedUpSlot);
if (estate->es_instrument)
......
......@@ -160,6 +160,17 @@ bool isMotionGather(const Motion *m)
&& m->numOutputSegs == 1);
}
/**
* Is it a gather motion to master?
*/
static bool
isMotionGatherToMaster(const Motion *m)
{
return (m->motionType == MOTIONTYPE_FIXED
&& m->numOutputSegs == 1
&& m->outputSegIdx[0] == -1);
}
/*
* Set the statistic info in gpmon packet.
*/
......@@ -1409,6 +1420,55 @@ doSendEndOfStream(Motion * motion, MotionState * node)
node->sentEndOfStream = true;
}
/*
* Change segment typemod to qd typmod for transient type.
*/
static void mapTransientTypeMod(TupleTableSlot *slot)
{
TupleDesc typeinfo = slot->tts_tupleDescriptor;
int natts = typeinfo->natts;
int i = 0;
for (i = 0; i < natts; ++i)
{
bool orignull;
Datum origattr;
Datum attr;
Form_pg_attribute attrData = typeinfo->attrs[i];
HeapTupleHeader rec;
TupleDesc tupleDesc;
if (attrData->atttypid != RECORDOID)
{
continue;
}
origattr = slot_getattr(slot, i+1, &orignull);
if (orignull)
{
continue;
}
attr = PointerGetDatum(PG_DETOAST_DATUM(origattr));
rec = DatumGetHeapTupleHeader(attr);
tupleDesc = lookup_rowtype_tupdesc_noerror(RECORDOID,
HeapTupleHeaderGetTypMod(rec),
true);
if (tupleDesc->tdqdtypmod == -1 ||
tupleDesc->tdqdtypmod == tupleDesc->tdtypmod)
{
ReleaseTupleDesc(tupleDesc);
continue;
}
HeapTupleHeaderSetTypMod(rec, tupleDesc->tdqdtypmod);
ReleaseTupleDesc(tupleDesc);
}
}
/*
* A crufty confusing part of the current code is how contentId is used within
* the motion structures and then how that gets translated to targetRoutes by
......@@ -1505,13 +1565,15 @@ doSendTuple(Motion * motion, MotionState * node, TupleTableSlot *outerTupleSlot)
Assert(!is_null);
}
tuple = ExecFetchSlotGenericTuple(outerTupleSlot, true);
CheckAndSendRecordCache(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
targetRoute);
/* If it's a gather motion sending tuple to master, need to change segment typmod to
* qd typmod for transient type */
if (isMotionGatherToMaster(motion))
{
mapTransientTypeMod(outerTupleSlot);
}
tuple = ExecFetchSlotGenericTuple(outerTupleSlot, true);
/* send the tuple out. */
sendRC = SendTuple(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
......
......@@ -434,9 +434,12 @@ ExecInitTableFunction(TableFunctionScan *node, EState *estate, int eflags)
scanstate->inputscan->subdesc = inputdesc;
/* Determine projection information for subplan */
TupleDesc cleanTupType = ExecCleanTypeFromTL(subplan->plan->targetlist,
false /* hasoid */);
scanstate->inputscan->junkfilter =
ExecInitJunkFilter(subplan->plan->targetlist,
false,
cleanTupType,
NULL /* slot */);
BlessTupleDesc(scanstate->inputscan->junkfilter->jf_cleanTupType);
......
......@@ -1144,6 +1144,7 @@ _outTupleDescNode(StringInfo str, TupleDescNode *node)
WRITE_OID_FIELD(tuple->tdtypeid);
WRITE_INT_FIELD(tuple->tdtypmod);
WRITE_INT_FIELD(tuple->tdqdtypmod);
WRITE_BOOL_FIELD(tuple->tdhasoid);
WRITE_INT_FIELD(tuple->tdrefcount);
}
......
......@@ -327,6 +327,7 @@ _outQueryDispatchDesc(StringInfo str, QueryDispatchDesc *node)
{
WRITE_NODE_TYPE("QUERYDISPATCHDESC");
WRITE_NODE_FIELD(transientTypeRecords);
WRITE_STRING_FIELD(intoTableSpaceName);
WRITE_NODE_FIELD(oidAssignments);
WRITE_NODE_FIELD(sliceTable);
......@@ -4292,6 +4293,7 @@ _outTupleDescNode(StringInfo str, TupleDescNode *node)
WRITE_OID_FIELD(tuple->tdtypeid);
WRITE_INT_FIELD(tuple->tdtypmod);
WRITE_INT_FIELD(tuple->tdqdtypmod);
WRITE_BOOL_FIELD(tuple->tdhasoid);
WRITE_INT_FIELD(tuple->tdrefcount);
}
......
......@@ -1501,6 +1501,7 @@ _readQueryDispatchDesc(void)
{
READ_LOCALS(QueryDispatchDesc);
READ_NODE_FIELD(transientTypeRecords);
READ_STRING_FIELD(intoTableSpaceName);
READ_NODE_FIELD(oidAssignments);
READ_NODE_FIELD(sliceTable);
......@@ -2566,6 +2567,7 @@ _readTupleDescNode(void)
READ_OID_FIELD(tuple->tdtypeid);
READ_INT_FIELD(tuple->tdtypmod);
READ_INT_FIELD(tuple->tdqdtypmod);
READ_BOOL_FIELD(tuple->tdhasoid);
READ_INT_FIELD(tuple->tdrefcount);
......
......@@ -85,7 +85,7 @@ static HTAB *RecordCacheHash = NULL;
static TupleDesc *RecordCacheArray = NULL;
static int32 RecordCacheArrayLen = 0; /* allocated length of array */
int32 NextRecordTypmod = 0; /* number of entries used */
static int32 NextRecordTypmod = 0; /* number of entries used */
static void TypeCacheRelCallback(Datum arg, Oid relid);
......@@ -466,6 +466,19 @@ assign_record_type_typmod(TupleDesc tupDesc)
{
tupDesc->tdtypmod = entDesc->tdtypmod;
if (entDesc->tdqdtypmod != -1 && tupDesc->tdqdtypmod != -1)
{
Assert(tupDesc->tdqdtypmod == entDesc->tdqdtypmod);
}
else if (entDesc->tdqdtypmod != -1)
{
tupDesc->tdqdtypmod = entDesc->tdqdtypmod;
}
else if (tupDesc->tdqdtypmod != -1)
{
entDesc->tdqdtypmod = tupDesc->tdqdtypmod;
}
return;
}
}
......@@ -559,14 +572,13 @@ TypeCacheRelCallback(Datum arg, Oid relid)
*
* Wrap TupleDesc with TupleDescNode. Return all record type in record cache.
*/
List *
build_tuple_node_list(int start)
void
build_tuple_node_list(List **transientTypeList)
{
List *transientTypeList = NIL;
int i = start;
int i = 0;
if (NextRecordTypmod == 0)
return transientTypeList;
return;
for (; i < NextRecordTypmod; i++)
{
......@@ -576,8 +588,7 @@ build_tuple_node_list(int start)
node->type = T_TupleDescNode;
node->natts = tmp->natts;
node->tuple = CreateTupleDescCopy(tmp);
transientTypeList = lappend(transientTypeList, node);
node->tuple->tdqdtypmod = tmp->tdtypmod;
*transientTypeList = lappend(*transientTypeList, node);
}
return transientTypeList;
}
......@@ -1163,6 +1163,7 @@ TypeGetTupleDesc(Oid typeoid, List *colaliases)
/* The tuple type is now an anonymous record type */
tupdesc->tdtypeid = RECORDOID;
tupdesc->tdtypmod = -1;
tupdesc->tdqdtypmod = -1;
}
}
else if (functypclass == TYPEFUNC_SCALAR)
......
......@@ -74,6 +74,7 @@ typedef struct tupleDesc
TupleConstr *constr; /* constraints, or NULL if none */
Oid tdtypeid; /* composite type ID for tuple type */
int32 tdtypmod; /* typmod for tuple type */
int32 tdqdtypmod; /* typmod for tuple type on Master */
bool tdhasoid; /* tuple has oid attribute in its header */
int tdrefcount; /* reference count, or -1 if not counting */
} *TupleDesc;
......
......@@ -18,7 +18,6 @@
#include "cdb/tupser.h"
#include "cdb/tupchunk.h"
#include "cdb/tupchunklist.h"
#include "cdb/tupleremap.h"
struct CdbProcess; /* #include "nodes/execnodes.h" */
struct Slice; /* #include "nodes/execnodes.h" */
......@@ -270,21 +269,6 @@ struct MotionConn
/* Indicate whether an EOS is received and acked. */
bool eosAcked;
/*
* used by the sender.
*
* the typmod of last sent record type in current connection,
* if the connection is for broadcasting then we only check
* and update this attribute on connection 0.
*/
int32 sent_record_typmod;
/*
* used by the receiver.
*
* all the remap information.
*/
TupleRemapper *remapper;
};
/*
......
......@@ -71,10 +71,6 @@ extern void EndMotionLayerNode(MotionLayerState *mlStates, int16 motNodeID, bool
* or error-cleanup). */
extern void RemoveMotionLayer(MotionLayerState *ml_states, bool flushCommLayer __attribute__((unused)) );
extern void CheckAndSendRecordCache(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
int16 targetRoute);
/* non-blocking operation that may perform only part (or none) of the
* send before returning. The TupleSendContext is used to help keep track
......
/*-------------------------------------------------------------------------
*
* tupleremap.h
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/cdb/tupleremap.h
*
*-------------------------------------------------------------------------
*/
#ifndef TUPLEREMAP_H
#define TUPLEREMAP_H
#include "tcop/dest.h"
/* Opaque struct, only known inside tupleremap.c. */
typedef struct TupleRemapper TupleRemapper;
extern TupleRemapper *CreateTupleRemapper(void);
extern void DestroyTupleRemapper(TupleRemapper *remapper);
extern HeapTuple TRCheckAndRemap(TupleRemapper *remapper, TupleDesc tupledesc, HeapTuple tuple);
extern void TRHandleTypeLists(TupleRemapper *remapper, List *typelist);
#endif /* TUPLEREMAP_H */
......@@ -13,7 +13,6 @@
#include "cdb/tupchunklist.h"
#include "lib/stringinfo.h"
#include "utils/lsyscache.h"
#include "cdb/tupleremap.h"
/* Define this to pack the NULLs-mask into the minimum number of bytes
......@@ -79,9 +78,6 @@ typedef struct SerTupInfo
/* Preallocated space for deformtuple and formtuple. */
Datum *values;
bool *nulls;
/* true if tupdesc contains record types */
bool has_record_types;
} SerTupInfo;
/*
......@@ -98,11 +94,6 @@ extern void InitSerTupInfo(TupleDesc tupdesc, SerTupInfo *pSerInfo);
/* Free up storage in a previously initialized SerTupInfo struct. */
extern void CleanupSerTupInfo(SerTupInfo *pSerInfo);
/* Convert RecordCache into chunks ready to send out, in one pass */
extern void SerializeRecordCacheIntoChunks(SerTupInfo *pSerInfo,
TupleChunkList tcList,
MotionConn *conn);
/* Convert a HeapTuple into chunks ready to send out, in one pass */
extern void SerializeTupleIntoChunks(HeapTuple tuple, SerTupInfo *pSerInfo, TupleChunkList tcList);
......@@ -115,6 +106,6 @@ extern HeapTuple DeserializeTuple(SerTupInfo * pSerInfo, StringInfo serialTup);
/* Convert a sequence of chunks containing serialized tuple data into a
* HeapTuple.
*/
extern HeapTuple CvtChunksToHeapTup(TupleChunkList tclist, SerTupInfo * pSerInfo, TupleRemapper *remapper);
extern HeapTuple CvtChunksToHeapTup(TupleChunkList tclist, SerTupInfo * pSerInfo);
#endif /* TUPSER_H */
......@@ -164,6 +164,12 @@ typedef struct QueryDispatchDesc
{
NodeTag type;
/*
* List of TupleDescNodes, one for each transient record type currently
* assigned.
*/
List *transientTypeRecords;
/*
* For a SELECT INTO statement, this stores the tablespace to use for the
* new table and related auxiliary tables.
......
......@@ -139,7 +139,7 @@ extern TupleHashEntry FindTupleHashEntry(TupleHashTable hashtable,
/*
* prototypes from functions in execJunk.c
*/
extern JunkFilter *ExecInitJunkFilter(List *targetList, bool hasoid,
extern JunkFilter *ExecInitJunkFilter(List *targetList, TupleDesc cleanTupType,
TupleTableSlot *slot);
extern JunkFilter *ExecInitJunkFilterConversion(List *targetList,
TupleDesc cleanTupType,
......
......@@ -75,8 +75,6 @@ typedef struct TypeCacheEntry
#define TYPECACHE_TUPDESC 0x0040
#define TYPECACHE_BTREE_OPFAMILY 0x0080
extern int32 NextRecordTypmod;
extern TypeCacheEntry *lookup_type_cache(Oid type_id, int flags);
extern TupleDesc lookup_rowtype_tupdesc(Oid type_id, int32 typmod);
......@@ -88,6 +86,6 @@ extern TupleDesc lookup_rowtype_tupdesc_copy(Oid type_id, int32 typmod);
extern void assign_record_type_typmod(TupleDesc tupDesc);
extern List *build_tuple_node_list(int start);
extern void build_tuple_node_list(List **transientTypeList);
#endif /* TYPCACHE_H */
......@@ -540,16 +540,14 @@ group by f1,f2,fs;
-- Check that whole-row Vars reading the result of a subselect don't include
-- any junk columns therein
--
select q from (select max(f1) as max from int4_tbl group by f1 order by f1) q order by max;
q
---------------
(-2147483647)
(-123456)
(0)
(123456)
(2147483647)
(5 rows)
-- This test works in upstream PostgreSQL but triggers a problem in GPDB; in
-- Greenplum the typmods must be the same between all segments and the master
-- but PostgreSQL executor defers typmod assignment to ExecEvalWholeRowVar()
-- which is too late for Greenplum since the plan has already been dispatched.
-- This should be fixed but requires new infrastructure.
--
select q from (select max(f1) from int4_tbl group by f1 order by f1) q;
ERROR: record type has not been registered
--
-- Test case for sublinks pushed down into subselects via join alias expansion
--
......
......@@ -551,16 +551,14 @@ group by f1,f2,fs;
-- Check that whole-row Vars reading the result of a subselect don't include
-- any junk columns therein
--
select q from (select max(f1) as max from int4_tbl group by f1 order by f1) q order by max;
q
---------------
(-2147483647)
(-123456)
(0)
(123456)
(2147483647)
(5 rows)
-- This test works in upstream PostgreSQL but triggers a problem in GPDB; in
-- Greenplum the typmods must be the same between all segments and the master
-- but PostgreSQL executor defers typmod assignment to ExecEvalWholeRowVar()
-- which is too late for Greenplum since the plan has already been dispatched.
-- This should be fixed but requires new infrastructure.
--
select q from (select max(f1) from int4_tbl group by f1 order by f1) q;
ERROR: record type has not been registered
--
-- Test case for sublinks pushed down into subselects via join alias expansion
--
......
......@@ -297,107 +297,4 @@ select foo();
-- cleanup
drop function foo();
drop table t;
-- This test case use UDF assign_new_record() to generate a new record type in cache
-- for each tuple, these types should be sync to receiver.
-- test function
create or replace function assign_new_record()
returns SETOF record as '@abs_builddir@/regress@DLSUFFIX@',
'assign_new_record' LANGUAGE C VOLATILE;
-- transfer record types via motion incrementally
select assign_new_record() from gp_dist_random('gp_id');
assign_new_record
-------------------
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(1)
(30 rows)
-- cleanup
drop function assign_new_record();
-- Test cases for record type nested in array
drop table if exists t_array_record;
NOTICE: table "t_array_record" does not exist, skipping
create table t_array_record as select i as c1, i * 2 as c2 from generate_series(1,10) i;
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'c1' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
select c1, array[row(c1,array[row(2,3),row(c2,5,array[c1],'abc')])] from t_array_record order by c1;
c1 | array
----+------------------------------------------------------
1 | {"(1,\"{\"\"(2,3)\"\",\"\"(2,5,{1},abc)\"\"}\")"}
2 | {"(2,\"{\"\"(2,3)\"\",\"\"(4,5,{2},abc)\"\"}\")"}
3 | {"(3,\"{\"\"(2,3)\"\",\"\"(6,5,{3},abc)\"\"}\")"}
4 | {"(4,\"{\"\"(2,3)\"\",\"\"(8,5,{4},abc)\"\"}\")"}
5 | {"(5,\"{\"\"(2,3)\"\",\"\"(10,5,{5},abc)\"\"}\")"}
6 | {"(6,\"{\"\"(2,3)\"\",\"\"(12,5,{6},abc)\"\"}\")"}
7 | {"(7,\"{\"\"(2,3)\"\",\"\"(14,5,{7},abc)\"\"}\")"}
8 | {"(8,\"{\"\"(2,3)\"\",\"\"(16,5,{8},abc)\"\"}\")"}
9 | {"(9,\"{\"\"(2,3)\"\",\"\"(18,5,{9},abc)\"\"}\")"}
10 | {"(10,\"{\"\"(2,3)\"\",\"\"(20,5,{10},abc)\"\"}\")"}
(10 rows)
drop table t_array_record;
-- Test the case that new record type is created during execution stage, which makes
-- QE have more transient types than QD.
drop table if exists t_record_qe;
NOTICE: table "t_record_qe" does not exist, skipping
create table t_record_qe as select f1 from generate_series(1,10) f1;
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'f1' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
select q from (select max(f1) as max from t_record_qe group by f1 order by f1) q order by max;
q
------
(1)
(2)
(3)
(4)
(5)
(6)
(7)
(8)
(9)
(10)
(10 rows)
drop table t_record_qe;
-- Customer reported case which used to segment fault.
DROP TABLE IF EXISTS typemod_init ;
NOTICE: table "typemod_init" does not exist, skipping
CREATE TABLE typemod_init ( varchar_col character varying(7));
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'varchar_col' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
INSERT INTO typemod_init VALUES ('A');
SELECT C FROM ( SELECT B.varchar_col FROM ( SELECT A.varchar_col FROM typemod_init A) B GROUP BY B.varchar_col) C;
c
-----
(A)
(1 row)
DROP TABLE typemod_init ;
drop schema transient_types;
......@@ -49,7 +49,6 @@ extern Datum int44in(PG_FUNCTION_ARGS);
extern Datum int44out(PG_FUNCTION_ARGS);
extern Datum gp_str2bytea(PG_FUNCTION_ARGS);
extern Datum check_auth_time_constraints(PG_FUNCTION_ARGS);
extern Datum assign_new_record(PG_FUNCTION_ARGS);
/* table_functions test */
extern Datum multiset_example(PG_FUNCTION_ARGS);
......@@ -2770,60 +2769,3 @@ udf_unsetenv(PG_FUNCTION_ARGS)
int ret = unsetenv(name);
PG_RETURN_BOOL(ret == 0);
}
PG_FUNCTION_INFO_V1(assign_new_record);
Datum
assign_new_record(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx = NULL;
if (SRF_IS_FIRSTCALL())
{
funcctx = SRF_FIRSTCALL_INIT();
TupleDesc tupdesc;
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "c", INT4OID, -1, 0);
BlessTupleDesc(tupdesc);
funcctx->tuple_desc = tupdesc;
/* dummy output */
funcctx->max_calls = 10;
}
if (Gp_role == GP_ROLE_DISPATCH)
SRF_RETURN_DONE(funcctx);
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
if (funcctx->call_cntr < funcctx->max_calls)
{
TupleDesc tupdesc;
HeapTuple tuple;
Datum dummy_values[1];
bool dummy_nulls[1];
int i;
tupdesc = CreateTemplateTupleDesc(funcctx->call_cntr, false);
dummy_values[0] = Int32GetDatum(1);
dummy_nulls[0] = false;
for (i = 1; i <= funcctx->call_cntr; i++)
TupleDescInitEntry(tupdesc, (AttrNumber) i, "c", INT4OID, -1, 0);
BlessTupleDesc(tupdesc);
tuple = heap_form_tuple(funcctx->tuple_desc, dummy_values, dummy_nulls);
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
}
else
{
/* nothing left */
SRF_RETURN_DONE(funcctx);
}
}
......@@ -324,8 +324,14 @@ group by f1,f2,fs;
-- Check that whole-row Vars reading the result of a subselect don't include
-- any junk columns therein
--
-- This test works in upstream PostgreSQL but triggers a problem in GPDB; in
-- Greenplum the typmods must be the same between all segments and the master
-- but PostgreSQL executor defers typmod assignment to ExecEvalWholeRowVar()
-- which is too late for Greenplum since the plan has already been dispatched.
-- This should be fixed but requires new infrastructure.
--
select q from (select max(f1) as max from int4_tbl group by f1 order by f1) q order by max;
select q from (select max(f1) from int4_tbl group by f1 order by f1) q;
--
-- Test case for sublinks pushed down into subselects via join alias expansion
......
......@@ -143,38 +143,4 @@ select foo();
drop function foo();
drop table t;
-- This test case use UDF assign_new_record() to generate a new record type in cache
-- for each tuple, these types should be sync to receiver.
-- test function
create or replace function assign_new_record()
returns SETOF record as '@abs_builddir@/regress@DLSUFFIX@',
'assign_new_record' LANGUAGE C VOLATILE;
-- transfer record types via motion incrementally
select assign_new_record() from gp_dist_random('gp_id');
-- cleanup
drop function assign_new_record();
-- Test cases for record type nested in array
drop table if exists t_array_record;
create table t_array_record as select i as c1, i * 2 as c2 from generate_series(1,10) i;
select c1, array[row(c1,array[row(2,3),row(c2,5,array[c1],'abc')])] from t_array_record order by c1;
drop table t_array_record;
-- Test the case that new record type is created during execution stage, which makes
-- QE have more transient types than QD.
drop table if exists t_record_qe;
create table t_record_qe as select f1 from generate_series(1,10) f1;
select q from (select max(f1) as max from t_record_qe group by f1 order by f1) q order by max;
drop table t_record_qe;
-- Customer reported case which used to segment fault.
DROP TABLE IF EXISTS typemod_init ;
CREATE TABLE typemod_init ( varchar_col character varying(7));
INSERT INTO typemod_init VALUES ('A');
SELECT C FROM ( SELECT B.varchar_col FROM ( SELECT A.varchar_col FROM typemod_init A) B GROUP BY B.varchar_col) C;
DROP TABLE typemod_init ;
drop schema transient_types;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册