From 4b3609423ae8e1d269bf0b02238fd05af7ca495d Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Mon, 6 Jun 2016 19:12:06 -0400 Subject: [PATCH] Dispatch exactly same text string for all slices. Include a map from sliceIndex to gang_id in the dispatched string, and remove the localSlice field, hence QE should get the localSlice from the map now. By this way, we avoid duplicating and modifying the dispatch text string slice by slice, and each QE of a sliced dispatch would get same contents now. The extra space cost is sizeof(int) * SliceNumber bytes, and the extra computing cost is iterating the SliceNumber-size array. Compared with memcpy of text string for each slice in previous implementation, this way is much cheaper, because SliceNumber is much smaller than the size of dispatch text string. Also, since SliceNumber is so small, we just use an array for the map instead of a hash table. Also, clean up some dead code in dispatcher, including: (1) Remove primary_gang_id field of Slice struct and DispatchCommandDtxProtocolParms struct, since dispatch agent is deprecated now; (2) Remove redundant logic in cdbdisp_dispatchX; (3) Clean up buildGpDtxProtocolCommand; --- src/backend/cdb/cdbgang.c | 55 ++- src/backend/cdb/cdbtm.c | 16 +- src/backend/cdb/dispatcher/cdbdisp_dtx.c | 94 ++-- src/backend/cdb/dispatcher/cdbdisp_query.c | 415 ++++++++---------- src/backend/cdb/dispatcher/cdbdisp_thread.c | 56 +-- .../cdb/dispatcher/test/cdbdisp_query_test.c | 13 +- src/backend/executor/execUtils.c | 5 - src/backend/nodes/copyfuncs.c | 1 - src/backend/nodes/outfuncs.c | 1 - src/backend/nodes/readfast.c | 1 - src/backend/nodes/readfuncs.c | 1 - src/backend/tcop/postgres.c | 56 +-- src/include/cdb/cdbgang.h | 3 + src/include/executor/execdesc.h | 3 - src/test/regress/expected/plan_size.out | 308 ------------- .../regress/expected/plan_size_optimizer.out | 313 ------------- src/test/regress/greenplum_schedule | 2 +- src/test/regress/sql/plan_size.sql | 248 ----------- 18 files changed, 318 insertions(+), 1273 deletions(-) delete mode 100644 src/test/regress/expected/plan_size.out delete mode 100644 src/test/regress/expected/plan_size_optimizer.out delete mode 100644 src/test/regress/sql/plan_size.sql diff --git a/src/backend/cdb/cdbgang.c b/src/backend/cdb/cdbgang.c index f98678c293..a3b29ab936 100644 --- a/src/backend/cdb/cdbgang.c +++ b/src/backend/cdb/cdbgang.c @@ -49,6 +49,12 @@ #define MAX_CACHED_1_GANGS 1 +/* + * Which gang this QE belongs to; this would be used in PostgresMain to find out + * the slice this QE should execute + */ +int qe_gang_id = 0; + /* * Points to the result of getCdbComponentDatabases() */ @@ -94,6 +100,8 @@ typedef struct DoConnectParms /* type of gang. */ GangType type; + int gangId; + /* connect options. GUC etc. */ StringInfo connectOptions; @@ -103,14 +111,15 @@ typedef struct DoConnectParms static Gang *buildGangDefinition(GangType type, int gang_id, int size, int content); -static DoConnectParms* makeConnectParms(int parmsCount, GangType type); +static DoConnectParms* makeConnectParms(int parmsCount, GangType type, int gangId); static void destroyConnectParms(DoConnectParms *doConnectParmsAr, int count); static void checkConnectionStatus(Gang* gp, int* countInRecovery, int* countSuccessful); static bool isPrimaryWriterGangAlive(void); static void *thread_DoConnect(void *arg); -static void build_gpqeid_param(char *buf, int bufsz, int segIndex, - bool is_writer); +static void +build_gpqeid_param(char *buf, int bufsz, int segIndex, + bool is_writer, int gangId); static Gang *createGang(GangType type, int gang_id, int size, int content); static void disconnectAndDestroyGang(Gang *gp); static void disconnectAndDestroyAllReaderGangs(bool destroyAllocated); @@ -364,7 +373,7 @@ create_gang_retry: Assert(threadCount > 0); /* initialize connect parameters */ - doConnectParmsAr = makeConnectParms(threadCount, type); + doConnectParmsAr = makeConnectParms(threadCount, type, gang_id); for (i = 0; i < size; i++) { parmIndex = i / gp_connections_per_thread; @@ -537,14 +546,17 @@ thread_DoConnect(void *arg) * early enough now some locks are taken before command line options * are recognized. */ - build_gpqeid_param(gpqeid, sizeof(gpqeid), segdbDesc->segindex, pParms->type == GANGTYPE_PRIMARY_WRITER); + build_gpqeid_param(gpqeid, sizeof(gpqeid), + segdbDesc->segindex, + pParms->type == GANGTYPE_PRIMARY_WRITER, + pParms->gangId); /* check the result in createGang */ cdbconn_doConnect(segdbDesc, gpqeid, pParms->connectOptions->data); } return (NULL); -} /* thread_DoConnect */ +} /* * Test if the connections of the primary writer gang are alive. @@ -975,7 +987,7 @@ static void addOptions(StringInfo string, bool iswriter) * * Including initialize the connect option string. */ -static DoConnectParms* makeConnectParms(int parmsCount, GangType type) +static DoConnectParms* makeConnectParms(int parmsCount, GangType type, int gangId) { DoConnectParms *doConnectParmsAr = (DoConnectParms*) palloc0( parmsCount * sizeof(DoConnectParms)); @@ -995,6 +1007,7 @@ static DoConnectParms* makeConnectParms(int parmsCount, GangType type) pParms->db_count = 0; pParms->type = type; pParms->connectOptions = pOptions; + pParms->gangId = gangId; } return doConnectParmsAr; } @@ -1027,14 +1040,15 @@ static void destroyConnectParms(DoConnectParms *doConnectParmsAr, int count) } /* - * build_gpqeid_params + * build_gpqeid_param * * Called from the qDisp process to create the "gpqeid" parameter string * to be passed to a qExec that is being started. NB: Can be called in a * thread, so mustn't use palloc/elog/ereport/etc. */ -static void build_gpqeid_param(char *buf, int bufsz, int segIndex, - bool is_writer) +static void +build_gpqeid_param(char *buf, int bufsz, int segIndex, + bool is_writer, int gangId) { #ifdef HAVE_INT64_TIMESTAMP #define TIMESTAMP_FORMAT INT64_FORMAT @@ -1046,9 +1060,10 @@ static void build_gpqeid_param(char *buf, int bufsz, int segIndex, #endif #endif - snprintf(buf, bufsz, "%d;%d;" TIMESTAMP_FORMAT ";%s", gp_session_id, - segIndex, PgStartTime, (is_writer ? "true" : "false")); -} /* build_gpqeid_params */ + snprintf(buf, bufsz, "%d;%d;" TIMESTAMP_FORMAT ";%s;%d", + gp_session_id, segIndex, PgStartTime, + (is_writer ? "true" : "false"), gangId); +} static bool gpqeid_next_param(char **cpp, char **npp) { @@ -1075,8 +1090,9 @@ static bool gpqeid_next_param(char **cpp, char **npp) * command line options have not been processed; GUCs have the settings * inherited from the postmaster; etc; so don't try to do too much in here. */ -void cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)), - const char *gpqeid_value) +void +cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)), + const char *gpqeid_value) { char *gpqeid = pstrdup(gpqeid_value); char *cp; @@ -1108,11 +1124,16 @@ void cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)), if (gpqeid_next_param(&cp, &np)) SetConfigOption("gp_is_writer", cp, PGC_POSTMASTER, PGC_S_OVERRIDE); + if (gpqeid_next_param(&cp, &np)) + { + qe_gang_id = (int) strtol(cp, NULL, 10); + } + /* Too few items, or too many? */ if (!cp || np) goto bad; - if (gp_session_id <= 0 || PgStartTime <= 0) + if (gp_session_id <= 0 || PgStartTime <= 0 || qe_gang_id <=0) goto bad; pfree(gpqeid); @@ -1120,7 +1141,7 @@ void cdbgang_parse_gpqeid_params(struct Port * port __attribute__((unused)), bad: elog(FATAL, "Segment dispatched with invalid option: 'gpqeid=%s'", gpqeid_value); -} /* cdbgang_parse_gpqeid_params */ +} /* * TODO: Dead code: remove it. diff --git a/src/backend/cdb/cdbtm.c b/src/backend/cdb/cdbtm.c index 3664875a8d..4807a9c28c 100644 --- a/src/backend/cdb/cdbtm.c +++ b/src/backend/cdb/cdbtm.c @@ -141,7 +141,7 @@ static void dumpRMOnlyDtx(HTAB *htab, StringInfoData *buff); static bool doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags, char* gid, DistributedTransactionId gxid, bool *badGangs, bool raiseError, CdbDispatchDirectDesc *direct, - char *serializedArgument, int serializedArgumentLen); + char *serializedDtxContextInfo, int serializedDtxContextInfoLen); static void doPrepareTransaction(void); static void doInsertForgetCommitted(void); static void doNotifyingCommitPrepared(void); @@ -2225,15 +2225,15 @@ doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags, char* gid, DistributedTransactionId gxid, bool *badGangs, bool raiseError, CdbDispatchDirectDesc *direct, - char *serializedArgument, - int serializedArgumentLen) + char *serializedDtxContextInfo, + int serializedDtxContextInfoLen) { - int i, resultCount, numOfFailed = 0; + int i, resultCount, numOfFailed = 0; - char *dtxProtocolCommandStr = 0; + char *dtxProtocolCommandStr = 0; - struct pg_result **results = NULL; - StringInfoData errbuf; + struct pg_result **results = NULL; + StringInfoData errbuf; dtxProtocolCommandStr = DtxProtocolCommandToString(dtxProtocolCommand); @@ -2253,7 +2253,7 @@ doDispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int flags, dtxProtocolCommandStr, gid, gxid, &errbuf, &resultCount, badGangs, direct, - serializedArgument, serializedArgumentLen); + serializedDtxContextInfo, serializedDtxContextInfoLen); if (errbuf.len > 0) { diff --git a/src/backend/cdb/dispatcher/cdbdisp_dtx.c b/src/backend/cdb/dispatcher/cdbdisp_dtx.c index af6ca5ef7b..4cd3331106 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_dtx.c +++ b/src/backend/cdb/dispatcher/cdbdisp_dtx.c @@ -37,9 +37,8 @@ typedef struct DispatchCommandDtxProtocolParms char *dtxProtocolCommandLoggingStr; char gid[TMGIDSIZE]; DistributedTransactionId gxid; - int primary_gang_id; - char *argument; - int argumentLength; + char *serializedDtxContextInfo; + int serializedDtxContextInfoLen; } DispatchCommandDtxProtocolParms; /* @@ -54,9 +53,9 @@ static void cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds, DispatchCommandDtxProtocolParms *pDtxProtocolParms); static char * -PQbuildGpDtxProtocolCommand(MemoryContext cxt, - DispatchCommandDtxProtocolParms * pDtxProtocolParms, - int *finalLen); +buildGpDtxProtocolCommand(MemoryContext cxt, + DispatchCommandDtxProtocolParms * pDtxProtocolParms, + int *finalLen); /* * cdbdisp_dispatchDtxProtocolCommand: @@ -80,7 +79,8 @@ cdbdisp_dispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, int *numresults, bool *badGangs, CdbDispatchDirectDesc * direct, - char *argument, int argumentLength) + char *serializedDtxContextInfo, + int serializedDtxContextInfoLen) { CdbDispatcherState ds = {NULL, NULL, NULL}; @@ -106,8 +106,8 @@ cdbdisp_dispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, elog(PANIC, "Distribute transaction identifier too long (%d)", (int) strlen(gid)); memcpy(dtxProtocolParms.gid, gid, TMGIDSIZE); dtxProtocolParms.gxid = gxid; - dtxProtocolParms.argument = argument; - dtxProtocolParms.argumentLength = argumentLength; + dtxProtocolParms.serializedDtxContextInfo = serializedDtxContextInfo; + dtxProtocolParms.serializedDtxContextInfoLen = serializedDtxContextInfoLen; /* * Allocate a primary QE for every available segDB in the system. @@ -122,8 +122,6 @@ cdbdisp_dispatchDtxProtocolCommand(DtxProtocolCommand dtxProtocolCommand, primaryGang->dispatcherActive = false; } - dtxProtocolParms.primary_gang_id = primaryGang->gang_id; - /* * Dispatch the command. */ @@ -328,9 +326,7 @@ cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds, oldContext = MemoryContextSwitchTo(ds->dispatchStateContext); - char *queryText = - PQbuildGpDtxProtocolCommand(ds->dispatchStateContext, - pDtxProtocolParms, &len); + char *queryText = buildGpDtxProtocolCommand(ds->dispatchStateContext, pDtxProtocolParms, &len); MemoryContextSwitchTo(oldContext); @@ -346,72 +342,76 @@ cdbdisp_dtxParmsInit(struct CdbDispatcherState *ds, * Build a dtx protocol command string to be dispatched to QE. */ static char * -PQbuildGpDtxProtocolCommand(MemoryContext cxt, - DispatchCommandDtxProtocolParms * - pDtxProtocolParms, int *finalLen) +buildGpDtxProtocolCommand(MemoryContext cxt, + DispatchCommandDtxProtocolParms * pDtxProtocolParms, + int *finalLen) { int dtxProtocolCommand = (int) pDtxProtocolParms->dtxProtocolCommand; int flags = pDtxProtocolParms->flags; char *dtxProtocolCommandLoggingStr = pDtxProtocolParms->dtxProtocolCommandLoggingStr; char *gid = pDtxProtocolParms->gid; int gxid = pDtxProtocolParms->gxid; - int primary_gang_id = pDtxProtocolParms->primary_gang_id; - char *argument = pDtxProtocolParms->argument; - int argumentLength = pDtxProtocolParms->argumentLength; + char *serializedDtxContextInfo = pDtxProtocolParms->serializedDtxContextInfo; + int serializedDtxContextInfoLen = pDtxProtocolParms->serializedDtxContextInfoLen; int tmp = 0; int len = 0; int loggingStrLen = strlen(dtxProtocolCommandLoggingStr) + 1; int gidLen = strlen(gid) + 1; - int total_query_len = - 5 /* overhead */ + 4 /* dtxProtocolCommand */ + - 4 /*flags */ + 8 /* lengths */ + - loggingStrLen + gidLen + 4 /* gxid */ + 8 /* gang ids */ + - argumentLength + 4 /* argumentLength field */ ; + int total_query_len = 1 /* 'T' */ + + sizeof(len) + + sizeof(dtxProtocolCommand) + + sizeof(flags) + + sizeof(loggingStrLen) + + loggingStrLen + + sizeof(gidLen) + + gidLen + + sizeof(gxid) + + sizeof(serializedDtxContextInfoLen) + + serializedDtxContextInfoLen; + char *shared_query = MemoryContextAlloc(cxt, total_query_len); char *pos = shared_query; *pos++ = 'T'; - pos += 4; /* placeholder for message length */ + pos += sizeof(len); /* placeholder for message length */ tmp = htonl(dtxProtocolCommand); - memcpy(pos, &tmp, 4); - pos += 4; + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); tmp = htonl(flags); - memcpy(pos, &tmp, 4); - pos += 4; + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); tmp = htonl(loggingStrLen); - memcpy(pos, &tmp, 4); - pos += 4; + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); memcpy(pos, dtxProtocolCommandLoggingStr, loggingStrLen); pos += loggingStrLen; tmp = htonl(gidLen); - memcpy(pos, &tmp, 4); - pos += 4; + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); memcpy(pos, gid, gidLen); pos += gidLen; tmp = htonl(gxid); - memcpy(pos, &tmp, 4); - pos += 4; + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); - tmp = htonl(primary_gang_id); - memcpy(pos, &tmp, 4); - pos += 4; + tmp = htonl(serializedDtxContextInfoLen); + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); - tmp = htonl(argumentLength); - memcpy(pos, &tmp, 4); - pos += 4; - - if (argumentLength > 0) - memcpy(pos, argument, argumentLength); - pos += argumentLength; + if (serializedDtxContextInfoLen > 0) + { + memcpy(pos, serializedDtxContextInfo, serializedDtxContextInfoLen); + pos += serializedDtxContextInfoLen; + } len = pos - shared_query - 1; @@ -419,7 +419,7 @@ PQbuildGpDtxProtocolCommand(MemoryContext cxt, * fill in length placeholder */ tmp = htonl(len); - memcpy(shared_query + 1, &tmp, 4); + memcpy(shared_query + 1, &tmp, sizeof(tmp)); if (finalLen) *finalLen = len + 1; diff --git a/src/backend/cdb/dispatcher/cdbdisp_query.c b/src/backend/cdb/dispatcher/cdbdisp_query.c index b16ced2410..a778e74c50 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_query.c +++ b/src/backend/cdb/dispatcher/cdbdisp_query.c @@ -39,10 +39,10 @@ extern bool Test_print_direct_dispatch_info; */ typedef struct { - int sliceIndex; - int children; - Slice *slice; -} sliceVec; + int sliceIndex; + int children; + Slice *slice; +} SliceVec; /* * Parameter structure for Greenplum Database Queries @@ -78,10 +78,9 @@ typedef struct DispatchCommandQueryParms int seqServerHostlen; int seqServerPort; /* If seqServerHost non-null, sequence server port. */ - /* - * Used by dispatch agent if NOT using sliced execution - */ - int primary_gang_id; + /* the map from sliceIndex to gang_id, in array form */ + int numSlices; + int *sliceIndexGangIdMap; } DispatchCommandQueryParms; static void @@ -102,14 +101,14 @@ cdbdisp_dispatchSetCommandToAllGangs(const char *strCommand, static int fillSliceVector(SliceTable *sliceTable, int sliceIndex, - sliceVec *sliceVector, + SliceVec *sliceVector, int len); static char * -PQbuildGpQueryString(MemoryContext cxt, - DispatchCommandParms * pParms, - DispatchCommandQueryParms * pQueryParms, - int *finalLen); +buildGpQueryString(MemoryContext cxt, + DispatchCommandParms * pParms, + DispatchCommandQueryParms * pQueryParms, + int *finalLen); static void cdbdisp_queryParmsInit(struct CdbDispatcherState *ds, @@ -121,9 +120,12 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, struct SliceTable *sliceTbl, struct CdbDispatcherState *ds); +static int * +buildSliceIndexGangIdMap(SliceVec *sliceVec, int numSlices, int numTotalSlices); + /* * Compose and dispatch the MPPEXEC commands corresponding to a plan tree - * within a complete parallel plan. (A plan tree will correspond either + * within a complete parallel plan. (A plan tree will correspond either * to an initPlan or to the main plan.) * * If cancelOnError is true, then any dispatching error, a cancellation @@ -131,7 +133,7 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, * may cause the unfinished portion of the plan to be abandoned or canceled; * and in the event this occurs before all gangs have been dispatched, this * function does not return, but waits for all QEs to stop and exits to - * the caller's error catcher via ereport(ERROR,...). Otherwise this + * the caller's error catcher via ereport(ERROR,...).Otherwise this * function returns normally and errors are not reported until later. * * If cancelOnError is false, the plan is to be dispatched as fully as @@ -139,11 +141,11 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, * requests, errors or connection failures from other QEs, etc. * * The CdbDispatchResults objects allocated for the plan are returned - * in *pPrimaryResults. The caller, after calling + * in *pPrimaryResults. The caller, after calling * CdbCheckDispatchResult(), can examine the CdbDispatchResults * objects, can keep them as long as needed, and ultimately must free * them with cdbdisp_destroyDispatcherState() prior to deallocation of - * the caller's memory context. Callers should use PG_TRY/PG_CATCH to + * the caller's memory context. Callers should use PG_TRY/PG_CATCH to * ensure proper cleanup. * * To wait for completion, check for errors, and clean up, it is @@ -203,7 +205,7 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, && rootIdx <= sliceTbl->nMotions + sliceTbl->nInitPlans)); /* - * Keep old value so we can restore it. We use this field as a parameter. + * Keep old value so we can restore it. We use this field as a parameter. */ oldLocalSlice = sliceTbl->localSlice; @@ -288,21 +290,10 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, * slice tree (corresponding to an initPlan or the main plan), so the * parameters are fixed and we can include them in the prefix. */ - splan = - serializeNode((Node *) queryDesc->plannedstmt, &splan_len, - &splan_len_uncompressed); - - /* - * compute the total uncompressed size of the query plan for all slices - */ - int num_slices = - queryDesc->plannedstmt->planTree->nMotionNodes + 1; - uint64 plan_size_in_kb = - ((uint64) splan_len_uncompressed * (uint64) num_slices) / (uint64) 1024; - - elog(((gp_log_gang >= GPVARS_VERBOSITY_VERBOSE) ? LOG : DEBUG1), - "Query plan size to dispatch: " UINT64_FORMAT "KB", plan_size_in_kb); + splan = serializeNode((Node *) queryDesc->plannedstmt, + &splan_len, &splan_len_uncompressed); + uint64 plan_size_in_kb = ((uint64) splan_len_uncompressed) / (uint64) 1024; if (0 < gp_max_plan_size && plan_size_in_kb > gp_max_plan_size) { ereport(ERROR, @@ -320,9 +311,9 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, ParamListInfoData *pli; ParamExternData *pxd; StringInfoData parambuf; - Size length; - int plioff; - int32 iparam; + Size length; + int plioff; + int32 iparam; /* * Allocate buffer for params @@ -343,8 +334,8 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, */ for (iparam = 0; iparam < queryDesc->params->numParams; iparam++) { - int16 typlen; - bool typbyval; + int16 typlen; + bool typbyval; /* * Recompute pli each time in case parambuf.data is repalloc'ed @@ -361,7 +352,7 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, get_typlenbyval(pxd->ptype, &typlen, &typbyval); if (!typbyval) { - char *s = DatumGetPointer(pxd->value); + char *s = DatumGetPointer(pxd->value); if (pxd->isnull || !PointerIsValid(s)) { @@ -395,9 +386,7 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, sparams_len = 0; } - sddesc = - serializeNode((Node *) queryDesc->ddesc, &sddesc_len, - NULL /*uncompressed_size */ ); + sddesc = serializeNode((Node *) queryDesc->ddesc, &sddesc_len, NULL /*uncompressed_size */ ); MemSet(&queryParms, 0, sizeof(queryParms)); queryParms.strCommand = queryDesc->sourceText; @@ -420,8 +409,6 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, queryParms.seqServerHostlen = strlen(qdinfo->hostip) + 1; queryParms.seqServerPort = seqServerCtl->seqServerPort; - queryParms.primary_gang_id = 0; /* We are relying on the slice table to provide gang ids */ - /* * serialized a version of our snapshot */ @@ -438,9 +425,6 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc, mppTxnOptions(planRequiresTxn), "cdbdisp_dispatchPlan"); - Assert(sliceTbl); - Assert(sliceTbl->slices != NIL); - cdbdisp_dispatchX(&queryParms, cancelOnError, sliceTbl, ds); sliceTbl->localSlice = oldLocalSlice; @@ -552,8 +536,6 @@ cdbdisp_dispatchCommand(const char *strCommand, Assert(primaryGang); - queryParms.primary_gang_id = primaryGang->gang_id; - /* * Serialize a version of our DTX Context Info */ @@ -783,9 +765,6 @@ CdbDispatchUtilityStatement_NoTwoPhase(struct Node *stmt, * Allocate query text in memory context, initialize it and assign it to * all DispatchCommandQueryParms in this dispatcher state. * - * For now, there's only one field (localSlice) which is different to each - * dispatcher thread, we set it later. - * * Also, we free the DispatchCommandQueryParms memory. */ static void @@ -800,9 +779,8 @@ cdbdisp_queryParmsInit(struct CdbDispatcherState *ds, Assert(pQueryParms->strCommand != NULL); - char *queryText = - PQbuildGpQueryString(ds->dispatchStateContext, pParms, pQueryParms, - &len); + char *queryText = buildGpQueryString(ds->dispatchStateContext, + pParms, pQueryParms, &len); if (pQueryParms->serializedQuerytree != NULL) { @@ -840,6 +818,12 @@ cdbdisp_queryParmsInit(struct CdbDispatcherState *ds, pQueryParms->seqServerHost = NULL; } + if (pQueryParms->sliceIndexGangIdMap != NULL) + { + pfree(pQueryParms->sliceIndexGangIdMap); + pQueryParms->sliceIndexGangIdMap = NULL; + } + for (i = 0; i < dThreads->dispatchCommandParmsArSize; i++) { pParms = &dThreads->dispatchCommandParmsAr[i]; @@ -849,7 +833,7 @@ cdbdisp_queryParmsInit(struct CdbDispatcherState *ds, } /* - * Three Helper functions for CdbDispatchPlan: + * Three Helper functions for cdbdisp_dispatchX: * * Used to figure out the dispatch order for the sliceTable by * counting the number of dependent child slices for each slice; and @@ -869,21 +853,40 @@ cdbdisp_queryParmsInit(struct CdbDispatcherState *ds, static int compare_slice_order(const void *aa, const void *bb) { - sliceVec *a = (sliceVec *) aa; - sliceVec *b = (sliceVec *) bb; + SliceVec *a = (SliceVec *) aa; + SliceVec *b = (SliceVec *) bb; if (a->slice == NULL) return 1; if (b->slice == NULL) return -1; + /* + * Put the slice not going to dispatch in the last + */ + if (a->slice->primaryGang == NULL) + { + Assert(a->slice->gangType == GANGTYPE_UNALLOCATED); + return 1; + } + if (b->slice->primaryGang == NULL) + { + Assert(b->slice->gangType == GANGTYPE_UNALLOCATED); + return -1; + } + /* * sort the writer gang slice first, because he sets the shared snapshot */ - if (a->slice->primary_gang_id == 1 && b->slice->primary_gang_id != 1) + if (a->slice->primaryGang->gang_id == 1) + { + Assert(b->slice->primaryGang->gang_id != 1); return -1; - else if (b->slice->primary_gang_id == 1 && a->slice->primary_gang_id != 1) + } + if (b->slice->primaryGang->gang_id == 1) + { return 1; + } if (a->children == b->children) return 0; @@ -940,8 +943,8 @@ count_bits(char *bits, int nbyte) * tree and add up number of children, which will return too big number. */ static int -markbit_dep_children(SliceTable * sliceTable, int sliceIdx, - sliceVec * sliceVec, int bitmasklen, char *bits) +markbit_dep_children(SliceTable *sliceTable, int sliceIdx, + SliceVec *sliceVec, int bitmasklen, char *bits) { ListCell *sublist; Slice *slice = (Slice *) list_nth(sliceTable->slices, sliceIdx); @@ -969,15 +972,14 @@ markbit_dep_children(SliceTable * sliceTable, int sliceIdx, * Count how many dependent childrens and fill in the sliceVector of dependent childrens. */ static int -count_dependent_children(SliceTable * sliceTable, int sliceIndex, - sliceVec * sliceVector, int len) +count_dependent_children(SliceTable *sliceTable, int sliceIndex, + SliceVec *sliceVector, int len) { int ret = 0; int bitmasklen = (len + 7) >> 3; char *bitmask = palloc0(bitmasklen); - ret = markbit_dep_children(sliceTable, sliceIndex, - sliceVector, bitmasklen, bitmask); + ret = markbit_dep_children(sliceTable, sliceIndex, sliceVector, bitmasklen, bitmask); pfree(bitmask); return ret; @@ -985,7 +987,7 @@ count_dependent_children(SliceTable * sliceTable, int sliceIndex, static int fillSliceVector(SliceTable *sliceTbl, int rootIdx, - sliceVec *sliceVector, int sliceLim) + SliceVec *sliceVector, int nTotalSlices) { int top_count; @@ -993,10 +995,9 @@ fillSliceVector(SliceTable *sliceTbl, int rootIdx, * count doesn't include top slice add 1, note that sliceVector would be * modified in place by count_dependent_children. */ - top_count = - 1 + count_dependent_children(sliceTbl, rootIdx, sliceVector, sliceLim); + top_count = 1 + count_dependent_children(sliceTbl, rootIdx, sliceVector, nTotalSlices); - qsort(sliceVector, sliceLim, sizeof(sliceVec), compare_slice_order); + qsort(sliceVector, nTotalSlices, sizeof(SliceVec), compare_slice_order); return top_count; } @@ -1005,8 +1006,8 @@ fillSliceVector(SliceTable *sliceTbl, int rootIdx, * Build a query string to be dispatched to QE. */ static char * -PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, - DispatchCommandQueryParms * pQueryParms, int *finalLen) +buildGpQueryString(MemoryContext cxt, DispatchCommandParms *pParms, + DispatchCommandQueryParms *pQueryParms, int *finalLen) { const char *command = pQueryParms->strCommand; int command_len = strlen(pQueryParms->strCommand) + 1; @@ -1021,12 +1022,12 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, const char *snapshotInfo = pQueryParms->serializedDtxContextInfo; int snapshotInfo_len = pQueryParms->serializedDtxContextInfolen; int flags = 0; /* unused flags */ - int localSlice = 0; /* localSlice; placeholder; set later in dupQueryTextAndSetSliceId */ int rootIdx = pQueryParms->rootIdx; const char *seqServerHost = pQueryParms->seqServerHost; int seqServerHostlen = pQueryParms->seqServerHostlen; int seqServerPort = pQueryParms->seqServerPort; - int primary_gang_id = pQueryParms->primary_gang_id; + int numSlices = pQueryParms->numSlices; + int *sliceIndexGangIdMap = pQueryParms->sliceIndexGangIdMap; int64 currentStatementStartTimestamp = GetCurrentStatementStartTimestamp(); Oid sessionUserId = GetSessionUserId(); Oid outerUserId = GetOuterUserId(); @@ -1034,8 +1035,7 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, bool sessionUserIsSuper = superuser_arg(GetSessionUserId()); bool outerUserIsSuper = superuser_arg(GetSessionUserId()); - int tmp, - len; + int tmp, len, i; uint32 n32; int total_query_len; char *shared_query, @@ -1049,9 +1049,7 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, sizeof(sessionUserId) + 1 /* sessionUserIsSuper */ + sizeof(outerUserId) + 1 /* outerUserIsSuper */ + sizeof(currentUserId) + - sizeof(localSlice) + sizeof(rootIdx) + - sizeof(primary_gang_id) + sizeof(n32) * 2 /* currentStatementStartTimestamp */ + sizeof(command_len) + sizeof(querytree_len) + @@ -1065,7 +1063,12 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, sizeof(seqServerPort) + command_len + querytree_len + - plantree_len + params_len + sddesc_len + seqServerHostlen; + plantree_len + + params_len + + sddesc_len + + seqServerHostlen + + sizeof(numSlices) + + sizeof(int) * numSlices; shared_query = MemoryContextAlloc(cxt, total_query_len); @@ -1075,10 +1078,6 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, pos += 4; /* placeholder for message length */ - tmp = htonl(localSlice); - memcpy(pos, &tmp, sizeof(localSlice)); - pos += sizeof(localSlice); - tmp = htonl(gp_command_count); memcpy(pos, &tmp, sizeof(gp_command_count)); pos += sizeof(gp_command_count); @@ -1110,10 +1109,6 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, memcpy(pos, &tmp, sizeof(rootIdx)); pos += sizeof(rootIdx); - tmp = htonl(primary_gang_id); - memcpy(pos, &tmp, sizeof(primary_gang_id)); - pos += sizeof(primary_gang_id); - /* * High order half first, since we're doing MSB-first */ @@ -1172,11 +1167,8 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, memcpy(pos, &tmp, sizeof(tmp)); pos += sizeof(tmp); - if (command_len > 0) - { - memcpy(pos, command, command_len); - pos += command_len; - } + memcpy(pos, command, command_len); + pos += command_len; if (querytree_len > 0) { @@ -1208,6 +1200,20 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, pos += seqServerHostlen; } + tmp = htonl(numSlices); + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); + + if (numSlices > 0) + { + for (i = 0; i < numSlices; ++i) + { + tmp = htonl(sliceIndexGangIdMap[i]); + memcpy(pos, &tmp, sizeof(tmp)); + pos += sizeof(tmp); + } + } + len = pos - shared_query - 1; /* @@ -1225,10 +1231,7 @@ PQbuildGpQueryString(MemoryContext cxt, DispatchCommandParms * pParms, } /* - * This code was refactored out of cdbdisp_dispatchPlan. It's - * used both for dispatching plans when we are using normal gangs, - * and for dispatching all statements from Query Dispatch Agents - * when we are using dispatch agents. + * This function is used for dispatching sliced plans */ static void cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, @@ -1236,38 +1239,31 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, struct SliceTable *sliceTbl, struct CdbDispatcherState *ds) { - int oldLocalSlice = 0; - sliceVec *sliceVector = NULL; - int nSlices = 1; - int sliceLim = 1; - int iSlice; - int rootIdx = pQueryParms->rootIdx; + SliceVec *sliceVector = NULL; + int nSlices = 1; /* slices this dispatch cares about */ + int nTotalSlices = 1; /* total slices in sliceTbl*/ + int iSlice; + int rootIdx = pQueryParms->rootIdx; if (log_dispatch_stats) ResetUsage(); - Assert(Gp_role == GP_ROLE_DISPATCH); - - if (sliceTbl) - { - Assert(rootIdx == 0 || - (rootIdx > sliceTbl->nMotions && - rootIdx <= sliceTbl->nMotions + sliceTbl->nInitPlans)); + Assert(sliceTbl != NULL); + Assert(rootIdx == 0 || + (rootIdx > sliceTbl->nMotions && + rootIdx <= sliceTbl->nMotions + sliceTbl->nInitPlans)); - /* - * Keep old value so we can restore it. We use this field as a parameter. - */ - oldLocalSlice = sliceTbl->localSlice; + /* + * Traverse the slice tree in sliceTbl rooted at rootIdx and build a + * vector of slice indexes specifying the order of [potential] dispatch. + */ + nTotalSlices = list_length(sliceTbl->slices); + sliceVector = palloc0(nTotalSlices * sizeof(SliceVec)); - /* - * Traverse the slice tree in sliceTbl rooted at rootIdx and build a - * vector of slice indexes specifying the order of [potential] dispatch. - */ - sliceLim = list_length(sliceTbl->slices); - sliceVector = palloc0(sliceLim * sizeof(sliceVec)); + nSlices = fillSliceVector(sliceTbl, rootIdx, sliceVector, nTotalSlices); - nSlices = fillSliceVector(sliceTbl, rootIdx, sliceVector, sliceLim); - } + pQueryParms->numSlices = nTotalSlices; + pQueryParms->sliceIndexGangIdMap = buildSliceIndexGangIdMap(sliceVector, nSlices, nTotalSlices); /* * Allocate result array with enough slots for QEs of primary gangs. @@ -1275,8 +1271,7 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, ds->primaryResults = NULL; ds->dispatchThreads = NULL; - cdbdisp_makeDispatcherState(ds, nSlices * largestGangsize(), sliceLim, - cancelOnError); + cdbdisp_makeDispatcherState(ds, nSlices * largestGangsize(), nSlices, cancelOnError); cdbdisp_queryParmsInit(ds, pQueryParms); cdb_total_plans++; @@ -1284,22 +1279,16 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, if (nSlices > cdb_max_slices) cdb_max_slices = nSlices; - /* - * must have somebody to dispatch to. - */ - Assert(sliceTbl != NULL || pQueryParms->primary_gang_id > 0); - if (DEBUG1 >= log_min_messages) { - char msec_str[32]; + char msec_str[32]; switch (check_log_duration(msec_str, false)) { case 1: case 2: ereport(LOG, - (errmsg - ("duration to start of dispatch send (root %d): %s ms", + (errmsg("duration to start of dispatch send (root %d): %s ms", pQueryParms->rootIdx, msec_str))); break; } @@ -1308,81 +1297,46 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, for (iSlice = 0; iSlice < nSlices; iSlice++) { CdbDispatchDirectDesc direct; - Gang *primaryGang = NULL; - Slice *slice = NULL; - int si = -1; + Gang *primaryGang = NULL; + Slice *slice = NULL; + int si = -1; + + Assert(sliceVector != NULL); + + slice = sliceVector[iSlice].slice; + si = slice->sliceIndex; - if (sliceVector) + /* + * Is this a slice we should dispatch? + */ + if (slice && slice->gangType == GANGTYPE_UNALLOCATED) { + Assert(slice->primaryGang == NULL); /* - * Sliced dispatch, and we are either the dispatch agent, or we are - * the QD and are not using Dispatch Agents. - * - * So, dispatch to each slice. + * Most slices are dispatched, however, in many cases the + * root runs only on the QD and is not dispatched to the QEs. */ - slice = sliceVector[iSlice].slice; - si = slice->sliceIndex; + continue; + } - /* - * Is this a slice we should dispatch? - */ - if (slice && slice->gangType == GANGTYPE_UNALLOCATED) - { - /* - * Most slices are dispatched, however, in many cases the - * root runs only on the QD and is not dispatched to the QEs. - */ - continue; - } + primaryGang = slice->primaryGang; + Assert(primaryGang != NULL); - primaryGang = slice->primaryGang; + if (slice->directDispatch.isDirectDispatch) + { + direct.directed_dispatch = true; + Assert(list_length(slice->directDispatch.contentIds) == 1); + direct.count = 1; /* - * If we are on the dispatch agent, the gang pointers aren't filled in. - * We must look them up by ID + * We only support single content right now. If this changes then we need to change from + * a list to another structure to avoid n^2 cases */ - if (primaryGang == NULL) - { - elog(DEBUG2, "Dispatch %d, Gangs are %d, type=%d", iSlice, - slice->primary_gang_id, slice->gangType); - primaryGang = findGangById(slice->primary_gang_id); - - Assert(primaryGang != NULL); - if (primaryGang != NULL) - Assert(primaryGang->type == slice->gangType - || primaryGang->type == GANGTYPE_PRIMARY_WRITER); - - if (primaryGang == NULL) - continue; - } + direct.content[0] = linitial_int(slice->directDispatch.contentIds); - if (slice->directDispatch.isDirectDispatch) - { - direct.directed_dispatch = true; - direct.count = list_length(slice->directDispatch.contentIds); - - /* - * only support single content right now. If this changes then we need to change from - * a list to another structure to avoid n^2 cases - */ - Assert(direct.count == 1); - direct.content[0] = - linitial_int(slice->directDispatch.contentIds); - - if (Test_print_direct_dispatch_info) - { - elog(INFO, "Dispatch command to SINGLE content"); - } - } - else + if (Test_print_direct_dispatch_info) { - direct.directed_dispatch = false; - direct.count = 0; - - if (Test_print_direct_dispatch_info) - { - elog(INFO, "Dispatch command to ALL contents"); - } + elog(INFO, "Dispatch command to SINGLE content"); } } else @@ -1394,22 +1348,6 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, { elog(INFO, "Dispatch command to ALL contents"); } - - /* - * Non-sliced, used specified gangs - */ - elog(DEBUG2, "primary %d", pQueryParms->primary_gang_id); - if (pQueryParms->primary_gang_id > 0) - primaryGang = findGangById(pQueryParms->primary_gang_id); - } - - /* - * Must have a gang to dispatch to - */ - Assert(primaryGang != NULL); - if (primaryGang) - { - Assert(ds->primaryResults && ds->primaryResults->resultArray); } /* @@ -1423,24 +1361,13 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, break; } - /* - * Dispatch the plan to our primaryGang. - * Doesn't wait for it to finish. - */ - if (primaryGang != NULL) - { - if (primaryGang->type == GANGTYPE_PRIMARY_WRITER) - ds->primaryResults->writer_gang = primaryGang; + if (primaryGang->type == GANGTYPE_PRIMARY_WRITER) + ds->primaryResults->writer_gang = primaryGang; - cdbdisp_dispatchToGang(ds, primaryGang, si, &direct); - } + cdbdisp_dispatchToGang(ds, primaryGang, si, &direct); } - if (sliceVector) - pfree(sliceVector); - - if (sliceTbl) - sliceTbl->localSlice = oldLocalSlice; + pfree(sliceVector); /* * If bailed before completely dispatched, stop QEs and throw error. @@ -1448,8 +1375,8 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, if (iSlice < nSlices) { elog(Debug_cancel_print ? LOG : DEBUG2, - "Plan dispatch canceled; dispatched %d of %d slices", iSlice, - nSlices); + "Plan dispatch canceled; dispatched %d of %d slices", + iSlice, nSlices); /* * Cancel any QEs still running, and wait for them to terminate. @@ -1476,15 +1403,14 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms, if (DEBUG1 >= log_min_messages) { - char msec_str[32]; + char msec_str[32]; switch (check_log_duration(msec_str, false)) { case 1: case 2: ereport(LOG, - (errmsg - ("duration to dispatch out (root %d): %s ms", + (errmsg("duration to dispatch out (root %d): %s ms", pQueryParms->rootIdx, msec_str))); break; } @@ -1531,8 +1457,6 @@ cdbdisp_dispatchSetCommandToAllGangs(const char *strCommand, Assert(primaryGang); - queryParms.primary_gang_id = primaryGang->gang_id; - /* * serialized a version of our snapshot */ @@ -1629,3 +1553,30 @@ CdbDispatchUtilityStatement_Internal(struct Node *stmt, } PG_END_TRY(); } + +static int * +buildSliceIndexGangIdMap(SliceVec *sliceVec, int numSlices, int numTotalSlices) +{ + Assert(sliceVec != NULL && numSlices > 0 && numTotalSlices > 0); + + /* would be freed in buildGpQueryString */ + int *sliceIndexGangIdMap = palloc0(numTotalSlices * sizeof(int)); + + if (sliceIndexGangIdMap == NULL) + return NULL; + + Slice *slice = NULL; + int index; + + for (index = 0; index < numSlices; ++index) + { + slice = sliceVec[index].slice; + + if (slice->primaryGang == NULL) + continue; + + sliceIndexGangIdMap[slice->sliceIndex] = slice->primaryGang->gang_id; + } + + return sliceIndexGangIdMap; +} diff --git a/src/backend/cdb/dispatcher/cdbdisp_thread.c b/src/backend/cdb/dispatcher/cdbdisp_thread.c index 0462515fe2..b287c2d428 100644 --- a/src/backend/cdb/dispatcher/cdbdisp_thread.c +++ b/src/backend/cdb/dispatcher/cdbdisp_thread.c @@ -64,11 +64,6 @@ dispatchCommand(CdbDispatchResult * dispatchResult, /* returns true if command complete */ static bool processResults(CdbDispatchResult * dispatchResult); -static char * -dupQueryTextAndSetSliceId(MemoryContext cxt, - char *queryText, - int len, int sliceId); - static DispatchWaitMode cdbdisp_signalQE(SegmentDatabaseDescriptor * segdbDesc, DispatchWaitMode waitMode); @@ -102,7 +97,6 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds, newThreads = 0; int gangSize = 0; SegmentDatabaseDescriptor *db_descriptors; - char *newQueryText = NULL; DispatchCommandParms *pParms = NULL; gangSize = gp->size; @@ -128,10 +122,6 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds, ds->dispatchThreads->threadCount + max_threads); } - pParms = &ds->dispatchThreads->dispatchCommandParmsAr[0]; - newQueryText = - dupQueryTextAndSetSliceId(ds->dispatchStateContext, pParms->query_text, - pParms->query_text_len, sliceIndex); /* * Create the thread parms structures based targetSet parameter. * This will add the segdbDesc pointers appropriate to the @@ -177,12 +167,8 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds, cdbdisp_mergeConnectionErrors(qeResult, segdbDesc); parmsIndex = gp_connections_per_thread == 0 ? 0 : segdbs_in_thread_pool / gp_connections_per_thread; - pParms = - ds->dispatchThreads->dispatchCommandParmsAr + - ds->dispatchThreads->threadCount + parmsIndex; + pParms = ds->dispatchThreads->dispatchCommandParmsAr + ds->dispatchThreads->threadCount + parmsIndex; pParms->dispatchResultPtrArray[pParms->db_count++] = qeResult; - if (newQueryText != NULL) - pParms->query_text = newQueryText; /* * This CdbDispatchResult/SegmentDatabaseDescriptor pair will be @@ -206,17 +192,14 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds, else if (gp_connections_per_thread == 0) newThreads = 1; else - newThreads = 1 - + (segdbs_in_thread_pool - 1) / gp_connections_per_thread; + newThreads = 1 + (segdbs_in_thread_pool - 1) / gp_connections_per_thread; /* * Create the threads. (which also starts the dispatching). */ for (i = 0; i < newThreads; i++) { - DispatchCommandParms *pParms = - &(ds->dispatchThreads->dispatchCommandParmsAr + - ds->dispatchThreads->threadCount)[i]; + DispatchCommandParms *pParms = &(ds->dispatchThreads->dispatchCommandParmsAr + ds->dispatchThreads->threadCount)[i]; Assert(pParms != NULL); @@ -230,9 +213,7 @@ cdbdisp_dispatchToGang_internal(struct CdbDispatcherState *ds, int pthread_err = 0; pParms->thread_valid = true; - pthread_err = - gp_pthread_create(&pParms->thread, thread_DispatchCommand, - pParms, "dispatchToGang"); + pthread_err = gp_pthread_create(&pParms->thread, thread_DispatchCommand, pParms, "dispatchToGang"); if (pthread_err != 0) { @@ -1737,35 +1718,6 @@ CollectQEWriterTransactionInformation(SegmentDatabaseDescriptor * segdbDesc, } } -/* - * Set slice in query text - * - * Make a new copy of query text and set the slice id in the right place. - * - */ -static char * -dupQueryTextAndSetSliceId(MemoryContext cxt, char *queryText, - int len, int sliceId) -{ - /* - * DTX command and RM command don't need slice id - */ - if (sliceId < 0) - return NULL; - - int tmp = htonl(sliceId); - char *newQuery = MemoryContextAlloc(cxt, len); - - memcpy(newQuery, queryText, len); - - /* - * the first byte is 'M' and followed by the length, which is an integer. - * see function PQbuildGpQueryString. - */ - memcpy(newQuery + 1 + sizeof(int), &tmp, sizeof(tmp)); - return newQuery; -} - /* * Send cancel/finish signal to still-running QE through libpq. * waitMode is either CANCEL or FINISH. Returns true if we successfully diff --git a/src/backend/cdb/dispatcher/test/cdbdisp_query_test.c b/src/backend/cdb/dispatcher/test/cdbdisp_query_test.c index 86ff406793..9a10e44245 100644 --- a/src/backend/cdb/dispatcher/test/cdbdisp_query_test.c +++ b/src/backend/cdb/dispatcher/test/cdbdisp_query_test.c @@ -42,15 +42,10 @@ test__cdbdisp_dispatchPlan__Overflow_plan_size_in_kb(void **state) * Set max plan to a value that will require handling INT32 * overflow of the current plan size */ - gp_max_plan_size = INT_MAX; + gp_max_plan_size = 1024; queryDesc->plannedstmt->planTree = (struct Plan *)palloc0(sizeof(struct Plan)); - /* - * Set num_slices and uncompressed_size to be INT_MAX-1 to force overflow - */ - queryDesc->plannedstmt->planTree->nMotionNodes = INT_MAX-1; - will_assign_value(serializeNode, uncompressed_size_out, INT_MAX-1); expect_any(serializeNode, node); expect_any(serializeNode, size); @@ -74,11 +69,11 @@ test__cdbdisp_dispatchPlan__Overflow_plan_size_in_kb(void **state) StringInfo message = makeStringInfo(); appendStringInfo(message, - "Query plan size limit exceeded, current size: " UINT64_FORMAT "KB, max allowed size: %dKB", - ((INT_MAX-1)*(INT_MAX-1)/(uint64)1024), INT_MAX); + "Query plan size limit exceeded, current size: " UINT64_FORMAT "KB, max allowed size: 1024KB", + ((INT_MAX-1)/(uint64)1024)); if (edata->elevel == ERROR && - strncmp(edata->message, message->data, message->len)) + strncmp(edata->message, message->data, message->len) == 0) { success = true; } diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 22c378e9d3..4b984a42e2 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -1618,7 +1618,6 @@ InitSliceTable(EState *estate, int nMotions, int nSubplans) slice->directDispatch.isDirectDispatch = false; slice->directDispatch.contentIds = NIL; slice->primaryGang = NULL; - slice->primary_gang_id = 0; slice->parentIndex = -1; slice->children = NIL; slice->primaryProcesses = NIL; @@ -2049,7 +2048,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req) Assert(slice->gangSize == 1); slice->primaryGang = req->vec1gangs_entrydb_reader[req->nxt1gang_entrydb_reader++]; Assert(slice->primaryGang != NULL); - slice->primary_gang_id = slice->primaryGang->gang_id; slice->primaryProcesses = getCdbProcessList(slice->primaryGang, slice->sliceIndex, NULL); @@ -2063,7 +2061,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req) slice->primaryGang = req->vecNgangs[req->nxtNgang++]; Assert(slice->primaryGang != NULL); - slice->primary_gang_id = slice->primaryGang->gang_id; slice->primaryProcesses = getCdbProcessList(slice->primaryGang, slice->sliceIndex, &slice->directDispatch); break; @@ -2071,7 +2068,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req) Assert(slice->gangSize == 1); slice->primaryGang = req->vec1gangs_primary_reader[req->nxt1gang_primary_reader++]; Assert(slice->primaryGang != NULL); - slice->primary_gang_id = slice->primaryGang->gang_id; slice->primaryProcesses = getCdbProcessList(slice->primaryGang, slice->sliceIndex, &slice->directDispatch); @@ -2082,7 +2078,6 @@ AssociateSlicesToProcesses(Slice ** sliceMap, int sliceIndex, SliceReq * req) Assert(slice->gangSize == getgpsegmentCount()); slice->primaryGang = req->vecNgangs[req->nxtNgang++]; Assert(slice->primaryGang != NULL); - slice->primary_gang_id = slice->primaryGang->gang_id; slice->primaryProcesses = getCdbProcessList(slice->primaryGang, slice->sliceIndex, &slice->directDispatch); diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 9a0c528a65..39e78ccaf6 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4221,7 +4221,6 @@ _copySlice(Slice *from) COPY_NODE_FIELD(directDispatch.contentIds); newnode->primaryGang = from->primaryGang; - COPY_SCALAR_FIELD(primary_gang_id); COPY_SCALAR_FIELD(parentIndex); COPY_NODE_FIELD(children); COPY_NODE_FIELD(primaryProcesses); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 45db7bda66..19cd5da54f 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -4069,7 +4069,6 @@ _outSlice(StringInfo str, Slice *node) WRITE_BOOL_FIELD(directDispatch.isDirectDispatch); WRITE_NODE_FIELD(directDispatch.contentIds); /* List of int */ WRITE_DUMMY_FIELD(primaryGang); - WRITE_INT_FIELD(primary_gang_id); WRITE_INT_FIELD(parentIndex); /* List of int index */ WRITE_NODE_FIELD(children); /* List of int index */ WRITE_NODE_FIELD(primaryProcesses); /* List of (CDBProcess *) */ diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c index 378de4bc3b..c0db7c7248 100644 --- a/src/backend/nodes/readfast.c +++ b/src/backend/nodes/readfast.c @@ -2270,7 +2270,6 @@ _readSlice(void) READ_BOOL_FIELD(directDispatch.isDirectDispatch); READ_NODE_FIELD(directDispatch.contentIds); /* List of int index */ READ_DUMMY_FIELD(primaryGang, NULL); - READ_INT_FIELD(primary_gang_id); READ_INT_FIELD(parentIndex); /* List of int index */ READ_NODE_FIELD(children); /* List of int index */ READ_NODE_FIELD(primaryProcesses); /* List of (CDBProcess *) */ diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 9f3d569c01..f613c233ec 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2860,7 +2860,6 @@ _readSlice(void) READ_BOOL_FIELD(directDispatch.isDirectDispatch); READ_NODE_FIELD(directDispatch.contentIds); /* List of int index */ READ_DUMMY_FIELD(primaryGang, NULL); - READ_INT_FIELD(primary_gang_id); READ_INT_FIELD(parentIndex); /* List of int index */ READ_NODE_FIELD(children); /* List of int index */ READ_NODE_FIELD(primaryProcesses); /* List of (CDBProcess *) */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index f5498c9f54..c09f8ffc5b 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4729,7 +4729,7 @@ PostgresMain(int argc, char *argv[], send_ready_for_query = true; } break; - case 'M': /* MPP dispatched stmt from QD */ + case 'M': /* MPP dispatched stmt from QD */ { /* This is exactly like 'Q' above except we peel off and * set the snapshot information right away. @@ -4755,24 +4755,21 @@ PostgresMain(int argc, char *argv[], int serializedQueryDispatchDesclen = 0; int seqServerHostlen = 0; int seqServerPort = -1; - - int localSlice; - int rootIdx; - int primary_gang_id; + + int localSlice = -1, i; + int rootIdx; + int numSlices = 0; TimestampTz statementStart; - Oid suid; - Oid ouid; - Oid cuid; - bool suid_is_super = false; - bool ouid_is_super = false; + Oid suid; + Oid ouid; + Oid cuid; + bool suid_is_super = false; + bool ouid_is_super = false; int unusedFlags; /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); - - /* get the slice number# */ - localSlice = pq_getmsgint(&input_message, 4); /* get the client command serial# */ gp_command_count = pq_getmsgint(&input_message, 4); @@ -4791,8 +4788,6 @@ PostgresMain(int argc, char *argv[], rootIdx = pq_getmsgint(&input_message, 4); - primary_gang_id = pq_getmsgint(&input_message, 4); - statementStart = pq_getmsgint64(&input_message); /* * Should we set the CurrentStatementStartTimestamp to the @@ -4802,8 +4797,6 @@ PostgresMain(int argc, char *argv[], * * Or both? */ - //SetCurrentStatementStartTimestampToMaster(statementStart); - /* read ser string lengths */ query_string_len = pq_getmsgint(&input_message, 4); serializedQuerytreelen = pq_getmsgint(&input_message, 4); @@ -4847,8 +4840,26 @@ PostgresMain(int argc, char *argv[], if (seqServerHostlen > 0) seqServerHost = pq_getmsgbytes(&input_message, seqServerHostlen); + numSlices = pq_getmsgint(&input_message, 4); + + Assert(qe_gang_id > 0); + for (i = 0; i < numSlices; ++i) + { + if (qe_gang_id == pq_getmsgint(&input_message, 4)) + { + localSlice = i; + } + } + + if (localSlice == -1 && numSlices > 0) + { + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("QE cannot find slice to execute"))); + } + pq_getmsgend(&input_message); - + elog((Debug_print_full_dtm ? LOG : DEBUG5), "MPP dispatched stmt from QD: %s.",query_string); if (suid > 0) @@ -4919,7 +4930,6 @@ PostgresMain(int argc, char *argv[], const char *gid; DistributedTransactionId gxid; - int primary_gang_id; int serializedSnapshotlen; const char *serializedSnapshot; @@ -4947,8 +4957,6 @@ PostgresMain(int argc, char *argv[], /* get the distributed transaction id */ gxid = (DistributedTransactionId) pq_getmsgint(&input_message, 4); - primary_gang_id = pq_getmsgint(&input_message, 4); - serializedSnapshotlen = pq_getmsgint(&input_message, 4); /* read in the snapshot info/ DtxContext */ @@ -4960,18 +4968,14 @@ PostgresMain(int argc, char *argv[], /* * This is for debugging. Otherwise we don't need to deserialize this */ - DtxContextInfo_Deserialize( - serializedSnapshot, serializedSnapshotlen, - &TempDtxContextInfo); + DtxContextInfo_Deserialize(serializedSnapshot, serializedSnapshotlen, &TempDtxContextInfo); pq_getmsgend(&input_message); // Do not touch DTX context. - exec_mpp_dtx_protocol_command(dtxProtocolCommand, flags, loggingStr, gid, gxid, &TempDtxContextInfo); send_ready_for_query = true; - } break; diff --git a/src/include/cdb/cdbgang.h b/src/include/cdb/cdbgang.h index 1c514efd69..d62ac60cc1 100644 --- a/src/include/cdb/cdbgang.h +++ b/src/include/cdb/cdbgang.h @@ -51,6 +51,9 @@ typedef struct Gang MemoryContext perGangContext; } Gang; +extern int qe_gang_id; + + extern Gang *allocateReaderGang(GangType type, char *portal_name); extern Gang *allocateWriterGang(void); diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index a47200ce98..a1ed50e364 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -102,9 +102,6 @@ typedef struct Slice struct Gang *primaryGang; - /* tell dispatch agents which gang we're talking about. */ - int primary_gang_id; - /* * A list of CDBProcess nodes corresponding to the worker processes * allocated to implement this plan slice. diff --git a/src/test/regress/expected/plan_size.out b/src/test/regress/expected/plan_size.out deleted file mode 100644 index 2e227f24b1..0000000000 --- a/src/test/regress/expected/plan_size.out +++ /dev/null @@ -1,308 +0,0 @@ ---start_ignore -drop table if exists plineitem cascade; -NOTICE: table "plineitem" does not exist, skipping -drop table if exists times; -NOTICE: table "times" does not exist, skipping ---end_ignore --- Ignoring error messages that might cause diff when plan size changes for test queries --- start_matchsubs --- m/ERROR: Query plan size limit exceeded.*/ --- s/ERROR: Query plan size limit exceeded.*/ERROR_MESSAGE/ --- end_matchsubs --- create partitioned lineitem, partitioned by an integer column -CREATE TABLE plineitem -( - L_ID INTEGER , - L_ORDERKEY INTEGER , - L_PARTKEY INTEGER , - L_SUPPKEY INTEGER , - L_LINENUMBER INTEGER , - L_QUANTITY DECIMAL(15,2) , - L_EXTENDEDPRICE DECIMAL(15,2) , - L_DISCOUNT DECIMAL(15,2) , - L_TAX DECIMAL(15,2) , - L_RETURNFLAG CHAR(1) , - L_LINESTATUS CHAR(1) , - L_SHIPDATE DATE , - L_COMMITDATE DATE , - L_RECEIPTDATE DATE , - L_SHIPINSTRUCT CHAR(25) , - L_SHIPMODE CHAR(10) , - L_COMMENT VARCHAR(44) - ) distributed by (l_orderkey) -partition by range(L_ID) -( -start (0) inclusive end(3000) every(100) -); -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_1" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_2" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_3" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_4" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_5" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_6" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_7" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_8" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_9" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_10" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_11" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_12" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_13" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_14" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_15" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_16" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_17" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_18" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_19" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_20" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_21" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_22" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_23" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_24" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_25" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_26" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_27" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_28" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_29" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_30" for table "plineitem" -create table times -( -id int, -date_id date, -date_text_id varchar(20), -day_name varchar(20), -day_number_in_week int, -day_number_in_month int, -week_number_in_month int, -week_number_in_year int, -week_start_date date, -week_end_date date, -month_number int, -month_name varchar(20), -year_number int, -quarter_number int, -quarter_desc varchar(20), -leap boolean, -days_in_year int -) distributed by (date_id); -create or replace function f() returns setof plineitem as $$ -select * from plineitem; -$$ -language SQL; -create or replace function f2() returns bigint as $$ -select count(*) from plineitem; -$$ -language SQL; -set gp_max_plan_size =20; --- Following queries should error out for plan size 20KB --- simple select -select * from plineitem; -ERROR: Query plan size limit exceeded, current size: 85KB, max allowed size: 20KB -HINT: Size controlled by gp_max_plan_size --- UDFs -select * from f(); -ERROR: Query plan size limit exceeded, current size: 85KB, max allowed size: 20KB -HINT: Size controlled by gp_max_plan_size -CONTEXT: SQL function "f" statement 1 -select * from f2(); -ERROR: Query plan size limit exceeded, current size: 27KB, max allowed size: 20KB -HINT: Size controlled by gp_max_plan_size -CONTEXT: SQL function "f2" statement 1 --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; -ERROR: Query plan size limit exceeded, current size: 123KB, max allowed size: 20KB -HINT: Size controlled by gp_max_plan_size -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; -ERROR: Query plan size limit exceeded, current size: 81KB, max allowed size: 20KB -HINT: Size controlled by gp_max_plan_size --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; -ERROR: Query plan size limit exceeded, current size: 307KB, max allowed size: 20KB -HINT: Size controlled by gp_max_plan_size -set gp_max_plan_size ='700kB'; --- Following queries should be dispatched for plan size 700kB --- simple select -select * from plineitem; - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - --- UDFs -select * from f(); - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - -select * from f2(); - f2 ----- - 0 -(1 row) - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; - l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order ---------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+------------- -(0 rows) - -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - -set gp_max_plan_size ='10MB'; --- Following queries should success for plan size 10MB --- simple select -select * from plineitem; - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - --- UDFs -select * from f(); - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - -select * from f2(); - f2 ----- - 0 -(1 row) - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; - l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order ---------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+------------- -(0 rows) - -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - diff --git a/src/test/regress/expected/plan_size_optimizer.out b/src/test/regress/expected/plan_size_optimizer.out deleted file mode 100644 index f8a7c7a6d5..0000000000 --- a/src/test/regress/expected/plan_size_optimizer.out +++ /dev/null @@ -1,313 +0,0 @@ ---start_ignore -drop table if exists plineitem cascade; -NOTICE: table "plineitem" does not exist, skipping -drop table if exists times; -NOTICE: table "times" does not exist, skipping ---end_ignore --- Ignoring error messages that might cause diff when plan size changes for test queries --- start_matchsubs --- m/ERROR: Query plan size limit exceeded.*/ --- s/ERROR: Query plan size limit exceeded.*/ERROR_MESSAGE/ --- end_matchsubs --- create partitioned lineitem, partitioned by an integer column -CREATE TABLE plineitem -( - L_ID INTEGER , - L_ORDERKEY INTEGER , - L_PARTKEY INTEGER , - L_SUPPKEY INTEGER , - L_LINENUMBER INTEGER , - L_QUANTITY DECIMAL(15,2) , - L_EXTENDEDPRICE DECIMAL(15,2) , - L_DISCOUNT DECIMAL(15,2) , - L_TAX DECIMAL(15,2) , - L_RETURNFLAG CHAR(1) , - L_LINESTATUS CHAR(1) , - L_SHIPDATE DATE , - L_COMMITDATE DATE , - L_RECEIPTDATE DATE , - L_SHIPINSTRUCT CHAR(25) , - L_SHIPMODE CHAR(10) , - L_COMMENT VARCHAR(44) - ) distributed by (l_orderkey) -partition by range(L_ID) -( -start (0) inclusive end(3000) every(100) -); -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_1" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_2" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_3" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_4" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_5" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_6" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_7" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_8" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_9" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_10" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_11" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_12" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_13" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_14" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_15" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_16" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_17" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_18" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_19" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_20" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_21" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_22" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_23" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_24" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_25" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_26" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_27" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_28" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_29" for table "plineitem" -NOTICE: CREATE TABLE will create partition "plineitem_1_prt_30" for table "plineitem" -create table times -( -id int, -date_id date, -date_text_id varchar(20), -day_name varchar(20), -day_number_in_week int, -day_number_in_month int, -week_number_in_month int, -week_number_in_year int, -week_start_date date, -week_end_date date, -month_number int, -month_name varchar(20), -year_number int, -quarter_number int, -quarter_desc varchar(20), -leap boolean, -days_in_year int -) distributed by (date_id); -create or replace function f() returns setof plineitem as $$ -select * from plineitem; -$$ -language SQL; -create or replace function f2() returns bigint as $$ -select count(*) from plineitem; -$$ -language SQL; -set gp_max_plan_size =20; --- Following queries should error out for plan size 20KB --- simple select -select * from plineitem; - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - --- UDFs -select * from f(); - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - -select * from f2(); - f2 ----- - 0 -(1 row) - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; -ERROR: Query plan size limit exceeded, current size: 39KB, max allowed size: 30KB -HINT: Size controlled by gp_max_plan_size -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; -ERROR: Query plan size limit exceeded, current size: 23KB, max allowed size: 20KB -HINT: Size controlled by gp_max_plan_size --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; -ERROR: Query plan size limit exceeded, current size: 133KB, max allowed size: 30KB -HINT: Size controlled by gp_max_plan_size -set gp_max_plan_size ='700kB'; --- Following queries should be dispatched for plan size 700kB --- simple select -select * from plineitem; - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - --- UDFs -select * from f(); - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - -select * from f2(); - f2 ----- - 0 -(1 row) - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; - l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order ---------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+------------- -(0 rows) - -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - -set gp_max_plan_size ='10MB'; --- Following queries should success for plan size 10MB --- simple select -select * from plineitem; - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - --- UDFs -select * from f(); - l_id | l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment -------+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+--------------+--------------+------------+--------------+---------------+----------------+------------+----------- -(0 rows) - -select * from f2(); - f2 ----- - 0 -(1 row) - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; - l_returnflag | l_linestatus | sum_qty | sum_base_price | sum_disc_price | sum_charge | avg_qty | avg_price | avg_disc | count_order ---------------+--------------+---------+----------------+----------------+------------+---------+-----------+----------+------------- -(0 rows) - -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; - l_orderkey | revenue | count_order -------------+---------+------------- -(0 rows) - diff --git a/src/test/regress/greenplum_schedule b/src/test/regress/greenplum_schedule index 765f03de00..41b1c9603c 100755 --- a/src/test/regress/greenplum_schedule +++ b/src/test/regress/greenplum_schedule @@ -68,7 +68,7 @@ test: resource_queue # vacuum from removing dead tuples. test: gp_toolkit -test: filespace trig auth_constraint role rle portals_updatable plpgsql_cache timeseries resource_queue_function pg_stat_last_operation gp_numeric_agg plan_size partindex_test direct_dispatch partition_pruning_with_fn dsp +test: filespace trig auth_constraint role rle portals_updatable plpgsql_cache timeseries resource_queue_function pg_stat_last_operation gp_numeric_agg partindex_test direct_dispatch partition_pruning_with_fn dsp # direct dispatch tests test: bfv_dd bfv_dd_multicolumn bfv_dd_types diff --git a/src/test/regress/sql/plan_size.sql b/src/test/regress/sql/plan_size.sql deleted file mode 100644 index 13da723cbe..0000000000 --- a/src/test/regress/sql/plan_size.sql +++ /dev/null @@ -1,248 +0,0 @@ ---start_ignore -drop table if exists plineitem cascade; -drop table if exists times; ---end_ignore - --- Ignoring error messages that might cause diff when plan size changes for test queries - --- start_matchsubs --- m/ERROR: Query plan size limit exceeded.*/ --- s/ERROR: Query plan size limit exceeded.*/ERROR_MESSAGE/ --- end_matchsubs - --- create partitioned lineitem, partitioned by an integer column -CREATE TABLE plineitem -( - L_ID INTEGER , - L_ORDERKEY INTEGER , - L_PARTKEY INTEGER , - L_SUPPKEY INTEGER , - L_LINENUMBER INTEGER , - L_QUANTITY DECIMAL(15,2) , - L_EXTENDEDPRICE DECIMAL(15,2) , - L_DISCOUNT DECIMAL(15,2) , - L_TAX DECIMAL(15,2) , - L_RETURNFLAG CHAR(1) , - L_LINESTATUS CHAR(1) , - L_SHIPDATE DATE , - L_COMMITDATE DATE , - L_RECEIPTDATE DATE , - L_SHIPINSTRUCT CHAR(25) , - L_SHIPMODE CHAR(10) , - L_COMMENT VARCHAR(44) - ) distributed by (l_orderkey) -partition by range(L_ID) -( -start (0) inclusive end(3000) every(100) -); - -create table times -( -id int, -date_id date, -date_text_id varchar(20), -day_name varchar(20), -day_number_in_week int, -day_number_in_month int, -week_number_in_month int, -week_number_in_year int, -week_start_date date, -week_end_date date, -month_number int, -month_name varchar(20), -year_number int, -quarter_number int, -quarter_desc varchar(20), -leap boolean, -days_in_year int -) distributed by (date_id); - - - -create or replace function f() returns setof plineitem as $$ -select * from plineitem; -$$ -language SQL; - - -create or replace function f2() returns bigint as $$ -select count(*) from plineitem; -$$ -language SQL; - - -set gp_max_plan_size =20; - --- Following queries should error out for plan size 20KB --- simple select -select * from plineitem; - --- UDFs -select * from f(); - -select * from f2(); - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; - -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; - --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; - - - -set gp_max_plan_size ='700kB'; - --- Following queries should be dispatched for plan size 700kB - --- simple select -select * from plineitem; - --- UDFs -select * from f(); - -select * from f2(); - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; - -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; - --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; - -set gp_max_plan_size ='10MB'; - - --- Following queries should success for plan size 10MB - --- simple select -select * from plineitem; - --- UDFs -select * from f(); - -select * from f2(); - --- Joins -select l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order -from - plineitem,times -where - plineitem.l_shipdate = times.date_id -group by - l_returnflag, - l_linestatus -order by - l_returnflag, - l_linestatus; - -select l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - count(*) as count_order -from - plineitem join times on (times.date_id = plineitem.l_shipdate and plineitem.l_receiptdate < times.week_end_date) -group by - l_orderkey -order by - l_orderkey -LIMIT 10; - --- Init plans -select p1.l_orderkey, - sum(p1.l_extendedprice * (1 - p1.l_discount)) as revenue, - count(*) as count_order -from - plineitem p1 join times on (times.date_id = p1.l_shipdate and p1.l_receiptdate < times.week_end_date) - join plineitem p2 on ( p2.l_shipdate < (select max(l_receiptdate) from plineitem)) -group by - p1.l_orderkey -order by - p1.l_orderkey -LIMIT 10; \ No newline at end of file -- GitLab