提交 5d73d223 编写于 作者: K Kenan Yao

Wrap logic in cdbdisp_dispatchPlan for building DispatchCommandQueryParms

into a separate function, and change the DispatchCommandQueryParms allocation to
palloc.
上级 120bac7d
......@@ -110,6 +110,9 @@ buildGpQueryString(struct CdbDispatcherState *ds,
DispatchCommandQueryParms *pQueryParms,
int *finalLen);
static DispatchCommandQueryParms *
cdbdisp_buildPlanQueryParms(struct QueryDesc *queryDesc, bool planRequiresTxn);
static void
cdbdisp_destroyQueryParms(DispatchCommandQueryParms *pQueryParms);
......@@ -166,26 +169,11 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc,
bool planRequiresTxn,
bool cancelOnError, struct CdbDispatcherState *ds)
{
char *splan,
*sddesc,
*sparams;
int splan_len,
splan_len_uncompressed,
sddesc_len,
sparams_len;
SliceTable *sliceTbl;
int rootIdx;
int oldLocalSlice;
PlannedStmt *stmt;
bool is_SRI = false;
DispatchCommandQueryParms queryParms;
CdbComponentDatabaseInfo *qdinfo;
ds->primaryResults = NULL;
ds->dispatchParams = NULL;
DispatchCommandQueryParms *pQueryParms;
Assert(Gp_role == GP_ROLE_DISPATCH);
Assert(queryDesc != NULL && queryDesc->estate != NULL);
......@@ -196,17 +184,7 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc,
* locally and assert our expectations about it.
*/
sliceTbl = queryDesc->estate->es_sliceTable;
rootIdx = RootSliceIndex(queryDesc->estate);
Assert(sliceTbl != NULL);
Assert(rootIdx == 0 ||
(rootIdx > sliceTbl->nMotions
&& rootIdx <= sliceTbl->nMotions + sliceTbl->nInitPlans));
/*
* Keep old value so we can restore it. We use this field as a parameter.
*/
oldLocalSlice = sliceTbl->localSlice;
/*
* This function is called only for planned statements.
......@@ -214,6 +192,11 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc,
stmt = queryDesc->plannedstmt;
Assert(stmt);
/*
* Keep old value so we can restore it. We use this field as a parameter.
*/
oldLocalSlice = sliceTbl->localSlice;
/*
* Let's evaluate STABLE functions now, so we get consistent values on the QEs
*
......@@ -278,144 +261,14 @@ cdbdisp_dispatchPlan(struct QueryDesc *queryDesc,
verify_shared_snapshot_ready();
}
/*
* serialized plan tree. Note that we're called for a single
* slice tree (corresponding to an initPlan or the main plan), so the
* parameters are fixed and we can include them in the prefix.
*/
splan = serializeNode((Node *) queryDesc->plannedstmt,
&splan_len, &splan_len_uncompressed);
pQueryParms = cdbdisp_buildPlanQueryParms(queryDesc, planRequiresTxn);
uint64 plan_size_in_kb = ((uint64) splan_len_uncompressed) / (uint64) 1024;
if (0 < gp_max_plan_size && plan_size_in_kb > gp_max_plan_size)
{
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("Query plan size limit exceeded, current size: "
UINT64_FORMAT "KB, max allowed size: %dKB",
plan_size_in_kb, gp_max_plan_size),
errhint("Size controlled by gp_max_plan_size"))));
}
Assert(splan != NULL && splan_len > 0 && splan_len_uncompressed > 0);
if (queryDesc->params != NULL && queryDesc->params->numParams > 0)
{
ParamListInfoData *pli;
ParamExternData *pxd;
StringInfoData parambuf;
Size length;
int plioff;
int32 iparam;
/*
* Allocate buffer for params
*/
initStringInfo(&parambuf);
/*
* Copy ParamListInfoData header and ParamExternData array
*/
pli = queryDesc->params;
length = (char *) &pli->params[pli->numParams] - (char *) pli;
plioff = parambuf.len;
Assert(plioff == MAXALIGN(plioff));
appendBinaryStringInfo(&parambuf, pli, length);
/*
* Copy pass-by-reference param values.
*/
for (iparam = 0; iparam < queryDesc->params->numParams; iparam++)
{
int16 typlen;
bool typbyval;
/*
* Recompute pli each time in case parambuf.data is repalloc'ed
*/
pli = (ParamListInfoData *) (parambuf.data + plioff);
pxd = &pli->params[iparam];
if (pxd->ptype == InvalidOid)
continue;
/*
* Does pxd->value contain the value itself, or a pointer?
*/
get_typlenbyval(pxd->ptype, &typlen, &typbyval);
if (!typbyval)
{
char *s = DatumGetPointer(pxd->value);
if (pxd->isnull || !PointerIsValid(s))
{
pxd->isnull = true;
pxd->value = 0;
}
else
{
length = datumGetSize(pxd->value, typbyval, typlen);
/*
* We *must* set this before we
* append. Appending may realloc, which will
* invalidate our pxd ptr. (obviously we could
* append first if we recalculate pxd from the new
* base address)
*/
pxd->value = Int32GetDatum(length);
appendBinaryStringInfo(&parambuf, &iparam, sizeof(iparam));
appendBinaryStringInfo(&parambuf, s, length);
}
}
}
sparams = parambuf.data;
sparams_len = parambuf.len;
}
else
{
sparams = NULL;
sparams_len = 0;
}
sddesc = serializeNode((Node *) queryDesc->ddesc, &sddesc_len, NULL /*uncompressed_size */ );
ds->primaryResults = NULL;
ds->dispatchParams = NULL;
MemSet(&queryParms, 0, sizeof(queryParms));
queryParms.strCommand = queryDesc->sourceText;
queryParms.serializedQuerytree = NULL;
queryParms.serializedQuerytreelen = 0;
queryParms.serializedPlantree = splan;
queryParms.serializedPlantreelen = splan_len;
queryParms.serializedParams = sparams;
queryParms.serializedParamslen = sparams_len;
queryParms.serializedQueryDispatchDesc = sddesc;
queryParms.serializedQueryDispatchDesclen = sddesc_len;
queryParms.rootIdx = rootIdx;
cdbdisp_dispatchX(pQueryParms, cancelOnError, sliceTbl, ds);
/*
* sequence server info
*/
qdinfo = &(getComponentDatabases()->entry_db_info[0]);
Assert(qdinfo != NULL && qdinfo->hostip != NULL);
queryParms.seqServerHost = pstrdup(qdinfo->hostip);
queryParms.seqServerHostlen = strlen(qdinfo->hostip) + 1;
queryParms.seqServerPort = seqServerCtl->seqServerPort;
/*
* Serialize a version of our snapshot, and generate our transction
* isolations. We generally want Plan based dispatch to be in a global
* transaction. The executor gets to decide if the special circumstances
* exist which allow us to dispatch without starting a global xact.
*/
queryParms.serializedDtxContextInfo =
qdSerializeDtxContextInfo(&queryParms.serializedDtxContextInfolen,
true /* wantSnapshot */ ,
queryDesc->extended_query,
mppTxnOptions(planRequiresTxn),
"cdbdisp_dispatchPlan");
cdbdisp_dispatchX(&queryParms, cancelOnError, sliceTbl, ds);
cdbdisp_destroyQueryParms(pQueryParms);
sliceTbl->localSlice = oldLocalSlice;
}
......@@ -561,7 +414,7 @@ cdbdisp_dispatchCommandInternal(const char *strCommand,
CdbDispatchResults* dispatchresults = NULL;
StringInfoData qeErrorMsg;
DispatchCommandQueryParms queryParms;
DispatchCommandQueryParms *pQueryParms;
Gang *primaryGang;
CdbComponentDatabaseInfo *qdinfo;
char *queryText = NULL;
......@@ -586,10 +439,10 @@ cdbdisp_dispatchCommandInternal(const char *strCommand,
"cdbdisp_dispatchCommandOrSerializedQuerytree: %.50s (needTwoPhase = %s)", strCommand,
(needTwoPhase ? "true" : "false"));
MemSet(&queryParms, 0, sizeof(queryParms));
queryParms.strCommand = strCommand;
queryParms.serializedQuerytree = serializedQuerytree;
queryParms.serializedQuerytreelen = serializedQuerytreelen;
pQueryParms = palloc0(sizeof(*pQueryParms));
pQueryParms->strCommand = strCommand;
pQueryParms->serializedQuerytree = serializedQuerytree;
pQueryParms->serializedQuerytreelen = serializedQuerytreelen;
/*
* Allocate a primary QE for every available segDB in the system.
......@@ -601,8 +454,8 @@ cdbdisp_dispatchCommandInternal(const char *strCommand,
/*
* Serialize a version of our DTX Context Info
*/
queryParms.serializedDtxContextInfo =
qdSerializeDtxContextInfo(&queryParms.serializedDtxContextInfolen,
pQueryParms->serializedDtxContextInfo =
qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
withSnapshot, false,
mppTxnOptions(needTwoPhase),
"cdbdisp_dispatchCommandOrSerializedQuerytree");
......@@ -612,17 +465,17 @@ cdbdisp_dispatchCommandInternal(const char *strCommand,
*/
qdinfo = &(getComponentDatabases()->entry_db_info[0]);
Assert(qdinfo != NULL && qdinfo->hostip != NULL);
queryParms.seqServerHost = pstrdup(qdinfo->hostip);
queryParms.seqServerHostlen = strlen(qdinfo->hostip) + 1;
queryParms.seqServerPort = seqServerCtl->seqServerPort;
pQueryParms->seqServerHost = pstrdup(qdinfo->hostip);
pQueryParms->seqServerHostlen = strlen(qdinfo->hostip) + 1;
pQueryParms->seqServerPort = seqServerCtl->seqServerPort;
/*
* Dispatch the command.
*/
ds.primaryResults = NULL;
ds.dispatchParams = NULL;
queryText = buildGpQueryString(&ds, &queryParms, &queryTextLength);
cdbdisp_destroyQueryParms(&queryParms);
queryText = buildGpQueryString(&ds, pQueryParms, &queryTextLength);
cdbdisp_destroyQueryParms(pQueryParms);
cdbdisp_makeDispatcherState(&ds, /*slice count*/1, cancelOnError, queryText, queryTextLength);
ds.primaryResults->writer_gang = primaryGang;
......@@ -672,6 +525,168 @@ cdbdisp_dispatchCommandInternal(const char *strCommand,
cdbdisp_destroyDispatcherState(&ds);
}
static DispatchCommandQueryParms *
cdbdisp_buildPlanQueryParms(struct QueryDesc *queryDesc,
bool planRequiresTxn)
{
char *splan,
*sddesc,
*sparams;
int splan_len,
splan_len_uncompressed,
sddesc_len,
sparams_len,
rootIdx;
rootIdx = RootSliceIndex(queryDesc->estate);
#ifdef USE_ASSERT_CHECKING
SliceTable *sliceTbl = queryDesc->estate->es_sliceTable;
Assert(rootIdx == 0 ||
(rootIdx > sliceTbl->nMotions
&& rootIdx <= sliceTbl->nMotions + sliceTbl->nInitPlans));
#endif
CdbComponentDatabaseInfo *qdinfo;
DispatchCommandQueryParms *pQueryParms = (DispatchCommandQueryParms *) palloc0(sizeof(*pQueryParms));
/*
* serialized plan tree. Note that we're called for a single
* slice tree (corresponding to an initPlan or the main plan), so the
* parameters are fixed and we can include them in the prefix.
*/
splan = serializeNode((Node *) queryDesc->plannedstmt, &splan_len, &splan_len_uncompressed);
uint64 plan_size_in_kb = ((uint64) splan_len_uncompressed) / (uint64) 1024;
if (0 < gp_max_plan_size && plan_size_in_kb > gp_max_plan_size)
{
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("Query plan size limit exceeded, current size: "
UINT64_FORMAT "KB, max allowed size: %dKB",
plan_size_in_kb, gp_max_plan_size),
errhint("Size controlled by gp_max_plan_size"))));
}
Assert(splan != NULL && splan_len > 0 && splan_len_uncompressed > 0);
if (queryDesc->params != NULL && queryDesc->params->numParams > 0)
{
ParamListInfoData *pli;
ParamExternData *pxd;
StringInfoData parambuf;
Size length;
int32 iparam;
/*
* Allocate buffer for params
*/
initStringInfo(&parambuf);
/*
* Copy ParamListInfoData header and ParamExternData array
*/
pli = queryDesc->params;
length = (char *) &pli->params[pli->numParams] - (char *) pli;
appendBinaryStringInfo(&parambuf, pli, length);
/*
* Copy pass-by-reference param values.
*/
for (iparam = 0; iparam < queryDesc->params->numParams; iparam++)
{
int16 typlen;
bool typbyval;
/*
* Recompute pli each time in case parambuf.data is repalloc'ed
*/
pli = (ParamListInfoData *) parambuf.data;
pxd = &pli->params[iparam];
if (pxd->ptype == InvalidOid)
continue;
/*
* Does pxd->value contain the value itself, or a pointer?
*/
get_typlenbyval(pxd->ptype, &typlen, &typbyval);
if (!typbyval)
{
char *s = DatumGetPointer(pxd->value);
if (pxd->isnull || !PointerIsValid(s))
{
pxd->isnull = true;
pxd->value = 0;
}
else
{
length = datumGetSize(pxd->value, typbyval, typlen);
/*
* We *must* set this before we
* append. Appending may realloc, which will
* invalidate our pxd ptr. (obviously we could
* append first if we recalculate pxd from the new
* base address)
*/
pxd->value = Int32GetDatum(length);
appendBinaryStringInfo(&parambuf, &iparam, sizeof(iparam));
appendBinaryStringInfo(&parambuf, s, length);
}
}
}
sparams = parambuf.data;
sparams_len = parambuf.len;
}
else
{
sparams = NULL;
sparams_len = 0;
}
sddesc = serializeNode((Node *) queryDesc->ddesc, &sddesc_len, NULL /*uncompressed_size */ );
pQueryParms->strCommand = queryDesc->sourceText;
pQueryParms->serializedQuerytree = NULL;
pQueryParms->serializedQuerytreelen = 0;
pQueryParms->serializedPlantree = splan;
pQueryParms->serializedPlantreelen = splan_len;
pQueryParms->serializedParams = sparams;
pQueryParms->serializedParamslen = sparams_len;
pQueryParms->serializedQueryDispatchDesc = sddesc;
pQueryParms->serializedQueryDispatchDesclen = sddesc_len;
pQueryParms->rootIdx = rootIdx;
/*
* sequence server info
*/
qdinfo = &(getComponentDatabases()->entry_db_info[0]);
Assert(qdinfo != NULL && qdinfo->hostip != NULL);
pQueryParms->seqServerHost = pstrdup(qdinfo->hostip);
pQueryParms->seqServerHostlen = strlen(qdinfo->hostip) + 1;
pQueryParms->seqServerPort = seqServerCtl->seqServerPort;
/*
* Serialize a version of our snapshot, and generate our transction
* isolations. We generally want Plan based dispatch to be in a global
* transaction. The executor gets to decide if the special circumstances
* exist which allow us to dispatch without starting a global xact.
*/
pQueryParms->serializedDtxContextInfo =
qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
true /* wantSnapshot */ ,
queryDesc->extended_query,
mppTxnOptions(planRequiresTxn),
"cdbdisp_buildPlanQueryParms");
return pQueryParms;
}
/*
* Free memory allocated in DispatchCommandQueryParms
*/
......@@ -724,6 +739,8 @@ cdbdisp_destroyQueryParms(DispatchCommandQueryParms *pQueryParms)
pfree(pQueryParms->sliceIndexGangIdMap);
pQueryParms->sliceIndexGangIdMap = NULL;
}
pfree(pQueryParms);
}
/*
......@@ -1176,7 +1193,6 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms,
ds->primaryResults = NULL;
ds->dispatchParams = NULL;
queryText = buildGpQueryString(ds, pQueryParms, &queryTextLength);
cdbdisp_destroyQueryParms(pQueryParms);
cdbdisp_makeDispatcherState(ds, nSlices, cancelOnError, queryText, queryTextLength);
cdb_total_plans++;
......@@ -1339,7 +1355,7 @@ cdbdisp_dispatchSetCommandToAllGangs(const char *strCommand,
bool needTwoPhase,
struct CdbDispatcherState *ds)
{
DispatchCommandQueryParms queryParms;
DispatchCommandQueryParms *pQueryParms;
Gang *primaryGang;
List *idleReaderGangs;
......@@ -1349,12 +1365,12 @@ cdbdisp_dispatchSetCommandToAllGangs(const char *strCommand,
int queryTextLength;
int gangCount;
MemSet(&queryParms, 0, sizeof(queryParms));
queryParms.strCommand = strCommand;
queryParms.serializedQuerytree = serializedQuerytree;
queryParms.serializedQuerytreelen = serializedQuerytreelen;
queryParms.serializedPlantree = serializedPlantree;
queryParms.serializedPlantreelen = serializedPlantreelen;
pQueryParms = palloc0(sizeof(*pQueryParms));
pQueryParms->strCommand = strCommand;
pQueryParms->serializedQuerytree = serializedQuerytree;
pQueryParms->serializedQuerytreelen = serializedQuerytreelen;
pQueryParms->serializedPlantree = serializedPlantree;
pQueryParms->serializedPlantreelen = serializedPlantreelen;
/*
* Allocate a primary QE for every available segDB in the system.
......@@ -1366,8 +1382,8 @@ cdbdisp_dispatchSetCommandToAllGangs(const char *strCommand,
/*
* serialized a version of our snapshot
*/
queryParms.serializedDtxContextInfo =
qdSerializeDtxContextInfo(&queryParms.serializedDtxContextInfolen,
pQueryParms->serializedDtxContextInfo =
qdSerializeDtxContextInfo(&pQueryParms->serializedDtxContextInfolen,
true /* withSnapshot */ ,
false /* cursor */ ,
mppTxnOptions(needTwoPhase),
......@@ -1383,8 +1399,8 @@ cdbdisp_dispatchSetCommandToAllGangs(const char *strCommand,
ds->primaryResults = NULL;
ds->dispatchParams = NULL;
queryText = buildGpQueryString(ds, &queryParms, &queryTextLength);
cdbdisp_destroyQueryParms(&queryParms);
queryText = buildGpQueryString(ds, pQueryParms, &queryTextLength);
cdbdisp_destroyQueryParms(pQueryParms);
cdbdisp_makeDispatcherState(ds, gangCount, cancelOnError, queryText, queryTextLength);
ds->primaryResults->writer_gang = primaryGang;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册