From 96eee800635a1a66e584253f7bb6567f77ec98bc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 26 Mar 2022 15:03:22 +0800 Subject: [PATCH] feature/qnode --- include/libs/nodes/plannodes.h | 3 + include/libs/parser/parser.h | 1 + include/libs/planner/planner.h | 1 + include/libs/transport/trpc.h | 2 +- source/client/src/clientImpl.c | 3 +- source/dnode/mnode/impl/src/mndShow.c | 7 ++ source/dnode/mnode/impl/src/mndStb.c | 2 +- source/libs/executor/inc/executorimpl.h | 4 +- source/libs/executor/src/executorimpl.c | 90 +++++++++++++++++-- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 14 +++ source/libs/parser/src/parTranslater.c | 1 + source/libs/planner/src/planLogicCreater.c | 1 + source/libs/planner/src/planPhysiCreater.c | 8 +- source/libs/qworker/src/qworker.c | 2 +- source/libs/scheduler/src/scheduler.c | 73 +++++++-------- source/libs/scheduler/test/schedulerTests.cpp | 2 +- tests/script/tsim/db/basic1.sim | 9 +- tests/script/tsim/db/basic6.sim | 11 ++- tests/script/tsim/db/error1.sim | 1 + tests/script/tsim/dnode/basic1.sim | 8 +- tests/script/tsim/tmq/basic.sim | 6 -- tests/script/tsim/user/basic1.sim | 8 +- 23 files changed, 170 insertions(+), 88 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 794e0ca85a..4d025eb9b7 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -48,6 +48,7 @@ typedef struct SScanLogicNode { uint8_t scanFlag; // denotes reversed scan of data or not STimeWindow scanRange; SName tableName; + bool showRewrite; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -171,6 +172,8 @@ typedef SScanPhysiNode SStreamScanPhysiNode; typedef struct SSystemTableScanPhysiNode { SScanPhysiNode scan; SEpSet mgmtEpSet; + bool showRewrite; + int32_t accountId; } SSystemTableScanPhysiNode; typedef struct STableScanPhysiNode { diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 2254298e5c..0747534721 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -54,6 +54,7 @@ typedef struct SQuery { int32_t msgType; SArray* pDbList; SArray* pTableList; + bool showRewrite; } SQuery; int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery); diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 38b30ec01e..8db78fccf5 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -29,6 +29,7 @@ typedef struct SPlanContext { SNode* pAstRoot; bool topicQuery; bool streamQuery; + bool showRewrite; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index e5286e65f2..8125de7647 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -90,7 +90,7 @@ typedef struct { int32_t msgType; void *val; int32_t (*clone)(void *src, void **dst); - void (*free)(void *arg); + void (*freeFunc)(const void *arg); } SRpcBrokenlinkVal; typedef struct { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c99c7050dc..6e65a4267f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -199,7 +199,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra .queryId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), - .pAstRoot = pQuery->pRoot + .pAstRoot = pQuery->pRoot, + .showRewrite = pQuery->showRewrite }; int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index a830ee7104..dff918f135 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -357,6 +357,13 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { // if free flag is set, client wants to clean the resources if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead); + if (rowsRead < 0) { + terrno = rowsRead; + rpcFreeCont(pRsp); + mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); + mndReleaseShowObj((SShowObj*) pShow, true); + return -1; + } } mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index acf45a31d9..fdd03be710 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1613,7 +1613,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 SDbObj* pDb = NULL; if (strlen(pShow->db) > 0) { pDb = mndAcquireDb(pMnode, pShow->db); - if (pDb == NULL) return 0; + if (pDb == NULL) return terrno; } while (numOfRows < rows) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c582873315..ead830394e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -449,6 +449,8 @@ typedef struct SSysTableScanInfo { SEpSet epSet; tsem_t ready; + int32_t accountId; + bool showRewrite; SNode* pCondition; // db_name filter condition, to discard data that are not in current database void *pCur; // cursor for iterate the local table meta store. SArray *scanCols; // SArray scan column id list @@ -655,7 +657,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo); + SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2ce4549776..4f7f3d2d7e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5489,7 +5489,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* } static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) { - SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param; + SOperatorInfo* operator = (SOperatorInfo *)param; + SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo *)operator->info; if (TSDB_CODE_SUCCESS == code) { pScanResInfo->pRsp = pMsg->pData; @@ -5498,6 +5499,8 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t pRsp->useconds = htobe64(pRsp->useconds); pRsp->handle = htobe64(pRsp->handle); pRsp->compLen = htonl(pRsp->compLen); + } else { + operator->pTaskInfo->code = code; } tsem_post(&pScanResInfo->ready); @@ -5544,6 +5547,64 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes; } +EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { + int32_t code = TSDB_CODE_SUCCESS; + ENodeType nType = nodeType(pNode); + + switch (nType) { + case QUERY_NODE_OPERATOR: { + SOperatorNode *node = (SOperatorNode *)pNode; + + if (OP_TYPE_EQUAL == node->opType) { + *(int32_t *)pContext = 1; + return DEAL_RES_CONTINUE; + } + + *(int32_t *)pContext = 0; + + return DEAL_RES_IGNORE_CHILD; + } + case QUERY_NODE_COLUMN: { + if (1 != *(int32_t *)pContext) { + return DEAL_RES_CONTINUE; + } + + SColumnNode *node = (SColumnNode *)pNode; + if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) { + *(int32_t *)pContext = 2; + return DEAL_RES_CONTINUE; + } + + *(int32_t *)pContext = 0; + return DEAL_RES_CONTINUE; + } + case QUERY_NODE_VALUE: { + if (2 != *(int32_t *)pContext) { + return DEAL_RES_CONTINUE; + } + + SValueNode *node = (SValueNode *)pNode; + char *dbName = nodesGetValueFromNode(node); + strncpy(pContext, varDataVal(dbName), varDataLen(dbName)); + *((char *)pContext + varDataLen(dbName)) = 0; + return DEAL_RES_ERROR; // stop walk + } + default: + break; + } + + return DEAL_RES_CONTINUE; +} + + +void getDBNameFromCondition(SNode *pCondition, char *dbName) { + if (NULL == pCondition) { + return; + } + + nodesWalkNode(pCondition, getDBNameFromConditionWalker, dbName); +} + static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { // build message and send to mnode to fetch the content of system tables. SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -5600,7 +5661,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { pInfo->req.type = pInfo->type; strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); - getFullDBNameFromCondition(pInfo->pCondition, pInfo->req.db)); + if (pInfo->showRewrite) { + char dbName[TSDB_DB_NAME_LEN] = {0}; + getDBNameFromCondition(pInfo->pCondition, dbName); + sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); + } int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); char* buf1 = taosMemoryCalloc(1, contLen); @@ -5614,7 +5679,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { return NULL; } - pMsgSendInfo->param = pInfo; + pMsgSendInfo->param = pOperator; pMsgSendInfo->msgInfo.pData = buf1; pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; @@ -5624,6 +5689,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); tsem_wait(&pInfo->ready); + if (pTaskInfo->code) { + return NULL; + } + SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; pInfo->req.showId = pRsp->handle; @@ -5645,7 +5714,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) { } SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo) { + SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) { SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5655,10 +5724,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB return NULL; } - pInfo->pRes = pResBlock; - pInfo->capacity = 4096; - pInfo->pCondition = pCondition; - pInfo->scanCols = colList; + pInfo->accountId = accountId; + pInfo->showRewrite = showRewrite; + pInfo->pRes = pResBlock; + pInfo->capacity = 4096; + pInfo->pCondition = pCondition; + pInfo->scanCols = colList; // TODO remove it int32_t tableType = 0; @@ -8531,7 +8602,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SArray* colList = extractScanColumnId(pScanNode->pScanCols); SOperatorInfo* pOperator = createSysTableScanOperatorInfo(pHandle->meta, pResBlock, &pScanNode->tableName, - pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pTaskInfo); + pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, + colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); return pOperator; } else { ASSERT(0); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 078f3ab3e6..a9660c8573 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -237,6 +237,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(scanFlag); COPY_SCALAR_FIELD(scanRange); COPY_SCALAR_FIELD(tableName); + COPY_SCALAR_FIELD(showRewrite); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 1acc3c51c5..d7caf6e511 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -764,6 +764,8 @@ static int32_t jsonToEpSet(const SJson* pJson, void* pObj) { } static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; +static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite"; +static const char* jkSysTableScanPhysiPlanAccountId = "AccountId"; static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) { const SSystemTableScanPhysiNode* pNode = (const SSystemTableScanPhysiNode*)pObj; @@ -772,6 +774,12 @@ static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, epSetToJson, &pNode->mgmtEpSet); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSysTableScanPhysiPlanShowRewrite, pNode->showRewrite); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId); + } return code; } @@ -783,6 +791,12 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, jsonToEpSet, &pNode->mgmtEpSet); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSysTableScanPhysiPlanShowRewrite, &pNode->showRewrite); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId); + } return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 2f722bb82b..1c721f3caf 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1962,6 +1962,7 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) { code = createShowCondition((SShowStmt*)pQuery->pRoot, pStmt); } if (TSDB_CODE_SUCCESS == code) { + pQuery->showRewrite = true; nodesDestroyNode(pQuery->pRoot); pQuery->pRoot = (SNode*)pStmt; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 84fa52a070..6ea476a334 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -161,6 +161,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pScan->tableName.acctId = pCxt->pPlanCxt->acctId; strcpy(pScan->tableName.dbname, pRealTable->table.dbName); strcpy(pScan->tableName.tname, pRealTable->table.tableName); + pScan->showRewrite = pCxt->pPlanCxt->showRewrite; // set columns to scan SNodeList* pCols = NULL; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 4b000da4e5..d8d090d280 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -331,18 +331,14 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* return TSDB_CODE_OUT_OF_MEMORY; } + pScan->showRewrite = pScanLogicNode->showRewrite; + pScan->accountId = pCxt->pPlanCxt->acctId; if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) { vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); } else { SQueryNodeAddr addr = { .nodeId = MND_VGID, .epSet = pCxt->pPlanCxt->mgmtEpSet }; taosArrayPush(pCxt->pExecNodeList, &addr); - - //for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) { - // SQueryNodeAddr addr; - // vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr); - // taosArrayPush(pCxt->pExecNodeList, &addr); - //} } pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e7681116e0..70ca0f736b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -917,7 +917,7 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - if (TSDB_CODE_SUCCESS == code && readyConnection) { + if (readyConnection) { qwBuildAndSendReadyRsp(readyConnection, code); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 38196f485f..5af13d97ca 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -74,15 +74,15 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { while (pIter) { SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter; - (*ctxVal->free)(ctxVal->val); + (*ctxVal->freeFunc)(ctxVal->val); pIter = taosHashIterate(pCtx->args, pIter); } taosHashCleanup(pCtx->args); - if (pCtx->brokenVal.free) { - (*pCtx->brokenVal.free)(pCtx->brokenVal.val); + if (pCtx->brokenVal.freeFunc) { + (*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val); } } @@ -1254,18 +1254,18 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { return TSDB_CODE_SUCCESS; } -void schFreeRpcCtxVal(void *arg) { +void schFreeRpcCtxVal(const void *arg) { if (NULL == arg) { return; } - SMsgSendInfo* pMsgSendInfo = arg; - tfree(pMsgSendInfo->param); - tfree(pMsgSendInfo); + SMsgSendInfo* pMsgSendInfo = (SMsgSendInfo *)arg; + taosMemoryFreeClear(pMsgSendInfo->param); + taosMemoryFreeClear(pMsgSendInfo); } int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { - SSchTaskCallbackParam *param = calloc(1, sizeof(SSchTaskCallbackParam)); + SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam)); if (NULL == param) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1282,7 +1282,7 @@ int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) } int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { - SSchHbCallbackParam *param = calloc(1, sizeof(SSchHbCallbackParam)); + SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam)); if (NULL == param) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1306,7 +1306,7 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal * int32_t code = 0; SMsgSendInfo* pMsgSendInfo = NULL; - pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1327,14 +1327,14 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal * brokenVal->msgType = msgType; brokenVal->val = pMsgSendInfo; brokenVal->clone = schCloneSMsgSendInfo; - brokenVal->free = schFreeRpcCtxVal; + brokenVal->freeFunc = schFreeRpcCtxVal; return TSDB_CODE_SUCCESS; _return: - tfree(pMsgSendInfo->param); - tfree(pMsgSendInfo); + taosMemoryFreeClear(pMsgSendInfo->param); + taosMemoryFreeClear(pMsgSendInfo); SCH_RET(code); } @@ -1350,13 +1350,13 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - param = calloc(1, sizeof(SSchTaskCallbackParam)); + param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam)); if (NULL == param) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1374,7 +1374,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; - SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal}; + SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1387,8 +1387,8 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { _return: taosHashCleanup(pCtx->args); - tfree(param); - tfree(pMsgSendInfo); + taosMemoryFreeClear(param); + taosMemoryFreeClear(pMsgSendInfo); SCH_RET(code); } @@ -1409,13 +1409,13 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - param = calloc(1, sizeof(SSchHbCallbackParam)); + param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam)); if (NULL == param) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1431,7 +1431,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; - SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal}; + SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1444,8 +1444,8 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { _return: taosHashCleanup(pCtx->args); - tfree(param); - tfree(pMsgSendInfo); + taosMemoryFreeClear(param); + taosMemoryFreeClear(pMsgSendInfo); SCH_RET(code); } @@ -1479,7 +1479,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId * int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) { if (pSrc->isHbParam) { - SSchHbCallbackParam *dst = malloc(sizeof(SSchHbCallbackParam)); + SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam)); if (NULL == dst) { qError("malloc SSchHbCallbackParam failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1491,7 +1491,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe return TSDB_CODE_SUCCESS; } - SSchTaskCallbackParam *dst = malloc(sizeof(SSchTaskCallbackParam)); + SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam)); if (NULL == dst) { qError("malloc SSchTaskCallbackParam failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1506,7 +1506,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe int32_t schCloneSMsgSendInfo(void *src, void **dst) { SMsgSendInfo *pSrc = src; int32_t code = 0; - SMsgSendInfo *pDst = malloc(sizeof(*pSrc)); + SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc)); if (NULL == pDst) { qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1523,7 +1523,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) { _return: - tfree(pDst); + taosMemoryFreeClear(pDst); SCH_RET(code); } @@ -1553,7 +1553,7 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) { if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) { qError("taosHashPut msg %d to rpcCtx failed", *msgType); - (*dst.free)(dst.val); + (*dst.freeFunc)(dst.val); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1601,8 +1601,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet* pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; - qDebug("start to send %s msg, refId:%" PRIx64 "instance:%p, handle:%p", - TMSG_INFO(msgType), pJob->refId, trans->transInst, trans->transHandle); + qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", + TMSG_INFO(msgType), ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, + pJob->refId, trans->transInst, trans->transHandle); int64_t transporterId = 0; code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); @@ -1649,7 +1650,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - void *msg = calloc(1, msgSize); + void *msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { qError("calloc hb req %d failed", msgSize); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1660,13 +1661,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { qError("calloc SMsgSendInfo failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSchTaskCallbackParam *param = calloc(1, sizeof(SSchTaskCallbackParam)); + SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam)); if (NULL == param) { qError("calloc SSchTaskCallbackParam failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1702,9 +1703,9 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { _return: - tfree(msg); - tfree(param); - tfree(pMsgSendInfo); + taosMemoryFreeClear(msg); + taosMemoryFreeClear(param); + taosMemoryFreeClear(pMsgSendInfo); schFreeRpcCtx(&rpcCtx); SCH_RET(code); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 445d95132b..cf04b06579 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -805,7 +805,7 @@ TEST(queryTest, readyFirstCase) { SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; ASSERT_EQ(pRsp->completed, 1); ASSERT_EQ(pRsp->numOfRows, 10); - tfree(data); + taosMemoryFreeClear(data); data = NULL; code = schedulerFetchRows(job, &data); diff --git a/tests/script/tsim/db/basic1.sim b/tests/script/tsim/db/basic1.sim index 7877bfc3a7..c07ebd0400 100644 --- a/tests/script/tsim/db/basic1.sim +++ b/tests/script/tsim/db/basic1.sim @@ -39,11 +39,10 @@ endi print =============== drop database sql drop database d1 -# todo release -#sql show databases -#if $rows != 1 then -# return -1 -#endi +sql show databases +if $rows != 1 then + return -1 +endi print =============== more databases sql create database d2 vgroups 2 diff --git a/tests/script/tsim/db/basic6.sim b/tests/script/tsim/db/basic6.sim index 48b3fccd47..7e57fe8f1b 100644 --- a/tests/script/tsim/db/basic6.sim +++ b/tests/script/tsim/db/basic6.sim @@ -58,11 +58,10 @@ endi print =============== step3 sql drop database $db -# todo release -#sql show databases -#if $rows != 1 then -# return -1 -#endi +sql show databases +if $rows != 1 then + return -1 +endi print =============== step4 sql_error drop database $db @@ -319,4 +318,4 @@ if $rows != 0 then return -1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index 73d9e6fab6..6f62228ae7 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -61,6 +61,7 @@ endi print ========== stop dnode2 system sh/exec.sh -n dnode2 -s stop -x SIGKILL +sleep 1000 print =============== create database sql_error drop database d1 diff --git a/tests/script/tsim/dnode/basic1.sim b/tests/script/tsim/dnode/basic1.sim index c5b83aa3a3..6f0d5f88b8 100644 --- a/tests/script/tsim/dnode/basic1.sim +++ b/tests/script/tsim/dnode/basic1.sim @@ -5,9 +5,6 @@ system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode2 -s start sql connect -# todo remove -sql create database useless_db - print =============== show dnodes sql show dnodes; if $rows != 1 then @@ -83,9 +80,6 @@ if $data02 != master then return -1 endi -# todo remove -sql drop database useless_db - print =============== create database sql create database d1 vgroups 4; sql create database d2; @@ -202,4 +196,4 @@ if $data00 != 1 then endi system sh/exec.sh -n dnode1 -s stop -x SIGINT -system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode2 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/basic.sim b/tests/script/tsim/tmq/basic.sim index 3e42c2cbd7..876cf7e266 100644 --- a/tests/script/tsim/tmq/basic.sim +++ b/tests/script/tsim/tmq/basic.sim @@ -6,9 +6,6 @@ system sh/exec.sh -n dnode1 -s start sleep 500 sql connect -# todo remove -sql create database useless_db - $loop_cnt = 0 check_dnode_ready: $loop_cnt = $loop_cnt + 1 @@ -26,9 +23,6 @@ if $data04 != ready then goto check_dnode_ready endi -# todo remove -sql drop database useless_db - #root@trd02 /data2/dnode $ tmq_demo --help #Used to tmq_demo # -c Configuration directory, default is diff --git a/tests/script/tsim/user/basic1.sim b/tests/script/tsim/user/basic1.sim index e14aa3af2f..7af5ba8d00 100644 --- a/tests/script/tsim/user/basic1.sim +++ b/tests/script/tsim/user/basic1.sim @@ -3,9 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start sql connect -# todo remove -sql create database useless_db - print =============== show users sql show users if $rows != 1 then @@ -74,7 +71,4 @@ print $data10 $data11 $data22 print $data20 $data11 $data22 print $data30 $data31 $data32 -# todo remove -sql drop database useless_db - -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT -- GitLab