提交 5cfb023d 编写于 作者: X Xiaoyu Wang

enh: add binary serialization method to node structure

上级 27a687ab
......@@ -20,6 +20,9 @@
#define tlvDecodeEnum(pTlv, value) tlvDecodeImpl(pTlv, &(value), sizeof(value))
#define tlvForEach(pDecoder, pTlv, code) \
while (TSDB_CODE_SUCCESS == (code = tlvGetNextTlv(pDecoder, &pTlv)) && NULL != pTlv)
typedef struct STlv {
int16_t type;
int16_t len;
......@@ -40,10 +43,15 @@ typedef struct STlvDecoder {
typedef int32_t (*FToMsg)(const void* pObj, STlvEncoder* pEncoder);
typedef int32_t (*FToObject)(STlvDecoder* pDecoder, void* pObj);
typedef void* (*FMakeObject)(int16_t type);
typedef int32_t (*FSetObject)(STlv* pTlv, void* pObj);
static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder);
static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder);
static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj);
static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj);
static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj);
static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj);
static int32_t initTlvEncoder(STlvEncoder* pEncoder) {
pEncoder->allocSize = NODES_MSG_DEFAULT_LEN;
......@@ -136,6 +144,11 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co
}
static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
if (pDecoder->offset == pDecoder->bufSize) {
*pTlv = NULL;
return TSDB_CODE_SUCCESS;
}
*pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset);
if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) {
return TSDB_CODE_FAILED;
......@@ -144,6 +157,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
return TSDB_CODE_SUCCESS;
}
static bool tlvDecodeEnd(STlvDecoder* pDecoder) { return pDecoder->offset == pDecoder->bufSize; }
static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int16_t len) {
if (pTlv->len != len) {
return TSDB_CODE_FAILED;
......@@ -156,8 +171,17 @@ static int32_t tlvDecodeI8(STlv* pTlv, int8_t* pValue) { return tlvDecodeImpl(pT
static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) {
memcpy(pValue, pTlv->value, pTlv->len);
return TSDB_CODE_SUCCESS;
}
static int32_t tlvDecodeObjFromTlv(STlv* pTlv, FToObject func, void* pObj) {
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
return func(&decoder, pObj);
......@@ -172,29 +196,25 @@ static int32_t tlvDecodeObj(STlvDecoder* pDecoder, FToObject func, void* pObj) {
return code;
}
static int32_t tlvDecodeNodeObj(STlvDecoder* pDecoder, FToObject func, void** pObj) {
STlv* pTlv = NULL;
int32_t code = tlvGetNextTlv(pDecoder, &pTlv);
if (TSDB_CODE_SUCCESS == code) {
*pObj = nodesMakeNode(pTlv->type);
if (NULL == *pObj) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
code = func(&decoder, *pObj);
static int32_t tlvDecodeDynObjFromTlv(STlv* pTlv, FMakeObject makeFunc, FToObject toFunc, void** pObj) {
*pObj = makeFunc(pTlv->type);
if (NULL == *pObj) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return code;
return tlvDecodeObjFromTlv(pTlv, toFunc, *pObj);
}
static int32_t msgToObject(STlvDecoder* pDecoder, FSetObject func, void* pObj) {
STlv* pTlv;
static int32_t tlvDecodeDynObj(STlvDecoder* pDecoder, FMakeObject makeFunc, FToObject toFunc, void** pObj) {
STlv* pTlv = NULL;
int32_t code = tlvGetNextTlv(pDecoder, &pTlv);
while (TSDB_CODE_SUCCESS == code && pTlv->len > 0) {
code = func(pTlv, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeDynObjFromTlv(pTlv, makeFunc, toFunc, pObj);
}
return code;
}
static void* makeNodeList(int16_t type) { return nodesMakeList(); }
enum { DATA_TYPE_CODE_TYPE = 1, DATA_TYPE_CODE_PRECISION, DATA_TYPE_CODE_SCALE, DATA_TYPE_CODE_BYTES };
static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -214,6 +234,34 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code;
}
static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) {
SDataType* pNode = (SDataType*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case DATA_TYPE_CODE_TYPE:
code = tlvDecodeI8(pTlv, &pNode->type);
break;
case DATA_TYPE_CODE_PRECISION:
code = tlvDecodeU8(pTlv, &pNode->precision);
break;
case DATA_TYPE_CODE_SCALE:
code = tlvDecodeU8(pTlv, &pNode->scale);
break;
case DATA_TYPE_CODE_BYTES:
code = tlvDecodeI32(pTlv, &pNode->bytes);
break;
default:
code = TSDB_CODE_FAILED;
break;
}
}
return code;
}
enum { EXPR_CODE_RES_TYPE = 1 };
static int32_t exprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -222,8 +270,22 @@ static int32_t exprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
}
static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) {
// todo
return TSDB_CODE_SUCCESS;
SExprNode* pNode = (SExprNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case EXPR_CODE_RES_TYPE:
code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->resType);
break;
default:
code = TSDB_CODE_FAILED;
break;
}
}
return code;
}
enum {
......@@ -262,33 +324,41 @@ static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code;
}
static int32_t setColumnNodeFromTlv(STlv* pTlv, void* pObj) {
static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) {
SColumnNode* pNode = (SColumnNode*)pObj;
switch (pTlv->type) {
case COLUMN_CODE_EXPR_BASE:
return tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node);
case COLUMN_CODE_TABLE_ID:
return tlvDecodeU64(pTlv, &pNode->tableId);
case COLUMN_CODE_TABLE_TYPE:
return tlvDecodeI8(pTlv, &pNode->tableType);
case COLUMN_CODE_COLUMN_ID:
return tlvDecodeI16(pTlv, &pNode->colId);
case COLUMN_CODE_COLUMN_TYPE:
return tlvDecodeEnum(pTlv, pNode->colType);
case COLUMN_CODE_DATABLOCK_ID:
return tlvDecodeI16(pTlv, &pNode->dataBlockId);
case COLUMN_CODE_SLOT_ID:
return tlvDecodeI16(pTlv, &pNode->slotId);
default:
break;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case COLUMN_CODE_EXPR_BASE:
code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node);
break;
case COLUMN_CODE_TABLE_ID:
code = tlvDecodeU64(pTlv, &pNode->tableId);
break;
case COLUMN_CODE_TABLE_TYPE:
code = tlvDecodeI8(pTlv, &pNode->tableType);
break;
case COLUMN_CODE_COLUMN_ID:
code = tlvDecodeI16(pTlv, &pNode->colId);
break;
case COLUMN_CODE_COLUMN_TYPE:
code = tlvDecodeEnum(pTlv, pNode->colType);
break;
case COLUMN_CODE_DATABLOCK_ID:
code = tlvDecodeI16(pTlv, &pNode->dataBlockId);
break;
case COLUMN_CODE_SLOT_ID:
code = tlvDecodeI16(pTlv, &pNode->slotId);
break;
default:
code = TSDB_CODE_FAILED;
break;
}
}
return TSDB_CODE_FAILED;
}
static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) {
return msgToObject(pDecoder, setColumnNodeFromTlv, pObj);
return code;
}
enum { SUBPLAN_ID_CODE_QUERY_ID = 1, SUBPLAN_ID_CODE_GROUP_ID, SUBPLAN_ID_CODE_SUBPLAN_ID };
......@@ -397,6 +467,55 @@ static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code;
}
static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) {
SSubplan* pNode = (SSubplan*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case SUBPLAN_CODE_SUBPLAN_ID:
// code = tlvDecodeObjFromTlv(pTlv, msgToSubplanId, &pNode->id);
break;
case SUBPLAN_CODE_SUBPLAN_TYPE:
code = tlvDecodeEnum(pTlv, pNode->subplanType);
break;
case SUBPLAN_CODE_MSG_TYPE:
code = tlvDecodeI32(pTlv, &pNode->msgType);
break;
case SUBPLAN_CODE_LEVEL:
code = tlvDecodeI32(pTlv, &pNode->level);
break;
case SUBPLAN_CODE_DBFNAME:
code = tlvDecodeCStr(pTlv, pNode->dbFName);
break;
case SUBPLAN_CODE_USER:
code = tlvDecodeCStr(pTlv, pNode->user);
break;
case SUBPLAN_CODE_EXECNODE:
// code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->execNode);
break;
case SUBPLAN_CODE_ROOT_NODE:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pNode);
break;
case SUBPLAN_CODE_DATA_SINK:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pDataSink);
break;
case SUBPLAN_CODE_TAG_COND:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagCond);
break;
case SUBPLAN_CODE_TAG_INDEX_COND:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagIndexCond);
break;
default:
code = TSDB_CODE_FAILED;
break;
}
}
return code;
}
enum { QUERY_PLAN_CODE_QUERY_ID = 1, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, QUERY_PLAN_CODE_SUBPLANS };
static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -413,6 +532,31 @@ static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code;
}
static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) {
SQueryPlan* pNode = (SQueryPlan*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case QUERY_PLAN_CODE_QUERY_ID:
code = tlvDecodeU64(pTlv, &pNode->queryId);
break;
case QUERY_PLAN_CODE_NUM_OF_SUBPLANS:
code = tlvDecodeI32(pTlv, &pNode->numOfSubplans);
break;
case QUERY_PLAN_CODE_SUBPLANS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSubplans);
break;
default:
code = TSDB_CODE_FAILED;
break;
}
}
return code;
}
static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN:
......@@ -432,6 +576,10 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN:
return msgToColumnNode(pDecoder, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN:
return msgToSubplan(pDecoder, pObj);
case QUERY_NODE_PHYSICAL_PLAN:
return msgToQueryPlan(pDecoder, pObj);
default:
break;
};
......@@ -444,7 +592,12 @@ static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
}
static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj) {
return tlvDecodeNodeObj(pDecoder, msgToSpecificNode, pObj);
return tlvDecodeDynObj(pDecoder, (FMakeObject)nodesMakeNode, msgToSpecificNode, pObj);
}
static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj) {
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
return msgToNode(&decoder, pObj);
}
static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) {
......@@ -461,6 +614,29 @@ static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) {
return TSDB_CODE_SUCCESS;
}
static int32_t msgToNodeListImpl(STlvDecoder* pDecoder, void* pObj) {
SNodeList* pList = (SNodeList*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
while (TSDB_CODE_SUCCESS == code && !tlvDecodeEnd(pDecoder)) {
SNode* pNode = NULL;
code = msgToNode(pDecoder, (void**)&pNode);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListAppend(pList, pNode);
}
}
return code;
}
static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) {
return tlvDecodeDynObj(pDecoder, makeNodeList, msgToNodeListImpl, pObj);
}
static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj) {
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
return msgToNodeList(&decoder, pObj);
}
int32_t nodesNodeToMsg(const SNode* pNode, char** pMsg, int32_t* pLen) {
if (NULL == pNode || NULL == pMsg || NULL == pLen) {
terrno = TSDB_CODE_FAILED;
......
......@@ -251,6 +251,7 @@ class PlannerTestBaseImpl {
string splitLogicPlan_;
string scaledLogicPlan_;
string physiPlan_;
string physiPlanMsg_;
vector<string> physiSubplans_;
};
......@@ -274,6 +275,7 @@ class PlannerTestBaseImpl {
res_.splitLogicPlan_.clear();
res_.scaledLogicPlan_.clear();
res_.physiPlan_.clear();
res_.physiPlanMsg_.clear();
res_.physiSubplans_.clear();
}
......@@ -321,6 +323,7 @@ class PlannerTestBaseImpl {
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) {
cout << "+++++++++++++++++++++physical plan : " << endl;
cout << res_.physiPlan_ << endl;
cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl;
}
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) {
......@@ -408,6 +411,7 @@ class PlannerTestBaseImpl {
SNode* pSubplan;
FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); }
}
res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan));
}
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
......@@ -452,6 +456,15 @@ class PlannerTestBaseImpl {
return str;
}
string toMsg(const SNode* pRoot) {
char* pStr = NULL;
int32_t len = 0;
DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len)
string str(pStr, len);
taosMemoryFreeClear(pStr);
return str;
}
caseEnv caseEnv_;
stmtEnv stmtEnv_;
stmtRes res_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册