diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 9c2fdf587b16e3148ba99ebe75dbe102c97e38a2..6500d3d1831e817c497406e574b721594e63e209 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -320,6 +320,9 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode); int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen); int32_t nodesStringToList(const char* pStr, SNodeList** pList); +int32_t nodesNodeToMsg(const SNode* pNode, char** pMsg, int32_t* pLen); +int32_t nodesMsgToNode(const char* pStr, int32_t len, SNode** pNode); + int32_t nodesNodeToSQL(SNode* pNode, char* buf, int32_t bufSize, int32_t* len); char* nodesGetNameFromColumnNode(SNode* pNode); int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 8976daadbdada2365dbb2e3bf2af4f807c4ac26f..a41462e3fd879a73baeb37ee829724fda560719c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -362,18 +362,14 @@ static int32_t jsonToTableComInfo(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkTableComInfoNumOfTags, pNode->numOfTags, code); - ; if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableComInfoPrecision, pNode->precision, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableComInfoNumOfColumns, pNode->numOfColumns, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableComInfoRowSize, pNode->rowSize, code); - ; } return code; @@ -406,14 +402,11 @@ static int32_t jsonToSchema(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkSchemaType, pNode->type, code); - ; if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkSchemaColId, pNode->colId, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkSchemaBytes, pNode->bytes, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkSchemaName, pNode->name); @@ -466,26 +459,20 @@ static int32_t jsonToTableMeta(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkTableMetaVgId, pNode->vgId, code); - ; if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaTableType, pNode->tableType, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaUid, pNode->uid, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaSuid, pNode->suid, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaSversion, pNode->sversion, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkTableMetaTversion, pNode->tversion, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkTableMetaComInfo, jsonToTableComInfo, &pNode->tableInfo); @@ -926,7 +913,6 @@ static int32_t jsonToLogicFillNode(const SJson* pJson, void* pObj) { int32_t code = jsonToLogicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkFillLogicPlanMode, pNode->mode, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillLogicPlanWStartTs, &pNode->pWStartTs); @@ -2815,7 +2801,6 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) { } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkColumnColType, pNode->colType, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkColumnDbName, pNode->dbName); @@ -3118,7 +3103,6 @@ static int32_t jsonToOperatorNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkOperatorType, pNode->opType, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkOperatorLeft, &pNode->pLeft); @@ -3153,7 +3137,6 @@ static int32_t jsonToLogicConditionNode(const SJson* pJson, void* pObj) { int32_t code = jsonToExprNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkLogicCondType, pNode->condType, code); - ; } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkLogicCondParameters, &pNode->pParameterList); @@ -3442,11 +3425,9 @@ static int32_t jsonToOrderByExprNode(const SJson* pJson, void* pObj) { int32_t code = jsonToNodeObject(pJson, jkOrderByExprExpr, &pNode->pExpr); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkOrderByExprOrder, pNode->order, code); - ; } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkOrderByExprNullOrder, pNode->nullOrder, code); - ; } return code; @@ -3624,7 +3605,6 @@ static int32_t jsonToFillNode(const SJson* pJson, void* pObj) { int32_t code; tjsonGetNumberValue(pJson, jkFillMode, pNode->mode, code); - ; if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkFillValues, &pNode->pValues); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c new file mode 100644 index 0000000000000000000000000000000000000000..5fe31ed78e80cf19a4e107d6f1366609bd257520 --- /dev/null +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -0,0 +1,3108 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "nodesUtil.h" +#include "plannodes.h" +#include "tdatablock.h" + +#define NODES_MSG_DEFAULT_LEN 1024 + +#define tlvForEach(pDecoder, pTlv, code) \ + while (TSDB_CODE_SUCCESS == code && TSDB_CODE_SUCCESS == (code = tlvGetNextTlv(pDecoder, &pTlv)) && NULL != pTlv) + +typedef struct STlv { + int16_t type; + int16_t len; + char value[0]; +} STlv; + +typedef struct STlvEncoder { + int32_t allocSize; + int32_t offset; + char* pBuf; + int32_t tlvCount; +} STlvEncoder; + +typedef struct STlvDecoder { + int32_t bufSize; + int32_t offset; + const char* pBuf; +} STlvDecoder; + +typedef int32_t (*FToMsg)(const void* pObj, STlvEncoder* pEncoder); +typedef int32_t (*FToObject)(STlvDecoder* pDecoder, void* pObj); +typedef void* (*FMakeObject)(int16_t type); +typedef int32_t (*FSetObject)(STlv* pTlv, void* pObj); + +static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder); +static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder); +static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj); +static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj); +static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj); +static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj); + +static int32_t initTlvEncoder(STlvEncoder* pEncoder) { + pEncoder->allocSize = NODES_MSG_DEFAULT_LEN; + pEncoder->offset = 0; + pEncoder->tlvCount = 0; + pEncoder->pBuf = taosMemoryMalloc(pEncoder->allocSize); + return NULL == pEncoder->pBuf ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; +} + +static void clearTlvEncoder(STlvEncoder* pEncoder) { taosMemoryFree(pEncoder->pBuf); } + +static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) { + *pMsg = pEncoder->pBuf; + pEncoder->pBuf = NULL; + *pLen = pEncoder->offset; + // nodesWarn("encode tlv count = %d, tl size = %d", pEncoder->tlvCount, sizeof(STlv) * pEncoder->tlvCount); +} + +static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int16_t len) { + int32_t tlvLen = sizeof(STlv) + len; + if (pEncoder->offset + tlvLen > pEncoder->allocSize) { + void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2); + if (NULL == pNewBuf) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pEncoder->pBuf = pNewBuf; + pEncoder->allocSize = pEncoder->allocSize * 2; + } + STlv* pTlv = (STlv*)(pEncoder->pBuf + pEncoder->offset); + pTlv->type = type; + pTlv->len = len; + memcpy(pTlv->value, pValue, len); + pEncoder->offset += tlvLen; + ++(pEncoder->tlvCount); + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvEncodeI8(STlvEncoder* pEncoder, int16_t type, int8_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeU8(STlvEncoder* pEncoder, int16_t type, uint8_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeBool(STlvEncoder* pEncoder, int16_t type, bool value) { + return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); +} + +static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pValue) { + return tlvEncodeImpl(pEncoder, type, pValue, strlen(pValue)); +} + +static int32_t tlvEncodeBinary(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) { + return tlvEncodeImpl(pEncoder, type, pValue, len); +} + +static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, const void* pObj) { + if (NULL == pObj) { + return TSDB_CODE_SUCCESS; + } + + int32_t start = pEncoder->offset; + pEncoder->offset += sizeof(STlv); + int32_t code = func(pObj, pEncoder); + if (TSDB_CODE_SUCCESS == code) { + STlv* pTlv = (STlv*)(pEncoder->pBuf + start); + pTlv->type = type; + pTlv->len = pEncoder->offset - start - sizeof(STlv); + } + return code; +} + +static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg func, const void* pArray, int32_t itemSize, + int32_t num) { + int32_t code = TSDB_CODE_SUCCESS; + if (num > 0) { + int32_t start = pEncoder->offset; + pEncoder->offset += sizeof(STlv); + for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) { + code = tlvEncodeObj(pEncoder, 0, func, (const char*)pArray + i * itemSize); + } + if (TSDB_CODE_SUCCESS == code) { + STlv* pTlv = (STlv*)(pEncoder->pBuf + start); + pTlv->type = type; + pTlv->len = pEncoder->offset - start - sizeof(STlv); + } + } + return code; +} + +static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { + if (pDecoder->offset == pDecoder->bufSize) { + *pTlv = NULL; + return TSDB_CODE_SUCCESS; + } + + *pTlv = (STlv*)(pDecoder->pBuf + pDecoder->offset); + if ((*pTlv)->len + pDecoder->offset > pDecoder->bufSize) { + return TSDB_CODE_FAILED; + } + pDecoder->offset += sizeof(STlv) + (*pTlv)->len; + return TSDB_CODE_SUCCESS; +} + +static bool tlvDecodeEnd(STlvDecoder* pDecoder) { return pDecoder->offset == pDecoder->bufSize; } + +static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int16_t len) { + if (pTlv->len != len) { + return TSDB_CODE_FAILED; + } + memcpy(pValue, pTlv->value, len); + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvDecodeI8(STlv* pTlv, int8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeBool(STlv* pTlv, bool* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } + +static int32_t tlvDecodeEnum(STlv* pTlv, void* pValue, int16_t len) { + int32_t value = 0; + memcpy(&value, pTlv->value, pTlv->len); + switch (len) { + case 1: + *(int8_t*)pValue = value; + break; + case 2: + *(int16_t*)pValue = value; + break; + case 4: + *(int32_t*)pValue = value; + break; + default: + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) { + memcpy(pValue, pTlv->value, pTlv->len); + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvDecodeDynBinary(STlv* pTlv, void** pValue) { + *pValue = taosMemoryMalloc(pTlv->len); + if (NULL == *pValue) { + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(*pValue, pTlv->value, pTlv->len); + return TSDB_CODE_SUCCESS; +} + +static int32_t tlvDecodeObjFromTlv(STlv* pTlv, FToObject func, void* pObj) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return func(&decoder, pObj); +} + +static int32_t tlvDecodeObj(STlvDecoder* pDecoder, FToObject func, void* pObj) { + STlv* pTlv = NULL; + int32_t code = tlvGetNextTlv(pDecoder, &pTlv); + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeObjFromTlv(pTlv, func, pObj); + } + return code; +} + +static int32_t tlvDecodeObjArray(STlvDecoder* pDecoder, FToObject func, void* pArray, int32_t itemSize) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t i = 0; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { code = tlvDecodeObjFromTlv(pTlv, func, (char*)pArray + itemSize * i++); } + return code; +} + +static int32_t tlvDecodeObjArrayFromTlv(STlv* pTlv, FToObject func, void* pArray, int32_t itemSize) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return tlvDecodeObjArray(&decoder, func, pArray, itemSize); +} + +static int32_t tlvDecodeDynObjFromTlv(STlv* pTlv, FMakeObject makeFunc, FToObject toFunc, void** pObj) { + *pObj = makeFunc(pTlv->type); + if (NULL == *pObj) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return tlvDecodeObjFromTlv(pTlv, toFunc, *pObj); +} + +static int32_t tlvDecodeDynObj(STlvDecoder* pDecoder, FMakeObject makeFunc, FToObject toFunc, void** pObj) { + STlv* pTlv = NULL; + int32_t code = tlvGetNextTlv(pDecoder, &pTlv); + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeDynObjFromTlv(pTlv, makeFunc, toFunc, pObj); + } + return code; +} + +enum { DATA_TYPE_CODE_TYPE = 1, DATA_TYPE_CODE_PRECISION, DATA_TYPE_CODE_SCALE, DATA_TYPE_CODE_BYTES }; + +static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataType* pNode = (const SDataType*)pObj; + + int32_t code = tlvEncodeI8(pEncoder, DATA_TYPE_CODE_TYPE, pNode->type); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, DATA_TYPE_CODE_PRECISION, pNode->precision); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, DATA_TYPE_CODE_SCALE, pNode->scale); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DATA_TYPE_CODE_BYTES, pNode->bytes); + } + + return code; +} + +static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { + SDataType* pNode = (SDataType*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case DATA_TYPE_CODE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->type); + break; + case DATA_TYPE_CODE_PRECISION: + code = tlvDecodeU8(pTlv, &pNode->precision); + break; + case DATA_TYPE_CODE_SCALE: + code = tlvDecodeU8(pTlv, &pNode->scale); + break; + case DATA_TYPE_CODE_BYTES: + code = tlvDecodeI32(pTlv, &pNode->bytes); + break; + default: + break; + } + } + + return code; +} + +enum { EXPR_CODE_RES_TYPE = 1 }; + +static int32_t exprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SExprNode* pNode = (const SExprNode*)pObj; + return tlvEncodeObj(pEncoder, EXPR_CODE_RES_TYPE, dataTypeToMsg, &pNode->resType); +} + +static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) { + SExprNode* pNode = (SExprNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case EXPR_CODE_RES_TYPE: + code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->resType); + break; + default: + break; + } + } + + return code; +} + +enum { + COLUMN_CODE_EXPR_BASE = 1, + COLUMN_CODE_TABLE_ID, + COLUMN_CODE_TABLE_TYPE, + COLUMN_CODE_COLUMN_ID, + COLUMN_CODE_COLUMN_TYPE, + COLUMN_CODE_DATABLOCK_ID, + COLUMN_CODE_SLOT_ID +}; + +static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SColumnNode* pNode = (const SColumnNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, COLUMN_CODE_EXPR_BASE, exprNodeToMsg, pNode); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, COLUMN_CODE_TABLE_ID, pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, COLUMN_CODE_TABLE_TYPE, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, COLUMN_CODE_COLUMN_ID, pNode->colId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, COLUMN_CODE_COLUMN_TYPE, pNode->colType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, COLUMN_CODE_DATABLOCK_ID, pNode->dataBlockId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, COLUMN_CODE_SLOT_ID, pNode->slotId); + } + + return code; +} + +static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { + SColumnNode* pNode = (SColumnNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case COLUMN_CODE_EXPR_BASE: + code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); + break; + case COLUMN_CODE_TABLE_ID: + code = tlvDecodeU64(pTlv, &pNode->tableId); + break; + case COLUMN_CODE_TABLE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->tableType); + break; + case COLUMN_CODE_COLUMN_ID: + code = tlvDecodeI16(pTlv, &pNode->colId); + break; + case COLUMN_CODE_COLUMN_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->colType, sizeof(pNode->colType)); + break; + case COLUMN_CODE_DATABLOCK_ID: + code = tlvDecodeI16(pTlv, &pNode->dataBlockId); + break; + case COLUMN_CODE_SLOT_ID: + code = tlvDecodeI16(pTlv, &pNode->slotId); + break; + default: + break; + } + } + + return code; +} + +enum { VALUE_CODE_EXPR_BASE = 1, VALUE_CODE_IS_NULL, VALUE_CODE_DATUM }; + +static int32_t datumToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SValueNode* pNode = (const SValueNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + switch (pNode->node.resType.type) { + case TSDB_DATA_TYPE_NULL: + break; + case TSDB_DATA_TYPE_BOOL: + code = tlvEncodeBool(pEncoder, VALUE_CODE_DATUM, pNode->datum.b); + break; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: + code = tlvEncodeI64(pEncoder, VALUE_CODE_DATUM, pNode->datum.i); + break; + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: + code = tlvEncodeU64(pEncoder, VALUE_CODE_DATUM, pNode->datum.u); + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + code = tlvEncodeDouble(pEncoder, VALUE_CODE_DATUM, pNode->datum.d); + break; + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_NCHAR: + code = tlvEncodeBinary(pEncoder, VALUE_CODE_DATUM, pNode->datum.p, varDataTLen(pNode->datum.p)); + break; + case TSDB_DATA_TYPE_JSON: + code = tlvEncodeBinary(pEncoder, VALUE_CODE_DATUM, pNode->datum.p, getJsonValueLen(pNode->datum.p)); + break; + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + // todo + default: + break; + } + + return code; +} + +static int32_t valueNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SValueNode* pNode = (const SValueNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, VALUE_CODE_EXPR_BASE, exprNodeToMsg, pNode); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, VALUE_CODE_IS_NULL, pNode->isNull); + } + if (TSDB_CODE_SUCCESS == code) { + code = datumToMsg(pNode, pEncoder); + } + + return code; +} + +static int32_t msgToDatum(STlv* pTlv, void* pObj) { + SValueNode* pNode = (SValueNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + switch (pNode->node.resType.type) { + case TSDB_DATA_TYPE_NULL: + break; + case TSDB_DATA_TYPE_BOOL: + code = tlvDecodeBool(pTlv, &pNode->datum.b); + *(bool*)&pNode->typeData = pNode->datum.b; + break; + case TSDB_DATA_TYPE_TINYINT: + code = tlvDecodeI64(pTlv, &pNode->datum.i); + *(int8_t*)&pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_SMALLINT: + code = tlvDecodeI64(pTlv, &pNode->datum.i); + *(int16_t*)&pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_INT: + code = tlvDecodeI64(pTlv, &pNode->datum.i); + *(int32_t*)&pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_BIGINT: + code = tlvDecodeI64(pTlv, &pNode->datum.i); + *(int64_t*)&pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_TIMESTAMP: + code = tlvDecodeI64(pTlv, &pNode->datum.i); + *(int64_t*)&pNode->typeData = pNode->datum.i; + break; + case TSDB_DATA_TYPE_UTINYINT: + code = tlvDecodeU64(pTlv, &pNode->datum.u); + *(uint8_t*)&pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_USMALLINT: + code = tlvDecodeU64(pTlv, &pNode->datum.u); + *(uint16_t*)&pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_UINT: + code = tlvDecodeU64(pTlv, &pNode->datum.u); + *(uint32_t*)&pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_UBIGINT: + code = tlvDecodeU64(pTlv, &pNode->datum.u); + *(uint64_t*)&pNode->typeData = pNode->datum.u; + break; + case TSDB_DATA_TYPE_FLOAT: + code = tlvDecodeDouble(pTlv, &pNode->datum.d); + *(float*)&pNode->typeData = pNode->datum.d; + break; + case TSDB_DATA_TYPE_DOUBLE: + code = tlvDecodeDouble(pTlv, &pNode->datum.d); + *(double*)&pNode->typeData = pNode->datum.d; + break; + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + code = tlvDecodeDynBinary(pTlv, (void**)&pNode->datum.p); + if (TSDB_CODE_SUCCESS == code) { + varDataSetLen(pNode->datum.p, pNode->node.resType.bytes - VARSTR_HEADER_SIZE); + } + break; + case TSDB_DATA_TYPE_JSON: + code = tlvDecodeDynBinary(pTlv, (void**)&pNode->datum.p); + break; + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + // todo + default: + break; + } + + return code; +} + +static int32_t msgToValueNode(STlvDecoder* pDecoder, void* pObj) { + SValueNode* pNode = (SValueNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case VALUE_CODE_EXPR_BASE: + code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); + break; + case VALUE_CODE_IS_NULL: + code = tlvDecodeBool(pTlv, &pNode->isNull); + break; + case VALUE_CODE_DATUM: + code = msgToDatum(pTlv, pNode); + break; + default: + break; + } + } + + return code; +} + +enum { OPERATOR_CODE_EXPR_BASE = 1, OPERATOR_CODE_OP_TYPE, OPERATOR_CODE_LEFT, OPERATOR_CODE_RIGHT }; + +static int32_t operatorNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SOperatorNode* pNode = (const SOperatorNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, OPERATOR_CODE_EXPR_BASE, exprNodeToMsg, pNode); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, OPERATOR_CODE_OP_TYPE, pNode->opType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, OPERATOR_CODE_LEFT, nodeToMsg, pNode->pLeft); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, OPERATOR_CODE_RIGHT, nodeToMsg, pNode->pRight); + } + + return code; +} + +static int32_t msgToOperatorNode(STlvDecoder* pDecoder, void* pObj) { + SOperatorNode* pNode = (SOperatorNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case OPERATOR_CODE_EXPR_BASE: + code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); + break; + case OPERATOR_CODE_OP_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->opType, sizeof(pNode->opType)); + break; + case OPERATOR_CODE_LEFT: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pLeft); + break; + case OPERATOR_CODE_RIGHT: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pRight); + break; + default: + break; + } + } + + return code; +} + +enum { LOGIC_COND_CODE_EXPR_BASE = 1, LOGIC_COND_CODE_COND_TYPE, LOGIC_COND_CODE_PARAMETERS }; + +static int32_t logicConditionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SLogicConditionNode* pNode = (const SLogicConditionNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, LOGIC_COND_CODE_EXPR_BASE, exprNodeToMsg, pNode); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, LOGIC_COND_CODE_COND_TYPE, pNode->condType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, LOGIC_COND_CODE_PARAMETERS, nodeListToMsg, pNode->pParameterList); + } + + return code; +} + +static int32_t msgToLogicConditionNode(STlvDecoder* pDecoder, void* pObj) { + SLogicConditionNode* pNode = (SLogicConditionNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case LOGIC_COND_CODE_EXPR_BASE: + code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); + break; + case LOGIC_COND_CODE_COND_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->condType, sizeof(pNode->condType)); + break; + case LOGIC_COND_CODE_PARAMETERS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pParameterList); + break; + default: + break; + } + } + + return code; +} + +enum { + FUNCTION_CODE_EXPR_BASE = 1, + FUNCTION_CODE_FUNCTION_ID, + FUNCTION_CODE_FUNCTION_TYPE, + FUNCTION_CODE_PARAMETERS, + FUNCTION_CODE_UDF_BUF_SIZE +}; + +static int32_t functionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SFunctionNode* pNode = (const SFunctionNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, FUNCTION_CODE_EXPR_BASE, exprNodeToMsg, pNode); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, FUNCTION_CODE_FUNCTION_ID, pNode->funcId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, FUNCTION_CODE_FUNCTION_TYPE, pNode->funcType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, FUNCTION_CODE_PARAMETERS, nodeListToMsg, pNode->pParameterList); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, FUNCTION_CODE_UDF_BUF_SIZE, pNode->udfBufSize); + } + + return code; +} + +static int32_t msgToFunctionNode(STlvDecoder* pDecoder, void* pObj) { + SFunctionNode* pNode = (SFunctionNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case FUNCTION_CODE_EXPR_BASE: + code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); + break; + case FUNCTION_CODE_FUNCTION_ID: + code = tlvDecodeI32(pTlv, &pNode->funcId); + break; + case FUNCTION_CODE_FUNCTION_TYPE: + code = tlvDecodeI32(pTlv, &pNode->funcType); + break; + case FUNCTION_CODE_PARAMETERS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pParameterList); + break; + case FUNCTION_CODE_UDF_BUF_SIZE: + code = tlvDecodeI32(pTlv, &pNode->udfBufSize); + break; + default: + break; + } + } + + return code; +} + +enum { ORDER_BY_EXPR_CODE_EXPR = 1, ORDER_BY_EXPR_CODE_ORDER, ORDER_BY_EXPR_CODE_NULL_ORDER }; + +static int32_t orderByExprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SOrderByExprNode* pNode = (const SOrderByExprNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, ORDER_BY_EXPR_CODE_EXPR, nodeToMsg, pNode->pExpr); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, ORDER_BY_EXPR_CODE_ORDER, pNode->order); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, ORDER_BY_EXPR_CODE_NULL_ORDER, pNode->nullOrder); + } + + return code; +} + +static int32_t msgToOrderByExprNode(STlvDecoder* pDecoder, void* pObj) { + SOrderByExprNode* pNode = (SOrderByExprNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case ORDER_BY_EXPR_CODE_EXPR: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pExpr); + break; + case ORDER_BY_EXPR_CODE_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->order, sizeof(pNode->order)); + break; + case ORDER_BY_EXPR_CODE_NULL_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->nullOrder, sizeof(pNode->nullOrder)); + break; + default: + break; + } + } + + return code; +} + +enum { LIMIT_CODE_LIMIT = 1, LIMIT_CODE_OFFSET }; + +static int32_t limitNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SLimitNode* pNode = (const SLimitNode*)pObj; + + int32_t code = tlvEncodeI64(pEncoder, LIMIT_CODE_LIMIT, pNode->limit); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, LIMIT_CODE_OFFSET, pNode->offset); + } + + return code; +} + +static int32_t msgToLimitNode(STlvDecoder* pDecoder, void* pObj) { + SLimitNode* pNode = (SLimitNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case LIMIT_CODE_LIMIT: + code = tlvDecodeI64(pTlv, &pNode->limit); + break; + case LIMIT_CODE_OFFSET: + code = tlvDecodeI64(pTlv, &pNode->offset); + break; + default: + break; + } + } + + return code; +} + +enum { NAME_CODE_TYPE = 1, NAME_CODE_ACCT_ID, NAME_CODE_DB_NAME, NAME_CODE_TABLE_NAME }; + +static int32_t nameToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SName* pNode = (const SName*)pObj; + + int32_t code = tlvEncodeU8(pEncoder, NAME_CODE_TYPE, pNode->type); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, NAME_CODE_ACCT_ID, pNode->acctId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, NAME_CODE_DB_NAME, pNode->dbname); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, NAME_CODE_TABLE_NAME, pNode->tname); + } + + return code; +} + +static int32_t msgToName(STlvDecoder* pDecoder, void* pObj) { + SName* pNode = (SName*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case NAME_CODE_TYPE: + code = tlvDecodeU8(pTlv, &pNode->type); + break; + case NAME_CODE_ACCT_ID: + code = tlvDecodeI32(pTlv, &pNode->acctId); + break; + case NAME_CODE_DB_NAME: + code = tlvDecodeCStr(pTlv, pNode->dbname); + break; + case NAME_CODE_TABLE_NAME: + code = tlvDecodeCStr(pTlv, pNode->tname); + break; + default: + break; + } + } + + return code; +} + +enum { TIME_WINDOW_CODE_START_KEY = 1, TIME_WINDOW_CODE_END_KEY }; + +static int32_t timeWindowToMsg(const void* pObj, STlvEncoder* pEncoder) { + const STimeWindow* pNode = (const STimeWindow*)pObj; + + int32_t code = tlvEncodeI64(pEncoder, TIME_WINDOW_CODE_START_KEY, pNode->skey); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, TIME_WINDOW_CODE_END_KEY, pNode->ekey); + } + + return code; +} + +static int32_t msgToTimeWindow(STlvDecoder* pDecoder, void* pObj) { + STimeWindow* pNode = (STimeWindow*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case TIME_WINDOW_CODE_START_KEY: + code = tlvDecodeI64(pTlv, &pNode->skey); + break; + case TIME_WINDOW_CODE_END_KEY: + code = tlvDecodeI64(pTlv, &pNode->ekey); + break; + default: + break; + } + } + + return code; +} + +enum { NODE_LIST_CODE_DATA_TYPE = 1, NODE_LIST_CODE_NODE_LIST }; + +static int32_t nodeListNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SNodeListNode* pNode = (const SNodeListNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_DATA_TYPE, dataTypeToMsg, &pNode->dataType); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_NODE_LIST, nodeListToMsg, pNode->pNodeList); + } + + return code; +} + +static int32_t msgToNodeListNode(STlvDecoder* pDecoder, void* pObj) { + SNodeListNode* pNode = (SNodeListNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case NODE_LIST_CODE_DATA_TYPE: + code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->dataType); + break; + case NODE_LIST_CODE_NODE_LIST: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pNodeList); + break; + default: + break; + } + } + + return code; +} + +enum { TARGET_CODE_DATA_BLOCK_ID = 1, TARGET_CODE_SLOT_ID, TARGET_CODE_EXPR }; + +static int32_t targetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const STargetNode* pNode = (const STargetNode*)pObj; + + int32_t code = tlvEncodeI16(pEncoder, TARGET_CODE_DATA_BLOCK_ID, pNode->dataBlockId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI16(pEncoder, TARGET_CODE_SLOT_ID, pNode->slotId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, TARGET_CODE_EXPR, nodeToMsg, pNode->pExpr); + } + + return code; +} + +static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) { + STargetNode* pNode = (STargetNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case TARGET_CODE_DATA_BLOCK_ID: + code = tlvDecodeI16(pTlv, &pNode->dataBlockId); + break; + case TARGET_CODE_SLOT_ID: + code = tlvDecodeI16(pTlv, &pNode->slotId); + break; + case TARGET_CODE_EXPR: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pExpr); + break; + default: + break; + } + } + + return code; +} + +enum { + DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID = 1, + DATA_BLOCK_DESC_CODE_SLOTS, + DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE, + DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE, + DATA_BLOCK_DESC_CODE_PRECISION +}; + +static int32_t dataBlockDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataBlockDescNode* pNode = (const SDataBlockDescNode*)pObj; + + int32_t code = tlvEncodeI16(pEncoder, DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID, pNode->dataBlockId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, DATA_BLOCK_DESC_CODE_SLOTS, nodeListToMsg, pNode->pSlots); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE, pNode->totalRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE, pNode->outputRowSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, DATA_BLOCK_DESC_CODE_PRECISION, pNode->precision); + } + + return code; +} + +static int32_t msgToDataBlockDescNode(STlvDecoder* pDecoder, void* pObj) { + SDataBlockDescNode* pNode = (SDataBlockDescNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID: + code = tlvDecodeI16(pTlv, &pNode->dataBlockId); + break; + case DATA_BLOCK_DESC_CODE_SLOTS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSlots); + break; + case DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE: + code = tlvDecodeI32(pTlv, &pNode->totalRowSize); + break; + case DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE: + code = tlvDecodeI32(pTlv, &pNode->outputRowSize); + break; + case DATA_BLOCK_DESC_CODE_PRECISION: + code = tlvDecodeU8(pTlv, &pNode->precision); + break; + default: + break; + } + } + + return code; +} + +enum { + SLOT_DESC_CODE_SLOT_ID = 1, + SLOT_DESC_CODE_DATA_TYPE, + SLOT_DESC_CODE_RESERVE, + SLOT_DESC_CODE_OUTPUT, + SLOT_DESC_CODE_TAG +}; + +static int32_t slotDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSlotDescNode* pNode = (const SSlotDescNode*)pObj; + + int32_t code = tlvEncodeI16(pEncoder, SLOT_DESC_CODE_SLOT_ID, pNode->slotId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SLOT_DESC_CODE_DATA_TYPE, dataTypeToMsg, &pNode->dataType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_RESERVE, pNode->reserve); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_OUTPUT, pNode->output); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_TAG, pNode->tag); + } + + return code; +} + +static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) { + SSlotDescNode* pNode = (SSlotDescNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case SLOT_DESC_CODE_SLOT_ID: + code = tlvDecodeI16(pTlv, &pNode->slotId); + break; + case SLOT_DESC_CODE_DATA_TYPE: + code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->dataType); + break; + case SLOT_DESC_CODE_RESERVE: + code = tlvDecodeBool(pTlv, &pNode->reserve); + break; + case SLOT_DESC_CODE_OUTPUT: + code = tlvDecodeBool(pTlv, &pNode->output); + break; + case SLOT_DESC_CODE_TAG: + code = tlvDecodeBool(pTlv, &pNode->tag); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_NODE_CODE_OUTPUT_DESC = 1, + PHY_NODE_CODE_CONDITIONS, + PHY_NODE_CODE_CHILDREN, + PHY_NODE_CODE_LIMIT, + PHY_NODE_CODE_SLIMIT +}; + +static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SPhysiNode* pNode = (const SPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_OUTPUT_DESC, nodeToMsg, pNode->pOutputDataBlockDesc); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_CONDITIONS, nodeToMsg, pNode->pConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_CHILDREN, nodeListToMsg, pNode->pChildren); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_LIMIT, nodeToMsg, pNode->pLimit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_NODE_CODE_SLIMIT, nodeToMsg, pNode->pSlimit); + } + + return code; +} + +static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) { + SPhysiNode* pNode = (SPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_NODE_CODE_OUTPUT_DESC: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOutputDataBlockDesc); + break; + case PHY_NODE_CODE_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pConditions); + break; + case PHY_NODE_CODE_CHILDREN: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pChildren); + break; + case PHY_NODE_CODE_LIMIT: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pLimit); + break; + case PHY_NODE_CODE_SLIMIT: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pSlimit); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_SCAN_CODE_BASE_NODE = 1, + PHY_SCAN_CODE_SCAN_COLS, + PHY_SCAN_CODE_SCAN_PSEUDO_COLS, + PHY_SCAN_CODE_BASE_UID, + PHY_SCAN_CODE_BASE_SUID, + PHY_SCAN_CODE_BASE_TABLE_TYPE, + PHY_SCAN_CODE_BASE_TABLE_NAME +}; + +static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SScanPhysiNode* pNode = (const SScanPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_SCAN_COLS, nodeListToMsg, pNode->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_SCAN_PSEUDO_COLS, nodeListToMsg, pNode->pScanPseudoCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_SCAN_CODE_BASE_UID, pNode->uid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_SCAN_CODE_BASE_SUID, pNode->suid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_SCAN_CODE_BASE_TABLE_TYPE, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName); + } + + return code; +} + +static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) { + SScanPhysiNode* pNode = (SScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_SCAN_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_SCAN_CODE_SCAN_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanCols); + break; + case PHY_SCAN_CODE_SCAN_PSEUDO_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanPseudoCols); + break; + case PHY_SCAN_CODE_BASE_UID: + code = tlvDecodeU64(pTlv, &pNode->uid); + break; + case PHY_SCAN_CODE_BASE_SUID: + code = tlvDecodeU64(pTlv, &pNode->suid); + break; + case PHY_SCAN_CODE_BASE_TABLE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->tableType); + break; + case PHY_SCAN_CODE_BASE_TABLE_NAME: + code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT }; + +static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, nodeListToMsg, pNode->pGroupTags); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, pNode->groupSort); + } + + return code; +} + +static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) { + SLastRowScanPhysiNode* pNode = (SLastRowScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_LAST_ROW_SCAN_CODE_SCAN: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan); + break; + case PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupTags); + break; + case PHY_LAST_ROW_SCAN_CODE_GROUP_SORT: + code = tlvDecodeBool(pTlv, &pNode->groupSort); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_TABLE_SCAN_CODE_SCAN = 1, + PHY_TABLE_SCAN_CODE_SCAN_COUNT, + PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT, + PHY_TABLE_SCAN_CODE_SCAN_RANGE, + PHY_TABLE_SCAN_CODE_RATIO, + PHY_TABLE_SCAN_CODE_DATA_REQUIRED, + PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS, + PHY_TABLE_SCAN_CODE_GROUP_TAGS, + PHY_TABLE_SCAN_CODE_GROUP_SORT, + PHY_TABLE_SCAN_CODE_INTERVAL, + PHY_TABLE_SCAN_CODE_OFFSET, + PHY_TABLE_SCAN_CODE_SLIDING, + PHY_TABLE_SCAN_CODE_INTERVAL_UNIT, + PHY_TABLE_SCAN_CODE_SLIDING_UNIT, + PHY_TABLE_SCAN_CODE_TRIGGER_TYPE, + PHY_TABLE_SCAN_CODE_WATERMARK, + PHY_TABLE_SCAN_CODE_IG_EXPIRED, + PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID, +}; + +static int32_t physiTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, PHY_TABLE_SCAN_CODE_SCAN_COUNT, pNode->scanSeq[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU8(pEncoder, PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT, pNode->scanSeq[1]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SCAN_RANGE, timeWindowToMsg, &pNode->scanRange); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeDouble(pEncoder, PHY_TABLE_SCAN_CODE_RATIO, pNode->ratio); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_TABLE_SCAN_CODE_DATA_REQUIRED, pNode->dataRequired); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS, nodeListToMsg, pNode->pDynamicScanFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_TAGS, nodeListToMsg, pNode->pGroupTags); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_SORT, pNode->groupSort); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_INTERVAL, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_OFFSET, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_SLIDING, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_INTERVAL_UNIT, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_SLIDING_UNIT, pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_TRIGGER_TYPE, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_WATERMARK, pNode->watermark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_IG_EXPIRED, pNode->igExpired); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID, pNode->assignBlockUid); + } + + return code; +} + +static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) { + STableScanPhysiNode* pNode = (STableScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_TABLE_SCAN_CODE_SCAN: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan); + break; + case PHY_TABLE_SCAN_CODE_SCAN_COUNT: + code = tlvDecodeU8(pTlv, pNode->scanSeq); + break; + case PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT: + code = tlvDecodeU8(pTlv, pNode->scanSeq + 1); + break; + case PHY_TABLE_SCAN_CODE_SCAN_RANGE: + code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, &pNode->scanRange); + break; + case PHY_TABLE_SCAN_CODE_RATIO: + code = tlvDecodeDouble(pTlv, &pNode->ratio); + break; + case PHY_TABLE_SCAN_CODE_DATA_REQUIRED: + code = tlvDecodeI32(pTlv, &pNode->dataRequired); + break; + case PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pDynamicScanFuncs); + break; + case PHY_TABLE_SCAN_CODE_GROUP_TAGS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupTags); + break; + case PHY_TABLE_SCAN_CODE_GROUP_SORT: + code = tlvDecodeBool(pTlv, &pNode->groupSort); + break; + case PHY_TABLE_SCAN_CODE_INTERVAL: + code = tlvDecodeI64(pTlv, &pNode->interval); + break; + case PHY_TABLE_SCAN_CODE_OFFSET: + code = tlvDecodeI64(pTlv, &pNode->offset); + break; + case PHY_TABLE_SCAN_CODE_SLIDING: + code = tlvDecodeI64(pTlv, &pNode->sliding); + break; + case PHY_TABLE_SCAN_CODE_INTERVAL_UNIT: + code = tlvDecodeI8(pTlv, &pNode->intervalUnit); + break; + case PHY_TABLE_SCAN_CODE_SLIDING_UNIT: + code = tlvDecodeI8(pTlv, &pNode->slidingUnit); + break; + case PHY_TABLE_SCAN_CODE_TRIGGER_TYPE: + code = tlvDecodeI8(pTlv, &pNode->triggerType); + break; + case PHY_TABLE_SCAN_CODE_WATERMARK: + code = tlvDecodeI64(pTlv, &pNode->watermark); + break; + case PHY_TABLE_SCAN_CODE_IG_EXPIRED: + code = tlvDecodeI8(pTlv, &pNode->igExpired); + break; + case PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID: + code = tlvDecodeBool(pTlv, &pNode->assignBlockUid); + break; + default: + break; + } + } + + return code; +} + +enum { EP_CODE_FQDN = 1, EP_CODE_port }; + +static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SEp* pNode = (const SEp*)pObj; + + int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port); + } + + return code; +} + +static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) { + SEp* pNode = (SEp*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case EP_CODE_FQDN: + code = tlvDecodeCStr(pTlv, pNode->fqdn); + break; + case EP_CODE_port: + code = tlvDecodeU16(pTlv, &pNode->port); + break; + default: + break; + } + } + + return code; +} + +enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS }; + +static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SEpSet* pNode = (const SEpSet*)pObj; + + int32_t code = tlvEncodeI8(pEncoder, EP_SET_CODE_IN_USE, pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, EP_SET_CODE_NUM_OF_EPS, pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObjArray(pEncoder, EP_SET_CODE_EPS, epToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps); + } + + return code; +} + +static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) { + SEpSet* pNode = (SEpSet*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case EP_SET_CODE_IN_USE: + code = tlvDecodeI8(pTlv, &pNode->inUse); + break; + case EP_SET_CODE_NUM_OF_EPS: + code = tlvDecodeI8(pTlv, &pNode->numOfEps); + break; + case EP_SET_CODE_EPS: + code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->eps, sizeof(SEp)); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_SYSTABLE_SCAN_CODE_SCAN = 1, + PHY_SYSTABLE_SCAN_CODE_MGMT_EP_SET, + PHY_SYSTABLE_SCAN_CODE_SHOW_REWRITE, + PHY_SYSTABLE_SCAN_CODE_ACCOUNT_ID, + PHY_SYSTABLE_SCAN_CODE_SYS_INFO +}; + +static int32_t physiSysTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSystemTableScanPhysiNode* pNode = (const SSystemTableScanPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_SYSTABLE_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SYSTABLE_SCAN_CODE_MGMT_EP_SET, epSetToMsg, &pNode->mgmtEpSet); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SYSTABLE_SCAN_CODE_SHOW_REWRITE, pNode->showRewrite); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_SYSTABLE_SCAN_CODE_ACCOUNT_ID, pNode->accountId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SYSTABLE_SCAN_CODE_SYS_INFO, pNode->sysInfo); + } + + return code; +} + +static int32_t msgToPhysiSysTableScanNode(STlvDecoder* pDecoder, void* pObj) { + SSystemTableScanPhysiNode* pNode = (SSystemTableScanPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_SYSTABLE_SCAN_CODE_SCAN: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan); + break; + case PHY_SYSTABLE_SCAN_CODE_MGMT_EP_SET: + code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->mgmtEpSet); + break; + case PHY_SYSTABLE_SCAN_CODE_SHOW_REWRITE: + code = tlvDecodeBool(pTlv, &pNode->showRewrite); + break; + case PHY_SYSTABLE_SCAN_CODE_ACCOUNT_ID: + code = tlvDecodeI32(pTlv, &pNode->accountId); + break; + case PHY_SYSTABLE_SCAN_CODE_SYS_INFO: + code = tlvDecodeBool(pTlv, &pNode->sysInfo); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_PROJECT_CODE_BASE_NODE = 1, + PHY_PROJECT_CODE_PROJECTIONS, + PHY_PROJECT_CODE_MERGE_DATA_BLOCK, + PHY_PROJECT_CODE_IGNORE_GROUP_ID +}; + +static int32_t physiProjectNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_PROJECT_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_PROJECT_CODE_PROJECTIONS, nodeListToMsg, pNode->pProjections); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_MERGE_DATA_BLOCK, pNode->mergeDataBlock); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId); + } + + return code; +} + +static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) { + SProjectPhysiNode* pNode = (SProjectPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_PROJECT_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_PROJECT_CODE_PROJECTIONS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pProjections); + break; + case PHY_PROJECT_CODE_MERGE_DATA_BLOCK: + code = tlvDecodeBool(pTlv, &pNode->mergeDataBlock); + break; + case PHY_PROJECT_CODE_IGNORE_GROUP_ID: + code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_SORT_MERGE_JOIN_CODE_BASE_NODE = 1, + PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, + PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, + PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, + PHY_SORT_MERGE_JOIN_CODE_TARGETS, + PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER +}; + +static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE, pNode->joinType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, nodeToMsg, pNode->pMergeCondition); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pOnConditions); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); + } + + return code; +} + +static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { + SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_SORT_MERGE_JOIN_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_SORT_MERGE_JOIN_CODE_JOIN_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType)); + break; + case PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pMergeCondition); + break; + case PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pOnConditions); + break; + case PHY_SORT_MERGE_JOIN_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; + case PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_AGG_CODE_BASE_NODE = 1, + PHY_AGG_CODE_EXPR, + PHY_AGG_CODE_GROUP_KEYS, + PHY_AGG_CODE_AGG_FUNCS, + PHY_AGG_CODE_MERGE_DATA_BLOCK +}; + +static int32_t physiAggNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SAggPhysiNode* pNode = (const SAggPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_AGG_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_AGG_CODE_EXPR, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_AGG_CODE_GROUP_KEYS, nodeListToMsg, pNode->pGroupKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_AGG_CODE_AGG_FUNCS, nodeListToMsg, pNode->pAggFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_AGG_CODE_MERGE_DATA_BLOCK, pNode->mergeDataBlock); + } + + return code; +} + +static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) { + SAggPhysiNode* pNode = (SAggPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_AGG_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_AGG_CODE_EXPR: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_AGG_CODE_GROUP_KEYS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupKeys); + break; + case PHY_AGG_CODE_AGG_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pAggFuncs); + break; + case PHY_AGG_CODE_MERGE_DATA_BLOCK: + code = tlvDecodeBool(pTlv, &pNode->mergeDataBlock); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_EXCHANGE_CODE_BASE_NODE = 1, + PHY_EXCHANGE_CODE_SRC_GROUP_ID, + PHY_EXCHANGE_CODE_SINGLE_CHANNEL, + PHY_EXCHANGE_CODE_SRC_ENDPOINTS +}; + +static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_GROUP_ID, pNode->srcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, pNode->singleChannel); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_SRC_ENDPOINTS, nodeListToMsg, pNode->pSrcEndPoints); + } + + return code; +} + +static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) { + SExchangePhysiNode* pNode = (SExchangePhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_EXCHANGE_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_EXCHANGE_CODE_SRC_GROUP_ID: + code = tlvDecodeI32(pTlv, &pNode->srcGroupId); + break; + case PHY_EXCHANGE_CODE_SINGLE_CHANNEL: + code = tlvDecodeBool(pTlv, &pNode->singleChannel); + break; + case PHY_EXCHANGE_CODE_SRC_ENDPOINTS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSrcEndPoints); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_MERGE_CODE_BASE_NODE = 1, + PHY_MERGE_CODE_MERGE_KEYS, + PHY_MERGE_CODE_TARGETS, + PHY_MERGE_CODE_NUM_OF_CHANNELS, + PHY_MERGE_CODE_SRC_GROUP_ID, + PHY_MERGE_CODE_GROUP_SORT +}; + +static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_MERGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_MERGE_CODE_MERGE_KEYS, nodeListToMsg, pNode->pMergeKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_MERGE_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_MERGE_CODE_NUM_OF_CHANNELS, pNode->numOfChannels); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_MERGE_CODE_SRC_GROUP_ID, pNode->srcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_GROUP_SORT, pNode->groupSort); + } + + return code; +} + +static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { + SMergePhysiNode* pNode = (SMergePhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_MERGE_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_MERGE_CODE_MERGE_KEYS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pMergeKeys); + break; + case PHY_MERGE_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; + case PHY_MERGE_CODE_NUM_OF_CHANNELS: + code = tlvDecodeI32(pTlv, &pNode->numOfChannels); + break; + case PHY_MERGE_CODE_SRC_GROUP_ID: + code = tlvDecodeI32(pTlv, &pNode->srcGroupId); + break; + case PHY_MERGE_CODE_GROUP_SORT: + code = tlvDecodeBool(pTlv, &pNode->groupSort); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS }; + +static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_EXPR, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_SORT_KEYS, nodeListToMsg, pNode->pSortKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } + + return code; +} + +static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) { + SSortPhysiNode* pNode = (SSortPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_SORT_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_SORT_CODE_EXPR: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_SORT_CODE_SORT_KEYS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSortKeys); + break; + case PHY_SORT_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_WINDOW_CODE_BASE_NODE = 1, + PHY_WINDOW_CODE_EXPR, + PHY_WINDOW_CODE_FUNCS, + PHY_WINDOW_CODE_TS_PK, + PHY_WINDOW_CODE_TS_END, + PHY_WINDOW_CODE_TRIGGER_TYPE, + PHY_WINDOW_CODE_WATERMARK, + PHY_WINDOW_CODE_IG_EXPIRED, + PHY_WINDOW_CODE_INPUT_TS_ORDER, + PHY_WINDOW_CODE_OUTPUT_TS_ORDER, + PHY_WINDOW_CODE_MERGE_DATA_BLOCK +}; + +static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_EXPR, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_FUNCS, nodeListToMsg, pNode->pFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_TS_PK, nodeToMsg, pNode->pTspk); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_WINDOW_CODE_TS_END, nodeToMsg, pNode->pTsEnd); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_TRIGGER_TYPE, pNode->triggerType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_WINDOW_CODE_WATERMARK, pNode->watermark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_IG_EXPIRED, pNode->igExpired); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_WINDOW_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_WINDOW_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_WINDOW_CODE_MERGE_DATA_BLOCK, pNode->mergeDataBlock); + } + + return code; +} + +static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) { + SWinodwPhysiNode* pNode = (SWinodwPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_WINDOW_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_WINDOW_CODE_EXPR: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_WINDOW_CODE_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFuncs); + break; + case PHY_WINDOW_CODE_TS_PK: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTspk); + break; + case PHY_WINDOW_CODE_TS_END: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTsEnd); + break; + case PHY_WINDOW_CODE_TRIGGER_TYPE: + code = tlvDecodeI8(pTlv, &pNode->triggerType); + break; + case PHY_WINDOW_CODE_WATERMARK: + code = tlvDecodeI64(pTlv, &pNode->watermark); + break; + case PHY_WINDOW_CODE_IG_EXPIRED: + code = tlvDecodeI8(pTlv, &pNode->igExpired); + break; + case PHY_WINDOW_CODE_INPUT_TS_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); + break; + case PHY_WINDOW_CODE_OUTPUT_TS_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder)); + break; + case PHY_WINDOW_CODE_MERGE_DATA_BLOCK: + code = tlvDecodeBool(pTlv, &pNode->mergeDataBlock); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_INTERVAL_CODE_WINDOW = 1, + PHY_INTERVAL_CODE_INTERVAL, + PHY_INTERVAL_CODE_OFFSET, + PHY_INTERVAL_CODE_SLIDING, + PHY_INTERVAL_CODE_INTERVAL_UNIT, + PHY_INTERVAL_CODE_SLIDING_UNIT +}; + +static int32_t physiIntervalNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_INTERVAL_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_INTERVAL, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_OFFSET, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_INTERVAL_CODE_SLIDING, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_INTERVAL_UNIT, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_INTERVAL_CODE_SLIDING_UNIT, pNode->slidingUnit); + } + + return code; +} + +static int32_t msgToPhysiIntervalNode(STlvDecoder* pDecoder, void* pObj) { + SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_INTERVAL_CODE_WINDOW: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); + break; + case PHY_INTERVAL_CODE_INTERVAL: + code = tlvDecodeI64(pTlv, &pNode->interval); + break; + case PHY_INTERVAL_CODE_OFFSET: + code = tlvDecodeI64(pTlv, &pNode->offset); + break; + case PHY_INTERVAL_CODE_SLIDING: + code = tlvDecodeI64(pTlv, &pNode->sliding); + break; + case PHY_INTERVAL_CODE_INTERVAL_UNIT: + code = tlvDecodeI8(pTlv, &pNode->intervalUnit); + break; + case PHY_INTERVAL_CODE_SLIDING_UNIT: + code = tlvDecodeI8(pTlv, &pNode->slidingUnit); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_FILL_CODE_BASE_NODE = 1, + PHY_FILL_CODE_MODE, + PHY_FILL_CODE_FILL_EXPRS, + PHY_FILL_CODE_NOT_FILL_EXPRS, + PHY_FILL_CODE_WSTART, + PHY_FILL_CODE_VALUES, + PHY_FILL_CODE_TIME_RANGE, + PHY_FILL_CODE_INPUT_TS_ORDER +}; + +static int32_t physiFillNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SFillPhysiNode* pNode = (const SFillPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_FILL_CODE_MODE, pNode->mode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_FILL_EXPRS, nodeListToMsg, pNode->pFillExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_NOT_FILL_EXPRS, nodeListToMsg, pNode->pNotFillExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_WSTART, nodeToMsg, pNode->pWStartTs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_VALUES, nodeToMsg, pNode->pValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_FILL_CODE_TIME_RANGE, timeWindowToMsg, &pNode->timeRange); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_FILL_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); + } + + return code; +} + +static int32_t msgToPhysiFillNode(STlvDecoder* pDecoder, void* pObj) { + SFillPhysiNode* pNode = (SFillPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_FILL_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_FILL_CODE_MODE: + code = tlvDecodeEnum(pTlv, &pNode->mode, sizeof(pNode->mode)); + break; + case PHY_FILL_CODE_FILL_EXPRS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFillExprs); + break; + case PHY_FILL_CODE_NOT_FILL_EXPRS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pNotFillExprs); + break; + case PHY_FILL_CODE_WSTART: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pWStartTs); + break; + case PHY_FILL_CODE_VALUES: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pValues); + break; + case PHY_FILL_CODE_TIME_RANGE: + code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, (void**)&pNode->timeRange); + break; + case PHY_FILL_CODE_INPUT_TS_ORDER: + code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_SESSION_CODE_WINDOW = 1, PHY_SESSION_CODE_GAP }; + +static int32_t physiSessionWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSessionWinodwPhysiNode* pNode = (const SSessionWinodwPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_SESSION_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_SESSION_CODE_GAP, pNode->gap); + } + + return code; +} + +static int32_t msgToPhysiSessionWindowNode(STlvDecoder* pDecoder, void* pObj) { + SSessionWinodwPhysiNode* pNode = (SSessionWinodwPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_SESSION_CODE_WINDOW: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); + break; + case PHY_SESSION_CODE_GAP: + code = tlvDecodeI64(pTlv, &pNode->gap); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_STATE_CODE_WINDOW = 1, PHY_STATE_CODE_KEY }; + +static int32_t physiStateWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SStateWinodwPhysiNode* pNode = (const SStateWinodwPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_STATE_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_STATE_CODE_KEY, nodeToMsg, pNode->pStateKey); + } + + return code; +} + +static int32_t msgToPhysiStateWindowNode(STlvDecoder* pDecoder, void* pObj) { + SStateWinodwPhysiNode* pNode = (SStateWinodwPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_STATE_CODE_WINDOW: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window); + break; + case PHY_STATE_CODE_KEY: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStateKey); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS }; + +static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_EXPR, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_KEYS, nodeListToMsg, pNode->pPartitionKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } + + return code; +} + +static int32_t msgToPhysiPartitionNode(STlvDecoder* pDecoder, void* pObj) { + SPartitionPhysiNode* pNode = (SPartitionPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_PARTITION_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_PARTITION_CODE_EXPR: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_PARTITION_CODE_KEYS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pPartitionKeys); + break; + case PHY_PARTITION_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_INDEF_ROWS_FUNC_CODE_BASE_NODE = 1, PHY_INDEF_ROWS_FUNC_CODE_EXPRS, PHY_INDEF_ROWS_FUNC_CODE_FUNCS }; + +static int32_t physiIndefRowsFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SIndefRowsFuncPhysiNode* pNode = (const SIndefRowsFuncPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_INDEF_ROWS_FUNC_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_INDEF_ROWS_FUNC_CODE_EXPRS, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_INDEF_ROWS_FUNC_CODE_FUNCS, nodeListToMsg, pNode->pFuncs); + } + + return code; +} + +static int32_t msgToPhysiIndefRowsFuncNode(STlvDecoder* pDecoder, void* pObj) { + SIndefRowsFuncPhysiNode* pNode = (SIndefRowsFuncPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_INDEF_ROWS_FUNC_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_INDEF_ROWS_FUNC_CODE_EXPRS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_INDEF_ROWS_FUNC_CODE_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFuncs); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_INERP_FUNC_CODE_BASE_NODE = 1, + PHY_INERP_FUNC_CODE_EXPR, + PHY_INERP_FUNC_CODE_FUNCS, + PHY_INERP_FUNC_CODE_TIME_RANGE, + PHY_INERP_FUNC_CODE_INTERVAL, + PHY_INERP_FUNC_CODE_INTERVAL_UNIT, + PHY_INERP_FUNC_CODE_FILL_MODE, + PHY_INERP_FUNC_CODE_FILL_VALUES, + PHY_INERP_FUNC_CODE_TIME_SERIES +}; + +static int32_t physiInterpFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_EXPR, nodeListToMsg, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_FUNCS, nodeListToMsg, pNode->pFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_TIME_RANGE, timeWindowToMsg, &pNode->timeRange); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_INERP_FUNC_CODE_INTERVAL, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_INERP_FUNC_CODE_INTERVAL_UNIT, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_INERP_FUNC_CODE_FILL_MODE, pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_FILL_VALUES, nodeToMsg, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_INERP_FUNC_CODE_TIME_SERIES, nodeToMsg, pNode->pTimeSeries); + } + + return code; +} + +static int32_t msgToPhysiInterpFuncNode(STlvDecoder* pDecoder, void* pObj) { + SInterpFuncPhysiNode* pNode = (SInterpFuncPhysiNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_INERP_FUNC_CODE_BASE_NODE: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); + break; + case PHY_INERP_FUNC_CODE_EXPR: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs); + break; + case PHY_INERP_FUNC_CODE_FUNCS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFuncs); + break; + case PHY_INERP_FUNC_CODE_TIME_RANGE: + code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, &pNode->timeRange); + break; + case PHY_INERP_FUNC_CODE_INTERVAL: + code = tlvDecodeI64(pTlv, &pNode->interval); + break; + case PHY_INERP_FUNC_CODE_INTERVAL_UNIT: + code = tlvDecodeI8(pTlv, &pNode->intervalUnit); + break; + case PHY_INERP_FUNC_CODE_FILL_MODE: + code = tlvDecodeEnum(pTlv, &pNode->fillMode, sizeof(pNode->fillMode)); + break; + case PHY_INERP_FUNC_CODE_FILL_VALUES: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFillValues); + break; + case PHY_INERP_FUNC_CODE_TIME_SERIES: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTimeSeries); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_DATA_SINK_CODE_INPUT_DESC = 1 }; + +static int32_t physicDataSinkNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataSinkNode* pNode = (const SDataSinkNode*)pObj; + return tlvEncodeObj(pEncoder, PHY_DATA_SINK_CODE_INPUT_DESC, nodeToMsg, pNode->pInputDataBlockDesc); +} + +static int32_t msgToPhysicDataSinkNode(STlvDecoder* pDecoder, void* pObj) { + SDataSinkNode* pNode = (SDataSinkNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_DATA_SINK_CODE_INPUT_DESC: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pInputDataBlockDesc); + break; + default: + break; + } + } + + return code; +} + +enum { PHY_DISPATCH_CODE_SINK = 1 }; + +static int32_t physiDispatchNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataDispatcherNode* pNode = (const SDataDispatcherNode*)pObj; + return tlvEncodeObj(pEncoder, PHY_DISPATCH_CODE_SINK, physicDataSinkNodeToMsg, &pNode->sink); +} + +static int32_t msgToPhysiDispatchNode(STlvDecoder* pDecoder, void* pObj) { + SDataDispatcherNode* pNode = (SDataDispatcherNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_DISPATCH_CODE_SINK: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysicDataSinkNode, &pNode->sink); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_QUERY_INSERT_CODE_SINK = 1, + PHY_QUERY_INSERT_CODE_COLS, + PHY_QUERY_INSERT_CODE_TABLE_ID, + PHY_QUERY_INSERT_CODE_STABLE_ID, + PHY_QUERY_INSERT_CODE_TABLE_TYPE, + PHY_QUERY_INSERT_CODE_TABLE_NAME, + PHY_QUERY_INSERT_CODE_VG_ID, + PHY_QUERY_INSERT_CODE_EP_SET +}; + +static int32_t physiQueryInsertNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_QUERY_INSERT_CODE_SINK, physicDataSinkNodeToMsg, &pNode->sink); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_QUERY_INSERT_CODE_COLS, nodeListToMsg, pNode->pCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_QUERY_INSERT_CODE_TABLE_ID, pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_QUERY_INSERT_CODE_STABLE_ID, pNode->stableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_QUERY_INSERT_CODE_TABLE_TYPE, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, PHY_QUERY_INSERT_CODE_TABLE_NAME, pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_QUERY_INSERT_CODE_VG_ID, pNode->vgId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_QUERY_INSERT_CODE_EP_SET, epSetToMsg, &pNode->epSet); + } + + return code; +} + +static int32_t msgToPhysiQueryInsertNode(STlvDecoder* pDecoder, void* pObj) { + SQueryInserterNode* pNode = (SQueryInserterNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_QUERY_INSERT_CODE_SINK: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysicDataSinkNode, &pNode->sink); + break; + case PHY_QUERY_INSERT_CODE_COLS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pCols); + break; + case PHY_QUERY_INSERT_CODE_TABLE_ID: + code = tlvDecodeU64(pTlv, &pNode->tableId); + break; + case PHY_QUERY_INSERT_CODE_STABLE_ID: + code = tlvDecodeU64(pTlv, &pNode->stableId); + break; + case PHY_QUERY_INSERT_CODE_TABLE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->tableType); + break; + case PHY_QUERY_INSERT_CODE_TABLE_NAME: + code = tlvDecodeCStr(pTlv, pNode->tableName); + break; + case PHY_QUERY_INSERT_CODE_VG_ID: + code = tlvDecodeI32(pTlv, &pNode->vgId); + break; + case PHY_QUERY_INSERT_CODE_EP_SET: + code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet); + break; + default: + break; + } + } + + return code; +} + +enum { + PHY_DELETER_CODE_SINK = 1, + PHY_DELETER_CODE_TABLE_ID, + PHY_DELETER_CODE_TABLE_TYPE, + PHY_DELETER_CODE_TABLE_FNAME, + PHY_DELETER_CODE_TS_COL_NAME, + PHY_DELETER_CODE_DELETE_TIME_RANGE, + PHY_DELETER_CODE_AFFECTED_ROWS +}; + +static int32_t physiDeleteNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDataDeleterNode* pNode = (const SDataDeleterNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_SINK, physicDataSinkNodeToMsg, &pNode->sink); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, PHY_DELETER_CODE_TABLE_ID, pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, PHY_DELETER_CODE_TABLE_TYPE, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, PHY_DELETER_CODE_TABLE_FNAME, pNode->tableFName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, PHY_DELETER_CODE_TS_COL_NAME, pNode->tsColName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_DELETE_TIME_RANGE, timeWindowToMsg, &pNode->deleteTimeRange); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_DELETER_CODE_AFFECTED_ROWS, nodeToMsg, pNode->pAffectedRows); + } + + return code; +} + +static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) { + SDataDeleterNode* pNode = (SDataDeleterNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case PHY_DELETER_CODE_SINK: + code = tlvDecodeObjFromTlv(pTlv, msgToPhysicDataSinkNode, &pNode->sink); + break; + case PHY_DELETER_CODE_TABLE_ID: + code = tlvDecodeU64(pTlv, &pNode->tableId); + break; + case PHY_DELETER_CODE_TABLE_TYPE: + code = tlvDecodeI8(pTlv, &pNode->tableType); + break; + case PHY_DELETER_CODE_TABLE_FNAME: + code = tlvDecodeCStr(pTlv, pNode->tableFName); + break; + case PHY_DELETER_CODE_TS_COL_NAME: + code = tlvDecodeCStr(pTlv, pNode->tsColName); + break; + case PHY_DELETER_CODE_DELETE_TIME_RANGE: + code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, &pNode->deleteTimeRange); + break; + case PHY_DELETER_CODE_AFFECTED_ROWS: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAffectedRows); + break; + default: + break; + } + } + + return code; +} + +enum { SUBPLAN_ID_CODE_QUERY_ID = 1, SUBPLAN_ID_CODE_GROUP_ID, SUBPLAN_ID_CODE_SUBPLAN_ID }; + +static int32_t subplanIdToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSubplanId* pNode = (const SSubplanId*)pObj; + + int32_t code = tlvEncodeU64(pEncoder, SUBPLAN_ID_CODE_QUERY_ID, pNode->queryId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_ID_CODE_GROUP_ID, pNode->groupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_ID_CODE_SUBPLAN_ID, pNode->subplanId); + } + + return code; +} + +static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) { + SSubplanId* pNode = (SSubplanId*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case SUBPLAN_ID_CODE_QUERY_ID: + code = tlvDecodeU64(pTlv, &pNode->queryId); + break; + case SUBPLAN_ID_CODE_GROUP_ID: + code = tlvDecodeI32(pTlv, &pNode->groupId); + break; + case SUBPLAN_ID_CODE_SUBPLAN_ID: + code = tlvDecodeI32(pTlv, &pNode->subplanId); + break; + default: + break; + } + } + + return code; +} + +enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET }; + +static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj; + + int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, QUERY_NODE_ADDR_CODE_EP_SET, epSetToMsg, &pNode->epSet); + } + + return code; +} + +static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) { + SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case QUERY_NODE_ADDR_CODE_NODE_ID: + code = tlvDecodeI32(pTlv, &pNode->nodeId); + break; + case QUERY_NODE_ADDR_CODE_EP_SET: + code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet); + break; + } + } + + return code; +} + +enum { + SUBPLAN_CODE_SUBPLAN_ID = 1, + SUBPLAN_CODE_SUBPLAN_TYPE, + SUBPLAN_CODE_MSG_TYPE, + SUBPLAN_CODE_LEVEL, + SUBPLAN_CODE_DBFNAME, + SUBPLAN_CODE_USER, + SUBPLAN_CODE_EXECNODE, + SUBPLAN_CODE_ROOT_NODE, + SUBPLAN_CODE_DATA_SINK, + SUBPLAN_CODE_TAG_COND, + SUBPLAN_CODE_TAG_INDEX_COND +}; + +static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SSubplan* pNode = (const SSubplan*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_SUBPLAN_ID, subplanIdToMsg, &pNode->id); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, SUBPLAN_CODE_SUBPLAN_TYPE, pNode->subplanType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_CODE_MSG_TYPE, pNode->msgType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SUBPLAN_CODE_LEVEL, pNode->level); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, SUBPLAN_CODE_DBFNAME, pNode->dbFName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, SUBPLAN_CODE_USER, pNode->user); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_EXECNODE, queryNodeAddrToMsg, &pNode->execNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_ROOT_NODE, nodeToMsg, pNode->pNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_DATA_SINK, nodeToMsg, pNode->pDataSink); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_TAG_COND, nodeToMsg, pNode->pTagCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_TAG_INDEX_COND, nodeToMsg, pNode->pTagIndexCond); + } + + return code; +} + +static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { + SSubplan* pNode = (SSubplan*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case SUBPLAN_CODE_SUBPLAN_ID: + code = tlvDecodeObjFromTlv(pTlv, msgToSubplanId, &pNode->id); + break; + case SUBPLAN_CODE_SUBPLAN_TYPE: + code = tlvDecodeEnum(pTlv, &pNode->subplanType, sizeof(pNode->subplanType)); + break; + case SUBPLAN_CODE_MSG_TYPE: + code = tlvDecodeI32(pTlv, &pNode->msgType); + break; + case SUBPLAN_CODE_LEVEL: + code = tlvDecodeI32(pTlv, &pNode->level); + break; + case SUBPLAN_CODE_DBFNAME: + code = tlvDecodeCStr(pTlv, pNode->dbFName); + break; + case SUBPLAN_CODE_USER: + code = tlvDecodeCStr(pTlv, pNode->user); + break; + case SUBPLAN_CODE_EXECNODE: + code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->execNode); + break; + case SUBPLAN_CODE_ROOT_NODE: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pNode); + break; + case SUBPLAN_CODE_DATA_SINK: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pDataSink); + break; + case SUBPLAN_CODE_TAG_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagCond); + break; + case SUBPLAN_CODE_TAG_INDEX_COND: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagIndexCond); + break; + default: + break; + } + } + + return code; +} + +enum { QUERY_PLAN_CODE_QUERY_ID = 1, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, QUERY_PLAN_CODE_SUBPLANS }; + +static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SQueryPlan* pNode = (const SQueryPlan*)pObj; + + int32_t code = tlvEncodeU64(pEncoder, QUERY_PLAN_CODE_QUERY_ID, pNode->queryId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, pNode->numOfSubplans); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, QUERY_PLAN_CODE_SUBPLANS, nodeListToMsg, pNode->pSubplans); + } + + return code; +} + +static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) { + SQueryPlan* pNode = (SQueryPlan*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case QUERY_PLAN_CODE_QUERY_ID: + code = tlvDecodeU64(pTlv, &pNode->queryId); + break; + case QUERY_PLAN_CODE_NUM_OF_SUBPLANS: + code = tlvDecodeI32(pTlv, &pNode->numOfSubplans); + break; + case QUERY_PLAN_CODE_SUBPLANS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSubplans); + break; + default: + break; + } + } + + return code; +} + +static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + int32_t code = TSDB_CODE_SUCCESS; + switch (nodeType(pObj)) { + case QUERY_NODE_COLUMN: + code = columnNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_VALUE: + code = valueNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_OPERATOR: + code = operatorNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_LOGIC_CONDITION: + code = logicConditionNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_FUNCTION: + code = functionNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_ORDER_BY_EXPR: + code = orderByExprNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_LIMIT: + code = limitNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_NODE_LIST: + code = nodeListNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_TARGET: + code = targetNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_DATABLOCK_DESC: + code = dataBlockDescNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_SLOT_DESC: + code = slotDescNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_LEFT_VALUE: + break; + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + code = physiScanNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: + code = physiLastRowScanNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: + code = physiTableScanNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + code = physiSysTableScanNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + code = physiProjectNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: + code = physiJoinNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: + code = physiAggNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + code = physiExchangeNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + code = physiMergeNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_SORT: + case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: + code = physiSortNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: + code = physiIntervalNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_FILL: + code = physiFillNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: + code = physiSessionWindowNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: + code = physiStateWindowNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: + code = physiPartitionNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + code = physiIndefRowsFuncNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + code = physiInterpFuncNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + code = physiDispatchNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + code = physiQueryInsertNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN_DELETE: + code = physiDeleteNodeToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_SUBPLAN: + code = subplanToMsg(pObj, pEncoder); + break; + case QUERY_NODE_PHYSICAL_PLAN: + code = queryPlanToMsg(pObj, pEncoder); + break; + default: + nodesWarn("specificNodeToMsg unknown node = %s", nodesNodeName(nodeType(pObj))); + break; + } + if (TSDB_CODE_SUCCESS != code) { + nodesError("specificNodeToMsg error node = %s", nodesNodeName(nodeType(pObj))); + } + return code; +} + +static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { + int32_t code = TSDB_CODE_SUCCESS; + switch (nodeType(pObj)) { + case QUERY_NODE_COLUMN: + code = msgToColumnNode(pDecoder, pObj); + break; + case QUERY_NODE_VALUE: + code = msgToValueNode(pDecoder, pObj); + break; + case QUERY_NODE_OPERATOR: + code = msgToOperatorNode(pDecoder, pObj); + break; + case QUERY_NODE_LOGIC_CONDITION: + code = msgToLogicConditionNode(pDecoder, pObj); + break; + case QUERY_NODE_FUNCTION: + code = msgToFunctionNode(pDecoder, pObj); + break; + case QUERY_NODE_ORDER_BY_EXPR: + code = msgToOrderByExprNode(pDecoder, pObj); + break; + case QUERY_NODE_LIMIT: + code = msgToLimitNode(pDecoder, pObj); + break; + case QUERY_NODE_NODE_LIST: + code = msgToNodeListNode(pDecoder, pObj); + break; + case QUERY_NODE_TARGET: + code = msgToTargetNode(pDecoder, pObj); + break; + case QUERY_NODE_DATABLOCK_DESC: + code = msgToDataBlockDescNode(pDecoder, pObj); + break; + case QUERY_NODE_SLOT_DESC: + code = msgToSlotDescNode(pDecoder, pObj); + break; + case QUERY_NODE_LEFT_VALUE: + break; + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + code = msgToPhysiScanNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: + code = msgToPhysiLastRowScanNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: + code = msgToPhysiTableScanNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + code = msgToPhysiSysTableScanNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + code = msgToPhysiProjectNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: + code = msgToPhysiJoinNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: + code = msgToPhysiAggNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + code = msgToPhysiExchangeNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + code = msgToPhysiMergeNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_SORT: + case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: + code = msgToPhysiSortNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: + code = msgToPhysiIntervalNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_FILL: + code = msgToPhysiFillNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: + code = msgToPhysiSessionWindowNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: + code = msgToPhysiStateWindowNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: + code = msgToPhysiPartitionNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + code = msgToPhysiIndefRowsFuncNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + code = msgToPhysiInterpFuncNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + code = msgToPhysiDispatchNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + code = msgToPhysiQueryInsertNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN_DELETE: + code = msgToPhysiDeleteNode(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_SUBPLAN: + code = msgToSubplan(pDecoder, pObj); + break; + case QUERY_NODE_PHYSICAL_PLAN: + code = msgToQueryPlan(pDecoder, pObj); + break; + default: + nodesWarn("msgToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj))); + break; + } + if (TSDB_CODE_SUCCESS != code) { + nodesError("msgToSpecificNode error node = %s", nodesNodeName(nodeType(pObj))); + } + return code; +} + +static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + return tlvEncodeObj(pEncoder, nodeType(pObj), specificNodeToMsg, pObj); +} + +static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj) { + return tlvDecodeDynObj(pDecoder, (FMakeObject)nodesMakeNode, msgToSpecificNode, pObj); +} + +static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return msgToNode(&decoder, pObj); +} + +static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SNodeList* pList = (const SNodeList*)pObj; + + SNode* pNode = NULL; + FOREACH(pNode, pList) { + int32_t code = nodeToMsg(pNode, pEncoder); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) { + SNodeList* pList = nodesMakeList(); + + int32_t code = TSDB_CODE_SUCCESS; + while (TSDB_CODE_SUCCESS == code && !tlvDecodeEnd(pDecoder)) { + SNode* pNode = NULL; + code = msgToNode(pDecoder, (void**)&pNode); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListAppend(pList, pNode); + } + } + if (TSDB_CODE_SUCCESS == code) { + *pObj = pList; + } else { + nodesDestroyList(pList); + } + return code; +} + +static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj) { + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + return msgToNodeList(&decoder, pObj); +} + +int32_t nodesNodeToMsg(const SNode* pNode, char** pMsg, int32_t* pLen) { + if (NULL == pNode || NULL == pMsg || NULL == pLen) { + terrno = TSDB_CODE_FAILED; + return TSDB_CODE_FAILED; + } + + STlvEncoder encoder; + int32_t code = initTlvEncoder(&encoder); + if (TSDB_CODE_SUCCESS == code) { + code = nodeToMsg(pNode, &encoder); + } + if (TSDB_CODE_SUCCESS == code) { + endTlvEncode(&encoder, pMsg, pLen); + } + clearTlvEncoder(&encoder); + + terrno = code; + return code; +} + +int32_t nodesMsgToNode(const char* pMsg, int32_t len, SNode** pNode) { + if (NULL == pMsg || NULL == pNode) { + return TSDB_CODE_SUCCESS; + } + + STlvDecoder decoder = {.bufSize = len, .offset = 0, .pBuf = pMsg}; + int32_t code = msgToNode(&decoder, (void**)pNode); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(*pNode); + *pNode = NULL; + } + + terrno = code; + return code; +} diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 96f7d29230bafc94639be35fcc56550c029ffbac..b280b32a94f7d824bcd475573f3f62744c3e3d26 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -19,6 +19,7 @@ #include #include +#include #include "cmdnodes.h" #include "mockCatalogService.h" @@ -251,6 +252,7 @@ class PlannerTestBaseImpl { string splitLogicPlan_; string scaledLogicPlan_; string physiPlan_; + string physiPlanMsg_; vector physiSubplans_; }; @@ -274,6 +276,7 @@ class PlannerTestBaseImpl { res_.splitLogicPlan_.clear(); res_.scaledLogicPlan_.clear(); res_.physiPlan_.clear(); + res_.physiPlanMsg_.clear(); res_.physiSubplans_.clear(); } @@ -408,6 +411,8 @@ class PlannerTestBaseImpl { SNode* pSubplan; FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { res_.physiSubplans_.push_back(toString(pSubplan)); } } + res_.physiPlanMsg_ = toMsg((SNode*)(*pPlan)); + cout << "json len: " << res_.physiPlan_.length() << ", msg len: " << res_.physiPlanMsg_.length() << endl; } void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { @@ -446,12 +451,45 @@ class PlannerTestBaseImpl { string toString(const SNode* pRoot) { char* pStr = NULL; int32_t len = 0; + + auto start = chrono::steady_clock::now(); DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len) + if (QUERY_NODE_PHYSICAL_PLAN == nodeType(pRoot)) { + cout << "nodesNodeToString: " + << chrono::duration_cast(chrono::steady_clock::now() - start).count() << "us" << endl; + } + string str(pStr); taosMemoryFreeClear(pStr); return str; } + string toMsg(const SNode* pRoot) { + char* pStr = NULL; + int32_t len = 0; + + auto start = chrono::steady_clock::now(); + DO_WITH_THROW(nodesNodeToMsg, pRoot, &pStr, &len) + cout << "nodesNodeToMsg: " + << chrono::duration_cast(chrono::steady_clock::now() - start).count() << "us" << endl; + + SNode* pNode = NULL; + char* pNewStr = NULL; + int32_t newlen = 0; + DO_WITH_THROW(nodesMsgToNode, pStr, len, &pNode) + DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen) + if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) { + cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl; + DO_WITH_THROW(nodesNodeToString, pNode, false, &pNewStr, &newlen) + cout << "nodesNodeToString " << pNewStr << endl; + } + taosMemoryFreeClear(pNewStr); + + string str(pStr, len); + taosMemoryFreeClear(pStr); + return str; + } + caseEnv caseEnv_; stmtEnv stmtEnv_; stmtRes res_; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 9db7d6c4554e3231399abd5ab470ea71f8207a37..5c437e6f7aeb942e02e79314e4140af0cdfe4323 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -168,6 +168,9 @@ static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { } pReader->pIdxFile = pIdxFile; + + pReader->curFileFirstVer = fileFirstVer; + return 0; } @@ -372,7 +375,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int64_t code; -// ASSERT(pRead->curVersion == pHead->head.version); + // ASSERT(pRead->curVersion == pHead->head.version); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { @@ -415,7 +418,8 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } if (walValidBodyCksum(*ppHead) != 0) { - wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); + wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, + ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1;