diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 9c2fdf587b16e3148ba99ebe75dbe102c97e38a2..6500d3d1831e817c497406e574b721594e63e209 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -320,6 +320,9 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode); int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen); int32_t nodesStringToList(const char* pStr, SNodeList** pList); +int32_t nodesNodeToMsg(const SNode* pNode, char** pMsg, int32_t* pLen); +int32_t nodesMsgToNode(const char* pStr, int32_t len, SNode** pNode); + int32_t nodesNodeToSQL(SNode* pNode, char* buf, int32_t bufSize, int32_t* len); char* nodesGetNameFromColumnNode(SNode* pNode); int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 8976daadbdada2365dbb2e3bf2af4f807c4ac26f..a41462e3fd879a73baeb37ee829724fda560719c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -362,18 +362,14 @@ static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags, code); - ; if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize, code); - ; } return code; @@ -406,14 +402,11 @@ static int32_t jsonToSchema(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkSchemaType, pNode->type, code); - ; if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name); @@ -466,26 +459,20 @@ static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId, code); - ; if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo); @@ -926,7 +913,6 @@ static int32_t jsonToLogicFillNode(const SJson* pJson, void* pObj) { int32_t code = jsonToLogicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillLogicPlanWStartTs, &pNode->pWStartTs); @@ -2815,7 +2801,6 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) { } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkColumnDbName, pNode->dbName); @@ -3118,7 +3103,6 @@ static int32_t jsonToOperatorNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkOperatorLeft, &pNode->pLeft); @@ -3153,7 +3137,6 @@ static int32_t jsonToLogicConditionNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkLogicCondParameters, &pNode->pParameterList); @@ -3442,11 +3425,9 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) { int32_t code = jsonToNodeObject(pJson, jkOrderByExprExpr, &pNode->pExpr); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder, code); - ; } return code; @@ -3624,7 +3605,6 @@ static int32_t jsonToFillNode(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkFillMode, pNode->mode, code); - ; if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillValues, &pNode->pValues); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c new file mode 100644 index 0000000000000000000000000000000000000000..f5d5bd60cccb2d94dc3186046c3877e8e815be6c --- /dev/null +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -0,0 +1,498 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "nodesUtil.h" +#include "plannodes.h" + +#define NODES_MSG_DEFAULT_LEN 1024 + +#define tlvDecodeEnum(pTlv, value) tlvDecodeImpl(pTlv, &(value), sizeof(value)) + +typedef struct STlv { + int16_t type; + int16_t len; + char value[0]; +} STlv; + +typedef struct STlvEncoder { + int32_t allocSize; + int32_t offset; + char* pBuf; +} STlvEncoder; + +typedef struct STlvDecoder { + int32_t bufSize; + int32_t offset; + const char* pBuf; +} STlvDecoder; + +typedef int32_t (*FToMsg)(const void* pObj, STlvEncoder* pEncoder); +typedef int32_t (*FToObject)(STlvDecoder* pDecoder, void* pObj); +typedef int32_t (*FSetObject)(STlv* pTlv, void* pObj); + +static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder); +static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder); + +static int32_t initTlvEncoder(STlvEncoder* pEncoder) { + pEncoder->allocSize = NODES_MSG_DEFAULT_LEN; + pEncoder->offset = 0; + pEncoder->pBuf = taosMemoryMalloc(pEncoder->allocSize); + return NULL == pEncoder->pBuf ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; +} + +static void clearTlvEncoder(STlvEncoder* pEncoder) { taosMemoryFree(pEncoder->pBuf); } + +static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) { + *pMsg = pEncoder->pBuf; + pEncoder->pBuf = NULL; + *pLen = pEncoder->offset; +} + +static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int16_t len) { + if (pEncoder->offset + len > pEncoder->allocSize) { + void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2); + if (NULL == pNewBuf) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pEncoder->pBuf = pNewBuf; + pEncoder->allocSize = pEncoder->allocSize * 2; + } + STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset); + pTlv->type = type; + pTlv->len = len; + memcpy(pTlv->value, pValue, len); + pEncoder->offset += sizeof(STlv) + pTlv->len; + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvEncodeI8(STlvEncoder* pEncoder, int16_t type, int8_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeU8(STlvEncoder* pEncoder, int16_t type, uint8_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pValue) { + return tlvEncodeImpl(pEncoder, type, pValue, strlen(pValue)); +} + +static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg func, const void* pArray, int32_t itemSize, + int32_t num) { + int32_t code = TSDB_CODE_SUCCESS; + if (num > 0) { + int32_t start = pEncoder->offset; + pEncoder->offset += sizeof(STlv); + for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) { + code = func(pArray + i * itemSize, pEncoder); + } + if (TSDB_CODE_SUCCESS == code) { + STlv* pTlv = (STlv*)(pEncoder->pBuf + start); + pTlv->type = type; + pTlv->len = pEncoder->offset - start - sizeof(STlv); + } + } + return code; +} + +static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, const void* pObj) { + int32_t start = pEncoder->offset; + pEncoder->offset += sizeof(STlv); + int32_t code = func(pObj, pEncoder); + if (TSDB_CODE_SUCCESS == code) { + STlv* pTlv = (STlv*)(pEncoder->pBuf + start); + pTlv->type = type; + pTlv->len = pEncoder->offset - start - sizeof(STlv); + } + return code; +} + +static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { + *pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset); + if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) { + return TSDB_CODE_FAILED; + } + pDecoder->offset += (*pTlv)->len; + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int16_t len) { + if (pTlv->len != len) { + return TSDB_CODE_FAILED; + } + memcpy(pValue, pTlv->value, len); + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvDecodeI8(STlv* pTlv, int8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeObjFromTlv(STlv* pTlv, FToObject func, void* pObj) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return func(&decoder, pObj); +} + +static int32_t tlvDecodeObj(STlvDecoder* pDecoder, FToObject func, void* pObj) { + STlv* pTlv = NULL; + int32_t code = tlvGetNextTlv(pDecoder, &pTlv); + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeObjFromTlv(pTlv, func, pObj); + } + return code; +} + +static int32_t tlvDecodeNodeObj(STlvDecoder* pDecoder, FToObject func, void** pObj) { + STlv* pTlv = NULL; + int32_t code = tlvGetNextTlv(pDecoder, &pTlv); + if (TSDB_CODE_SUCCESS == code) { + *pObj = nodesMakeNode(pTlv->type); + if (NULL == *pObj) { + return TSDB_CODE_OUT_OF_MEMORY; + } + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + code = func(&decoder, *pObj); + } + return code; +} + +static int32_t msgToObject(STlvDecoder* pDecoder, FSetObject func, void* pObj) { + STlv* pTlv; + int32_t code = tlvGetNextTlv(pDecoder, &pTlv); + while (TSDB_CODE_SUCCESS == code && pTlv->len > 0) { + code = func(pTlv, pObj); + } + return code; +} + +enum { DATA_TYPE_CODE_TYPE = 1, DATA_TYPE_CODE_PRECISION, DATA_TYPE_CODE_SCALE, DATA_TYPE_CODE_BYTES }; + +static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataType* pNode = (const SDataType*)pObj; + + int32_t code = tlvEncodeI8(pEncoder, DATA_TYPE_CODE_TYPE, pNode->type); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, DATA_TYPE_CODE_PRECISION, pNode->precision); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, DATA_TYPE_CODE_SCALE, pNode->scale); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DATA_TYPE_CODE_BYTES, pNode->bytes); + } + + return code; +} + +enum { EXPR_CODE_RES_TYPE = 1 }; + +static int32_t exprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SExprNode* pNode = (const SExprNode*)pObj; + return tlvEncodeObj(pEncoder, EXPR_CODE_RES_TYPE, dataTypeToMsg, &pNode->resType); +} + +static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) { + // todo + return TSDB_CODE_SUCCESS; +} + +enum { + COLUMN_CODE_EXPR_BASE = 1, + COLUMN_CODE_TABLE_ID, + COLUMN_CODE_TABLE_TYPE, + COLUMN_CODE_COLUMN_ID, + COLUMN_CODE_COLUMN_TYPE, + COLUMN_CODE_DATABLOCK_ID, + COLUMN_CODE_SLOT_ID +}; + +static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SColumnNode* pNode = (const SColumnNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, COLUMN_CODE_EXPR_BASE, exprNodeToMsg, pNode); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, COLUMN_CODE_TABLE_ID, pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, COLUMN_CODE_TABLE_TYPE, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, COLUMN_CODE_COLUMN_ID, pNode->colId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, COLUMN_CODE_COLUMN_TYPE, pNode->colType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, COLUMN_CODE_DATABLOCK_ID, pNode->dataBlockId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, COLUMN_CODE_SLOT_ID, pNode->slotId); + } + + return code; +} + +static int32_t setColumnNodeFromTlv(STlv* pTlv, void* pObj) { + SColumnNode* pNode = (SColumnNode*)pObj; + + switch (pTlv->type) { + case COLUMN_CODE_EXPR_BASE: + return tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); + case COLUMN_CODE_TABLE_ID: + return tlvDecodeU64(pTlv, &pNode->tableId); + case COLUMN_CODE_TABLE_TYPE: + return tlvDecodeI8(pTlv, &pNode->tableType); + case COLUMN_CODE_COLUMN_ID: + return tlvDecodeI16(pTlv, &pNode->colId); + case COLUMN_CODE_COLUMN_TYPE: + return tlvDecodeEnum(pTlv, pNode->colType); + case COLUMN_CODE_DATABLOCK_ID: + return tlvDecodeI16(pTlv, &pNode->dataBlockId); + case COLUMN_CODE_SLOT_ID: + return tlvDecodeI16(pTlv, &pNode->slotId); + default: + break; + } + + return TSDB_CODE_FAILED; +} + +static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { + return msgToObject(pDecoder, setColumnNodeFromTlv, pObj); +} + +enum { SUBPLAN_ID_CODE_QUERY_ID = 1, SUBPLAN_ID_CODE_GROUP_ID, SUBPLAN_ID_CODE_SUBPLAN_ID }; + +static int32_t subplanIdToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSubplanId* pNode = (const SSubplanId*)pObj; + + int32_t code = tlvEncodeU64(pEncoder, SUBPLAN_ID_CODE_QUERY_ID, pNode->queryId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_ID_CODE_GROUP_ID, pNode->groupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_ID_CODE_SUBPLAN_ID, pNode->subplanId); + } + + return code; +} + +enum { EP_CODE_FQDN = 1, EP_CODE_port }; + +static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SEp* pNode = (const SEp*)pObj; + + int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port); + } + + return code; +} + +enum { + QUERY_NODE_ADDR_CODE_NODE_ID = 1, + QUERY_NODE_ADDR_CODE_IN_USE, + QUERY_NODE_ADDR_CODE_NUM_OF_EPS, + QUERY_NODE_ADDR_CODE_EPS +}; + +static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj; + + int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_IN_USE, pNode->epSet.inUse); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NUM_OF_EPS, pNode->epSet.numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObjArray(pEncoder, QUERY_NODE_ADDR_CODE_EPS, epToMsg, pNode->epSet.eps, sizeof(SEp), + pNode->epSet.numOfEps); + } + + return code; +} + +enum { + SUBPLAN_CODE_SUBPLAN_ID = 1, + SUBPLAN_CODE_SUBPLAN_TYPE, + SUBPLAN_CODE_MSG_TYPE, + SUBPLAN_CODE_LEVEL, + SUBPLAN_CODE_DBFNAME, + SUBPLAN_CODE_USER, + SUBPLAN_CODE_EXECNODE, + SUBPLAN_CODE_ROOT_NODE, + SUBPLAN_CODE_DATA_SINK, + SUBPLAN_CODE_TAG_COND, + SUBPLAN_CODE_TAG_INDEX_COND +}; + +static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSubplan* pNode = (const SSubplan*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_SUBPLAN_ID, subplanIdToMsg, &pNode->id); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, SUBPLAN_CODE_SUBPLAN_TYPE, pNode->subplanType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_CODE_MSG_TYPE, pNode->msgType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_CODE_LEVEL, pNode->level); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, SUBPLAN_CODE_DBFNAME, pNode->dbFName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, SUBPLAN_CODE_USER, pNode->user); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_EXECNODE, queryNodeAddrToMsg, &pNode->execNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_ROOT_NODE, nodeToMsg, pNode->pNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_DATA_SINK, nodeToMsg, pNode->pDataSink); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_TAG_COND, nodeToMsg, pNode->pTagCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_TAG_INDEX_COND, nodeToMsg, pNode->pTagIndexCond); + } + + return code; +} + +enum { QUERY_PLAN_CODE_QUERY_ID = 1, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, QUERY_PLAN_CODE_SUBPLANS }; + +static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SQueryPlan* pNode = (const SQueryPlan*)pObj; + + int32_t code = tlvEncodeU64(pEncoder, QUERY_PLAN_CODE_QUERY_ID, pNode->queryId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, pNode->numOfSubplans); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, QUERY_PLAN_CODE_SUBPLANS, nodeListToMsg, pNode->pSubplans); + } + + return code; +} + +static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + switch (nodeType(pObj)) { + case QUERY_NODE_COLUMN: + return columnNodeToMsg(pObj, pEncoder); + case QUERY_NODE_PHYSICAL_SUBPLAN: + return subplanToMsg(pObj, pEncoder); + case QUERY_NODE_PHYSICAL_PLAN: + return queryPlanToMsg(pObj, pEncoder); + default: + break; + }; + nodesWarn("specificNodeToMsg unknown node = %s", nodesNodeName(nodeType(pObj))); + return TSDB_CODE_SUCCESS; +} + +static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { + switch (nodeType(pObj)) { + case QUERY_NODE_COLUMN: + return msgToColumnNode(pDecoder, pObj); + default: + break; + }; + nodesWarn("msgToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj))); + return TSDB_CODE_SUCCESS; +} + +static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + return tlvEncodeObj(pEncoder, nodeType(pObj), specificNodeToMsg, pObj); +} + +static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj) { + return tlvDecodeNodeObj(pDecoder, msgToSpecificNode, pObj); +} + +static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SNodeList* pList = (const SNodeList*)pObj; + + SNode* pNode = NULL; + FOREACH(pNode, pList) { + int32_t code = nodeToMsg(pNode, pEncoder); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t nodesNodeToMsg(const SNode* pNode, char** pMsg, int32_t* pLen) { + if (NULL == pNode || NULL == pMsg || NULL == pLen) { + terrno = TSDB_CODE_FAILED; + return TSDB_CODE_FAILED; + } + + STlvEncoder encoder; + int32_t code = initTlvEncoder(&encoder); + if (TSDB_CODE_SUCCESS == code) { + code = nodeToMsg(pNode, &encoder); + } + if (TSDB_CODE_SUCCESS == code) { + endTlvEncode(&encoder, pMsg, pLen); + } + clearTlvEncoder(&encoder); + + terrno = code; + return code; +} + +int32_t nodesMsgToNode(const char* pMsg, int32_t len, SNode** pNode) { + if (NULL == pMsg || NULL == pNode) { + return TSDB_CODE_SUCCESS; + } + + STlvDecoder decoder = {.bufSize = len, .offset = 0, .pBuf = pMsg}; + int32_t code = msgToNode(&decoder, (void**)pNode); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(*pNode); + *pNode = NULL; + } + + terrno = code; + return code; +}