diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a7fae403edbb608945840da7237080258aa37e1c..e15708e357973d707a4312a6b21911c0f586654e 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -155,7 +155,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index faee6cc2facc9ddcc4cdcabf2c8b554ac9a673b7..cc4b6a289033bab81a47acc835a2ed13926a5026 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -16,65 +16,9 @@ #define _DEFAULT_SOURCE #include "tdatablock.h" #include "tcompare.h" -#include "tglobal.h" #include "tlog.h" #include "tname.h" -int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) { - pEp->port = 0; - strcpy(pEp->fqdn, ep); - - char* temp = strchr(pEp->fqdn, ':'); - if (temp) { - *temp = 0; - pEp->port = atoi(temp + 1); - } - - if (pEp->port == 0) { - pEp->port = tsServerPort; - } - - return 0; -} - -void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port) { - if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) { - return; - } - - int32_t index = pEpSet->numOfEps; - tstrncpy(pEpSet->eps[index].fqdn, fqdn, tListLen(pEpSet->eps[index].fqdn)); - pEpSet->eps[index].port = port; - pEpSet->numOfEps += 1; -} - -bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) { - if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { - return false; - } - - for (int32_t i = 0; i < s1->numOfEps; i++) { - if (s1->eps[i].port != s2->eps[i].port || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0) - return false; - } - return true; -} - -void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) { - taosCorBeginWrite(&pEpSet->version); - pEpSet->epSet = *pNewEpSet; - taosCorEndWrite(&pEpSet->version); -} - -SEpSet getEpSet_s(SCorEpSet* pEpSet) { - SEpSet ep = {0}; - taosCorBeginRead(&pEpSet->version); - ep = pEpSet->epSet; - taosCorEndRead(&pEpSet->version); - - return ep; -} - int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { ASSERT(pColumnInfoData != NULL); if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { diff --git a/source/common/src/tname.c b/source/common/src/tname.c index c5bebf363062b9a9a434da4f220110f9e9db2a6d..b6f49a7219394c2bb1f2a9f3e7901dc5911cb4ae 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -20,34 +20,6 @@ #define VALID_NAME_TYPE(x) ((x) == TSDB_DB_NAME_T || (x) == TSDB_TABLE_NAME_T) -bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; } - -#if 0 -// TODO refactor -SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) { - if (numOfFilters == 0 || src == NULL) { - assert(src == NULL); - return NULL; - } - - SColumnFilterInfo* pFilter = taosMemoryCalloc(1, numOfFilters * sizeof(SColumnFilterInfo)); - - memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters); - for (int32_t j = 0; j < numOfFilters; ++j) { - if (pFilter[j].filterstr) { - size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE; - pFilter[j].pz = (int64_t) taosMemoryCalloc(1, len); - - memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t) pFilter[j].len); - } - } - - assert(src->filterstr == 0 || src->filterstr == 1); - assert(!(src->lowerRelOptr == 0 && src->upperRelOptr == 0)); - - return pFilter; -} -#endif #if 0 int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) { if (slidingTime == 0) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 10b84ba7b17bcdacf8a944d4a6dd1d7c9ffc982e..a70e7cd1dd23b31b79376eeb65f9486f5502239b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -855,7 +855,6 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList); -STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); @@ -986,9 +985,8 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, const char* sql, EOPTR_EXEC_MODEL model); -int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); -int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, - int32_t* resNum); +int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); +int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result); int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a7d13da52bbe258e8d662aa1402e3276726fca43..8f3ecafbea1e9574d718a521c190c811cff02419 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -496,11 +496,9 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) { doDestroyTask(pTaskInfo); } -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) { +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - int32_t capacity = 0; - - return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); + return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList); } int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 022089e06ac85e1660cba35bc749ef2a89a4eb7f..084fbbea7cfafa7b7d0be5fb3fa0a19af640c69d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4613,42 +4613,29 @@ void releaseQueryBuf(size_t numOfTables) { atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); } -int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, - int32_t* resNum) { - if (*resNum >= *capacity) { - *capacity += 10; - - *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo)); - if (NULL == *pRes) { - qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo)); - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - } - - SExplainExecInfo* pInfo = &(*pRes)[*resNum]; +int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { + SExplainExecInfo execInfo = {0}; + SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); - pInfo->numOfRows = operatorInfo->resultInfo.totalRows; - pInfo->startupCost = operatorInfo->cost.openCost; - pInfo->totalCost = operatorInfo->cost.totalCost; + pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows; + pExplainInfo->startupCost = operatorInfo->cost.openCost; + pExplainInfo->totalCost = operatorInfo->cost.totalCost; + pExplainInfo->verboseLen = 0; + pExplainInfo->verboseInfo = NULL; if (operatorInfo->fpSet.getExplainFn) { - int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen); + int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen); if (code) { qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code)); return code; } - } else { - pInfo->verboseLen = 0; - pInfo->verboseInfo = NULL; } - ++(*resNum); - int32_t code = 0; for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) { - code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum); - if (code) { - taosMemoryFreeClear(*pRes); + code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList); + if (code != TSDB_CODE_SUCCESS) { +// taosMemoryFreeClear(*pRes); return TSDB_CODE_QRY_OUT_OF_MEMORY; } } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5c37cf01e8d29db4453b5e177ec0dabe79f4ac4a..5690389302c46a520ca76f8ce15020ff9175c1f5 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -31,14 +31,21 @@ static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup); +static void freeGroupKey(void* param) { + SGroupKeys* pKey = (SGroupKeys*) param; + taosMemoryFree(pKey->pData); +} + static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->keyBuf); taosArrayDestroy(pInfo->pGroupCols); - taosArrayDestroy(pInfo->pGroupColVals); + taosArrayDestroyEx(pInfo->pGroupColVals, freeGroupKey); cleanupExprSupp(&pInfo->scalarSup); - + + cleanupGroupResInfo(&pInfo->groupResInfo); + cleanupAggSup(&pInfo->aggSup); taosMemoryFreeClear(param); } @@ -414,8 +421,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 53789343433289e10c52c207e3ce9ca7bd6ef0bb..3ee870ef96235c1be598c600ccce0e51b4f97b21 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -39,7 +39,7 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); -int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); +int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList); int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 7901fa64cb2c2ce198cc9996edfbc0eecb3e1cce..e8ffd98153a2e7ac32e4e7500cb0e1a5d14da0e4 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -82,8 +82,9 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) { - SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; +int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList) { + SExplainExecInfo* pInfo = taosArrayGet(pExecList, 0); + SExplainRsp rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo}; int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); void * pRsp = rpcMallocCont(contLen); @@ -96,10 +97,9 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn .code = 0, .info = *pConn, }; - rpcRsp.info.ahandle = NULL; + rpcRsp.info.ahandle = NULL; tmsgSendRsp(&rpcRsp); - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index d1f8a50dab556067dbc2bfa143a75c8ee2dda9c2..36d85f1f123cc8864d8678d07f174308a38ba729 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -44,18 +44,24 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re QW_RET(TSDB_CODE_SUCCESS); } +static void freeItem(void* param) { + SExplainExecInfo* pInfo = param; + taosMemoryFree(pInfo->verboseInfo); +} + int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { qTaskInfo_t taskHandle = ctx->taskHandle; if (TASK_TYPE_TEMP == ctx->taskType && taskHandle) { if (ctx->explain) { - SExplainExecInfo *execInfo = NULL; - int32_t resNum = 0; - QW_ERR_RET(qGetExplainExecInfo(taskHandle, &resNum, &execInfo)); + SArray* execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); + QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); SRpcHandleInfo connInfo = ctx->ctrlConnInfo; connInfo.ahandle = NULL; - QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum)); + int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); + taosArrayDestroyEx(execInfoList, freeItem); + QW_ERR_RET(code); } if (!ctx->needFetch) {