未验证 提交 755a1751 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16874 from taosdata/enh/3.0_plan_msg

enh: plan serialization optimize
...@@ -18,16 +18,21 @@ ...@@ -18,16 +18,21 @@
#include "tdatablock.h" #include "tdatablock.h"
#define NODES_MSG_DEFAULT_LEN 1024 #define NODES_MSG_DEFAULT_LEN 1024
#define TLV_TYPE_ARRAY_ELEM 0
#define tlvForEach(pDecoder, pTlv, code) \ #define tlvForEach(pDecoder, pTlv, code) \
while (TSDB_CODE_SUCCESS == code && TSDB_CODE_SUCCESS == (code = tlvGetNextTlv(pDecoder, &pTlv)) && NULL != pTlv) while (TSDB_CODE_SUCCESS == code && TSDB_CODE_SUCCESS == (code = tlvGetNextTlv(pDecoder, &pTlv)) && NULL != pTlv)
#pragma pack(push, 1)
typedef struct STlv { typedef struct STlv {
int16_t type; int16_t type;
int32_t len; int32_t len;
char value[0]; char value[0];
} STlv; } STlv;
#pragma pack(pop)
typedef struct STlvEncoder { typedef struct STlvEncoder {
int32_t allocSize; int32_t allocSize;
int32_t offset; int32_t offset;
...@@ -89,46 +94,100 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV ...@@ -89,46 +94,100 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tlvEncodeValueImpl(STlvEncoder* pEncoder, const void* pValue, int32_t len) {
if (pEncoder->offset + len > pEncoder->allocSize) {
void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2);
if (NULL == pNewBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pEncoder->pBuf = pNewBuf;
pEncoder->allocSize = pEncoder->allocSize * 2;
}
memcpy(pEncoder->pBuf + pEncoder->offset, pValue, len);
pEncoder->offset += len;
return TSDB_CODE_SUCCESS;
}
static int32_t tlvEncodeI8(STlvEncoder* pEncoder, int16_t type, int8_t value) { static int32_t tlvEncodeI8(STlvEncoder* pEncoder, int16_t type, int8_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI8(STlvEncoder* pEncoder, int8_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) { static int32_t tlvEncodeI16(STlvEncoder* pEncoder, int16_t type, int16_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI16(STlvEncoder* pEncoder, int16_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) { static int32_t tlvEncodeI32(STlvEncoder* pEncoder, int16_t type, int32_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI32(STlvEncoder* pEncoder, int32_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) { static int32_t tlvEncodeI64(STlvEncoder* pEncoder, int16_t type, int64_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueI64(STlvEncoder* pEncoder, int64_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeU8(STlvEncoder* pEncoder, int16_t type, uint8_t value) { static int32_t tlvEncodeU8(STlvEncoder* pEncoder, int16_t type, uint8_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueU8(STlvEncoder* pEncoder, uint8_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) { static int32_t tlvEncodeU16(STlvEncoder* pEncoder, int16_t type, uint16_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueU16(STlvEncoder* pEncoder, uint16_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) { static int32_t tlvEncodeU64(STlvEncoder* pEncoder, int16_t type, uint64_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueU64(STlvEncoder* pEncoder, uint64_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) { static int32_t tlvEncodeDouble(STlvEncoder* pEncoder, int16_t type, double value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueDouble(STlvEncoder* pEncoder, double value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) { static int32_t tlvEncodeEnum(STlvEncoder* pEncoder, int16_t type, int32_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeBool(STlvEncoder* pEncoder, int16_t type, bool value) { static int32_t tlvEncodeValueEnum(STlvEncoder* pEncoder, int32_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeBool(STlvEncoder* pEncoder, int16_t type, int8_t value) {
return tlvEncodeImpl(pEncoder, type, &value, sizeof(value)); return tlvEncodeImpl(pEncoder, type, &value, sizeof(value));
} }
static int32_t tlvEncodeValueBool(STlvEncoder* pEncoder, int8_t value) {
return tlvEncodeValueImpl(pEncoder, &value, sizeof(value));
}
static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pValue) { static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pValue) {
if (NULL == pValue) { if (NULL == pValue) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -136,6 +195,15 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV ...@@ -136,6 +195,15 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV
return tlvEncodeImpl(pEncoder, type, pValue, strlen(pValue)); return tlvEncodeImpl(pEncoder, type, pValue, strlen(pValue));
} }
static int32_t tlvEncodeValueCStr(STlvEncoder* pEncoder, const char* pValue) {
int16_t len = strlen(pValue);
int32_t code = tlvEncodeValueImpl(pEncoder, &len, sizeof(len));
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueImpl(pEncoder, pValue, len);
}
return code;
}
static int32_t tlvEncodeBinary(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) { static int32_t tlvEncodeBinary(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) {
return tlvEncodeImpl(pEncoder, type, pValue, len); return tlvEncodeImpl(pEncoder, type, pValue, len);
} }
...@@ -153,6 +221,7 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co ...@@ -153,6 +221,7 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co
pTlv->type = type; pTlv->type = type;
pTlv->len = pEncoder->offset - start - sizeof(STlv); pTlv->len = pEncoder->offset - start - sizeof(STlv);
} }
++(pEncoder->tlvCount);
return code; return code;
} }
...@@ -163,7 +232,7 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun ...@@ -163,7 +232,7 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun
int32_t start = pEncoder->offset; int32_t start = pEncoder->offset;
pEncoder->offset += sizeof(STlv); pEncoder->offset += sizeof(STlv);
for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) { for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) {
code = tlvEncodeObj(pEncoder, 0, func, (const char*)pArray + i * itemSize); code = tlvEncodeObj(pEncoder, TLV_TYPE_ARRAY_ELEM, func, (const char*)pArray + i * itemSize);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
STlv* pTlv = (STlv*)(pEncoder->pBuf + start); STlv* pTlv = (STlv*)(pEncoder->pBuf + start);
...@@ -174,6 +243,15 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun ...@@ -174,6 +243,15 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun
return code; return code;
} }
static int32_t tlvEncodeValueArray(STlvEncoder* pEncoder, FToMsg func, const void* pArray, int32_t itemSize,
int32_t num) {
int32_t code = tlvEncodeValueI32(pEncoder, num);
for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < num; ++i) {
code = func((const char*)pArray + i * itemSize, pEncoder);
}
return code;
}
static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
if (pDecoder->offset == pDecoder->bufSize) { if (pDecoder->offset == pDecoder->bufSize) {
*pTlv = NULL; *pTlv = NULL;
...@@ -198,27 +276,64 @@ static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int32_t len) { ...@@ -198,27 +276,64 @@ static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int32_t len) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tlvDecodeValueImpl(STlvDecoder* pDecoder, void* pValue, int32_t len) {
if (pDecoder->offset + len > pDecoder->bufSize) {
return TSDB_CODE_FAILED;
}
memcpy(pValue, pDecoder->pBuf + pDecoder->offset, len);
pDecoder->offset += len;
return TSDB_CODE_SUCCESS;
}
static int32_t tlvDecodeI8(STlv* pTlv, int8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI8(STlv* pTlv, int8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeValueI8(STlvDecoder* pDecoder, int8_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI16(STlv* pTlv, int16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeValueI16(STlvDecoder* pDecoder, int16_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI32(STlv* pTlv, int32_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeValueI32(STlvDecoder* pDecoder, int32_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeI64(STlv* pTlv, int64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeValueI64(STlvDecoder* pDecoder, int64_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU8(STlv* pTlv, uint8_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeValueU8(STlvDecoder* pDecoder, uint8_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU16(STlv* pTlv, uint16_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeValueU16(STlvDecoder* pDecoder, uint16_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeU64(STlv* pTlv, uint64_t* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeValueU64(STlvDecoder* pDecoder, uint64_t* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeDouble(STlv* pTlv, double* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); }
static int32_t tlvDecodeBool(STlv* pTlv, bool* pValue) { return tlvDecodeImpl(pTlv, pValue, sizeof(*pValue)); } static int32_t tlvDecodeValueDouble(STlvDecoder* pDecoder, double* pValue) {
return tlvDecodeValueImpl(pDecoder, pValue, sizeof(*pValue));
}
static int32_t tlvDecodeEnum(STlv* pTlv, void* pValue, int16_t len) { static int32_t convertIntegerType(int32_t value, void* pValue, int16_t len) {
int32_t value = 0;
memcpy(&value, pTlv->value, pTlv->len);
switch (len) { switch (len) {
case 1: case 1:
*(int8_t*)pValue = value; *(int8_t*)pValue = value;
...@@ -235,11 +350,56 @@ static int32_t tlvDecodeEnum(STlv* pTlv, void* pValue, int16_t len) { ...@@ -235,11 +350,56 @@ static int32_t tlvDecodeEnum(STlv* pTlv, void* pValue, int16_t len) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tlvDecodeBool(STlv* pTlv, bool* pValue) {
int8_t value = 0;
int32_t code = tlvDecodeI8(pTlv, &value);
if (TSDB_CODE_SUCCESS == code) {
code = convertIntegerType(value, pValue, sizeof(bool));
}
return code;
}
static int32_t tlvDecodeValueBool(STlvDecoder* pDecoder, bool* pValue) {
int8_t value = 0;
int32_t code = tlvDecodeValueI8(pDecoder, &value);
if (TSDB_CODE_SUCCESS == code) {
code = convertIntegerType(value, pValue, sizeof(bool));
}
return code;
}
static int32_t tlvDecodeEnum(STlv* pTlv, void* pValue, int16_t len) {
int32_t value = 0;
int32_t code = tlvDecodeI32(pTlv, &value);
if (TSDB_CODE_SUCCESS == code) {
code = convertIntegerType(value, pValue, len);
}
return code;
}
static int32_t tlvDecodeValueEnum(STlvDecoder* pDecoder, void* pValue, int16_t len) {
int32_t value = 0;
int32_t code = tlvDecodeValueI32(pDecoder, &value);
if (TSDB_CODE_SUCCESS == code) {
code = convertIntegerType(value, pValue, len);
}
return code;
}
static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) { static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) {
memcpy(pValue, pTlv->value, pTlv->len); memcpy(pValue, pTlv->value, pTlv->len);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tlvDecodeValueCStr(STlvDecoder* pDecoder, char* pValue) {
int16_t len = 0;
int32_t code = tlvDecodeValueI16(pDecoder, &len);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueImpl(pDecoder, pValue, len);
}
return code;
}
static int32_t tlvDecodeCStrP(STlv* pTlv, char** pValue) { static int32_t tlvDecodeCStrP(STlv* pTlv, char** pValue) {
*pValue = strndup(pTlv->value, pTlv->len); *pValue = strndup(pTlv->value, pTlv->len);
return NULL == *pValue ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; return NULL == *pValue ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
...@@ -281,6 +441,15 @@ static int32_t tlvDecodeObjArray(STlvDecoder* pDecoder, FToObject func, void* pA ...@@ -281,6 +441,15 @@ static int32_t tlvDecodeObjArray(STlvDecoder* pDecoder, FToObject func, void* pA
return code; return code;
} }
static int32_t tlvDecodeValueArray(STlvDecoder* pDecoder, FToObject func, void* pArray, int32_t itemSize,
int32_t* pNum) {
int32_t code = tlvDecodeValueI32(pDecoder, pNum);
for (size_t i = 0; TSDB_CODE_SUCCESS == code && i < *pNum; ++i) {
code = func(pDecoder, (char*)pArray + i * itemSize);
}
return code;
}
static int32_t tlvDecodeObjArrayFromTlv(STlv* pTlv, FToObject func, void* pArray, int32_t itemSize) { static int32_t tlvDecodeObjArrayFromTlv(STlv* pTlv, FToObject func, void* pArray, int32_t itemSize) {
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
return tlvDecodeObjArray(&decoder, func, pArray, itemSize); return tlvDecodeObjArray(&decoder, func, pArray, itemSize);
...@@ -305,6 +474,23 @@ static int32_t tlvDecodeDynObj(STlvDecoder* pDecoder, FMakeObject makeFunc, FToO ...@@ -305,6 +474,23 @@ static int32_t tlvDecodeDynObj(STlvDecoder* pDecoder, FMakeObject makeFunc, FToO
enum { DATA_TYPE_CODE_TYPE = 1, DATA_TYPE_CODE_PRECISION, DATA_TYPE_CODE_SCALE, DATA_TYPE_CODE_BYTES }; enum { DATA_TYPE_CODE_TYPE = 1, DATA_TYPE_CODE_PRECISION, DATA_TYPE_CODE_SCALE, DATA_TYPE_CODE_BYTES };
static int32_t dataTypeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SDataType* pNode = (const SDataType*)pObj;
int32_t code = tlvEncodeValueI8(pEncoder, pNode->type);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU8(pEncoder, pNode->precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU8(pEncoder, pNode->scale);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->bytes);
}
return code;
}
static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SDataType* pNode = (const SDataType*)pObj; const SDataType* pNode = (const SDataType*)pObj;
...@@ -322,6 +508,23 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -322,6 +508,23 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToDataTypeInline(STlvDecoder* pDecoder, void* pObj) {
SDataType* pNode = (SDataType*)pObj;
int32_t code = tlvDecodeValueI8(pDecoder, &pNode->type);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueU8(pDecoder, &pNode->precision);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueU8(pDecoder, &pNode->scale);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->bytes);
}
return code;
}
static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) {
SDataType* pNode = (SDataType*)pObj; SDataType* pNode = (SDataType*)pObj;
...@@ -374,53 +577,83 @@ static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -374,53 +577,83 @@ static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { enum { COLUMN_CODE_INLINE_ATTRS = 1 };
COLUMN_CODE_EXPR_BASE = 1,
COLUMN_CODE_TABLE_ID,
COLUMN_CODE_TABLE_TYPE,
COLUMN_CODE_COLUMN_ID,
COLUMN_CODE_COLUMN_TYPE,
COLUMN_CODE_DB_NAME,
COLUMN_CODE_TABLE_NAME,
COLUMN_CODE_TABLE_ALIAS,
COLUMN_CODE_COL_NAME,
COLUMN_CODE_DATABLOCK_ID,
COLUMN_CODE_SLOT_ID
};
static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t columnNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SColumnNode* pNode = (const SColumnNode*)pObj; const SColumnNode* pNode = (const SColumnNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, COLUMN_CODE_EXPR_BASE, exprNodeToMsg, pNode); int32_t code = dataTypeInlineToMsg(&pNode->node.resType, pEncoder);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU64(pEncoder, pNode->tableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI16(pEncoder, pNode->colId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueEnum(pEncoder, pNode->colType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueCStr(pEncoder, pNode->dbName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueCStr(pEncoder, pNode->tableName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueCStr(pEncoder, pNode->tableAlias);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueCStr(pEncoder, pNode->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI16(pEncoder, pNode->dataBlockId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI16(pEncoder, pNode->slotId);
}
return code;
}
static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return tlvEncodeObj(pEncoder, COLUMN_CODE_INLINE_ATTRS, columnNodeInlineToMsg, pObj);
}
static int32_t msgToColumnNodeInline(STlvDecoder* pDecoder, void* pObj) {
SColumnNode* pNode = (SColumnNode*)pObj;
int32_t code = msgToDataTypeInline(pDecoder, &pNode->node.resType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU64(pEncoder, COLUMN_CODE_TABLE_ID, pNode->tableId); code = tlvDecodeValueU64(pDecoder, &pNode->tableId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, COLUMN_CODE_TABLE_TYPE, pNode->tableType); code = tlvDecodeValueI8(pDecoder, &pNode->tableType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI16(pEncoder, COLUMN_CODE_COLUMN_ID, pNode->colId); code = tlvDecodeValueI16(pDecoder, &pNode->colId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, COLUMN_CODE_COLUMN_TYPE, pNode->colType); code = tlvDecodeValueEnum(pDecoder, &pNode->colType, sizeof(pNode->colType));
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_DB_NAME, pNode->dbName); code = tlvDecodeValueCStr(pDecoder, pNode->dbName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_TABLE_NAME, pNode->tableName); code = tlvDecodeValueCStr(pDecoder, pNode->tableName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_TABLE_ALIAS, pNode->tableAlias); code = tlvDecodeValueCStr(pDecoder, pNode->tableAlias);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_COL_NAME, pNode->colName); code = tlvDecodeValueCStr(pDecoder, pNode->colName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI16(pEncoder, COLUMN_CODE_DATABLOCK_ID, pNode->dataBlockId); code = tlvDecodeValueI16(pDecoder, &pNode->dataBlockId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI16(pEncoder, COLUMN_CODE_SLOT_ID, pNode->slotId); code = tlvDecodeValueI16(pDecoder, &pNode->slotId);
} }
return code; return code;
...@@ -433,38 +666,8 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -433,38 +666,8 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL; STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case COLUMN_CODE_EXPR_BASE: case COLUMN_CODE_INLINE_ATTRS:
code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); code = tlvDecodeObjFromTlv(pTlv, msgToColumnNodeInline, pNode);
break;
case COLUMN_CODE_TABLE_ID:
code = tlvDecodeU64(pTlv, &pNode->tableId);
break;
case COLUMN_CODE_TABLE_TYPE:
code = tlvDecodeI8(pTlv, &pNode->tableType);
break;
case COLUMN_CODE_COLUMN_ID:
code = tlvDecodeI16(pTlv, &pNode->colId);
break;
case COLUMN_CODE_COLUMN_TYPE:
code = tlvDecodeEnum(pTlv, &pNode->colType, sizeof(pNode->colType));
break;
case COLUMN_CODE_DB_NAME:
code = tlvDecodeCStr(pTlv, pNode->dbName);
break;
case COLUMN_CODE_TABLE_NAME:
code = tlvDecodeCStr(pTlv, pNode->tableName);
break;
case COLUMN_CODE_TABLE_ALIAS:
code = tlvDecodeCStr(pTlv, pNode->tableAlias);
break;
case COLUMN_CODE_COL_NAME:
code = tlvDecodeCStr(pTlv, pNode->colName);
break;
case COLUMN_CODE_DATABLOCK_ID:
code = tlvDecodeI16(pTlv, &pNode->dataBlockId);
break;
case COLUMN_CODE_SLOT_ID:
code = tlvDecodeI16(pTlv, &pNode->slotId);
break; break;
default: default:
break; break;
...@@ -983,7 +1186,7 @@ enum { NODE_LIST_CODE_DATA_TYPE = 1, NODE_LIST_CODE_NODE_LIST }; ...@@ -983,7 +1186,7 @@ enum { NODE_LIST_CODE_DATA_TYPE = 1, NODE_LIST_CODE_NODE_LIST };
static int32_t nodeListNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t nodeListNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SNodeListNode* pNode = (const SNodeListNode*)pObj; const SNodeListNode* pNode = (const SNodeListNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_DATA_TYPE, dataTypeToMsg, &pNode->dataType); int32_t code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_DATA_TYPE, dataTypeInlineToMsg, &pNode->dataType);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_NODE_LIST, nodeListToMsg, pNode->pNodeList); code = tlvEncodeObj(pEncoder, NODE_LIST_CODE_NODE_LIST, nodeListToMsg, pNode->pNodeList);
} }
...@@ -999,7 +1202,7 @@ static int32_t msgToNodeListNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -999,7 +1202,7 @@ static int32_t msgToNodeListNode(STlvDecoder* pDecoder, void* pObj) {
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case NODE_LIST_CODE_DATA_TYPE: case NODE_LIST_CODE_DATA_TYPE:
code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->dataType); code = tlvDecodeObjFromTlv(pTlv, msgToDataTypeInline, &pNode->dataType);
break; break;
case NODE_LIST_CODE_NODE_LIST: case NODE_LIST_CODE_NODE_LIST:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pNodeList); code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pNodeList);
...@@ -1012,15 +1215,23 @@ static int32_t msgToNodeListNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1012,15 +1215,23 @@ static int32_t msgToNodeListNode(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { TARGET_CODE_DATA_BLOCK_ID = 1, TARGET_CODE_SLOT_ID, TARGET_CODE_EXPR }; enum { TARGET_CODE_INLINE_ATTRS = 1, TARGET_CODE_EXPR };
static int32_t targetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t targetNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const STargetNode* pNode = (const STargetNode*)pObj; const STargetNode* pNode = (const STargetNode*)pObj;
int32_t code = tlvEncodeI16(pEncoder, TARGET_CODE_DATA_BLOCK_ID, pNode->dataBlockId); int32_t code = tlvEncodeValueI16(pEncoder, pNode->dataBlockId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI16(pEncoder, TARGET_CODE_SLOT_ID, pNode->slotId); code = tlvEncodeValueI16(pEncoder, pNode->slotId);
} }
return code;
}
static int32_t targetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const STargetNode* pNode = (const STargetNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, TARGET_CODE_INLINE_ATTRS, targetNodeInlineToMsg, pNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, TARGET_CODE_EXPR, nodeToMsg, pNode->pExpr); code = tlvEncodeObj(pEncoder, TARGET_CODE_EXPR, nodeToMsg, pNode->pExpr);
} }
...@@ -1028,6 +1239,17 @@ static int32_t targetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -1028,6 +1239,17 @@ static int32_t targetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToTargetNodeInline(STlvDecoder* pDecoder, void* pObj) {
STargetNode* pNode = (STargetNode*)pObj;
int32_t code = tlvDecodeValueI16(pDecoder, &pNode->dataBlockId);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI16(pDecoder, &pNode->slotId);
}
return code;
}
static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) {
STargetNode* pNode = (STargetNode*)pObj; STargetNode* pNode = (STargetNode*)pObj;
...@@ -1035,11 +1257,8 @@ static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1035,11 +1257,8 @@ static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL; STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case TARGET_CODE_DATA_BLOCK_ID: case TARGET_CODE_INLINE_ATTRS:
code = tlvDecodeI16(pTlv, &pNode->dataBlockId); code = tlvDecodeObjFromTlv(pTlv, msgToTargetNodeInline, pNode);
break;
case TARGET_CODE_SLOT_ID:
code = tlvDecodeI16(pTlv, &pNode->slotId);
break; break;
case TARGET_CODE_EXPR: case TARGET_CODE_EXPR:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pExpr); code = msgToNodeFromTlv(pTlv, (void**)&pNode->pExpr);
...@@ -1052,29 +1271,48 @@ static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1052,29 +1271,48 @@ static int32_t msgToTargetNode(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { enum { DATA_BLOCK_DESC_CODE_INLINE_ATTRS = 1, DATA_BLOCK_DESC_CODE_SLOTS };
DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID = 1,
DATA_BLOCK_DESC_CODE_SLOTS, static int32_t dataBlockDescNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE, const SDataBlockDescNode* pNode = (const SDataBlockDescNode*)pObj;
DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE,
DATA_BLOCK_DESC_CODE_PRECISION int32_t code = tlvEncodeValueI16(pEncoder, pNode->dataBlockId);
}; if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->totalRowSize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->outputRowSize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU8(pEncoder, pNode->precision);
}
return code;
}
static int32_t dataBlockDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t dataBlockDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SDataBlockDescNode* pNode = (const SDataBlockDescNode*)pObj; const SDataBlockDescNode* pNode = (const SDataBlockDescNode*)pObj;
int32_t code = tlvEncodeI16(pEncoder, DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID, pNode->dataBlockId); int32_t code = tlvEncodeObj(pEncoder, DATA_BLOCK_DESC_CODE_INLINE_ATTRS, dataBlockDescNodeInlineToMsg, pNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, DATA_BLOCK_DESC_CODE_SLOTS, nodeListToMsg, pNode->pSlots); code = tlvEncodeObj(pEncoder, DATA_BLOCK_DESC_CODE_SLOTS, nodeListToMsg, pNode->pSlots);
} }
return code;
}
static int32_t msgToDataBlockDescNodeInline(STlvDecoder* pDecoder, void* pObj) {
SDataBlockDescNode* pNode = (SDataBlockDescNode*)pObj;
int32_t code = tlvDecodeValueI16(pDecoder, &pNode->dataBlockId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE, pNode->totalRowSize); code = tlvDecodeValueI32(pDecoder, &pNode->totalRowSize);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE, pNode->outputRowSize); code = tlvDecodeValueI32(pDecoder, &pNode->outputRowSize);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU8(pEncoder, DATA_BLOCK_DESC_CODE_PRECISION, pNode->precision); code = tlvDecodeValueU8(pDecoder, &pNode->precision);
} }
return code; return code;
...@@ -1087,21 +1325,12 @@ static int32_t msgToDataBlockDescNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1087,21 +1325,12 @@ static int32_t msgToDataBlockDescNode(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL; STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case DATA_BLOCK_DESC_CODE_DATA_BLOCK_ID: case DATA_BLOCK_DESC_CODE_INLINE_ATTRS:
code = tlvDecodeI16(pTlv, &pNode->dataBlockId); code = tlvDecodeObjFromTlv(pTlv, msgToDataBlockDescNodeInline, pNode);
break; break;
case DATA_BLOCK_DESC_CODE_SLOTS: case DATA_BLOCK_DESC_CODE_SLOTS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSlots); code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSlots);
break; break;
case DATA_BLOCK_DESC_CODE_TOTAL_ROW_SIZE:
code = tlvDecodeI32(pTlv, &pNode->totalRowSize);
break;
case DATA_BLOCK_DESC_CODE_OUTPUT_ROW_SIZE:
code = tlvDecodeI32(pTlv, &pNode->outputRowSize);
break;
case DATA_BLOCK_DESC_CODE_PRECISION:
code = tlvDecodeU8(pTlv, &pNode->precision);
break;
default: default:
break; break;
} }
...@@ -1110,29 +1339,47 @@ static int32_t msgToDataBlockDescNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1110,29 +1339,47 @@ static int32_t msgToDataBlockDescNode(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { enum { SLOT_DESC_CODE_INLINE_ATTRS = 1 };
SLOT_DESC_CODE_SLOT_ID = 1,
SLOT_DESC_CODE_DATA_TYPE,
SLOT_DESC_CODE_RESERVE,
SLOT_DESC_CODE_OUTPUT,
SLOT_DESC_CODE_TAG
};
static int32_t slotDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t slotDescNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSlotDescNode* pNode = (const SSlotDescNode*)pObj; const SSlotDescNode* pNode = (const SSlotDescNode*)pObj;
int32_t code = tlvEncodeI16(pEncoder, SLOT_DESC_CODE_SLOT_ID, pNode->slotId); int32_t code = tlvEncodeValueI16(pEncoder, pNode->slotId);
if (TSDB_CODE_SUCCESS == code) {
code = dataTypeInlineToMsg(&pNode->dataType, pEncoder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->reserve);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->output);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->tag);
}
return code;
}
static int32_t slotDescNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return tlvEncodeObj(pEncoder, SLOT_DESC_CODE_INLINE_ATTRS, slotDescNodeInlineToMsg, pObj);
}
static int32_t msgToSlotDescNodeInline(STlvDecoder* pDecoder, void* pObj) {
SSlotDescNode* pNode = (SSlotDescNode*)pObj;
int32_t code = tlvDecodeValueI16(pDecoder, &pNode->slotId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, SLOT_DESC_CODE_DATA_TYPE, dataTypeToMsg, &pNode->dataType); code = msgToDataTypeInline(pDecoder, &pNode->dataType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_RESERVE, pNode->reserve); code = tlvDecodeValueBool(pDecoder, &pNode->reserve);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_OUTPUT, pNode->output); code = tlvDecodeValueBool(pDecoder, &pNode->output);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, SLOT_DESC_CODE_TAG, pNode->tag); code = tlvDecodeValueBool(pDecoder, &pNode->tag);
} }
return code; return code;
...@@ -1145,20 +1392,8 @@ static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1145,20 +1392,8 @@ static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL; STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case SLOT_DESC_CODE_SLOT_ID: case SLOT_DESC_CODE_INLINE_ATTRS:
code = tlvDecodeI16(pTlv, &pNode->slotId); code = tlvDecodeObjFromTlv(pTlv, msgToSlotDescNodeInline, pNode);
break;
case SLOT_DESC_CODE_DATA_TYPE:
code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->dataType);
break;
case SLOT_DESC_CODE_RESERVE:
code = tlvDecodeBool(pTlv, &pNode->reserve);
break;
case SLOT_DESC_CODE_OUTPUT:
code = tlvDecodeBool(pTlv, &pNode->output);
break;
case SLOT_DESC_CODE_TAG:
code = tlvDecodeBool(pTlv, &pNode->tag);
break; break;
default: default:
break; break;
...@@ -1170,6 +1405,17 @@ static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1170,6 +1405,17 @@ static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) {
enum { EP_CODE_FQDN = 1, EP_CODE_port }; enum { EP_CODE_FQDN = 1, EP_CODE_port };
static int32_t epInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEp* pNode = (const SEp*)pObj;
int32_t code = tlvEncodeValueCStr(pEncoder, pNode->fqdn);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU16(pEncoder, pNode->port);
}
return code;
}
static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEp* pNode = (const SEp*)pObj; const SEp* pNode = (const SEp*)pObj;
...@@ -1181,6 +1427,17 @@ static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -1181,6 +1427,17 @@ static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToEpInline(STlvDecoder* pDecoder, void* pObj) {
SEp* pNode = (SEp*)pObj;
int32_t code = tlvDecodeValueCStr(pDecoder, pNode->fqdn);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueU16(pDecoder, &pNode->port);
}
return code;
}
static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) {
SEp* pNode = (SEp*)pObj; SEp* pNode = (SEp*)pObj;
...@@ -1204,6 +1461,17 @@ static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) { ...@@ -1204,6 +1461,17 @@ static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) {
enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS }; enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS };
static int32_t epSetInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEpSet* pNode = (const SEpSet*)pObj;
int32_t code = tlvEncodeValueI8(pEncoder, pNode->inUse);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueArray(pEncoder, epInlineToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps);
}
return code;
}
static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEpSet* pNode = (const SEpSet*)pObj; const SEpSet* pNode = (const SEpSet*)pObj;
...@@ -1218,6 +1486,19 @@ static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -1218,6 +1486,19 @@ static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToEpSetInline(STlvDecoder* pDecoder, void* pObj) {
SEpSet* pNode = (SEpSet*)pObj;
int32_t code = tlvDecodeValueI8(pDecoder, &pNode->inUse);
if (TSDB_CODE_SUCCESS == code) {
int32_t numOfEps = 0;
code = tlvDecodeValueArray(pDecoder, msgToEpInline, pNode->eps, sizeof(SEp), &numOfEps);
pNode->numOfEps = numOfEps;
}
return code;
}
static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) {
SEpSet* pNode = (SEpSet*)pObj; SEpSet* pNode = (SEpSet*)pObj;
...@@ -1244,6 +1525,17 @@ static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) { ...@@ -1244,6 +1525,17 @@ static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) {
enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET }; enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET };
static int32_t queryNodeAddrInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj;
int32_t code = tlvEncodeValueI32(pEncoder, pNode->nodeId);
if (TSDB_CODE_SUCCESS == code) {
code = epSetInlineToMsg(&pNode->epSet, pEncoder);
}
return code;
}
static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj; const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj;
...@@ -1255,6 +1547,17 @@ static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -1255,6 +1547,17 @@ static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToQueryNodeAddrInline(STlvDecoder* pDecoder, void* pObj) {
SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj;
int32_t code = tlvDecodeValueI32(pDecoder, &pNode->nodeId);
if (TSDB_CODE_SUCCESS == code) {
code = msgToEpSetInline(pDecoder, &pNode->epSet);
}
return code;
}
static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) {
SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj; SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj;
...@@ -1274,29 +1577,47 @@ static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) { ...@@ -1274,29 +1577,47 @@ static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { enum { DOWNSTREAM_SOURCE_CODE_INLINE_ATTRS = 1 };
DOWNSTREAM_SOURCE_CODE_ADDR = 1,
DOWNSTREAM_SOURCE_CODE_TASK_ID,
DOWNSTREAM_SOURCE_CODE_SCHED_ID,
DOWNSTREAM_SOURCE_CODE_EXEC_ID,
DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE
};
static int32_t downstreamSourceNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t downstreamSourceNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj; const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, DOWNSTREAM_SOURCE_CODE_ADDR, queryNodeAddrToMsg, &pNode->addr); int32_t code = queryNodeAddrInlineToMsg(&pNode->addr, pEncoder);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU64(pEncoder, pNode->taskId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU64(pEncoder, pNode->schedId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->execId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->fetchMsgType);
}
return code;
}
static int32_t downstreamSourceNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
return tlvEncodeObj(pEncoder, DOWNSTREAM_SOURCE_CODE_INLINE_ATTRS, downstreamSourceNodeInlineToMsg, pObj);
}
static int32_t msgToDownstreamSourceNodeInlineToMsg(STlvDecoder* pDecoder, void* pObj) {
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)pObj;
int32_t code = msgToQueryNodeAddrInline(pDecoder, &pNode->addr);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_TASK_ID, pNode->taskId); code = tlvDecodeValueU64(pDecoder, &pNode->taskId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_SCHED_ID, pNode->schedId); code = tlvDecodeValueU64(pDecoder, &pNode->schedId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_EXEC_ID, pNode->execId); code = tlvDecodeValueI32(pDecoder, &pNode->execId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE, pNode->fetchMsgType); code = tlvDecodeValueI32(pDecoder, &pNode->fetchMsgType);
} }
return code; return code;
...@@ -1309,20 +1630,8 @@ static int32_t msgToDownstreamSourceNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1309,20 +1630,8 @@ static int32_t msgToDownstreamSourceNode(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL; STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case DOWNSTREAM_SOURCE_CODE_ADDR: case DOWNSTREAM_SOURCE_CODE_INLINE_ATTRS:
code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->addr); code = tlvDecodeObjFromTlv(pTlv, msgToDownstreamSourceNodeInlineToMsg, pNode);
break;
case DOWNSTREAM_SOURCE_CODE_TASK_ID:
code = tlvDecodeU64(pTlv, &pNode->taskId);
break;
case DOWNSTREAM_SOURCE_CODE_SCHED_ID:
code = tlvDecodeU64(pTlv, &pNode->schedId);
break;
case DOWNSTREAM_SOURCE_CODE_EXEC_ID:
code = tlvDecodeI32(pTlv, &pNode->execId);
break;
case DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE:
code = tlvDecodeI32(pTlv, &pNode->fetchMsgType);
break; break;
default: default:
break; break;
...@@ -1504,43 +1813,70 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1504,43 +1813,70 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) {
enum { enum {
PHY_TABLE_SCAN_CODE_SCAN = 1, PHY_TABLE_SCAN_CODE_SCAN = 1,
PHY_TABLE_SCAN_CODE_SCAN_COUNT, PHY_TABLE_SCAN_CODE_INLINE_ATTRS,
PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT,
PHY_TABLE_SCAN_CODE_SCAN_RANGE,
PHY_TABLE_SCAN_CODE_RATIO,
PHY_TABLE_SCAN_CODE_DATA_REQUIRED,
PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS, PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS,
PHY_TABLE_SCAN_CODE_GROUP_TAGS, PHY_TABLE_SCAN_CODE_GROUP_TAGS
PHY_TABLE_SCAN_CODE_GROUP_SORT,
PHY_TABLE_SCAN_CODE_INTERVAL,
PHY_TABLE_SCAN_CODE_OFFSET,
PHY_TABLE_SCAN_CODE_SLIDING,
PHY_TABLE_SCAN_CODE_INTERVAL_UNIT,
PHY_TABLE_SCAN_CODE_SLIDING_UNIT,
PHY_TABLE_SCAN_CODE_TRIGGER_TYPE,
PHY_TABLE_SCAN_CODE_WATERMARK,
PHY_TABLE_SCAN_CODE_IG_EXPIRED,
PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID,
}; };
static int32_t physiTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan); int32_t code = tlvEncodeValueU8(pEncoder, pNode->scanSeq[0]);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU8(pEncoder, PHY_TABLE_SCAN_CODE_SCAN_COUNT, pNode->scanSeq[0]); code = tlvEncodeValueU8(pEncoder, pNode->scanSeq[1]);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU8(pEncoder, PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT, pNode->scanSeq[1]); code = tlvEncodeValueI64(pEncoder, pNode->scanRange.skey);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SCAN_RANGE, timeWindowToMsg, &pNode->scanRange); code = tlvEncodeValueI64(pEncoder, pNode->scanRange.ekey);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeDouble(pEncoder, PHY_TABLE_SCAN_CODE_RATIO, pNode->ratio); code = tlvEncodeValueDouble(pEncoder, pNode->ratio);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_TABLE_SCAN_CODE_DATA_REQUIRED, pNode->dataRequired); code = tlvEncodeValueI32(pEncoder, pNode->dataRequired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI64(pEncoder, pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI64(pEncoder, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI64(pEncoder, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI64(pEncoder, pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->assignBlockUid);
}
return code;
}
static int32_t physiTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_INLINE_ATTRS, physiTableScanNodeInlineToMsg, pNode);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS, nodeListToMsg, pNode->pDynamicScanFuncs); code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS, nodeListToMsg, pNode->pDynamicScanFuncs);
...@@ -1548,35 +1884,58 @@ static int32_t physiTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) ...@@ -1548,35 +1884,58 @@ static int32_t physiTableScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_TAGS, nodeListToMsg, pNode->pGroupTags); code = tlvEncodeObj(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_TAGS, nodeListToMsg, pNode->pGroupTags);
} }
return code;
}
static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj) {
STableScanPhysiNode* pNode = (STableScanPhysiNode*)pObj;
int32_t code = tlvDecodeValueU8(pDecoder, pNode->scanSeq);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueU8(pDecoder, pNode->scanSeq + 1);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI64(pDecoder, &pNode->scanRange.skey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI64(pDecoder, &pNode->scanRange.ekey);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_TABLE_SCAN_CODE_GROUP_SORT, pNode->groupSort); code = tlvDecodeValueDouble(pDecoder, &pNode->ratio);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_INTERVAL, pNode->interval); code = tlvDecodeValueI32(pDecoder, &pNode->dataRequired);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_OFFSET, pNode->offset); code = tlvDecodeValueBool(pDecoder, &pNode->groupSort);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_SLIDING, pNode->sliding); code = tlvDecodeValueI64(pDecoder, &pNode->interval);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_INTERVAL_UNIT, pNode->intervalUnit); code = tlvDecodeValueI64(pDecoder, &pNode->offset);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_SLIDING_UNIT, pNode->slidingUnit); code = tlvDecodeValueI64(pDecoder, &pNode->sliding);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_TRIGGER_TYPE, pNode->triggerType); code = tlvDecodeValueI8(pDecoder, &pNode->intervalUnit);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_TABLE_SCAN_CODE_WATERMARK, pNode->watermark); code = tlvDecodeValueI8(pDecoder, &pNode->slidingUnit);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_TABLE_SCAN_CODE_IG_EXPIRED, pNode->igExpired); code = tlvDecodeValueI8(pDecoder, &pNode->triggerType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID, pNode->assignBlockUid); code = tlvDecodeValueI64(pDecoder, &pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI8(pDecoder, &pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->assignBlockUid);
} }
return code; return code;
...@@ -1592,20 +1951,8 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1592,20 +1951,8 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_TABLE_SCAN_CODE_SCAN: case PHY_TABLE_SCAN_CODE_SCAN:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan); code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan);
break; break;
case PHY_TABLE_SCAN_CODE_SCAN_COUNT: case PHY_TABLE_SCAN_CODE_INLINE_ATTRS:
code = tlvDecodeU8(pTlv, pNode->scanSeq); code = tlvDecodeObjFromTlv(pTlv, msgToPhysiTableScanNodeInline, pNode);
break;
case PHY_TABLE_SCAN_CODE_REVERSE_SCAN_COUNT:
code = tlvDecodeU8(pTlv, pNode->scanSeq + 1);
break;
case PHY_TABLE_SCAN_CODE_SCAN_RANGE:
code = tlvDecodeObjFromTlv(pTlv, msgToTimeWindow, &pNode->scanRange);
break;
case PHY_TABLE_SCAN_CODE_RATIO:
code = tlvDecodeDouble(pTlv, &pNode->ratio);
break;
case PHY_TABLE_SCAN_CODE_DATA_REQUIRED:
code = tlvDecodeI32(pTlv, &pNode->dataRequired);
break; break;
case PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS: case PHY_TABLE_SCAN_CODE_DYN_SCAN_FUNCS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pDynamicScanFuncs); code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pDynamicScanFuncs);
...@@ -1613,36 +1960,6 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -1613,36 +1960,6 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_TABLE_SCAN_CODE_GROUP_TAGS: case PHY_TABLE_SCAN_CODE_GROUP_TAGS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupTags); code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupTags);
break; break;
case PHY_TABLE_SCAN_CODE_GROUP_SORT:
code = tlvDecodeBool(pTlv, &pNode->groupSort);
break;
case PHY_TABLE_SCAN_CODE_INTERVAL:
code = tlvDecodeI64(pTlv, &pNode->interval);
break;
case PHY_TABLE_SCAN_CODE_OFFSET:
code = tlvDecodeI64(pTlv, &pNode->offset);
break;
case PHY_TABLE_SCAN_CODE_SLIDING:
code = tlvDecodeI64(pTlv, &pNode->sliding);
break;
case PHY_TABLE_SCAN_CODE_INTERVAL_UNIT:
code = tlvDecodeI8(pTlv, &pNode->intervalUnit);
break;
case PHY_TABLE_SCAN_CODE_SLIDING_UNIT:
code = tlvDecodeI8(pTlv, &pNode->slidingUnit);
break;
case PHY_TABLE_SCAN_CODE_TRIGGER_TYPE:
code = tlvDecodeI8(pTlv, &pNode->triggerType);
break;
case PHY_TABLE_SCAN_CODE_WATERMARK:
code = tlvDecodeI64(pTlv, &pNode->watermark);
break;
case PHY_TABLE_SCAN_CODE_IG_EXPIRED:
code = tlvDecodeI8(pTlv, &pNode->igExpired);
break;
case PHY_TABLE_SCAN_CODE_ASSIGN_BLOCK_UID:
code = tlvDecodeBool(pTlv, &pNode->assignBlockUid);
break;
default: default:
break; break;
} }
...@@ -2746,6 +3063,20 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2746,6 +3063,20 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) {
enum { SUBPLAN_ID_CODE_QUERY_ID = 1, SUBPLAN_ID_CODE_GROUP_ID, SUBPLAN_ID_CODE_SUBPLAN_ID }; enum { SUBPLAN_ID_CODE_QUERY_ID = 1, SUBPLAN_ID_CODE_GROUP_ID, SUBPLAN_ID_CODE_SUBPLAN_ID };
static int32_t subplanIdInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSubplanId* pNode = (const SSubplanId*)pObj;
int32_t code = tlvEncodeValueU64(pEncoder, pNode->queryId);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->subplanId);
}
return code;
}
static int32_t subplanIdToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t subplanIdToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSubplanId* pNode = (const SSubplanId*)pObj; const SSubplanId* pNode = (const SSubplanId*)pObj;
...@@ -2760,6 +3091,20 @@ static int32_t subplanIdToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2760,6 +3091,20 @@ static int32_t subplanIdToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToSubplanIdInline(STlvDecoder* pDecoder, void* pObj) {
SSubplanId* pNode = (SSubplanId*)pObj;
int32_t code = tlvDecodeValueU64(pDecoder, &pNode->queryId);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->subplanId);
}
return code;
}
static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) {
SSubplanId* pNode = (SSubplanId*)pObj; SSubplanId* pNode = (SSubplanId*)pObj;
...@@ -2785,41 +3130,43 @@ static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) { ...@@ -2785,41 +3130,43 @@ static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) {
} }
enum { enum {
SUBPLAN_CODE_SUBPLAN_ID = 1, SUBPLAN_CODE_INLINE_ATTRS = 1,
SUBPLAN_CODE_SUBPLAN_TYPE,
SUBPLAN_CODE_MSG_TYPE,
SUBPLAN_CODE_LEVEL,
SUBPLAN_CODE_DBFNAME,
SUBPLAN_CODE_USER,
SUBPLAN_CODE_EXECNODE,
SUBPLAN_CODE_ROOT_NODE, SUBPLAN_CODE_ROOT_NODE,
SUBPLAN_CODE_DATA_SINK, SUBPLAN_CODE_DATA_SINK,
SUBPLAN_CODE_TAG_COND, SUBPLAN_CODE_TAG_COND,
SUBPLAN_CODE_TAG_INDEX_COND SUBPLAN_CODE_TAG_INDEX_COND
}; };
static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t subplanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSubplan* pNode = (const SSubplan*)pObj; const SSubplan* pNode = (const SSubplan*)pObj;
int32_t code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_SUBPLAN_ID, subplanIdToMsg, &pNode->id); int32_t code = subplanIdInlineToMsg(&pNode->id, pEncoder);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, SUBPLAN_CODE_SUBPLAN_TYPE, pNode->subplanType); code = tlvEncodeValueEnum(pEncoder, pNode->subplanType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, SUBPLAN_CODE_MSG_TYPE, pNode->msgType); code = tlvEncodeValueI32(pEncoder, pNode->msgType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, SUBPLAN_CODE_LEVEL, pNode->level); code = tlvEncodeValueI32(pEncoder, pNode->level);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, SUBPLAN_CODE_DBFNAME, pNode->dbFName); code = tlvEncodeValueCStr(pEncoder, pNode->dbFName);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, SUBPLAN_CODE_USER, pNode->user); code = tlvEncodeValueCStr(pEncoder, pNode->user);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_EXECNODE, queryNodeAddrToMsg, &pNode->execNode); code = queryNodeAddrInlineToMsg(&pNode->execNode, pEncoder);
} }
return code;
}
static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSubplan* pNode = (const SSubplan*)pObj;
int32_t code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_INLINE_ATTRS, subplanInlineToMsg, pNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_ROOT_NODE, nodeToMsg, pNode->pNode); code = tlvEncodeObj(pEncoder, SUBPLAN_CODE_ROOT_NODE, nodeToMsg, pNode->pNode);
} }
...@@ -2836,6 +3183,32 @@ static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2836,6 +3183,32 @@ static int32_t subplanToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToSubplanInline(STlvDecoder* pDecoder, void* pObj) {
SSubplan* pNode = (SSubplan*)pObj;
int32_t code = msgToSubplanIdInline(pDecoder, &pNode->id);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueEnum(pDecoder, &pNode->subplanType, sizeof(pNode->subplanType));
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->msgType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->level);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueCStr(pDecoder, pNode->dbFName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueCStr(pDecoder, pNode->user);
}
if (TSDB_CODE_SUCCESS == code) {
code = msgToQueryNodeAddrInline(pDecoder, &pNode->execNode);
}
return code;
}
static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) {
SSubplan* pNode = (SSubplan*)pObj; SSubplan* pNode = (SSubplan*)pObj;
...@@ -2843,26 +3216,8 @@ static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { ...@@ -2843,26 +3216,8 @@ static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL; STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case SUBPLAN_CODE_SUBPLAN_ID: case SUBPLAN_CODE_INLINE_ATTRS:
code = tlvDecodeObjFromTlv(pTlv, msgToSubplanId, &pNode->id); code = tlvDecodeObjFromTlv(pTlv, msgToSubplanInline, pNode);
break;
case SUBPLAN_CODE_SUBPLAN_TYPE:
code = tlvDecodeEnum(pTlv, &pNode->subplanType, sizeof(pNode->subplanType));
break;
case SUBPLAN_CODE_MSG_TYPE:
code = tlvDecodeI32(pTlv, &pNode->msgType);
break;
case SUBPLAN_CODE_LEVEL:
code = tlvDecodeI32(pTlv, &pNode->level);
break;
case SUBPLAN_CODE_DBFNAME:
code = tlvDecodeCStr(pTlv, pNode->dbFName);
break;
case SUBPLAN_CODE_USER:
code = tlvDecodeCStr(pTlv, pNode->user);
break;
case SUBPLAN_CODE_EXECNODE:
code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->execNode);
break; break;
case SUBPLAN_CODE_ROOT_NODE: case SUBPLAN_CODE_ROOT_NODE:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pNode); code = msgToNodeFromTlv(pTlv, (void**)&pNode->pNode);
...@@ -2884,15 +3239,23 @@ static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) { ...@@ -2884,15 +3239,23 @@ static int32_t msgToSubplan(STlvDecoder* pDecoder, void* pObj) {
return code; return code;
} }
enum { QUERY_PLAN_CODE_QUERY_ID = 1, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, QUERY_PLAN_CODE_SUBPLANS }; enum { QUERY_PLAN_CODE_INLINE_ATTRS = 1, QUERY_PLAN_CODE_SUBPLANS };
static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t queryPlanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SQueryPlan* pNode = (const SQueryPlan*)pObj; const SQueryPlan* pNode = (const SQueryPlan*)pObj;
int32_t code = tlvEncodeU64(pEncoder, QUERY_PLAN_CODE_QUERY_ID, pNode->queryId); int32_t code = tlvEncodeValueU64(pEncoder, pNode->queryId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, QUERY_PLAN_CODE_NUM_OF_SUBPLANS, pNode->numOfSubplans); code = tlvEncodeValueI32(pEncoder, pNode->numOfSubplans);
} }
return code;
}
static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SQueryPlan* pNode = (const SQueryPlan*)pObj;
int32_t code = tlvEncodeObj(pEncoder, QUERY_PLAN_CODE_INLINE_ATTRS, queryPlanInlineToMsg, pNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, QUERY_PLAN_CODE_SUBPLANS, nodeListToMsg, pNode->pSubplans); code = tlvEncodeObj(pEncoder, QUERY_PLAN_CODE_SUBPLANS, nodeListToMsg, pNode->pSubplans);
} }
...@@ -2900,6 +3263,17 @@ static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2900,6 +3263,17 @@ static int32_t queryPlanToMsg(const void* pObj, STlvEncoder* pEncoder) {
return code; return code;
} }
static int32_t msgToQueryPlanInline(STlvDecoder* pDecoder, void* pObj) {
SQueryPlan* pNode = (SQueryPlan*)pObj;
int32_t code = tlvDecodeValueU64(pDecoder, &pNode->queryId);
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->numOfSubplans);
}
return code;
}
static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) { static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) {
SQueryPlan* pNode = (SQueryPlan*)pObj; SQueryPlan* pNode = (SQueryPlan*)pObj;
...@@ -2907,11 +3281,8 @@ static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) { ...@@ -2907,11 +3281,8 @@ static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL; STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) { tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) { switch (pTlv->type) {
case QUERY_PLAN_CODE_QUERY_ID: case QUERY_PLAN_CODE_INLINE_ATTRS:
code = tlvDecodeU64(pTlv, &pNode->queryId); code = tlvDecodeObjFromTlv(pTlv, msgToQueryPlanInline, pNode);
break;
case QUERY_PLAN_CODE_NUM_OF_SUBPLANS:
code = tlvDecodeI32(pTlv, &pNode->numOfSubplans);
break; break;
case QUERY_PLAN_CODE_SUBPLANS: case QUERY_PLAN_CODE_SUBPLANS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSubplans); code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSubplans);
...@@ -2925,6 +3296,7 @@ static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) { ...@@ -2925,6 +3296,7 @@ static int32_t msgToQueryPlan(STlvDecoder* pDecoder, void* pObj) {
} }
static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
// nodesWarn("specificNodeToMsg node = %s, before tlv count = %d", nodesNodeName(nodeType(pObj)), pEncoder->tlvCount);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pObj)) { switch (nodeType(pObj)) {
case QUERY_NODE_COLUMN: case QUERY_NODE_COLUMN:
...@@ -3044,12 +3416,12 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -3044,12 +3416,12 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = queryPlanToMsg(pObj, pEncoder); code = queryPlanToMsg(pObj, pEncoder);
break; break;
default: default:
nodesWarn("specificNodeToMsg unknown node = %s", nodesNodeName(nodeType(pObj)));
break; break;
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
nodesError("specificNodeToMsg error node = %s", nodesNodeName(nodeType(pObj))); nodesError("specificNodeToMsg error node = %s", nodesNodeName(nodeType(pObj)));
} }
// nodesWarn("specificNodeToMsg node = %s, after tlv count = %d", nodesNodeName(nodeType(pObj)), pEncoder->tlvCount);
return code; return code;
} }
...@@ -3173,7 +3545,6 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -3173,7 +3545,6 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code = msgToQueryPlan(pDecoder, pObj); code = msgToQueryPlan(pDecoder, pObj);
break; break;
default: default:
nodesWarn("msgToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj)));
break; break;
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册