提交 f24b9ab5 编写于 作者: H Heikki Linnakangas

Fix dispatching of queries with record-type parameters.

This fixes the "ERROR:  record type has not been registered" error, when
a record-type variable is used in a query inside a PL/pgSQL function.
This is essentially the same problem we battled with in Motion nodes
in GPDB 5, and added the whole tuple remapper to deal with it. Only this
time, the problem is with record Datums being dispatched from QD to QE,
as Params, rather than with record Datums being transferred across a
Motion.

To fix, send the transient record type cache along with the query
parameters, if there are any of the parameters are transient record types.
This is a bit inefficient, as the transient record type cache can be quite
large. A more fine-grained approach would be to send only those record
types that are actually used in the parameters, but more code would be
required to figure that out. This will do for now.

Refactor the serialization and deserialization of the query parameters, to
leverage the outfast/readfast functions.

Backport to 5X_STABLE. This changes the wire format of query parameters, so
this requires the QD and QE to be on the same minor version. But this does
not change the on-disk format, or the numbering of existing Node tags.

Fixes github issue #4444.
上级 ae07664c
......@@ -23,6 +23,7 @@
#include "cdb/cdbvars.h"
#include "cdb/cdbmutate.h"
#include "cdb/cdbsrlz.h"
#include "cdb/tupleremap.h"
#include "tcop/tcopprot.h"
#include "utils/datum.h"
#include "utils/guc.h"
......@@ -32,6 +33,7 @@
#include "utils/resgroup.h"
#include "utils/resource_manager.h"
#include "utils/session_state.h"
#include "utils/typcache.h"
#include "miscadmin.h"
#include "cdb/cdbdisp.h"
......@@ -143,6 +145,8 @@ cdbdisp_dispatchX(DispatchCommandQueryParms *pQueryParms,
static int *
buildSliceIndexGangIdMap(SliceVec *sliceVec, int numSlices, int numTotalSlices);
static char *serializeParamListInfo(ParamListInfo paramLI, int *len_p);
/*
* Compose and dispatch the MPPEXEC commands corresponding to a plan tree
* within a complete parallel plan. (A plan tree will correspond either
......@@ -621,74 +625,7 @@ cdbdisp_buildPlanQueryParms(struct QueryDesc *queryDesc,
if (queryDesc->params != NULL && queryDesc->params->numParams > 0)
{
ParamListInfoData *pli;
ParamExternData *pxd;
StringInfoData parambuf;
Size length;
int32 iparam;
/*
* Allocate buffer for params
*/
initStringInfo(&parambuf);
/*
* Copy ParamListInfoData header and ParamExternData array
*/
pli = queryDesc->params;
length = (char *) &pli->params[pli->numParams] - (char *) pli;
appendBinaryStringInfo(&parambuf, pli, length);
/*
* Copy pass-by-reference param values.
*/
for (iparam = 0; iparam < queryDesc->params->numParams; iparam++)
{
int16 typlen;
bool typbyval;
/*
* Recompute pli each time in case parambuf.data is repalloc'ed
*/
pli = (ParamListInfoData *) parambuf.data;
pxd = &pli->params[iparam];
if (pxd->ptype == InvalidOid)
continue;
/*
* Does pxd->value contain the value itself, or a pointer?
*/
get_typlenbyval(pxd->ptype, &typlen, &typbyval);
if (!typbyval)
{
char *s = DatumGetPointer(pxd->value);
if (pxd->isnull || !PointerIsValid(s))
{
pxd->isnull = true;
pxd->value = 0;
}
else
{
length = datumGetSize(pxd->value, typbyval, typlen);
/*
* We *must* set this before we
* append. Appending may realloc, which will
* invalidate our pxd ptr. (obviously we could
* append first if we recalculate pxd from the new
* base address)
*/
pxd->value = Int32GetDatum(length);
appendBinaryStringInfo(&parambuf, &iparam, sizeof(iparam));
appendBinaryStringInfo(&parambuf, s, length);
}
}
}
sparams = parambuf.data;
sparams_len = parambuf.len;
sparams = serializeParamListInfo(queryDesc->params, &sparams_len);
}
else
{
......@@ -1529,3 +1466,161 @@ buildSliceIndexGangIdMap(SliceVec *sliceVec, int numSlices, int numTotalSlices)
return sliceIndexGangIdMap;
}
/*
* Serialization of query parameters (ParamListInfos).
*
* When a query is dispatched from QD to QE, we also need to dispatch any
* query parameters, contained in the ParamListInfo struct. We need to
* serialize ParamListInfo, but there are a few complications:
*
* - ParamListInfo is not a Node type, so we cannot use the usual
* nodeToStringBinary() function directly. We turn the array of
* ParamExternDatas into a List of SerializedParamExternData nodes,
* which we can then pass to nodeToStringBinary().
*
* - In order to deserialize correctly, the receiver needs the typlen and
* typbyval information for each datatype. The receiver has access to the
* catalogs, so it could look them up, but for the sake of simplicity and
* robustness in the receiver, we include that information in
* SerializedParamExternData.
*
* - RECORD types. Type information of transient record is kept only in
* backend private memory, indexed by typmod. The recipient will not know
* what a record type's typmod means. And record types can also be nested.
* Because of that, if there are any RECORD, we include a copy of the whole
* transient record type cache.
*
* If there are no record types involved, we dispatch a list of
* SerializedParamListInfos, i.e.
*
* List<SerializedParamListInfo>
*
* With record types, we dispatch:
*
* List(List<TupleDescNode>, List<SerializedParamListInfo>)
*
* XXX: Sending *all* record types can be quite bulky, but ATM there is no
* easy way to extract just the needed record types.
*/
static char *
serializeParamListInfo(ParamListInfo paramLI, int *len_p)
{
int i;
List *sparams;
bool found_records = false;
/* Construct a list of SerializedParamExternData */
sparams = NIL;
for (i = 0; i < paramLI->numParams; i++)
{
ParamExternData *prm = &paramLI->params[i];
SerializedParamExternData *sprm;
sprm = makeNode(SerializedParamExternData);
sprm->value = prm->value;
sprm->isnull = prm->isnull;
sprm->pflags = prm->pflags;
sprm->ptype = prm->ptype;
if (OidIsValid(prm->ptype))
{
get_typlenbyval(prm->ptype, &sprm->plen, &sprm->pbyval);
if (prm->ptype == RECORDOID && !prm->isnull)
{
/*
* Note: We don't want to use lookup_rowtype_tupdesc_copy here, because
* it copies defaults and constraints too. We don't want those.
*/
found_records = true;
}
}
else
{
sprm->plen = 0;
sprm->pbyval = true;
}
sparams = lappend(sparams, sprm);
}
/*
* If there were any record types, include the transient record type cache.
*/
if (found_records)
sparams = lcons(build_tuple_node_list(0), sparams);
return nodeToBinaryStringFast(sparams, len_p);
}
ParamListInfo
deserializeParamListInfo(const char *str, int slen)
{
List *sparams;
ListCell *lc;
TupleRemapper *remapper;
ParamListInfo paramLI;
int numParams;
int iparam;
sparams = (List *) readNodeFromBinaryString(str, slen);
if (!IsA(sparams, List))
elog(ERROR, "could not deserialize query parameters");
if (!sparams)
return NULL;;
/*
* If a transient record type cache was included, load it into
* a TupleRemapper.
*/
if (IsA(linitial(sparams), List))
{
List *typelist = (List *) linitial(sparams);
sparams = list_delete_first(sparams);
remapper = CreateTupleRemapper();
TRHandleTypeLists(remapper, typelist);
}
else
remapper = NULL;
/*
* Build a new ParamListInfo.
*/
numParams = list_length(sparams);
paramLI = palloc(offsetof(ParamListInfoData, params) + numParams * sizeof(ParamExternData));
/* this clears the callback fields, among others */
memset(paramLI, 0, offsetof(ParamListInfoData, params));
paramLI->numParams = numParams;
/*
* Read the ParamExternDatas
*/
iparam = 0;
foreach(lc, sparams)
{
SerializedParamExternData *sprm = (SerializedParamExternData *) lfirst(lc);
ParamExternData *prm = &paramLI->params[iparam];
if (!IsA(sprm, SerializedParamExternData))
elog(ERROR, "could not deserialize query parameters");
prm->ptype = sprm->ptype;
prm->isnull = sprm->isnull;
prm->pflags = sprm->pflags;
/* If remapping record types is needed, do it. */
if (remapper && prm->ptype != InvalidOid)
prm->value = TRRemapDatum(remapper, sprm->ptype, sprm->value);
else
prm->value = sprm->value;
iparam++;
}
return paramLI;
}
......@@ -257,6 +257,29 @@ TRHandleTypeLists(TupleRemapper *remapper, List *typelist)
}
}
/*
* Remap a single Datum, which can be a RECORD datum using the remote system's
* typmods.
*/
Datum
TRRemapDatum(TupleRemapper *remapper, Oid typeid, Datum value)
{
TupleRemapInfo *remapinfo;
bool changed;
remapinfo = BuildTupleRemapInfo(typeid, remapper->mycontext);
if (!remapinfo)
return value;
value = TRRemap(remapper, remapinfo, value, &changed);
pfree(remapinfo);
return value;
}
/*
* Copy the given tuple, remapping any transient typmods contained in it.
*/
......
......@@ -33,6 +33,7 @@
#include <ctype.h>
#include "lib/stringinfo.h"
#include "nodes/params.h"
#include "nodes/parsenodes.h"
#include "nodes/plannodes.h"
#include "nodes/relation.h"
......@@ -1155,6 +1156,12 @@ _outAlterResourceGroupStmt(StringInfo str, AlterResourceGroupStmt *node)
WRITE_NODE_FIELD(options); /* List of DefElem nodes */
}
/*
* Support for serializing TupleDescs and ParamListInfos.
*
* TupleDescs and ParamListInfos are not Nodes as such, but if you wrap them
* in TupleDescNode and ParamListInfoNode structs, we allow serializing them.
*/
static void
_outTupleDescNode(StringInfo str, TupleDescNode *node)
{
......@@ -1177,6 +1184,22 @@ _outTupleDescNode(StringInfo str, TupleDescNode *node)
WRITE_INT_FIELD(tuple->tdrefcount);
}
static void
_outSerializedParamExternData(StringInfo str, SerializedParamExternData *node)
{
WRITE_NODE_TYPE("SERIALIZEDPARAMEXTERNDATA");
WRITE_BOOL_FIELD(isnull);
WRITE_INT16_FIELD(pflags);
WRITE_OID_FIELD(ptype);
WRITE_INT16_FIELD(plen);
WRITE_BOOL_FIELD(pbyval);
if (!node->isnull)
_outDatum(str, node->value, node->plen, node->pbyval);
}
/*
* _outNode -
* converts a Node into binary string and append it to 'str'
......@@ -2045,9 +2068,13 @@ _outNode(StringInfo str, void *obj)
case T_AlterExtensionContentsStmt:
_outAlterExtensionContentsStmt(str, obj);
break;
case T_TupleDescNode:
_outTupleDescNode(str, obj);
break;
case T_SerializedParamExternData:
_outSerializedParamExternData(str, obj);
break;
case T_AlterTSConfigurationStmt:
_outAlterTSConfigurationStmt(str, obj);
......@@ -2069,7 +2096,7 @@ _outNode(StringInfo str, void *obj)
* returns a binary representation of the Node as a palloc'd string
*/
char *
nodeToBinaryStringFast(void *obj, int * length)
nodeToBinaryStringFast(void *obj, int *length)
{
StringInfoData str;
int16 tg = (int16) 0xDEAD;
......@@ -2085,4 +2112,3 @@ nodeToBinaryStringFast(void *obj, int * length)
*length = str.len;
return str.data;
}
......@@ -2591,6 +2591,23 @@ _readTupleDescNode(void)
READ_DONE();
}
static SerializedParamExternData *
_readSerializedParamExternData(void)
{
READ_LOCALS(SerializedParamExternData);
READ_BOOL_FIELD(isnull);
READ_INT16_FIELD(pflags);
READ_OID_FIELD(ptype);
READ_INT16_FIELD(plen);
READ_BOOL_FIELD(pbyval);
if (!local_node->isnull)
local_node->value = readDatum(local_node->pbyval);
READ_DONE();
}
static AlterExtensionStmt *
_readAlterExtensionStmt(void)
{
......@@ -3491,9 +3508,13 @@ readNodeBinary(void)
case T_AlterExtensionContentsStmt:
return_value = _readAlterExtensionContentsStmt();
break;
case T_TupleDescNode:
return_value = _readTupleDescNode();
break;
case T_SerializedParamExternData:
return_value = _readSerializedParamExternData();
break;
case T_AlterTSConfigurationStmt:
return_value = _readAlterTSConfigurationStmt();
......
......@@ -1194,60 +1194,10 @@ exec_mpp_query(const char *query_string,
/*
* Get (possibly 0) parameters.
*/
paramLI = NULL;
if (serializedParams != NULL && serializedParamslen > 0)
{
ParamListInfoData paramhdr;
Size length;
const char *cpos;
const char *epos;
/* Peek at header using an aligned workarea. */
length = offsetof(ParamListInfoData, params);
Insist(length <= serializedParamslen);
memcpy(&paramhdr, serializedParams, length);
/* Get ParamListInfoData header and ParamExternData array. */
length += paramhdr.numParams * sizeof(paramhdr.params[0]);
Insist(paramhdr.numParams > 0 &&
length <= serializedParamslen);
paramLI = palloc(length);
memcpy(paramLI, serializedParams, length);
/* Get pass-by-reference data. */
cpos = serializedParams + length;
epos = serializedParams + serializedParamslen;
while (cpos < epos)
{
ParamExternData *pxd;
int32 iparam;
/* param index */
memcpy(&iparam, cpos, sizeof(iparam));
cpos += sizeof(iparam);
Insist(cpos <= epos &&
iparam >= 0 &&
iparam < paramhdr.numParams);
/* length */
pxd = &paramLI->params[iparam];
length = DatumGetInt32(pxd->value);
/* value */
Insist((int)length >= 0 &&
length <= epos - cpos);
if (length > 0)
{
char *v = (char *)palloc(length);
pxd->value = PointerGetDatum(v);
memcpy(v, cpos, length);
cpos += length;
}
}
Insist(cpos == epos);
}
paramLI = deserializeParamListInfo(serializedParams, serializedParamslen);
else
paramLI = NULL;
/*
* Switch back to transaction context to enter the loop.
......
......@@ -106,4 +106,7 @@ CdbDispatchUtilityStatement(struct Node *stmt,
List *oid_assignments,
struct CdbPgResults* cdb_pgresults);
extern ParamListInfo deserializeParamListInfo(const char *str, int slen);
#endif /* CDBDISP_QUERY_H */
......@@ -22,5 +22,6 @@ extern TupleRemapper *CreateTupleRemapper(void);
extern void DestroyTupleRemapper(TupleRemapper *remapper);
extern GenericTuple TRCheckAndRemap(TupleRemapper *remapper, TupleDesc tupledesc, GenericTuple tuple);
extern void TRHandleTypeLists(TupleRemapper *remapper, List *typelist);
extern Datum TRRemapDatum(TupleRemapper *remapper, Oid typeid, Datum value);
#endif /* TUPLEREMAP_H */
......@@ -164,7 +164,13 @@ typedef enum NodeTag
T_RowTriggerState,
T_AssertOpState,
T_PartitionSelectorState,
/*
* TupleDesc and ParamListInfo are not Nodes as such, but you can wrap
* them in TupleDescNode and SerializedParamExternData structs for serialization.
*/
T_TupleDescNode,
T_SerializedParamExternData,
/*
* TAGS FOR PRIMITIVE NODES (primnodes.h)
......@@ -601,9 +607,10 @@ extern char *nodeToString(void *obj);
* nodes/outfast.c. This special version of nodeToString is only used by serializeNode.
* It's a quick hack that allocates 8K buffer for StringInfo struct through initStringIinfoSizeOf
*/
extern char *nodeToBinaryStringFast(void *obj, int * size);
extern char *nodeToBinaryStringFast(void *obj, int *length);
extern Node *readNodeFromBinaryString(const char *str, int len);
/*
* nodes/{readfuncs.c,read.c}
*/
......
......@@ -54,6 +54,26 @@ typedef struct ParamListInfoData
typedef ParamListInfoData *ParamListInfo;
/*
* Serialized form of ParamExternData. This is used when query parameters
* are serialized, when dispatching a query from QD to QEs.
*/
typedef struct SerializedParamExternData
{
NodeTag type;
/* Fields from ParamExternData */
Datum value; /* parameter value */
bool isnull; /* is it NULL? */
uint16 pflags; /* flag bits, see above */
Oid ptype; /* parameter's datatype, or 0 */
/* Extra information about the type */
int16 plen;
bool pbyval;
} SerializedParamExternData;
/* ----------------
* ParamExecData
*
......
......@@ -194,4 +194,43 @@ SELECT foo() FROM t_nomap;
DROP TABLE t_nomap;
DROP FUNCTION foo();
--
-- Use a transient record type in a parameter of a query that's dispatched to segments
--
-- For these tests, the QD needs to serialize a transient record type to the QE, while
-- all the previous tests were for the other direction, in Motion nodes.
--
drop table if exists t;
create table t as select i as id from generate_series(1,8) i;
insert into t values (100000);
do $$
declare
r record;
nestedr record;
result record;
begin
select 1 as i, 2 AS j into r;
select r as nestedrec, 'foo' AS foo into nestedr;
raise notice 'r: %', r;
raise notice 'nestedr: %', nestedr;
-- Reference one field of a 'record' variable.
select * into result from t where id=r.i;
raise notice 'result 1: %', result.id;
-- Same with a nested 'record' variable
select * into result from t where id > length(nestedr.nestedrec::text) + 1000;
raise notice 'result 2: %', result.id;
-- Reference the record variable itself.
select * into result from t where id > length(nestedr::text) + 1000;
raise notice 'result 3: %', result.id;
end;
$$;
drop table if exists t;
drop schema transient_types;
......@@ -447,4 +447,45 @@ SELECT foo() FROM t_nomap;
DROP TABLE t_nomap;
DROP FUNCTION foo();
--
-- Use a transient record type in a parameter of a query that's dispatched to segments
--
-- For these tests, the QD needs to serialize a transient record type to the QE, while
-- all the previous tests were for the other direction, in Motion nodes.
--
drop table if exists t;
create table t as select i as id from generate_series(1,8) i;
insert into t values (100000);
do $$
declare
r record;
nestedr record;
result record;
begin
select 1 as i, 2 AS j into r;
select r as nestedrec, 'foo' AS foo into nestedr;
raise notice 'r: %', r;
raise notice 'nestedr: %', nestedr;
-- Reference one field of a 'record' variable.
select * into result from t where id=r.i;
raise notice 'result 1: %', result.id;
-- Same with a nested 'record' variable
select * into result from t where id > length(nestedr.nestedrec::text) + 1000;
raise notice 'result 2: %', result.id;
-- Reference the record variable itself.
select * into result from t where id > length(nestedr::text) + 1000;
raise notice 'result 3: %', result.id;
end;
$$;
NOTICE: r: (1,2)
NOTICE: nestedr: ("(1,2)",foo)
NOTICE: result 1: 1
NOTICE: result 2: 100000
NOTICE: result 3: 100000
drop table if exists t;
drop schema transient_types;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册