From 5cfb023d018724b2a4e1ba66c30e06b45b72c787 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 6 Sep 2022 17:26:37 +0800 Subject: [PATCH] enh: add binary serialization method to node structure --- source/libs/nodes/src/nodesMsgFuncs.c | 258 ++++++++++++++++++---- source/libs/planner/test/planTestUtil.cpp | 13 ++ 2 files changed, 230 insertions(+), 41 deletions(-) diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index f5d5bd60cc..12c7c7a37a 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -20,6 +20,9 @@ #define tlvDecodeEnum(pTlv, value) tlvDecodeImpl(pTlv, &(value), sizeof(value)) +#define tlvForEach(pDecoder, pTlv, code) \ + while (TSDB_CODE_SUCCESS == (code = tlvGetNextTlv(pDecoder, &pTlv)) && NULL != pTlv) + typedef struct STlv { int16_t type; int16_t len; @@ -40,10 +43,15 @@ typedef struct STlvDecoder { typedef int32_t (*FToMsg)(const void* pObj, STlvEncoder* pEncoder); typedef int32_t (*FToObject)(STlvDecoder* pDecoder, void* pObj); +typedef void* (*FMakeObject)(int16_t type); typedef int32_t (*FSetObject)(STlv* pTlv, void* pObj); static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder); static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder); +static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj); +static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj); +static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj); +static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj); static int32_t initTlvEncoder(STlvEncoder* pEncoder) { pEncoder->allocSize = NODES_MSG_DEFAULT_LEN; @@ -136,6 +144,11 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co } static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { + if (pDecoder->offset == pDecoder->bufSize) { + *pTlv = NULL; + return TSDB_CODE_SUCCESS; + } + *pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset); if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) { return TSDB_CODE_FAILED; @@ -144,6 +157,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { return TSDB_CODE_SUCCESS; } +static bool tlvDecodeEnd(STlvDecoder* pDecoder) { return pDecoder->offset == pDecoder->bufSize; } + static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int16_t len) { if (pTlv->len != len) { return TSDB_CODE_FAILED; @@ -156,8 +171,17 @@ static int32_t tlvDecodeI8(STlv* pTlv, int8_t* pValue) { return tlvDecodeImpl(pT static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } +static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) { + memcpy(pValue, pTlv->value, pTlv->len); + return TSDB_CODE_SUCCESS; +} + static int32_t tlvDecodeObjFromTlv(STlv* pTlv, FToObject func, void* pObj) { STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; return func(&decoder, pObj); @@ -172,29 +196,25 @@ static int32_t tlvDecodeObj(STlvDecoder* pDecoder, FToObject func, void* pObj) { return code; } -static int32_t tlvDecodeNodeObj(STlvDecoder* pDecoder, FToObject func, void** pObj) { - STlv* pTlv = NULL; - int32_t code = tlvGetNextTlv(pDecoder, &pTlv); - if (TSDB_CODE_SUCCESS == code) { - *pObj = nodesMakeNode(pTlv->type); - if (NULL == *pObj) { - return TSDB_CODE_OUT_OF_MEMORY; - } - STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; - code = func(&decoder, *pObj); +static int32_t tlvDecodeDynObjFromTlv(STlv* pTlv, FMakeObject makeFunc, FToObject toFunc, void** pObj) { + *pObj = makeFunc(pTlv->type); + if (NULL == *pObj) { + return TSDB_CODE_OUT_OF_MEMORY; } - return code; + return tlvDecodeObjFromTlv(pTlv, toFunc, *pObj); } -static int32_t msgToObject(STlvDecoder* pDecoder, FSetObject func, void* pObj) { - STlv* pTlv; +static int32_t tlvDecodeDynObj(STlvDecoder* pDecoder, FMakeObject makeFunc, FToObject toFunc, void** pObj) { + STlv* pTlv = NULL; int32_t code = tlvGetNextTlv(pDecoder, &pTlv); - while (TSDB_CODE_SUCCESS == code && pTlv->len > 0) { - code = func(pTlv, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeDynObjFromTlv(pTlv, makeFunc, toFunc, pObj); } return code; } +static void* makeNodeList(int16_t type) { return nodesMakeList(); } + enum { DATA_TYPE_CODE_TYPE = 1, DATA_TYPE_CODE_PRECISION, DATA_TYPE_CODE_SCALE, DATA_TYPE_CODE_BYTES }; static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -214,6 +234,34 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } +static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { + SDataType* pNode = (SDataType*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case DATA_TYPE_CODE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->type); + break; + case DATA_TYPE_CODE_PRECISION: + code = tlvDecodeU8(pTlv, &pNode->precision); + break; + case DATA_TYPE_CODE_SCALE: + code = tlvDecodeU8(pTlv, &pNode->scale); + break; + case DATA_TYPE_CODE_BYTES: + code = tlvDecodeI32(pTlv, &pNode->bytes); + break; + default: + code = TSDB_CODE_FAILED; + break; + } + } + + return code; +} + enum { EXPR_CODE_RES_TYPE = 1 }; static int32_t exprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -222,8 +270,22 @@ static int32_t exprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { } static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) { - // todo - return TSDB_CODE_SUCCESS; + SExprNode* pNode = (SExprNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case EXPR_CODE_RES_TYPE: + code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->resType); + break; + default: + code = TSDB_CODE_FAILED; + break; + } + } + + return code; } enum { @@ -262,33 +324,41 @@ static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } -static int32_t setColumnNodeFromTlv(STlv* pTlv, void* pObj) { +static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { SColumnNode* pNode = (SColumnNode*)pObj; - switch (pTlv->type) { - case COLUMN_CODE_EXPR_BASE: - return tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); - case COLUMN_CODE_TABLE_ID: - return tlvDecodeU64(pTlv, &pNode->tableId); - case COLUMN_CODE_TABLE_TYPE: - return tlvDecodeI8(pTlv, &pNode->tableType); - case COLUMN_CODE_COLUMN_ID: - return tlvDecodeI16(pTlv, &pNode->colId); - case COLUMN_CODE_COLUMN_TYPE: - return tlvDecodeEnum(pTlv, pNode->colType); - case COLUMN_CODE_DATABLOCK_ID: - return tlvDecodeI16(pTlv, &pNode->dataBlockId); - case COLUMN_CODE_SLOT_ID: - return tlvDecodeI16(pTlv, &pNode->slotId); - default: - break; + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case COLUMN_CODE_EXPR_BASE: + code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); + break; + case COLUMN_CODE_TABLE_ID: + code = tlvDecodeU64(pTlv, &pNode->tableId); + break; + case COLUMN_CODE_TABLE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->tableType); + break; + case COLUMN_CODE_COLUMN_ID: + code = tlvDecodeI16(pTlv, &pNode->colId); + break; + case COLUMN_CODE_COLUMN_TYPE: + code = tlvDecodeEnum(pTlv, pNode->colType); + break; + case COLUMN_CODE_DATABLOCK_ID: + code = tlvDecodeI16(pTlv, &pNode->dataBlockId); + break; + case COLUMN_CODE_SLOT_ID: + code = tlvDecodeI16(pTlv, &pNode->slotId); + break; + default: + code = TSDB_CODE_FAILED; + break; + } } - return TSDB_CODE_FAILED; -} - -static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { - return msgToObject(pDecoder, setColumnNodeFromTlv, pObj); + return code; } enum { SUBPLAN_ID_CODE_QUERY_ID = 1, SUBPLAN_ID_CODE_GROUP_ID, SUBPLAN_ID_CODE_SUBPLAN_ID }; @@ -397,6 +467,55 @@ static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } +static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { + SSubplan* pNode = (SSubplan*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case SUBPLAN_CODE_SUBPLAN_ID: + // code = tlvDecodeObjFromTlv(pTlv, msgToSubplanId, &pNode->id); + break; + case SUBPLAN_CODE_SUBPLAN_TYPE: + code = tlvDecodeEnum(pTlv, pNode->subplanType); + break; + case SUBPLAN_CODE_MSG_TYPE: + code = tlvDecodeI32(pTlv, &pNode->msgType); + break; + case SUBPLAN_CODE_LEVEL: + code = tlvDecodeI32(pTlv, &pNode->level); + break; + case SUBPLAN_CODE_DBFNAME: + code = tlvDecodeCStr(pTlv, pNode->dbFName); + break; + case SUBPLAN_CODE_USER: + code = tlvDecodeCStr(pTlv, pNode->user); + break; + case SUBPLAN_CODE_EXECNODE: + // code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->execNode); + break; + case SUBPLAN_CODE_ROOT_NODE: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pNode); + break; + case SUBPLAN_CODE_DATA_SINK: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pDataSink); + break; + case SUBPLAN_CODE_TAG_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagCond); + break; + case SUBPLAN_CODE_TAG_INDEX_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagIndexCond); + break; + default: + code = TSDB_CODE_FAILED; + break; + } + } + + return code; +} + enum { QUERY_PLAN_CODE_QUERY_ID = 1, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, QUERY_PLAN_CODE_SUBPLANS }; static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -413,6 +532,31 @@ static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) { return code; } +static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) { + SQueryPlan* pNode = (SQueryPlan*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case QUERY_PLAN_CODE_QUERY_ID: + code = tlvDecodeU64(pTlv, &pNode->queryId); + break; + case QUERY_PLAN_CODE_NUM_OF_SUBPLANS: + code = tlvDecodeI32(pTlv, &pNode->numOfSubplans); + break; + case QUERY_PLAN_CODE_SUBPLANS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSubplans); + break; + default: + code = TSDB_CODE_FAILED; + break; + } + } + + return code; +} + static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { switch (nodeType(pObj)) { case QUERY_NODE_COLUMN: @@ -432,6 +576,10 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { switch (nodeType(pObj)) { case QUERY_NODE_COLUMN: return msgToColumnNode(pDecoder, pObj); + case QUERY_NODE_PHYSICAL_SUBPLAN: + return msgToSubplan(pDecoder, pObj); + case QUERY_NODE_PHYSICAL_PLAN: + return msgToQueryPlan(pDecoder, pObj); default: break; }; @@ -444,7 +592,12 @@ static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder) { } static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj) { - return tlvDecodeNodeObj(pDecoder, msgToSpecificNode, pObj); + return tlvDecodeDynObj(pDecoder, (FMakeObject)nodesMakeNode, msgToSpecificNode, pObj); +} + +static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return msgToNode(&decoder, pObj); } static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -461,6 +614,29 @@ static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) { return TSDB_CODE_SUCCESS; } +static int32_t msgToNodeListImpl(STlvDecoder* pDecoder, void* pObj) { + SNodeList* pList = (SNodeList*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + while (TSDB_CODE_SUCCESS == code && !tlvDecodeEnd(pDecoder)) { + SNode* pNode = NULL; + code = msgToNode(pDecoder, (void**)&pNode); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListAppend(pList, pNode); + } + } + return code; +} + +static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) { + return tlvDecodeDynObj(pDecoder, makeNodeList, msgToNodeListImpl, pObj); +} + +static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return msgToNodeList(&decoder, pObj); +} + int32_t nodesNodeToMsg(const SNode* pNode, char** pMsg, int32_t* pLen) { if (NULL == pNode || NULL == pMsg || NULL == pLen) { terrno = TSDB_CODE_FAILED; diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 96f7d29230..5824f042d0 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -251,6 +251,7 @@ class PlannerTestBaseImpl { string splitLogicPlan_; string scaledLogicPlan_; string physiPlan_; + string physiPlanMsg_; vector physiSubplans_; }; @@ -274,6 +275,7 @@ class PlannerTestBaseImpl { res_.splitLogicPlan_.clear(); res_.scaledLogicPlan_.clear(); res_.physiPlan_.clear(); + res_.physiPlanMsg_.clear(); res_.physiSubplans_.clear(); } @@ -321,6 +323,7 @@ class PlannerTestBaseImpl { if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) { cout << "+++++++++++++++++++++physical plan : " << endl; cout << res_.physiPlan_ << endl; + cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl; } if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) { @@ -408,6 +411,7 @@ class PlannerTestBaseImpl { SNode* pSubplan; FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); } } + res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan)); } void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { @@ -452,6 +456,15 @@ class PlannerTestBaseImpl { return str; } + string toMsg(const SNode* pRoot) { + char* pStr = NULL; + int32_t len = 0; + DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len) + string str(pStr, len); + taosMemoryFreeClear(pStr); + return str; + } + caseEnv caseEnv_; stmtEnv stmtEnv_; stmtRes res_; -- GitLab