未验证 提交 6263c8fc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16781 from taosdata/enh/3.0_planner_optimize

enh: add binary serialization method to node structure
......@@ -52,10 +52,14 @@ int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstrea
void qClearSubplanExecutionNode(SSubplan* pSubplan);
// Convert to subplan to string for the scheduler to send to the executor
// Convert to subplan to display string for the scheduler to send to the executor
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
// Convert to subplan to msg for the scheduler to send to the executor
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
SQueryPlan* qStringToQueryPlan(const char* pStr);
......
......@@ -4723,9 +4723,8 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1;
if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1;
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeU32(&encoder, pReq->phyLen) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->msg) < 0) return -1;
if (tEncodeBinary(&encoder, pReq->msg, pReq->phyLen) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -4755,13 +4754,12 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->phyLen) < 0) return -1;
pReq->sql = taosMemoryCalloc(1, pReq->sqlLen + 1);
if (NULL == pReq->sql) return -1;
pReq->msg = taosMemoryCalloc(1, pReq->phyLen + 1);
if (NULL == pReq->msg) return -1;
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->msg) < 0) return -1;
uint64_t msgLen = 0;
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, &msgLen) < 0) return -1;
pReq->phyLen = msgLen;
tEndDecode(&decoder);
......
......@@ -24,7 +24,7 @@
typedef struct STlv {
int16_t type;
int16_t len;
int32_t len;
char value[0];
} STlv;
......@@ -70,7 +70,7 @@ static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) {
// nodesWarn("encode tlv count = %d, tl size = %d", pEncoder->tlvCount, sizeof(STlv) * pEncoder->tlvCount);
}
static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int16_t len) {
static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) {
int32_t tlvLen = sizeof(STlv) + len;
if (pEncoder->offset + tlvLen > pEncoder->allocSize) {
void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2);
......@@ -130,6 +130,9 @@ static int32_t tlvEncodeBool(STlvEncoder* pEncoder, int16_t type, bool value) {
}
static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pValue) {
if (NULL == pValue) {
return TSDB_CODE_SUCCESS;
}
return tlvEncodeImpl(pEncoder, type, pValue, strlen(pValue));
}
......@@ -187,7 +190,7 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
static bool tlvDecodeEnd(STlvDecoder* pDecoder) { return pDecoder->offset == pDecoder->bufSize; }
static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int16_t len) {
static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int32_t len) {
if (pTlv->len != len) {
return TSDB_CODE_FAILED;
}
......@@ -237,6 +240,11 @@ static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) {
return TSDB_CODE_SUCCESS;
}
static int32_t tlvDecodeCStrP(STlv* pTlv, char** pValue) {
*pValue = strndup(pTlv->value, pTlv->len);
return NULL == *pValue ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS;
}
static int32_t tlvDecodeDynBinary(STlv* pTlv, void** pValue) {
*pValue = taosMemoryMalloc(pTlv->len);
if (NULL == *pValue) {
......@@ -246,6 +254,11 @@ static int32_t tlvDecodeDynBinary(STlv* pTlv, void** pValue) {
return TSDB_CODE_SUCCESS;
}
static int32_t tlvDecodeBinary(STlv* pTlv, void* pValue) {
memcpy(pValue, pTlv->value, pTlv->len);
return TSDB_CODE_SUCCESS;
}
static int32_t tlvDecodeObjFromTlv(STlv* pTlv, FToObject func, void* pObj) {
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
return func(&decoder, pObj);
......@@ -367,6 +380,10 @@ enum {
COLUMN_CODE_TABLE_TYPE,
COLUMN_CODE_COLUMN_ID,
COLUMN_CODE_COLUMN_TYPE,
COLUMN_CODE_DB_NAME,
COLUMN_CODE_TABLE_NAME,
COLUMN_CODE_TABLE_ALIAS,
COLUMN_CODE_COL_NAME,
COLUMN_CODE_DATABLOCK_ID,
COLUMN_CODE_SLOT_ID
};
......@@ -387,6 +404,18 @@ static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, COLUMN_CODE_COLUMN_TYPE, pNode->colType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_DB_NAME, pNode->dbName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_TABLE_NAME, pNode->tableName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_TABLE_ALIAS, pNode->tableAlias);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, COLUMN_CODE_COL_NAME, pNode->colName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI16(pEncoder, COLUMN_CODE_DATABLOCK_ID, pNode->dataBlockId);
}
......@@ -419,6 +448,18 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) {
case COLUMN_CODE_COLUMN_TYPE:
code = tlvDecodeEnum(pTlv, &pNode->colType, sizeof(pNode->colType));
break;
case COLUMN_CODE_DB_NAME:
code = tlvDecodeCStr(pTlv, pNode->dbName);
break;
case COLUMN_CODE_TABLE_NAME:
code = tlvDecodeCStr(pTlv, pNode->tableName);
break;
case COLUMN_CODE_TABLE_ALIAS:
code = tlvDecodeCStr(pTlv, pNode->tableAlias);
break;
case COLUMN_CODE_COL_NAME:
code = tlvDecodeCStr(pTlv, pNode->colName);
break;
case COLUMN_CODE_DATABLOCK_ID:
code = tlvDecodeI16(pTlv, &pNode->dataBlockId);
break;
......@@ -433,7 +474,15 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { VALUE_CODE_EXPR_BASE = 1, VALUE_CODE_IS_NULL, VALUE_CODE_DATUM };
enum {
VALUE_CODE_EXPR_BASE = 1,
VALUE_CODE_LITERAL,
VALUE_CODE_IS_DURATION,
VALUE_CODE_TRANSLATE,
VALUE_CODE_NOT_RESERVED,
VALUE_CODE_IS_NULL,
VALUE_CODE_DATUM
};
static int32_t datumToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SValueNode* pNode = (const SValueNode*)pObj;
......@@ -485,9 +534,21 @@ static int32_t valueNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
int32_t code = tlvEncodeObj(pEncoder, VALUE_CODE_EXPR_BASE, exprNodeToMsg, pNode);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, VALUE_CODE_IS_NULL, pNode->isNull);
code = tlvEncodeCStr(pEncoder, VALUE_CODE_LITERAL, pNode->literal);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, VALUE_CODE_IS_DURATION, pNode->isDuration);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, VALUE_CODE_TRANSLATE, pNode->translate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, VALUE_CODE_NOT_RESERVED, pNode->notReserved);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, VALUE_CODE_IS_NULL, pNode->isNull);
}
if (TSDB_CODE_SUCCESS == code && !pNode->isNull) {
code = datumToMsg(pNode, pEncoder);
}
......@@ -551,12 +612,18 @@ static int32_t msgToDatum(STlv* pTlv, void* pObj) {
break;
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
code = tlvDecodeDynBinary(pTlv, (void**)&pNode->datum.p);
case TSDB_DATA_TYPE_VARBINARY: {
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
code = tlvDecodeBinary(pTlv, pNode->datum.p);
if (TSDB_CODE_SUCCESS == code) {
varDataSetLen(pNode->datum.p, pNode->node.resType.bytes - VARSTR_HEADER_SIZE);
varDataSetLen(pNode->datum.p, pTlv->len - VARSTR_HEADER_SIZE);
}
break;
}
case TSDB_DATA_TYPE_JSON:
code = tlvDecodeDynBinary(pTlv, (void**)&pNode->datum.p);
break;
......@@ -580,6 +647,18 @@ static int32_t msgToValueNode(STlvDecoder* pDecoder, void* pObj) {
case VALUE_CODE_EXPR_BASE:
code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node);
break;
case VALUE_CODE_LITERAL:
code = tlvDecodeCStrP(pTlv, &pNode->literal);
break;
case VALUE_CODE_IS_DURATION:
code = tlvDecodeBool(pTlv, &pNode->isDuration);
break;
case VALUE_CODE_TRANSLATE:
code = tlvDecodeBool(pTlv, &pNode->translate);
break;
case VALUE_CODE_NOT_RESERVED:
code = tlvDecodeBool(pTlv, &pNode->notReserved);
break;
case VALUE_CODE_IS_NULL:
code = tlvDecodeBool(pTlv, &pNode->isNull);
break;
......@@ -682,6 +761,7 @@ static int32_t msgToLogicConditionNode(STlvDecoder* pDecoder, void* pObj) {
enum {
FUNCTION_CODE_EXPR_BASE = 1,
FUNCTION_CODE_FUNCTION_NAME,
FUNCTION_CODE_FUNCTION_ID,
FUNCTION_CODE_FUNCTION_TYPE,
FUNCTION_CODE_PARAMETERS,
......@@ -692,6 +772,9 @@ static int32_t functionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SFunctionNode* pNode = (const SFunctionNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, FUNCTION_CODE_EXPR_BASE, exprNodeToMsg, pNode);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, FUNCTION_CODE_FUNCTION_NAME, pNode->functionName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, FUNCTION_CODE_FUNCTION_ID, pNode->funcId);
}
......@@ -718,6 +801,9 @@ static int32_t msgToFunctionNode(STlvDecoder* pDecoder, void* pObj) {
case FUNCTION_CODE_EXPR_BASE:
code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node);
break;
case FUNCTION_CODE_FUNCTION_NAME:
code = tlvDecodeCStr(pTlv, pNode->functionName);
break;
case FUNCTION_CODE_FUNCTION_ID:
code = tlvDecodeI32(pTlv, &pNode->funcId);
break;
......@@ -1082,6 +1168,170 @@ static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { EP_CODE_FQDN = 1, EP_CODE_port };
static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEp* pNode = (const SEp*)pObj;
int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port);
}
return code;
}
static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) {
SEp* pNode = (SEp*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case EP_CODE_FQDN:
code = tlvDecodeCStr(pTlv, pNode->fqdn);
break;
case EP_CODE_port:
code = tlvDecodeU16(pTlv, &pNode->port);
break;
default:
break;
}
}
return code;
}
enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS };
static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEpSet* pNode = (const SEpSet*)pObj;
int32_t code = tlvEncodeI8(pEncoder, EP_SET_CODE_IN_USE, pNode->inUse);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, EP_SET_CODE_NUM_OF_EPS, pNode->numOfEps);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObjArray(pEncoder, EP_SET_CODE_EPS, epToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps);
}
return code;
}
static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) {
SEpSet* pNode = (SEpSet*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case EP_SET_CODE_IN_USE:
code = tlvDecodeI8(pTlv, &pNode->inUse);
break;
case EP_SET_CODE_NUM_OF_EPS:
code = tlvDecodeI8(pTlv, &pNode->numOfEps);
break;
case EP_SET_CODE_EPS:
code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->eps, sizeof(SEp));
break;
default:
break;
}
}
return code;
}
enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET };
static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj;
int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, QUERY_NODE_ADDR_CODE_EP_SET, epSetToMsg, &pNode->epSet);
}
return code;
}
static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) {
SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case QUERY_NODE_ADDR_CODE_NODE_ID:
code = tlvDecodeI32(pTlv, &pNode->nodeId);
break;
case QUERY_NODE_ADDR_CODE_EP_SET:
code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet);
break;
}
}
return code;
}
enum {
DOWNSTREAM_SOURCE_CODE_ADDR = 1,
DOWNSTREAM_SOURCE_CODE_TASK_ID,
DOWNSTREAM_SOURCE_CODE_SCHED_ID,
DOWNSTREAM_SOURCE_CODE_EXEC_ID,
DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE
};
static int32_t downstreamSourceNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, DOWNSTREAM_SOURCE_CODE_ADDR, queryNodeAddrToMsg, &pNode->addr);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_TASK_ID, pNode->taskId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_SCHED_ID, pNode->schedId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_EXEC_ID, pNode->execId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE, pNode->fetchMsgType);
}
return code;
}
static int32_t msgToDownstreamSourceNode(STlvDecoder* pDecoder, void* pObj) {
SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case DOWNSTREAM_SOURCE_CODE_ADDR:
code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->addr);
break;
case DOWNSTREAM_SOURCE_CODE_TASK_ID:
code = tlvDecodeU64(pTlv, &pNode->taskId);
break;
case DOWNSTREAM_SOURCE_CODE_SCHED_ID:
code = tlvDecodeU64(pTlv, &pNode->schedId);
break;
case DOWNSTREAM_SOURCE_CODE_EXEC_ID:
code = tlvDecodeI32(pTlv, &pNode->execId);
break;
case DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE:
code = tlvDecodeI32(pTlv, &pNode->fetchMsgType);
break;
default:
break;
}
}
return code;
}
enum {
PHY_NODE_CODE_OUTPUT_DESC = 1,
PHY_NODE_CODE_CONDITIONS,
......@@ -1401,80 +1651,6 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { EP_CODE_FQDN = 1, EP_CODE_port };
static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEp* pNode = (const SEp*)pObj;
int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port);
}
return code;
}
static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) {
SEp* pNode = (SEp*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case EP_CODE_FQDN:
code = tlvDecodeCStr(pTlv, pNode->fqdn);
break;
case EP_CODE_port:
code = tlvDecodeU16(pTlv, &pNode->port);
break;
default:
break;
}
}
return code;
}
enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS };
static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SEpSet* pNode = (const SEpSet*)pObj;
int32_t code = tlvEncodeI8(pEncoder, EP_SET_CODE_IN_USE, pNode->inUse);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, EP_SET_CODE_NUM_OF_EPS, pNode->numOfEps);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObjArray(pEncoder, EP_SET_CODE_EPS, epToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps);
}
return code;
}
static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) {
SEpSet* pNode = (SEpSet*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case EP_SET_CODE_IN_USE:
code = tlvDecodeI8(pTlv, &pNode->inUse);
break;
case EP_SET_CODE_NUM_OF_EPS:
code = tlvDecodeI8(pTlv, &pNode->numOfEps);
break;
case EP_SET_CODE_EPS:
code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->eps, sizeof(SEp));
break;
default:
break;
}
}
return code;
}
enum {
PHY_SYSTABLE_SCAN_CODE_SCAN = 1,
PHY_SYSTABLE_SCAN_CODE_MGMT_EP_SET,
......@@ -2594,38 +2770,6 @@ static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET };
static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj;
int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, QUERY_NODE_ADDR_CODE_EP_SET, epSetToMsg, &pNode->epSet);
}
return code;
}
static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) {
SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case QUERY_NODE_ADDR_CODE_NODE_ID:
code = tlvDecodeI32(pTlv, &pNode->nodeId);
break;
case QUERY_NODE_ADDR_CODE_EP_SET:
code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet);
break;
}
}
return code;
}
enum {
SUBPLAN_CODE_SUBPLAN_ID = 1,
SUBPLAN_CODE_SUBPLAN_TYPE,
......@@ -2802,6 +2946,8 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_SLOT_DESC:
code = slotDescNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_DOWNSTREAM_SOURCE:
return downstreamSourceNodeToMsg(pObj, pEncoder);
case QUERY_NODE_LEFT_VALUE:
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
......@@ -2929,6 +3075,8 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_SLOT_DESC:
code = msgToSlotDescNode(pDecoder, pObj);
break;
case QUERY_NODE_DOWNSTREAM_SOURCE:
return msgToDownstreamSourceNode(pDecoder, pObj);
case QUERY_NODE_LEFT_VALUE:
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
......
......@@ -123,6 +123,21 @@ int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); }
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
*pLen = insert->size;
*pStr = insert->pData;
insert->pData = NULL;
return TSDB_CODE_SUCCESS;
}
return nodesNodeToMsg((const SNode*)pSubplan, pStr, pLen);
}
int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan) {
return nodesMsgToNode(pStr, len, (SNode**)pSubplan);
}
char* qQueryPlanToString(const SQueryPlan* pPlan) {
char* pStr = NULL;
int32_t len = 0;
......
......@@ -480,9 +480,14 @@ class PlannerTestBaseImpl {
DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen)
if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) {
cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl;
taosMemoryFreeClear(pNewStr);
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pNewStr, &newlen)
cout << "orac node: " << pNewStr << endl;
taosMemoryFreeClear(pNewStr);
DO_WITH_THROW(nodesNodeToString, pNode, false, &pNewStr, &newlen)
cout << "nodesNodeToString " << pNewStr << endl;
cout << "new node: " << pNewStr << endl;
}
nodesDestroyNode(pNode);
taosMemoryFreeClear(pNewStr);
string str(pStr, len);
......
......@@ -559,7 +559,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
code = qStringToSubplan(qwMsg->msg, &plan);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
......@@ -968,7 +968,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx ctx = {0};
code = qStringToSubplan(qwMsg->msg, &plan);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
......
......@@ -860,7 +860,7 @@ int32_t schLaunchTaskImpl(void *param) {
SSubplan *plan = pTask->plan;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册