diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 05caa7a7bb56617ef34c03e3646f85ac98f65a56..e03ac3811a11b3927531a6250f5b41fb876c0f1c 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -52,10 +52,14 @@ int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstrea void qClearSubplanExecutionNode(SSubplan* pSubplan); -// Convert to subplan to string for the scheduler to send to the executor +// Convert to subplan to display string for the scheduler to send to the executor int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen); int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan); +// Convert to subplan to msg for the scheduler to send to the executor +int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen); +int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan); + char* qQueryPlanToString(const SQueryPlan* pPlan); SQueryPlan* qStringToQueryPlan(const char* pStr); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cbd6bc3ed7e62a9edf9d4baac18e0c7057a2ae10..1a41f809070ce7f0cca98fb7fddbc20065ec9cdf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4723,9 +4723,8 @@ int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1; - if (tEncodeU32(&encoder, pReq->phyLen) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->msg) < 0) return -1; + if (tEncodeBinary(&encoder, pReq->msg, pReq->phyLen) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -4755,13 +4754,12 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) { if (tDecodeU64(&decoder, &pReq->queryId) < 0) return -1; if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1; - if (tDecodeU32(&decoder, &pReq->phyLen) < 0) return -1; pReq->sql = taosMemoryCalloc(1, pReq->sqlLen + 1); if (NULL == pReq->sql) return -1; - pReq->msg = taosMemoryCalloc(1, pReq->phyLen + 1); - if (NULL == pReq->msg) return -1; if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->msg) < 0) return -1; + uint64_t msgLen = 0; + if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, &msgLen) < 0) return -1; + pReq->phyLen = msgLen; tEndDecode(&decoder); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 5fe31ed78e80cf19a4e107d6f1366609bd257520..efe820fee211af67ce1b168c5a5cddc821b0eeba 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -24,7 +24,7 @@ typedef struct STlv { int16_t type; - int16_t len; + int32_t len; char value[0]; } STlv; @@ -70,7 +70,7 @@ static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) { // nodesWarn("encode tlv count = %d, tl size = %d", pEncoder->tlvCount, sizeof(STlv) * pEncoder->tlvCount); } -static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int16_t len) { +static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) { int32_t tlvLen = sizeof(STlv) + len; if (pEncoder->offset + tlvLen > pEncoder->allocSize) { void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize * 2); @@ -130,6 +130,9 @@ static int32_t tlvEncodeBool(STlvEncoder* pEncoder, int16_t type, bool value) { } static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pValue) { + if (NULL == pValue) { + return TSDB_CODE_SUCCESS; + } return tlvEncodeImpl(pEncoder, type, pValue, strlen(pValue)); } @@ -187,7 +190,7 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) { static bool tlvDecodeEnd(STlvDecoder* pDecoder) { return pDecoder->offset == pDecoder->bufSize; } -static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int16_t len) { +static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int32_t len) { if (pTlv->len != len) { return TSDB_CODE_FAILED; } @@ -237,6 +240,11 @@ static int32_t tlvDecodeCStr(STlv* pTlv, char* pValue) { return TSDB_CODE_SUCCESS; } +static int32_t tlvDecodeCStrP(STlv* pTlv, char** pValue) { + *pValue = strndup(pTlv->value, pTlv->len); + return NULL == *pValue ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS; +} + static int32_t tlvDecodeDynBinary(STlv* pTlv, void** pValue) { *pValue = taosMemoryMalloc(pTlv->len); if (NULL == *pValue) { @@ -246,6 +254,11 @@ static int32_t tlvDecodeDynBinary(STlv* pTlv, void** pValue) { return TSDB_CODE_SUCCESS; } +static int32_t tlvDecodeBinary(STlv* pTlv, void* pValue) { + memcpy(pValue, pTlv->value, pTlv->len); + return TSDB_CODE_SUCCESS; +} + static int32_t tlvDecodeObjFromTlv(STlv* pTlv, FToObject func, void* pObj) { STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; return func(&decoder, pObj); @@ -367,6 +380,10 @@ enum { COLUMN_CODE_TABLE_TYPE, COLUMN_CODE_COLUMN_ID, COLUMN_CODE_COLUMN_TYPE, + COLUMN_CODE_DB_NAME, + COLUMN_CODE_TABLE_NAME, + COLUMN_CODE_TABLE_ALIAS, + COLUMN_CODE_COL_NAME, COLUMN_CODE_DATABLOCK_ID, COLUMN_CODE_SLOT_ID }; @@ -387,6 +404,18 @@ static int32_t columnNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, COLUMN_CODE_COLUMN_TYPE, pNode->colType); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, COLUMN_CODE_DB_NAME, pNode->dbName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, COLUMN_CODE_TABLE_NAME, pNode->tableName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, COLUMN_CODE_TABLE_ALIAS, pNode->tableAlias); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, COLUMN_CODE_COL_NAME, pNode->colName); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI16(pEncoder, COLUMN_CODE_DATABLOCK_ID, pNode->dataBlockId); } @@ -419,6 +448,18 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { case COLUMN_CODE_COLUMN_TYPE: code = tlvDecodeEnum(pTlv, &pNode->colType, sizeof(pNode->colType)); break; + case COLUMN_CODE_DB_NAME: + code = tlvDecodeCStr(pTlv, pNode->dbName); + break; + case COLUMN_CODE_TABLE_NAME: + code = tlvDecodeCStr(pTlv, pNode->tableName); + break; + case COLUMN_CODE_TABLE_ALIAS: + code = tlvDecodeCStr(pTlv, pNode->tableAlias); + break; + case COLUMN_CODE_COL_NAME: + code = tlvDecodeCStr(pTlv, pNode->colName); + break; case COLUMN_CODE_DATABLOCK_ID: code = tlvDecodeI16(pTlv, &pNode->dataBlockId); break; @@ -433,7 +474,15 @@ static int32_t msgToColumnNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { VALUE_CODE_EXPR_BASE = 1, VALUE_CODE_IS_NULL, VALUE_CODE_DATUM }; +enum { + VALUE_CODE_EXPR_BASE = 1, + VALUE_CODE_LITERAL, + VALUE_CODE_IS_DURATION, + VALUE_CODE_TRANSLATE, + VALUE_CODE_NOT_RESERVED, + VALUE_CODE_IS_NULL, + VALUE_CODE_DATUM +}; static int32_t datumToMsg(const void* pObj, STlvEncoder* pEncoder) { const SValueNode* pNode = (const SValueNode*)pObj; @@ -485,9 +534,21 @@ static int32_t valueNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { int32_t code = tlvEncodeObj(pEncoder, VALUE_CODE_EXPR_BASE, exprNodeToMsg, pNode); if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeBool(pEncoder, VALUE_CODE_IS_NULL, pNode->isNull); + code = tlvEncodeCStr(pEncoder, VALUE_CODE_LITERAL, pNode->literal); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, VALUE_CODE_IS_DURATION, pNode->isDuration); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, VALUE_CODE_TRANSLATE, pNode->translate); } if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, VALUE_CODE_NOT_RESERVED, pNode->notReserved); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, VALUE_CODE_IS_NULL, pNode->isNull); + } + if (TSDB_CODE_SUCCESS == code && !pNode->isNull) { code = datumToMsg(pNode, pEncoder); } @@ -551,12 +612,18 @@ static int32_t msgToDatum(STlv* pTlv, void* pObj) { break; case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARCHAR: - case TSDB_DATA_TYPE_VARBINARY: - code = tlvDecodeDynBinary(pTlv, (void**)&pNode->datum.p); + case TSDB_DATA_TYPE_VARBINARY: { + pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1); + if (NULL == pNode->datum.p) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + code = tlvDecodeBinary(pTlv, pNode->datum.p); if (TSDB_CODE_SUCCESS == code) { - varDataSetLen(pNode->datum.p, pNode->node.resType.bytes - VARSTR_HEADER_SIZE); + varDataSetLen(pNode->datum.p, pTlv->len - VARSTR_HEADER_SIZE); } break; + } case TSDB_DATA_TYPE_JSON: code = tlvDecodeDynBinary(pTlv, (void**)&pNode->datum.p); break; @@ -580,6 +647,18 @@ static int32_t msgToValueNode(STlvDecoder* pDecoder, void* pObj) { case VALUE_CODE_EXPR_BASE: code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); break; + case VALUE_CODE_LITERAL: + code = tlvDecodeCStrP(pTlv, &pNode->literal); + break; + case VALUE_CODE_IS_DURATION: + code = tlvDecodeBool(pTlv, &pNode->isDuration); + break; + case VALUE_CODE_TRANSLATE: + code = tlvDecodeBool(pTlv, &pNode->translate); + break; + case VALUE_CODE_NOT_RESERVED: + code = tlvDecodeBool(pTlv, &pNode->notReserved); + break; case VALUE_CODE_IS_NULL: code = tlvDecodeBool(pTlv, &pNode->isNull); break; @@ -682,6 +761,7 @@ static int32_t msgToLogicConditionNode(STlvDecoder* pDecoder, void* pObj) { enum { FUNCTION_CODE_EXPR_BASE = 1, + FUNCTION_CODE_FUNCTION_NAME, FUNCTION_CODE_FUNCTION_ID, FUNCTION_CODE_FUNCTION_TYPE, FUNCTION_CODE_PARAMETERS, @@ -692,6 +772,9 @@ static int32_t functionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SFunctionNode* pNode = (const SFunctionNode*)pObj; int32_t code = tlvEncodeObj(pEncoder, FUNCTION_CODE_EXPR_BASE, exprNodeToMsg, pNode); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeCStr(pEncoder, FUNCTION_CODE_FUNCTION_NAME, pNode->functionName); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI32(pEncoder, FUNCTION_CODE_FUNCTION_ID, pNode->funcId); } @@ -718,6 +801,9 @@ static int32_t msgToFunctionNode(STlvDecoder* pDecoder, void* pObj) { case FUNCTION_CODE_EXPR_BASE: code = tlvDecodeObjFromTlv(pTlv, msgToExprNode, &pNode->node); break; + case FUNCTION_CODE_FUNCTION_NAME: + code = tlvDecodeCStr(pTlv, pNode->functionName); + break; case FUNCTION_CODE_FUNCTION_ID: code = tlvDecodeI32(pTlv, &pNode->funcId); break; @@ -1082,6 +1168,170 @@ static int32_t msgToSlotDescNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { EP_CODE_FQDN = 1, EP_CODE_port }; + +static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SEp* pNode = (const SEp*)pObj; + + int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port); + } + + return code; +} + +static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) { + SEp* pNode = (SEp*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case EP_CODE_FQDN: + code = tlvDecodeCStr(pTlv, pNode->fqdn); + break; + case EP_CODE_port: + code = tlvDecodeU16(pTlv, &pNode->port); + break; + default: + break; + } + } + + return code; +} + +enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS }; + +static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SEpSet* pNode = (const SEpSet*)pObj; + + int32_t code = tlvEncodeI8(pEncoder, EP_SET_CODE_IN_USE, pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI8(pEncoder, EP_SET_CODE_NUM_OF_EPS, pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObjArray(pEncoder, EP_SET_CODE_EPS, epToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps); + } + + return code; +} + +static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) { + SEpSet* pNode = (SEpSet*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case EP_SET_CODE_IN_USE: + code = tlvDecodeI8(pTlv, &pNode->inUse); + break; + case EP_SET_CODE_NUM_OF_EPS: + code = tlvDecodeI8(pTlv, &pNode->numOfEps); + break; + case EP_SET_CODE_EPS: + code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->eps, sizeof(SEp)); + break; + default: + break; + } + } + + return code; +} + +enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET }; + +static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj; + + int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, QUERY_NODE_ADDR_CODE_EP_SET, epSetToMsg, &pNode->epSet); + } + + return code; +} + +static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) { + SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case QUERY_NODE_ADDR_CODE_NODE_ID: + code = tlvDecodeI32(pTlv, &pNode->nodeId); + break; + case QUERY_NODE_ADDR_CODE_EP_SET: + code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet); + break; + } + } + + return code; +} + +enum { + DOWNSTREAM_SOURCE_CODE_ADDR = 1, + DOWNSTREAM_SOURCE_CODE_TASK_ID, + DOWNSTREAM_SOURCE_CODE_SCHED_ID, + DOWNSTREAM_SOURCE_CODE_EXEC_ID, + DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE +}; + +static int32_t downstreamSourceNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, DOWNSTREAM_SOURCE_CODE_ADDR, queryNodeAddrToMsg, &pNode->addr); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_TASK_ID, pNode->taskId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeU64(pEncoder, DOWNSTREAM_SOURCE_CODE_SCHED_ID, pNode->schedId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_EXEC_ID, pNode->execId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE, pNode->fetchMsgType); + } + + return code; +} + +static int32_t msgToDownstreamSourceNode(STlvDecoder* pDecoder, void* pObj) { + SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case DOWNSTREAM_SOURCE_CODE_ADDR: + code = tlvDecodeObjFromTlv(pTlv, msgToQueryNodeAddr, &pNode->addr); + break; + case DOWNSTREAM_SOURCE_CODE_TASK_ID: + code = tlvDecodeU64(pTlv, &pNode->taskId); + break; + case DOWNSTREAM_SOURCE_CODE_SCHED_ID: + code = tlvDecodeU64(pTlv, &pNode->schedId); + break; + case DOWNSTREAM_SOURCE_CODE_EXEC_ID: + code = tlvDecodeI32(pTlv, &pNode->execId); + break; + case DOWNSTREAM_SOURCE_CODE_FETCH_MSG_TYPE: + code = tlvDecodeI32(pTlv, &pNode->fetchMsgType); + break; + default: + break; + } + } + + return code; +} + enum { PHY_NODE_CODE_OUTPUT_DESC = 1, PHY_NODE_CODE_CONDITIONS, @@ -1401,80 +1651,6 @@ static int32_t msgToPhysiTableScanNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { EP_CODE_FQDN = 1, EP_CODE_port }; - -static int32_t epToMsg(const void* pObj, STlvEncoder* pEncoder) { - const SEp* pNode = (const SEp*)pObj; - - int32_t code = tlvEncodeCStr(pEncoder, EP_CODE_FQDN, pNode->fqdn); - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeU16(pEncoder, EP_CODE_port, pNode->port); - } - - return code; -} - -static int32_t msgToEp(STlvDecoder* pDecoder, void* pObj) { - SEp* pNode = (SEp*)pObj; - - int32_t code = TSDB_CODE_SUCCESS; - STlv* pTlv = NULL; - tlvForEach(pDecoder, pTlv, code) { - switch (pTlv->type) { - case EP_CODE_FQDN: - code = tlvDecodeCStr(pTlv, pNode->fqdn); - break; - case EP_CODE_port: - code = tlvDecodeU16(pTlv, &pNode->port); - break; - default: - break; - } - } - - return code; -} - -enum { EP_SET_CODE_IN_USE = 1, EP_SET_CODE_NUM_OF_EPS, EP_SET_CODE_EPS }; - -static int32_t epSetToMsg(const void* pObj, STlvEncoder* pEncoder) { - const SEpSet* pNode = (const SEpSet*)pObj; - - int32_t code = tlvEncodeI8(pEncoder, EP_SET_CODE_IN_USE, pNode->inUse); - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI8(pEncoder, EP_SET_CODE_NUM_OF_EPS, pNode->numOfEps); - } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObjArray(pEncoder, EP_SET_CODE_EPS, epToMsg, pNode->eps, sizeof(SEp), pNode->numOfEps); - } - - return code; -} - -static int32_t msgToEpSet(STlvDecoder* pDecoder, void* pObj) { - SEpSet* pNode = (SEpSet*)pObj; - - int32_t code = TSDB_CODE_SUCCESS; - STlv* pTlv = NULL; - tlvForEach(pDecoder, pTlv, code) { - switch (pTlv->type) { - case EP_SET_CODE_IN_USE: - code = tlvDecodeI8(pTlv, &pNode->inUse); - break; - case EP_SET_CODE_NUM_OF_EPS: - code = tlvDecodeI8(pTlv, &pNode->numOfEps); - break; - case EP_SET_CODE_EPS: - code = tlvDecodeObjArrayFromTlv(pTlv, msgToEp, pNode->eps, sizeof(SEp)); - break; - default: - break; - } - } - - return code; -} - enum { PHY_SYSTABLE_SCAN_CODE_SCAN = 1, PHY_SYSTABLE_SCAN_CODE_MGMT_EP_SET, @@ -2594,38 +2770,6 @@ static int32_t msgToSubplanId(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { QUERY_NODE_ADDR_CODE_NODE_ID = 1, QUERY_NODE_ADDR_CODE_EP_SET }; - -static int32_t queryNodeAddrToMsg(const void* pObj, STlvEncoder* pEncoder) { - const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj; - - int32_t code = tlvEncodeI32(pEncoder, QUERY_NODE_ADDR_CODE_NODE_ID, pNode->nodeId); - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeObj(pEncoder, QUERY_NODE_ADDR_CODE_EP_SET, epSetToMsg, &pNode->epSet); - } - - return code; -} - -static int32_t msgToQueryNodeAddr(STlvDecoder* pDecoder, void* pObj) { - SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj; - - int32_t code = TSDB_CODE_SUCCESS; - STlv* pTlv = NULL; - tlvForEach(pDecoder, pTlv, code) { - switch (pTlv->type) { - case QUERY_NODE_ADDR_CODE_NODE_ID: - code = tlvDecodeI32(pTlv, &pNode->nodeId); - break; - case QUERY_NODE_ADDR_CODE_EP_SET: - code = tlvDecodeObjFromTlv(pTlv, msgToEpSet, &pNode->epSet); - break; - } - } - - return code; -} - enum { SUBPLAN_CODE_SUBPLAN_ID = 1, SUBPLAN_CODE_SUBPLAN_TYPE, @@ -2802,6 +2946,8 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_SLOT_DESC: code = slotDescNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_DOWNSTREAM_SOURCE: + return downstreamSourceNodeToMsg(pObj, pEncoder); case QUERY_NODE_LEFT_VALUE: break; case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: @@ -2929,6 +3075,8 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_SLOT_DESC: code = msgToSlotDescNode(pDecoder, pObj); break; + case QUERY_NODE_DOWNSTREAM_SOURCE: + return msgToDownstreamSourceNode(pDecoder, pObj); case QUERY_NODE_LEFT_VALUE: break; case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index baa1d1074c7d4bea0df280649777db4a659247cb..35903d45b16175c1f9c21904b96ced434178ec51 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -123,6 +123,21 @@ int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); } +int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { + if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) { + SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink; + *pLen = insert->size; + *pStr = insert->pData; + insert->pData = NULL; + return TSDB_CODE_SUCCESS; + } + return nodesNodeToMsg((const SNode*)pSubplan, pStr, pLen); +} + +int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan) { + return nodesMsgToNode(pStr, len, (SNode**)pSubplan); +} + char* qQueryPlanToString(const SQueryPlan* pPlan) { char* pStr = NULL; int32_t len = 0; diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index b280b32a94f7d824bcd475573f3f62744c3e3d26..bf19c7a2221ea36d565f9a2031a51a2a3b6dbfff 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -480,9 +480,14 @@ class PlannerTestBaseImpl { DO_WITH_THROW(nodesNodeToMsg, pNode, &pNewStr, &newlen) if (newlen != len || 0 != memcmp(pStr, pNewStr, len)) { cout << "nodesNodeToMsg error!!!!!!!!!!!!!! len = " << len << ", newlen = " << newlen << endl; + taosMemoryFreeClear(pNewStr); + DO_WITH_THROW(nodesNodeToString, pRoot, false, &pNewStr, &newlen) + cout << "orac node: " << pNewStr << endl; + taosMemoryFreeClear(pNewStr); DO_WITH_THROW(nodesNodeToString, pNode, false, &pNewStr, &newlen) - cout << "nodesNodeToString " << pNewStr << endl; + cout << "new node: " << pNewStr << endl; } + nodesDestroyNode(pNode); taosMemoryFreeClear(pNewStr); string str(pStr, len); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f006096ce20a45e18a5b9d990c9c63b621638ac5..61c38f59db2d08e960100e1b615bdf8f0a104127 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -559,7 +559,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); - code = qStringToSubplan(qwMsg->msg, &plan); + code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); @@ -968,7 +968,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { DataSinkHandle sinkHandle = NULL; SQWTaskCtx ctx = {0}; - code = qStringToSubplan(qwMsg->msg, &plan); + code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index c5f161b66a8312a8c5919efca8fe8b1b2d61308c..969c6fc8a6aab8d140feb13b43975e8db4573536 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -860,7 +860,7 @@ int32_t schLaunchTaskImpl(void *param) { SSubplan *plan = pTask->plan; if (NULL == pTask->msg) { // TODO add more detailed reason for failure - code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); + code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen); if (TSDB_CODE_SUCCESS != code) { SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen);