提交 4b360942 编写于 作者: K Kenan Yao

Dispatch exactly same text string for all slices.

Include a map from sliceIndex to gang_id in the dispatched string,
and remove the localSlice field, hence QE should get the localSlice
from the map now. By this way, we avoid duplicating and modifying
the dispatch text string slice by slice, and each QE of a sliced
dispatch would get same contents now.

The extra space cost is sizeof(int) * SliceNumber bytes, and the extra
computing cost is iterating the SliceNumber-size array. Compared with
memcpy of text string for each slice in previous implementation, this
way is much cheaper, because SliceNumber is much smaller than the size
of dispatch text string. Also, since SliceNumber is so small, we just
use an array for the map instead of a hash table.

Also, clean up some dead code in dispatcher, including:
(1) Remove primary_gang_id field of Slice struct and DispatchCommandDtxProtocolParms
struct, since dispatch agent is deprecated now;
(2) Remove redundant logic in cdbdisp_dispatchX;
(3) Clean up buildGpDtxProtocolCommand;
上级 2cda812c
......@@ -49,6 +49,12 @@
#define MAX_CACHED_1_GANGS 1
/*
* Which gang this QE belongs to; this would be used in PostgresMain to find out
* the slice this QE should execute
*/
int qe_gang_id = 0;
/*
* Points to the result of getCdbComponentDatabases()
*/
......@@ -94,6 +100,8 @@ typedef struct DoConnectParms
/* type of gang. */
GangType type;
int gangId;
/* connect options. GUC etc. */
StringInfo connectOptions;
......@@ -103,14 +111,15 @@ typedef struct DoConnectParms
static Gang *buildGangDefinition(GangType type, int gang_id, int size,
int content);
static DoConnectParms* makeConnectParms(int parmsCount, GangType type);
static DoConnectParms* makeConnectParms(int parmsCount, GangType type, int gangId);
static void destroyConnectParms(DoConnectParms *doConnectParmsAr, int count);
static void checkConnectionStatus(Gang* gp, int* countInRecovery,
int* countSuccessful);
static bool isPrimaryWriterGangAlive(void);
static void *thread_DoConnect(void *arg);
static void build_gpqeid_param(char *buf, int bufsz, int segIndex,
bool is_writer);
static void
build_gpqeid_param(char *buf, int bufsz, int segIndex,
bool is_writer, int gangId);
static Gang *createGang(GangType type, int gang_id, int size, int content);
static void disconnectAndDestroyGang(Gang *gp);
static void disconnectAndDestroyAllReaderGangs(bool destroyAllocated);
......@@ -364,7 +373,7 @@ create_gang_retry:
Assert(threadCount > 0);
/* initialize connect parameters */
doConnectParmsAr = makeConnectParms(threadCount, type);
doConnectParmsAr = makeConnectParms(threadCount, type, gang_id);
for (i = 0; i < size; i++)
{
parmIndex = i / gp_connections_per_thread;
......@@ -537,14 +546,17 @@ thread_DoConnect(void *arg)
* early enough now some locks are taken before command line options
* are recognized.
*/
build_gpqeid_param(gpqeid, sizeof(gpqeid), segdbDesc->segindex, pParms->type == GANGTYPE_PRIMARY_WRITER);
build_gpqeid_param(gpqeid, sizeof(gpqeid),
segdbDesc->segindex,
pParms->type == GANGTYPE_PRIMARY_WRITER,
pParms->gangId);
/* check the result in createGang */
cdbconn_doConnect(segdbDesc, gpqeid, pParms->connectOptions->data);
}
return (NULL);
} /* thread_DoConnect */
}
/*
* Test if the connections of the primary writer gang are alive.
......@@ -975,7 +987,7 @@ static void addOptions(StringInfo string, bool iswriter)
*
* Including initialize the connect option string.
*/
static DoConnectParms* makeConnectParms(int parmsCount, GangType type)
static DoConnectParms* makeConnectParms(int parmsCount, GangType type, int gangId)
{
DoConnectParms *doConnectParmsAr = (DoConnectParms*) palloc0(
parmsCount * sizeof(DoConnectParms));
......@@ -995,6 +1007,7 @@ static DoConnectParms* makeConnectParms(int parmsCount, GangType type)
pParms->db_count = 0;
pParms->type = type;
pParms->connectOptions = pOptions;
pParms->gangId = gangId;
}
return doConnectParmsAr;
}
......@@ -1027,14 +1040,15 @@ static void destroyConnectParms(DoConnectParms *doConnectParmsAr, int count)
}
/*
* build_gpqeid_params
* build_gpqeid_param
*
* Called from the qDisp process to create the "gpqeid" parameter string
* to be passed to a qExec that is being started. NB: Can be called in a
* thread, so mustn't use palloc/elog/ereport/etc.
*/
static void build_gpqeid_param(char *buf, int bufsz, int segIndex,
bool is_writer)
static void
build_gpqeid_param(char *buf, int bufsz, int segIndex,
bool is_writer, int gangId)
{
#ifdef HAVE_INT64_TIMESTAMP
#define TIMESTAMP_FORMAT INT64_FORMAT
......@@ -1046,9 +1060,10 @@ static void build_gpqeid_param(char *buf, int bufsz, int segIndex,
#endif
#endif
snprintf(buf, bufsz, "%d;%d;" TIMESTAMP_FORMAT ";%s", gp_session_id,
segIndex, PgStartTime, (is_writer ? "true" : "false"));
} /* build_gpqeid_params */
snprintf(buf, bufsz, "%d;%d;" TIMESTAMP_FORMAT ";%s;%d",
gp_session_id, segIndex, PgStartTime,
(is_writer ? "true" : "false"), gangId);
}
static bool gpqeid_next_param(char **cpp, char **npp)
{
......@@ -1075,8 +1090,9 @@ static bool gpqeid_next_param(char **cpp, char **npp)
* command line options have not been processed; GUCs have the settings
* inherited from the postmaster; etc; so don't try to do too much in here.
*/
void cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)),
const char *gpqeid_value)
void
cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)),
const char *gpqeid_value)
{
char *gpqeid = pstrdup(gpqeid_value);
char *cp;
......@@ -1108,11 +1124,16 @@ void cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)),
if (gpqeid_next_param(&cp, &np))
SetConfigOption("gp_is_writer", cp, PGC_POSTMASTER, PGC_S_OVERRIDE);
if (gpqeid_next_param(&cp, &np))
{
qe_gang_id = (int) strtol(cp, NULL, 10);
}
/* Too few items, or too many? */
if (!cp || np)
goto bad;
if (gp_session_id <= 0 || PgStartTime <= 0)
if (gp_session_id <= 0 || PgStartTime <= 0 || qe_gang_id <=0)
goto bad;
pfree(gpqeid);
......@@ -1120,7 +1141,7 @@ void cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)),
bad:
elog(FATAL, "Segment dispatched with invalid option: 'gpqeid=%s'", gpqeid_value);
} /* cdbgang_parse_gpqeid_params */
}
/*
* TODO: Dead code: remove it.
......
......@@ -141,7 +141,7 @@ static void dumpRMOnlyDtx(HTAB *htab, StringInfoData *buff);
static bool doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags,
char* gid, DistributedTransactionId gxid,
bool *badGangs, bool raiseError, CdbDispatchDirectDesc *direct,
char *serializedArgument, int serializedArgumentLen);
char *serializedDtxContextInfo, int serializedDtxContextInfoLen);
static void doPrepareTransaction(void);
static void doInsertForgetCommitted(void);
static void doNotifyingCommitPrepared(void);
......@@ -2225,15 +2225,15 @@ doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags,
char* gid, DistributedTransactionId gxid,
bool *badGangs, bool raiseError,
CdbDispatchDirectDesc *direct,
char *serializedArgument,
int serializedArgumentLen)
char *serializedDtxContextInfo,
int serializedDtxContextInfoLen)
{
int i, resultCount, numOfFailed = 0;
int i, resultCount, numOfFailed = 0;
char *dtxProtocolCommandStr = 0;
char *dtxProtocolCommandStr = 0;
struct pg_result **results = NULL;
StringInfoData errbuf;
struct pg_result **results = NULL;
StringInfoData errbuf;
dtxProtocolCommandStr = DtxProtocolCommandToString(dtxProtocolCommand);
......@@ -2253,7 +2253,7 @@ doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags,
dtxProtocolCommandStr,
gid, gxid,
&errbuf, &resultCount, badGangs, direct,
serializedArgument, serializedArgumentLen);
serializedDtxContextInfo, serializedDtxContextInfoLen);
if (errbuf.len > 0)
{
......
......@@ -37,9 +37,8 @@ typedef struct DispatchCommandDtxProtocolParms
char *dtxProtocolCommandLoggingStr;
char gid[TMGIDSIZE];
DistributedTransactionId gxid;
int primary_gang_id;
char *argument;
int argumentLength;
char *serializedDtxContextInfo;
int serializedDtxContextInfoLen;
} DispatchCommandDtxProtocolParms;
/*
......@@ -54,9 +53,9 @@ static void cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds,
DispatchCommandDtxProtocolParms *pDtxProtocolParms);
static char *
PQbuildGpDtxProtocolCommand(MemoryContext cxt,
DispatchCommandDtxProtocolParms * pDtxProtocolParms,
int *finalLen);
buildGpDtxProtocolCommand(MemoryContext cxt,
DispatchCommandDtxProtocolParms * pDtxProtocolParms,
int *finalLen);
/*
* cdbdisp_dispatchDtxProtocolCommand:
......@@ -80,7 +79,8 @@ cdbdisp_dispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand,
int *numresults,
bool *badGangs,
CdbDispatchDirectDesc * direct,
char *argument, int argumentLength)
char *serializedDtxContextInfo,
int serializedDtxContextInfoLen)
{
CdbDispatcherState ds = {NULL, NULL, NULL};
......@@ -106,8 +106,8 @@ cdbdisp_dispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand,
elog(PANIC, "Distribute transaction identifier too long (%d)", (int) strlen(gid));
memcpy(dtxProtocolParms.gid, gid, TMGIDSIZE);
dtxProtocolParms.gxid = gxid;
dtxProtocolParms.argument = argument;
dtxProtocolParms.argumentLength = argumentLength;
dtxProtocolParms.serializedDtxContextInfo = serializedDtxContextInfo;
dtxProtocolParms.serializedDtxContextInfoLen = serializedDtxContextInfoLen;
/*
* Allocate a primary QE for every available segDB in the system.
......@@ -122,8 +122,6 @@ cdbdisp_dispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand,
primaryGang->dispatcherActive = false;
}
dtxProtocolParms.primary_gang_id = primaryGang->gang_id;
/*
* Dispatch the command.
*/
......@@ -328,9 +326,7 @@ cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds,
oldContext = MemoryContextSwitchTo(ds->dispatchStateContext);
char *queryText =
PQbuildGpDtxProtocolCommand(ds->dispatchStateContext,
pDtxProtocolParms, &len);
char *queryText = buildGpDtxProtocolCommand(ds->dispatchStateContext, pDtxProtocolParms, &len);
MemoryContextSwitchTo(oldContext);
......@@ -346,72 +342,76 @@ cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds,
* Build a dtx protocol command string to be dispatched to QE.
*/
static char *
PQbuildGpDtxProtocolCommand(MemoryContext cxt,
DispatchCommandDtxProtocolParms *
pDtxProtocolParms, int *finalLen)
buildGpDtxProtocolCommand(MemoryContext cxt,
DispatchCommandDtxProtocolParms * pDtxProtocolParms,
int *finalLen)
{
int dtxProtocolCommand = (int) pDtxProtocolParms->dtxProtocolCommand;
int flags = pDtxProtocolParms->flags;
char *dtxProtocolCommandLoggingStr = pDtxProtocolParms->dtxProtocolCommandLoggingStr;
char *gid = pDtxProtocolParms->gid;
int gxid = pDtxProtocolParms->gxid;
int primary_gang_id = pDtxProtocolParms->primary_gang_id;
char *argument = pDtxProtocolParms->argument;
int argumentLength = pDtxProtocolParms->argumentLength;
char *serializedDtxContextInfo = pDtxProtocolParms->serializedDtxContextInfo;
int serializedDtxContextInfoLen = pDtxProtocolParms->serializedDtxContextInfoLen;
int tmp = 0;
int len = 0;
int loggingStrLen = strlen(dtxProtocolCommandLoggingStr) + 1;
int gidLen = strlen(gid) + 1;
int total_query_len =
5 /* overhead */ + 4 /* dtxProtocolCommand */ +
4 /*flags */ + 8 /* lengths */ +
loggingStrLen + gidLen + 4 /* gxid */ + 8 /* gang ids */ +
argumentLength + 4 /* argumentLength field */ ;
int total_query_len = 1 /* 'T' */ +
sizeof(len) +
sizeof(dtxProtocolCommand) +
sizeof(flags) +
sizeof(loggingStrLen) +
loggingStrLen +
sizeof(gidLen) +
gidLen +
sizeof(gxid) +
sizeof(serializedDtxContextInfoLen) +
serializedDtxContextInfoLen;
char *shared_query = MemoryContextAlloc(cxt, total_query_len);
char *pos = shared_query;
*pos++ = 'T';
pos += 4; /* placeholder for message length */
pos += sizeof(len); /* placeholder for message length */
tmp = htonl(dtxProtocolCommand);
memcpy(pos, &tmp, 4);
pos += 4;
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
tmp = htonl(flags);
memcpy(pos, &tmp, 4);
pos += 4;
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
tmp = htonl(loggingStrLen);
memcpy(pos, &tmp, 4);
pos += 4;
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
memcpy(pos, dtxProtocolCommandLoggingStr, loggingStrLen);
pos += loggingStrLen;
tmp = htonl(gidLen);
memcpy(pos, &tmp, 4);
pos += 4;
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
memcpy(pos, gid, gidLen);
pos += gidLen;
tmp = htonl(gxid);
memcpy(pos, &tmp, 4);
pos += 4;
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
tmp = htonl(primary_gang_id);
memcpy(pos, &tmp, 4);
pos += 4;
tmp = htonl(serializedDtxContextInfoLen);
memcpy(pos, &tmp, sizeof(tmp));
pos += sizeof(tmp);
tmp = htonl(argumentLength);
memcpy(pos, &tmp, 4);
pos += 4;
if (argumentLength > 0)
memcpy(pos, argument, argumentLength);
pos += argumentLength;
if (serializedDtxContextInfoLen > 0)
{
memcpy(pos, serializedDtxContextInfo, serializedDtxContextInfoLen);
pos += serializedDtxContextInfoLen;
}
len = pos - shared_query - 1;
......@@ -419,7 +419,7 @@ PQbuildGpDtxProtocolCommand(MemoryContext cxt,
* fill in length placeholder
*/
tmp = htonl(len);
memcpy(shared_query + 1, &tmp, 4);
memcpy(shared_query + 1, &tmp, sizeof(tmp));
if (finalLen)
*finalLen = len + 1;
......
......@@ -64,11 +64,6 @@ dispatchCommand(CdbDispatchResult * dispatchResult,
/* returns true if command complete */
static bool processResults(CdbDispatchResult * dispatchResult);
static char *
dupQueryTextAndSetSliceId(MemoryContext cxt,
char *queryText,
int len, int sliceId);
static DispatchWaitMode
cdbdisp_signalQE(SegmentDatabaseDescriptor * segdbDesc,
DispatchWaitMode waitMode);
......@@ -102,7 +97,6 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
newThreads = 0;
int gangSize = 0;
SegmentDatabaseDescriptor *db_descriptors;
char *newQueryText = NULL;
DispatchCommandParms *pParms = NULL;
gangSize = gp->size;
......@@ -128,10 +122,6 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
ds->dispatchThreads->threadCount + max_threads);
}
pParms = &ds->dispatchThreads->dispatchCommandParmsAr[0];
newQueryText =
dupQueryTextAndSetSliceId(ds->dispatchStateContext, pParms->query_text,
pParms->query_text_len, sliceIndex);
/*
* Create the thread parms structures based targetSet parameter.
* This will add the segdbDesc pointers appropriate to the
......@@ -177,12 +167,8 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
cdbdisp_mergeConnectionErrors(qeResult, segdbDesc);
parmsIndex = gp_connections_per_thread == 0 ? 0 : segdbs_in_thread_pool / gp_connections_per_thread;
pParms =
ds->dispatchThreads->dispatchCommandParmsAr +
ds->dispatchThreads->threadCount + parmsIndex;
pParms = ds->dispatchThreads->dispatchCommandParmsAr + ds->dispatchThreads->threadCount + parmsIndex;
pParms->dispatchResultPtrArray[pParms->db_count++] = qeResult;
if (newQueryText != NULL)
pParms->query_text = newQueryText;
/*
* This CdbDispatchResult/SegmentDatabaseDescriptor pair will be
......@@ -206,17 +192,14 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
else if (gp_connections_per_thread == 0)
newThreads = 1;
else
newThreads = 1
+ (segdbs_in_thread_pool - 1) / gp_connections_per_thread;
newThreads = 1 + (segdbs_in_thread_pool - 1) / gp_connections_per_thread;
/*
* Create the threads. (which also starts the dispatching).
*/
for (i = 0; i < newThreads; i++)
{
DispatchCommandParms *pParms =
&(ds->dispatchThreads->dispatchCommandParmsAr +
ds->dispatchThreads->threadCount)[i];
DispatchCommandParms *pParms = &(ds->dispatchThreads->dispatchCommandParmsAr + ds->dispatchThreads->threadCount)[i];
Assert(pParms != NULL);
......@@ -230,9 +213,7 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds,
int pthread_err = 0;
pParms->thread_valid = true;
pthread_err =
gp_pthread_create(&pParms->thread, thread_DispatchCommand,
pParms, "dispatchToGang");
pthread_err = gp_pthread_create(&pParms->thread, thread_DispatchCommand, pParms, "dispatchToGang");
if (pthread_err != 0)
{
......@@ -1737,35 +1718,6 @@ CollectQEWriterTransactionInformation(SegmentDatabaseDescriptor * segdbDesc,
}
}
/*
* Set slice in query text
*
* Make a new copy of query text and set the slice id in the right place.
*
*/
static char *
dupQueryTextAndSetSliceId(MemoryContext cxt, char *queryText,
int len, int sliceId)
{
/*
* DTX command and RM command don't need slice id
*/
if (sliceId < 0)
return NULL;
int tmp = htonl(sliceId);
char *newQuery = MemoryContextAlloc(cxt, len);
memcpy(newQuery, queryText, len);
/*
* the first byte is 'M' and followed by the length, which is an integer.
* see function PQbuildGpQueryString.
*/
memcpy(newQuery + 1 + sizeof(int), &tmp, sizeof(tmp));
return newQuery;
}
/*
* Send cancel/finish signal to still-running QE through libpq.
* waitMode is either CANCEL or FINISH. Returns true if we successfully
......
......@@ -42,15 +42,10 @@ test__cdbdisp_dispatchPlan__Overflow_plan_size_in_kb(void **state)
* Set max plan to a value that will require handling INT32
* overflow of the current plan size
*/
gp_max_plan_size = INT_MAX;
gp_max_plan_size = 1024;
queryDesc->plannedstmt->planTree = (struct Plan *)palloc0(sizeof(struct Plan));
/*
* Set num_slices and uncompressed_size to be INT_MAX-1 to force overflow
*/
queryDesc->plannedstmt->planTree->nMotionNodes = INT_MAX-1;
will_assign_value(serializeNode, uncompressed_size_out, INT_MAX-1);
expect_any(serializeNode, node);
expect_any(serializeNode, size);
......@@ -74,11 +69,11 @@ test__cdbdisp_dispatchPlan__Overflow_plan_size_in_kb(void **state)
StringInfo message = makeStringInfo();
appendStringInfo(message,
"Query plan size limit exceeded, current size: " UINT64_FORMAT "KB, max allowed size: %dKB",
((INT_MAX-1)*(INT_MAX-1)/(uint64)1024), INT_MAX);
"Query plan size limit exceeded, current size: " UINT64_FORMAT "KB, max allowed size: 1024KB",
((INT_MAX-1)/(uint64)1024));
if (edata->elevel == ERROR &&
strncmp(edata->message, message->data, message->len))
strncmp(edata->message, message->data, message->len) == 0)
{
success = true;
}
......
......@@ -1618,7 +1618,6 @@ InitSliceTable(EState *estate, int nMotions, int nSubplans)
slice->directDispatch.isDirectDispatch = false;
slice->directDispatch.contentIds = NIL;
slice->primaryGang = NULL;
slice->primary_gang_id = 0;
slice->parentIndex = -1;
slice->children = NIL;
slice->primaryProcesses = NIL;
......@@ -2049,7 +2048,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req)
Assert(slice->gangSize == 1);
slice->primaryGang = req->vec1gangs_entrydb_reader[req->nxt1gang_entrydb_reader++];
Assert(slice->primaryGang != NULL);
slice->primary_gang_id = slice->primaryGang->gang_id;
slice->primaryProcesses = getCdbProcessList(slice->primaryGang,
slice->sliceIndex,
NULL);
......@@ -2063,7 +2061,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req)
slice->primaryGang = req->vecNgangs[req->nxtNgang++];
Assert(slice->primaryGang != NULL);
slice->primary_gang_id = slice->primaryGang->gang_id;
slice->primaryProcesses = getCdbProcessList(slice->primaryGang, slice->sliceIndex, &slice->directDispatch);
break;
......@@ -2071,7 +2068,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req)
Assert(slice->gangSize == 1);
slice->primaryGang = req->vec1gangs_primary_reader[req->nxt1gang_primary_reader++];
Assert(slice->primaryGang != NULL);
slice->primary_gang_id = slice->primaryGang->gang_id;
slice->primaryProcesses = getCdbProcessList(slice->primaryGang,
slice->sliceIndex,
&slice->directDispatch);
......@@ -2082,7 +2078,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req)
Assert(slice->gangSize == getgpsegmentCount());
slice->primaryGang = req->vecNgangs[req->nxtNgang++];
Assert(slice->primaryGang != NULL);
slice->primary_gang_id = slice->primaryGang->gang_id;
slice->primaryProcesses = getCdbProcessList(slice->primaryGang,
slice->sliceIndex,
&slice->directDispatch);
......
......@@ -4221,7 +4221,6 @@ _copySlice(Slice *from)
COPY_NODE_FIELD(directDispatch.contentIds);
newnode->primaryGang = from->primaryGang;
COPY_SCALAR_FIELD(primary_gang_id);
COPY_SCALAR_FIELD(parentIndex);
COPY_NODE_FIELD(children);
COPY_NODE_FIELD(primaryProcesses);
......
......@@ -4069,7 +4069,6 @@ _outSlice(StringInfo str, Slice *node)
WRITE_BOOL_FIELD(directDispatch.isDirectDispatch);
WRITE_NODE_FIELD(directDispatch.contentIds); /* List of int */
WRITE_DUMMY_FIELD(primaryGang);
WRITE_INT_FIELD(primary_gang_id);
WRITE_INT_FIELD(parentIndex); /* List of int index */
WRITE_NODE_FIELD(children); /* List of int index */
WRITE_NODE_FIELD(primaryProcesses); /* List of (CDBProcess *) */
......
......@@ -2270,7 +2270,6 @@ _readSlice(void)
READ_BOOL_FIELD(directDispatch.isDirectDispatch);
READ_NODE_FIELD(directDispatch.contentIds); /* List of int index */
READ_DUMMY_FIELD(primaryGang, NULL);
READ_INT_FIELD(primary_gang_id);
READ_INT_FIELD(parentIndex); /* List of int index */
READ_NODE_FIELD(children); /* List of int index */
READ_NODE_FIELD(primaryProcesses); /* List of (CDBProcess *) */
......
......@@ -2860,7 +2860,6 @@ _readSlice(void)
READ_BOOL_FIELD(directDispatch.isDirectDispatch);
READ_NODE_FIELD(directDispatch.contentIds); /* List of int index */
READ_DUMMY_FIELD(primaryGang, NULL);
READ_INT_FIELD(primary_gang_id);
READ_INT_FIELD(parentIndex); /* List of int index */
READ_NODE_FIELD(children); /* List of int index */
READ_NODE_FIELD(primaryProcesses); /* List of (CDBProcess *) */
......
......@@ -4729,7 +4729,7 @@ PostgresMain(int argc, char *argv[],
send_ready_for_query = true;
}
break;
case 'M': /* MPP dispatched stmt from QD */
case 'M': /* MPP dispatched stmt from QD */
{
/* This is exactly like 'Q' above except we peel off and
* set the snapshot information right away.
......@@ -4755,24 +4755,21 @@ PostgresMain(int argc, char *argv[],
int serializedQueryDispatchDesclen = 0;
int seqServerHostlen = 0;
int seqServerPort = -1;
int localSlice;
int rootIdx;
int primary_gang_id;
int localSlice = -1, i;
int rootIdx;
int numSlices = 0;
TimestampTz statementStart;
Oid suid;
Oid ouid;
Oid cuid;
bool suid_is_super = false;
bool ouid_is_super = false;
Oid suid;
Oid ouid;
Oid cuid;
bool suid_is_super = false;
bool ouid_is_super = false;
int unusedFlags;
/* Set statement_timestamp() */
SetCurrentStatementStartTimestamp();
/* get the slice number# */
localSlice = pq_getmsgint(&input_message, 4);
/* get the client command serial# */
gp_command_count = pq_getmsgint(&input_message, 4);
......@@ -4791,8 +4788,6 @@ PostgresMain(int argc, char *argv[],
rootIdx = pq_getmsgint(&input_message, 4);
primary_gang_id = pq_getmsgint(&input_message, 4);
statementStart = pq_getmsgint64(&input_message);
/*
* Should we set the CurrentStatementStartTimestamp to the
......@@ -4802,8 +4797,6 @@ PostgresMain(int argc, char *argv[],
*
* Or both?
*/
//SetCurrentStatementStartTimestampToMaster(statementStart);
/* read ser string lengths */
query_string_len = pq_getmsgint(&input_message, 4);
serializedQuerytreelen = pq_getmsgint(&input_message, 4);
......@@ -4847,8 +4840,26 @@ PostgresMain(int argc, char *argv[],
if (seqServerHostlen > 0)
seqServerHost = pq_getmsgbytes(&input_message, seqServerHostlen);
numSlices = pq_getmsgint(&input_message, 4);
Assert(qe_gang_id > 0);
for (i = 0; i < numSlices; ++i)
{
if (qe_gang_id == pq_getmsgint(&input_message, 4))
{
localSlice = i;
}
}
if (localSlice == -1 && numSlices > 0)
{
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("QE cannot find slice to execute")));
}
pq_getmsgend(&input_message);
elog((Debug_print_full_dtm ? LOG : DEBUG5), "MPP dispatched stmt from QD: %s.",query_string);
if (suid > 0)
......@@ -4919,7 +4930,6 @@ PostgresMain(int argc, char *argv[],
const char *gid;
DistributedTransactionId gxid;
int primary_gang_id;
int serializedSnapshotlen;
const char *serializedSnapshot;
......@@ -4947,8 +4957,6 @@ PostgresMain(int argc, char *argv[],
/* get the distributed transaction id */
gxid = (DistributedTransactionId) pq_getmsgint(&input_message, 4);
primary_gang_id = pq_getmsgint(&input_message, 4);
serializedSnapshotlen = pq_getmsgint(&input_message, 4);
/* read in the snapshot info/ DtxContext */
......@@ -4960,18 +4968,14 @@ PostgresMain(int argc, char *argv[],
/*
* This is for debugging. Otherwise we don't need to deserialize this
*/
DtxContextInfo_Deserialize(
serializedSnapshot, serializedSnapshotlen,
&TempDtxContextInfo);
DtxContextInfo_Deserialize(serializedSnapshot, serializedSnapshotlen, &TempDtxContextInfo);
pq_getmsgend(&input_message);
// Do not touch DTX context.
exec_mpp_dtx_protocol_command(dtxProtocolCommand, flags, loggingStr, gid, gxid, &TempDtxContextInfo);
send_ready_for_query = true;
}
break;
......
......@@ -51,6 +51,9 @@ typedef struct Gang
MemoryContext perGangContext;
} Gang;
extern int qe_gang_id;
extern Gang *allocateReaderGang(GangType type, char *portal_name);
extern Gang *allocateWriterGang(void);
......
......@@ -102,9 +102,6 @@ typedef struct Slice
struct Gang *primaryGang;
/* tell dispatch agents which gang we're talking about. */
int primary_gang_id;
/*
* A list of CDBProcess nodes corresponding to the worker processes
* allocated to implement this plan slice.
......
--start_ignore
drop table if exists plineitem cascade;
NOTICE: table "plineitem" does not exist, skipping
drop table if exists times;
NOTICE: table "times" does not exist, skipping
--end_ignore
-- Ignoring error messages that might cause diff when plan size changes for test queries
-- start_matchsubs
-- m/ERROR: Query plan size limit exceeded.*/
-- s/ERROR: Query plan size limit exceeded.*/ERROR_MESSAGE/
-- end_matchsubs
-- create partitioned lineitem, partitioned by an integer column
CREATE TABLE plineitem
(
L_ID INTEGER ,
L_ORDERKEY INTEGER ,
L_PARTKEY INTEGER ,
L_SUPPKEY INTEGER ,
L_LINENUMBER INTEGER ,
L_QUANTITY DECIMAL(15,2) ,
L_EXTENDEDPRICE DECIMAL(15,2) ,
L_DISCOUNT DECIMAL(15,2) ,
L_TAX DECIMAL(15,2) ,
L_RETURNFLAG CHAR(1) ,
L_LINESTATUS CHAR(1) ,
L_SHIPDATE DATE ,
L_COMMITDATE DATE ,
L_RECEIPTDATE DATE ,
L_SHIPINSTRUCT CHAR(25) ,
L_SHIPMODE CHAR(10) ,
L_COMMENT VARCHAR(44)
) distributed by (l_orderkey)
partition by range(L_ID)
(
start (0) inclusive end(3000) every(100)
);
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_1" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_2" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_3" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_4" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_5" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_6" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_7" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_8" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_9" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_10" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_11" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_12" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_13" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_14" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_15" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_16" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_17" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_18" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_19" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_20" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_21" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_22" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_23" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_24" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_25" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_26" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_27" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_28" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_29" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_30" for table "plineitem"
create table times
(
id int,
date_id date,
date_text_id varchar(20),
day_name varchar(20),
day_number_in_week int,
day_number_in_month int,
week_number_in_month int,
week_number_in_year int,
week_start_date date,
week_end_date date,
month_number int,
month_name varchar(20),
year_number int,
quarter_number int,
quarter_desc varchar(20),
leap boolean,
days_in_year int
) distributed by (date_id);
create or replace function f() returns setof plineitem as $$
select * from plineitem;
$$
language SQL;
create or replace function f2() returns bigint as $$
select count(*) from plineitem;
$$
language SQL;
set gp_max_plan_size =20;
-- Following queries should error out for plan size 20KB
-- simple select
select * from plineitem;
ERROR: Query plan size limit exceeded, current size: 85KB, max allowed size: 20KB
HINT: Size controlled by gp_max_plan_size
-- UDFs
select * from f();
ERROR: Query plan size limit exceeded, current size: 85KB, max allowed size: 20KB
HINT: Size controlled by gp_max_plan_size
CONTEXT: SQL function "f" statement 1
select * from f2();
ERROR: Query plan size limit exceeded, current size: 27KB, max allowed size: 20KB
HINT: Size controlled by gp_max_plan_size
CONTEXT: SQL function "f2" statement 1
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
ERROR: Query plan size limit exceeded, current size: 123KB, max allowed size: 20KB
HINT: Size controlled by gp_max_plan_size
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
ERROR: Query plan size limit exceeded, current size: 81KB, max allowed size: 20KB
HINT: Size controlled by gp_max_plan_size
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
ERROR: Query plan size limit exceeded, current size: 307KB, max allowed size: 20KB
HINT: Size controlled by gp_max_plan_size
set gp_max_plan_size ='700kB';
-- Following queries should be dispatched for plan size 700kB
-- simple select
select * from plineitem;
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
-- UDFs
select * from f();
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
select * from f2();
f2
----
0
(1 row)
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order
--------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+-------------
(0 rows)
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
set gp_max_plan_size ='10MB';
-- Following queries should success for plan size 10MB
-- simple select
select * from plineitem;
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
-- UDFs
select * from f();
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
select * from f2();
f2
----
0
(1 row)
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order
--------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+-------------
(0 rows)
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
--start_ignore
drop table if exists plineitem cascade;
NOTICE: table "plineitem" does not exist, skipping
drop table if exists times;
NOTICE: table "times" does not exist, skipping
--end_ignore
-- Ignoring error messages that might cause diff when plan size changes for test queries
-- start_matchsubs
-- m/ERROR: Query plan size limit exceeded.*/
-- s/ERROR: Query plan size limit exceeded.*/ERROR_MESSAGE/
-- end_matchsubs
-- create partitioned lineitem, partitioned by an integer column
CREATE TABLE plineitem
(
L_ID INTEGER ,
L_ORDERKEY INTEGER ,
L_PARTKEY INTEGER ,
L_SUPPKEY INTEGER ,
L_LINENUMBER INTEGER ,
L_QUANTITY DECIMAL(15,2) ,
L_EXTENDEDPRICE DECIMAL(15,2) ,
L_DISCOUNT DECIMAL(15,2) ,
L_TAX DECIMAL(15,2) ,
L_RETURNFLAG CHAR(1) ,
L_LINESTATUS CHAR(1) ,
L_SHIPDATE DATE ,
L_COMMITDATE DATE ,
L_RECEIPTDATE DATE ,
L_SHIPINSTRUCT CHAR(25) ,
L_SHIPMODE CHAR(10) ,
L_COMMENT VARCHAR(44)
) distributed by (l_orderkey)
partition by range(L_ID)
(
start (0) inclusive end(3000) every(100)
);
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_1" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_2" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_3" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_4" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_5" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_6" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_7" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_8" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_9" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_10" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_11" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_12" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_13" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_14" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_15" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_16" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_17" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_18" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_19" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_20" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_21" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_22" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_23" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_24" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_25" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_26" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_27" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_28" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_29" for table "plineitem"
NOTICE: CREATE TABLE will create partition "plineitem_1_prt_30" for table "plineitem"
create table times
(
id int,
date_id date,
date_text_id varchar(20),
day_name varchar(20),
day_number_in_week int,
day_number_in_month int,
week_number_in_month int,
week_number_in_year int,
week_start_date date,
week_end_date date,
month_number int,
month_name varchar(20),
year_number int,
quarter_number int,
quarter_desc varchar(20),
leap boolean,
days_in_year int
) distributed by (date_id);
create or replace function f() returns setof plineitem as $$
select * from plineitem;
$$
language SQL;
create or replace function f2() returns bigint as $$
select count(*) from plineitem;
$$
language SQL;
set gp_max_plan_size =20;
-- Following queries should error out for plan size 20KB
-- simple select
select * from plineitem;
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
-- UDFs
select * from f();
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
select * from f2();
f2
----
0
(1 row)
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
ERROR: Query plan size limit exceeded, current size: 39KB, max allowed size: 30KB
HINT: Size controlled by gp_max_plan_size
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
ERROR: Query plan size limit exceeded, current size: 23KB, max allowed size: 20KB
HINT: Size controlled by gp_max_plan_size
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
ERROR: Query plan size limit exceeded, current size: 133KB, max allowed size: 30KB
HINT: Size controlled by gp_max_plan_size
set gp_max_plan_size ='700kB';
-- Following queries should be dispatched for plan size 700kB
-- simple select
select * from plineitem;
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
-- UDFs
select * from f();
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
select * from f2();
f2
----
0
(1 row)
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order
--------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+-------------
(0 rows)
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
set gp_max_plan_size ='10MB';
-- Following queries should success for plan size 10MB
-- simple select
select * from plineitem;
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
-- UDFs
select * from f();
l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment
------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------
(0 rows)
select * from f2();
f2
----
0
(1 row)
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order
--------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+-------------
(0 rows)
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
l_orderkey | revenue | count_order
------------+---------+-------------
(0 rows)
......@@ -68,7 +68,7 @@ test: resource_queue
# vacuum from removing dead tuples.
test: gp_toolkit
test: filespace trig auth_constraint role rle portals_updatable plpgsql_cache timeseries resource_queue_function pg_stat_last_operation gp_numeric_agg plan_size partindex_test direct_dispatch partition_pruning_with_fn dsp
test: filespace trig auth_constraint role rle portals_updatable plpgsql_cache timeseries resource_queue_function pg_stat_last_operation gp_numeric_agg partindex_test direct_dispatch partition_pruning_with_fn dsp
# direct dispatch tests
test: bfv_dd bfv_dd_multicolumn bfv_dd_types
......
--start_ignore
drop table if exists plineitem cascade;
drop table if exists times;
--end_ignore
-- Ignoring error messages that might cause diff when plan size changes for test queries
-- start_matchsubs
-- m/ERROR: Query plan size limit exceeded.*/
-- s/ERROR: Query plan size limit exceeded.*/ERROR_MESSAGE/
-- end_matchsubs
-- create partitioned lineitem, partitioned by an integer column
CREATE TABLE plineitem
(
L_ID INTEGER ,
L_ORDERKEY INTEGER ,
L_PARTKEY INTEGER ,
L_SUPPKEY INTEGER ,
L_LINENUMBER INTEGER ,
L_QUANTITY DECIMAL(15,2) ,
L_EXTENDEDPRICE DECIMAL(15,2) ,
L_DISCOUNT DECIMAL(15,2) ,
L_TAX DECIMAL(15,2) ,
L_RETURNFLAG CHAR(1) ,
L_LINESTATUS CHAR(1) ,
L_SHIPDATE DATE ,
L_COMMITDATE DATE ,
L_RECEIPTDATE DATE ,
L_SHIPINSTRUCT CHAR(25) ,
L_SHIPMODE CHAR(10) ,
L_COMMENT VARCHAR(44)
) distributed by (l_orderkey)
partition by range(L_ID)
(
start (0) inclusive end(3000) every(100)
);
create table times
(
id int,
date_id date,
date_text_id varchar(20),
day_name varchar(20),
day_number_in_week int,
day_number_in_month int,
week_number_in_month int,
week_number_in_year int,
week_start_date date,
week_end_date date,
month_number int,
month_name varchar(20),
year_number int,
quarter_number int,
quarter_desc varchar(20),
leap boolean,
days_in_year int
) distributed by (date_id);
create or replace function f() returns setof plineitem as $$
select * from plineitem;
$$
language SQL;
create or replace function f2() returns bigint as $$
select count(*) from plineitem;
$$
language SQL;
set gp_max_plan_size =20;
-- Following queries should error out for plan size 20KB
-- simple select
select * from plineitem;
-- UDFs
select * from f();
select * from f2();
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
set gp_max_plan_size ='700kB';
-- Following queries should be dispatched for plan size 700kB
-- simple select
select * from plineitem;
-- UDFs
select * from f();
select * from f2();
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
set gp_max_plan_size ='10MB';
-- Following queries should success for plan size 10MB
-- simple select
select * from plineitem;
-- UDFs
select * from f();
select * from f2();
-- Joins
select l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
plineitem,times
where
plineitem.l_shipdate = times.date_id
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
select l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
count(*) as count_order
from
plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date)
group by
l_orderkey
order by
l_orderkey
LIMIT 10;
-- Init plans
select p1.l_orderkey,
sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue,
count(*) as count_order
from
plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date)
join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem))
group by
p1.l_orderkey
order by
p1.l_orderkey
LIMIT 10;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册