提交 11e4aa66 编写于 作者: N Ning Yu 提交者: GitHub

Simplify tuple serialization in Motion nodes.

* Simplify tuple serialization in Motion nodes.

There is a fast-path for tuples that contain no toasted attributes,
which writes the raw tuple almost as is. However, the slow path is
significantly more complicated, calling each attribute's binary
send/receive functions (although there's a fast-path for a few
built-in datatypes). I don't see any need for calling I/O functions
here. We can just write the raw Datum on the wire. If that works
for tuples with no toasted attributes, it should work for all tuples,
if we just detoast any toasted attributes first.

This makes the code a lot simpler, and also fixes a bug with data
types that don't have a binary send/receive routines. We used to
call the regular (text) I/O functions in that case, but didn't handle
the resulting cstring correctly.

Diagnosis and test case by Foyzur Rahman.
Signed-off-by: NHaisheng Yuan <hyuan@pivotal.io>
Signed-off-by: NNing Yu <nyu@pivotal.io>
上级 79202d91
......@@ -182,13 +182,9 @@ InitSerTupInfo(TupleDesc tupdesc, SerTupInfo * pSerInfo)
attrInfo->atttypid = tupdesc->attrs[i]->atttypid;
/*
* Ok, we want the Binary input/output routines for the type if they exist,
* else we want the normal text input/output routines.
*
* User defined types might or might not have binary routines.
*
* getTypeBinaryOutputInfo throws an error if we try to call it to get
* the binary output routine and one doesn't exist, so let's not call that.
* Serialization will be performed at a high level abstraction,
* we only care about whether it's toasted or pass-by-value or
* a CString, so only track the high level type information.
*/
{
HeapTuple typeTuple;
......@@ -210,55 +206,11 @@ InitSerTupInfo(TupleDesc tupdesc, SerTupInfo * pSerInfo)
errmsg("type %s is only a shell",
format_type_be(attrInfo->atttypid))));
/* If we don't have both binary routines */
if (!OidIsValid(pt->typsend) || !OidIsValid(pt->typreceive))
{
/* Use the normal text routines (slower) */
if (!OidIsValid(pt->typoutput))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("no output function available for type %s",
format_type_be(attrInfo->atttypid))));
if (!OidIsValid(pt->typinput))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("no input function available for type %s",
format_type_be(attrInfo->atttypid))));
attrInfo->typsend = pt->typoutput;
attrInfo->send_typio_param = getTypeIOParam(typeTuple);
attrInfo->typisvarlena = (!pt->typbyval) && (pt->typlen == -1);
attrInfo->typrecv = pt->typinput;
attrInfo->recv_typio_param = getTypeIOParam(typeTuple);
}
else
{
/* Use binary routines */
attrInfo->typsend = pt->typsend;
attrInfo->send_typio_param = getTypeIOParam(typeTuple);
attrInfo->typisvarlena = (!pt->typbyval) && (pt->typlen == -1);
attrInfo->typrecv = pt->typreceive;
attrInfo->recv_typio_param = getTypeIOParam(typeTuple);
}
attrInfo->typlen = pt->typlen;
attrInfo->typbyval = pt->typbyval;
ReleaseSysCache(typeTuple);
}
fmgr_info(attrInfo->typsend, &attrInfo->send_finfo);
fmgr_info(attrInfo->typrecv, &attrInfo->recv_finfo);
#ifdef TUPSER_SCRATCH_SPACE
/*
* If the field is a varlena, allocate some space to use for
* deserializing it. If most of the values are smaller than this
* scratch-space then we save time on allocation and freeing.
*/
attrInfo->pv_varlen_scratch = palloc(VARLEN_SCRATCH_SIZE);
attrInfo->varlen_scratch_size = VARLEN_SCRATCH_SIZE;
#endif
}
}
......@@ -483,7 +435,6 @@ SerializeTupleIntoChunks(HeapTuple tuple, SerTupInfo * pSerInfo, TupleChunkList
TupleDesc tupdesc;
int i,
natts;
bool fHandled;
AssertArg(tcList != NULL);
AssertArg(tuple != NULL);
......@@ -596,164 +547,63 @@ SerializeTupleIntoChunks(HeapTuple tuple, SerTupInfo * pSerInfo, TupleChunkList
SerAttrInfo *attrInfo = pSerInfo->myinfo + i;
Datum origattr = pSerInfo->values[i],
attr;
bytea *outputbytes=0;
/* skip null attributes (already taken care of above) */
if (pSerInfo->nulls[i])
continue;
/*
* If we have a toasted datum, forcibly detoast it here to avoid
* memory leakage: we want to force the detoast allocation(s) to
* happen in our reset-able serialization context.
*/
if (attrInfo->typisvarlena)
if (attrInfo->typlen == -1)
{
int32 sz;
char *data;
/*
* If we have a toasted datum, forcibly detoast it here to avoid
* memory leakage: we want to force the detoast allocation(s) to
* happen in our reset-able serialization context.
*/
oldCtxt = MemoryContextSwitchTo(s_tupSerMemCtxt);
/* we want to detoast but leave compressed, if
* possible, but we have to handle varlena
* attributes (and others ?) differently than we
* currently do (first step is to use
* heap_tuple_fetch_attr() instead of
* PG_DETOAST_DATUM()). */
attr = PointerGetDatum(PG_DETOAST_DATUM(origattr));
attr = PointerGetDatum(PG_DETOAST_DATUM_PACKED(origattr));
MemoryContextSwitchTo(oldCtxt);
}
else
attr = origattr;
/*
* Assume that the data's output will be handled by the special IO
* code, and if not then we can handle it the slow way.
*/
fHandled = true;
switch (attrInfo->atttypid)
{
case INT4OID:
addInt32ToChunkList(tcList, DatumGetInt32(attr), &pSerInfo->chunkCache);
break;
case CHAROID:
addCharToChunkList(tcList, DatumGetChar(attr), &pSerInfo->chunkCache);
addPadding(tcList,&pSerInfo->chunkCache,1);
break;
case BPCHAROID:
case VARCHAROID:
case INT2VECTOROID: /* postgres serialization logic broken, use our own */
case OIDVECTOROID: /* postgres serialization logic broken, use our own */
case ANYARRAYOID:
{
text *pText = DatumGetTextP(attr);
int32 textSize = VARSIZE(pText) - VARHDRSZ;
addInt32ToChunkList(tcList, textSize, &pSerInfo->chunkCache);
addByteStringToChunkList(tcList, (char *) VARDATA(pText), textSize, &pSerInfo->chunkCache);
addPadding(tcList,&pSerInfo->chunkCache,textSize);
break;
}
case DATEOID:
{
DateADT date = DatumGetDateADT(attr);
addByteStringToChunkList(tcList, (char *) &date, sizeof(DateADT), &pSerInfo->chunkCache);
break;
}
case NUMERICOID:
{
/*
* Treat the numeric as a varlena variable, and just push
* the whole shebang to the output-buffer. We don't care
* about the guts of the numeric.
*/
Numeric num = DatumGetNumeric(attr);
int32 numSize = VARSIZE(num) - VARHDRSZ;
addInt32ToChunkList(tcList, numSize, &pSerInfo->chunkCache);
addByteStringToChunkList(tcList, (char *) VARDATA(num), numSize, &pSerInfo->chunkCache);
addPadding(tcList,&pSerInfo->chunkCache,numSize);
break;
}
case ACLITEMOID:
{
AclItem *aip = DatumGetAclItemP(attr);
char *outputstring;
int32 aclSize ;
outputstring = DatumGetCString(DirectFunctionCall1(aclitemout,
PointerGetDatum(aip)));
aclSize = strlen(outputstring);
addInt32ToChunkList(tcList, aclSize, &pSerInfo->chunkCache);
addByteStringToChunkList(tcList, outputstring,aclSize, &pSerInfo->chunkCache);
addPadding(tcList,&pSerInfo->chunkCache,aclSize);
break;
}
case 210: /* storage manager */
{
char *smgrstr;
int32 strsize;
smgrstr = DatumGetCString(DirectFunctionCall1(smgrout, 0));
strsize = strlen(smgrstr);
addInt32ToChunkList(tcList, strsize, &pSerInfo->chunkCache);
addByteStringToChunkList(tcList, smgrstr, strsize, &pSerInfo->chunkCache);
addPadding(tcList,&pSerInfo->chunkCache,strsize);
break;
}
default:
fHandled = false;
sz = VARSIZE_ANY_EXHDR(attr);
data = VARDATA_ANY(attr);
/* Send length first, then data */
addInt32ToChunkList(tcList, sz, &pSerInfo->chunkCache);
addByteStringToChunkList(tcList, data, sz, &pSerInfo->chunkCache);
addPadding(tcList, &pSerInfo->chunkCache, sz);
}
else if (attrInfo->typlen == -2)
{
int32 sz;
char *data;
if (fHandled)
continue;
/* CString, we would send the string with the terminating '\0' */
data = DatumGetCString(origattr);
sz = strlen(data) + 1;
/*
* the FunctionCall2 call into the send function may result in some
* allocations which we'd like to have contained by our reset-able
* context
*/
oldCtxt = MemoryContextSwitchTo(s_tupSerMemCtxt);
/* Call the attribute type's binary input converter. */
if (attrInfo->send_finfo.fn_nargs == 1)
outputbytes =
DatumGetByteaP(FunctionCall1(&attrInfo->send_finfo,
attr));
else if (attrInfo->send_finfo.fn_nargs == 2)
outputbytes =
DatumGetByteaP(FunctionCall2(&attrInfo->send_finfo,
attr,
ObjectIdGetDatum(attrInfo->send_typio_param)));
else if (attrInfo->send_finfo.fn_nargs == 3)
outputbytes =
DatumGetByteaP(FunctionCall3(&attrInfo->send_finfo,
attr,
ObjectIdGetDatum(attrInfo->send_typio_param),
Int32GetDatum(tupdesc->attrs[i]->atttypmod)));
else
/* Send length first, then data */
addInt32ToChunkList(tcList, sz, &pSerInfo->chunkCache);
addByteStringToChunkList(tcList, data, sz, &pSerInfo->chunkCache);
addPadding(tcList, &pSerInfo->chunkCache, sz);
}
else if (attrInfo->typbyval)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("Conversion function takes %d args",attrInfo->recv_finfo.fn_nargs)));
/*
* We send a full-width Datum for all pass-by-value types, regardless of
* the actual size.
*/
addByteStringToChunkList(tcList, (char *) &origattr, sizeof(Datum), &pSerInfo->chunkCache);
addPadding(tcList, &pSerInfo->chunkCache, sizeof(Datum));
}
MemoryContextSwitchTo(oldCtxt);
/* We assume the result will not have been toasted */
addInt32ToChunkList(tcList, VARSIZE(outputbytes) - VARHDRSZ, &pSerInfo->chunkCache);
addByteStringToChunkList(tcList, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ, &pSerInfo->chunkCache);
addPadding(tcList,&pSerInfo->chunkCache,VARSIZE(outputbytes) - VARHDRSZ);
/*
* this was allocated in our reset-able context, but we *are* done
* with it; and for tuples with several large columns it'd be nice to
* free the memory back to the context
*/
pfree(outputbytes);
else
{
addByteStringToChunkList(tcList, DatumGetPointer(origattr), attrInfo->typlen, &pSerInfo->chunkCache);
addPadding(tcList, &pSerInfo->chunkCache, attrInfo->typlen);
attr = origattr;
}
}
MemoryContextReset(s_tupSerMemCtxt);
......@@ -912,11 +762,7 @@ DeserializeTuple(SerTupInfo * pSerInfo, StringInfo serialTup)
HeapTuple htup;
int natts;
SerAttrInfo *attrInfo;
uint32 attr_size;
int i;
StringInfoData attr_data;
bool fHandled;
AssertArg(pSerInfo != NULL);
AssertArg(serialTup != NULL);
......@@ -936,7 +782,6 @@ DeserializeTuple(SerTupInfo * pSerInfo, StringInfo serialTup)
skipPadding(serialTup);
/* Deserialize the non-NULL attributes of this tuple */
initStringInfo(&attr_data);
for (i = 0; i < natts; ++i)
{
attrInfo = pSerInfo->myinfo + i;
......@@ -947,180 +792,56 @@ DeserializeTuple(SerTupInfo * pSerInfo, StringInfo serialTup)
continue;
}
/*
* Assume that the data's output will be handled by the special IO
* code, and if not then we can handle it the slow way.
*/
fHandled = true;
switch (attrInfo->atttypid)
if (attrInfo->typlen == -1)
{
case INT4OID:
pSerInfo->values[i] = Int32GetDatum(stringInfoGetInt32(serialTup));
break;
case CHAROID:
pSerInfo->values[i] = CharGetDatum(pq_getmsgbyte(serialTup));
skipPadding(serialTup);
break;
case BPCHAROID:
case VARCHAROID:
case INT2VECTOROID: /* postgres serialization logic broken, use our own */
case OIDVECTOROID: /* postgres serialization logic broken, use our own */
case ANYARRAYOID:
{
text *pText;
int textSize;
int32 sz;
struct varlena *p;
textSize = stringInfoGetInt32(serialTup);
/* Read length first */
pq_copymsgbytes(serialTup, (char *) &sz, sizeof(int32));
if (sz < 0)
elog(ERROR, "invalid length received for a varlen Datum");
#ifdef TUPSER_SCRATCH_SPACE
if (textSize + VARHDRSZ <= attrInfo->varlen_scratch_size)
pText = (text *) attrInfo->pv_varlen_scratch;
else
pText = (text *) palloc(textSize + VARHDRSZ);
#else
pText = (text *) palloc(textSize + VARHDRSZ);
#endif
SET_VARSIZE(pText, textSize + VARHDRSZ);
pq_copymsgbytes(serialTup, VARDATA(pText), textSize);
skipPadding(serialTup);
pSerInfo->values[i] = PointerGetDatum(pText);
break;
}
p = palloc(sz + VARHDRSZ);
case DATEOID:
{
/*
* TODO: I would LIKE to do something more efficient, but
* DateADT is not strictly limited to 4 bytes by its
* definition.
*/
DateADT date;
pq_copymsgbytes(serialTup, (char *) &date, sizeof(DateADT));
skipPadding(serialTup);
pSerInfo->values[i] = DateADTGetDatum(date);
break;
}
pq_copymsgbytes(serialTup, VARDATA(p), sz);
SET_VARSIZE(p, sz + VARHDRSZ);
case NUMERICOID:
{
/*
* Treat the numeric as a varlena variable, and just push
* the whole shebang to the output-buffer. We don't care
* about the guts of the numeric.
*/
Numeric num;
int numSize;
numSize = stringInfoGetInt32(serialTup);
#ifdef TUPSER_SCRATCH_SPACE
if (numSize + VARHDRSZ <= attrInfo->varlen_scratch_size)
num = (Numeric) attrInfo->pv_varlen_scratch;
else
num = (Numeric) palloc(numSize + VARHDRSZ);
#else
num = (Numeric) palloc(numSize + VARHDRSZ);
#endif
SET_VARSIZE(num, numSize + VARHDRSZ);
pq_copymsgbytes(serialTup, VARDATA(num), numSize);
skipPadding(serialTup);
pSerInfo->values[i] = NumericGetDatum(num);
break;
}
pSerInfo->values[i] = PointerGetDatum(p);
}
else if (attrInfo->typlen == -2)
{
int32 sz;
char *p;
case ACLITEMOID:
{
int aclSize, k, cnt;
char *inputstring, *starsfree;
aclSize = stringInfoGetInt32(serialTup);
inputstring = (char*) palloc(aclSize + 1);
starsfree = (char*) palloc(aclSize + 1);
cnt = 0;
pq_copymsgbytes(serialTup, inputstring, aclSize);
skipPadding(serialTup);
inputstring[aclSize] = '\0';
for(k=0; k<aclSize; k++)
{
if( inputstring[k] != '*')
{
starsfree[cnt] = inputstring[k];
cnt++;
}
}
starsfree[cnt] = '\0';
/* CString, with terminating '\0' included */
pSerInfo->values[i] = DirectFunctionCall1(aclitemin, CStringGetDatum(starsfree));
pfree(inputstring);
break;
}
/* Read length first */
pq_copymsgbytes(serialTup, (char *) &sz, sizeof(int32));
if (sz < 0)
elog(ERROR, "invalid length received for a CString");
case 210:
{
int strsize;
char *smgrstr;
p = palloc(sz + VARHDRSZ);
strsize = stringInfoGetInt32(serialTup);
smgrstr = (char*) palloc(strsize + 1);
pq_copymsgbytes(serialTup, smgrstr, strsize);
skipPadding(serialTup);
smgrstr[strsize] = '\0';
/* Then data */
pq_copymsgbytes(serialTup, p, sz);
pSerInfo->values[i] = DirectFunctionCall1(smgrin, CStringGetDatum(smgrstr));
break;
}
default:
fHandled = false;
pSerInfo->values[i] = CStringGetDatum(p);
}
else if (attrInfo->typbyval)
{
/* Read a whole Datum */
if (fHandled)
continue;
attr_size = stringInfoGetInt32(serialTup);
/* reset attr_data to empty, and load raw data into it */
attr_data.len = 0;
attr_data.data[0] = '\0';
attr_data.cursor = 0;
appendBinaryStringInfo(&attr_data,
pq_getmsgbytes(serialTup, attr_size), attr_size);
skipPadding(serialTup);
/* Call the attribute type's binary input converter. */
if (attrInfo->recv_finfo.fn_nargs == 1)
pSerInfo->values[i] = FunctionCall1(&attrInfo->recv_finfo,
PointerGetDatum(&attr_data));
else if (attrInfo->recv_finfo.fn_nargs == 2)
pSerInfo->values[i] = FunctionCall2(&attrInfo->recv_finfo,
PointerGetDatum(&attr_data),
ObjectIdGetDatum(attrInfo->recv_typio_param));
else if (attrInfo->recv_finfo.fn_nargs == 3)
pSerInfo->values[i] = FunctionCall3(&attrInfo->recv_finfo,
PointerGetDatum(&attr_data),
ObjectIdGetDatum(attrInfo->recv_typio_param),
Int32GetDatum(tupdesc->attrs[i]->atttypmod) );
pq_copymsgbytes(serialTup, (char *) &(pSerInfo->values[i]), sizeof(Datum));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("Conversion function takes %d args",attrInfo->recv_finfo.fn_nargs)));
}
/* fixed width, pass-by-ref */
char *p = palloc(attrInfo->typlen);
/* Trouble if it didn't eat the whole buffer */
if (attr_data.cursor != attr_data.len)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format")));
pq_copymsgbytes(serialTup, p, attrInfo->typlen);
pSerInfo->values[i] = PointerGetDatum(p);
}
}
......@@ -1134,6 +855,12 @@ DeserializeTuple(SerTupInfo * pSerInfo, StringInfo serialTup)
MemoryContextReset(s_tupSerMemCtxt);
/* Trouble if it didn't eat the whole buffer */
if (serialTup->cursor != serialTup->len)
ereport(ERROR,
(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
errmsg("incorrect binary data format")));
/* All done. Return the result. */
return htup;
}
......
......@@ -21,19 +21,6 @@
#include "cdb/tupleremap.h"
/* Define this to pack the NULLs-mask into the minimum number of bytes
* possible. If undefined, the NULLs-sequence is sent as one character per
* attribute.
*/
#undef TUPSER_BITPACK_NULLMASK
/* Define this to allocate scratch-space for varlena attribute-values, so that
* tuple-deserialization doesn't have to allocate space if the varlena's value
* is smaller than the scratch size.
*/
#undef TUPSER_SCRATCH_SPACE
#define VARLEN_SCRATCH_SIZE 500
typedef struct MotionConn MotionConn;
/*
......@@ -53,21 +40,8 @@ typedef struct MotionConn MotionConn;
typedef struct SerAttrInfo
{
Oid atttypid; /* Oid of the attribute's data-type. */
bool typisvarlena; /* is type varlena (ie possibly toastable)? */
Oid typsend; /* Oid for the type's binary output fn */
Oid send_typio_param; /* param to pass to the output fn */
FmgrInfo send_finfo; /* Precomputed call info for output fn */
Oid typrecv; /* Oid for the type's binary input fn */
Oid recv_typio_param; /* param to pass to the input fn */
FmgrInfo recv_finfo; /* Precomputed call info for output fn */
#ifdef TUPSER_SCRATCH_SPACE
void *pv_varlen_scratch; /* For deserializing varlena
* attributes. */
int varlen_scratch_size; /* Size of varlena scratch space. */
#endif
int16 typlen;
bool typbyval;
} SerAttrInfo;
/* The information for sending and receiving tuples that match a particular
......
-- Check if motion layer correctly serialize & deserialize tuples in particular cases
CREATE TYPE incomplete_type;
CREATE FUNCTION incomplete_type_in(cstring)
RETURNS incomplete_type
AS 'textin'
LANGUAGE internal IMMUTABLE STRICT;
NOTICE: return type incomplete_type is only a shell
CREATE FUNCTION incomplete_type_out(incomplete_type)
RETURNS cstring
AS 'textout'
LANGUAGE internal IMMUTABLE STRICT;
NOTICE: argument type incomplete_type is only a shell
CREATE TYPE incomplete_type (
internallength = variable,
input = incomplete_type_in,
output = incomplete_type_out,
alignment = double,
storage = EXTENDED,
default = 'zippo'
);
CREATE TABLE table_with_incomplete_type (id int, incomplete incomplete_type);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
INSERT INTO table_with_incomplete_type(id, incomplete) VALUES(1, repeat('abcde', 1000000)::incomplete_type);
-- Turn off output temporarily as the output is quite large
\o /dev/null
SELECT * FROM table_with_incomplete_type;
\o
DROP TABLE table_with_incomplete_type;
DROP TYPE incomplete_type CASCADE;
NOTICE: drop cascades to function incomplete_type_out(incomplete_type)
NOTICE: drop cascades to function incomplete_type_in(cstring)
......@@ -105,6 +105,8 @@ test: aggregate_with_groupingsets
test: nested_case_null sort bb_mpph
test: bb_memory_quota memconsumption
test: tuple_serialization
# NOTE: The bfv_temp test assumes that there are no temporary tables in
# other sessions. Therefore the other tests in this group mustn't create
# temp tables
......
-- Check if motion layer correctly serialize & deserialize tuples in particular cases
CREATE TYPE incomplete_type;
CREATE FUNCTION incomplete_type_in(cstring)
RETURNS incomplete_type
AS 'textin'
LANGUAGE internal IMMUTABLE STRICT;
CREATE FUNCTION incomplete_type_out(incomplete_type)
RETURNS cstring
AS 'textout'
LANGUAGE internal IMMUTABLE STRICT;
CREATE TYPE incomplete_type (
internallength = variable,
input = incomplete_type_in,
output = incomplete_type_out,
alignment = double,
storage = EXTENDED,
default = 'zippo'
);
CREATE TABLE table_with_incomplete_type (id int, incomplete incomplete_type);
INSERT INTO table_with_incomplete_type(id, incomplete) VALUES(1, repeat('abcde', 1000000)::incomplete_type);
-- Turn off output temporarily as the output is quite large
\o /dev/null
SELECT * FROM table_with_incomplete_type;
\o
DROP TABLE table_with_incomplete_type;
DROP TYPE incomplete_type CASCADE;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册