diff --git a/include/util/tdef.h b/include/util/tdef.h index e6d28ae3414dc9388bcd62d022c0dbf3fd5189bf..d9bb558b74e099e852ed39d179492b6bef36fab4 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -239,6 +239,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_TAGS 128 #define TSDB_MAX_TAG_CONDITIONS 1024 +#define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS) #define TSDB_MAX_JSON_TAG_LEN 16384 #define TSDB_MAX_JSON_KEY_LEN 256 @@ -496,6 +497,8 @@ enum { #define MAX_NUM_STR_SIZE 40 +#define MAX_META_MSG_IN_BATCH 1048576 + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 3f24a08cf4a68a19407da670fc43fe2be3bea279..fa0e3212a34b27b49432e2affe079852dfe6a0f3 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -440,6 +440,7 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S } if (userNum <= 0) { + taosMemoryFree(users); return TSDB_CODE_SUCCESS; } @@ -476,6 +477,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl } if (dbNum <= 0) { + taosMemoryFree(dbs); return TSDB_CODE_SUCCESS; } @@ -514,6 +516,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC } if (stbNum <= 0) { + taosMemoryFree(stbs); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/inc/mndQuery.h b/source/dnode/mnode/impl/inc/mndQuery.h index 4b359c0a1408dc84688992cd5fd107dafe646087..69ec54f3302b3a24dc760870c909a2081354e980 100644 --- a/source/dnode/mnode/impl/inc/mndQuery.h +++ b/source/dnode/mnode/impl/inc/mndQuery.h @@ -22,6 +22,8 @@ extern "C" { #endif +#define MAX_META_MSG_IN_BATCH 1048576 + int32_t mndInitQuery(SMnode *pMnode); void mndCleanupQuery(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 1909b168bde380b735f843a6b2f58d2893399d78..d42b66455f2be95c160e9d39d8ea84a3cc10ac0a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -369,7 +369,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) { taosCacheRelease(pMgmt->appCache, (void **)&pApp, false); } -void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) { +SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) { SAppObj *pApp = NULL; bool hasNext = taosCacheIterNext(pIter); if (hasNext) { diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index f185cdeaec5305d638d95443cd5d773e01de7d81..2e01fadbae7cd0f957fc5d4af013bc57af7f3ba9 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -77,6 +77,12 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { void *pRsp = NULL; SMnode *pMnode = pMsg->info.node; + if (msgNum >= MAX_META_MSG_IN_BATCH) { + code = TSDB_CODE_INVALID_MSG; + mError("too many msgs %d in mnode batch meta req", msgNum); + goto _exit; + } + SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); if (NULL == batchRsp) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -106,6 +112,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { if (fp == NULL) { mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED; + taosArrayDestroy(batchRsp); return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index e18379fe8aa331a71ffc3378707170bb93da629c..8c1f858cbbfbe98e53662c46243364c72320f441 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -272,6 +272,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { SRpcMsg rspMsg = {0}; void *pRsp = NULL; + if (msgNum >= MAX_META_MSG_IN_BATCH) { + code = TSDB_CODE_INVALID_MSG; + qError("too many msgs %d in vnode batch meta req", msgNum); + goto _exit; + } + SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); if (NULL == batchRsp) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/catalog/inc/ctgRemote.h b/source/libs/catalog/inc/ctgRemote.h index 8edd3b3880994c60d9dc8e1ce27c64e3f271a26f..746344b7ed880ae6e1fb660865f7531b6449176e 100644 --- a/source/libs/catalog/inc/ctgRemote.h +++ b/source/libs/catalog/inc/ctgRemote.h @@ -20,6 +20,8 @@ extern "C" { #endif +#define CTG_MAX_REQ_IN_BATCH 1048576 + #ifdef __cplusplus } #endif diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index b027dbcfa5ac9a91a2656778810ae84ae0b72eeb..b960103d94a65e5508031c6b23fb57ab17f2068f 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -173,15 +173,19 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput)); } - CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncReq)); + code = ctgUpdateTbMetaEnqueue(pCtg, output, syncReq); + output = NULL; + CTG_ERR_JRET(code); return TSDB_CODE_SUCCESS; _return: - taosMemoryFreeClear(output->tbMeta); - taosMemoryFreeClear(output); - + if (output) { + taosMemoryFreeClear(output->tbMeta); + taosMemoryFreeClear(output); + } + CTG_RET(code); } @@ -290,7 +294,9 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) { CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta)); } - CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp)); + code = ctgUpdateTbMetaEnqueue(pCtg, output, syncOp); + output = NULL; + CTG_ERR_JRET(code); return TSDB_CODE_SUCCESS; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 1305540a5082e59317969b8fdd574f59a059aaf6..1d2e3640a1b739493e923d5bf250cbddc717dd23 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -71,7 +71,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx, - ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); + ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); return TSDB_CODE_SUCCESS; } @@ -202,7 +202,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx, - ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); + ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbHashNum); return TSDB_CODE_SUCCESS; } @@ -1056,7 +1056,6 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf default: ctgError("invalid reqType %d", reqType); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); - break; } STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; @@ -1223,7 +1222,6 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu default: ctgError("invalid reqType %d", reqType); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); - break; } STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; @@ -1309,7 +1307,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* default: ctgError("invalid reqType %d", reqType); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); - break; } _return: @@ -1346,7 +1343,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf default: ctgError("invalid reqType %d", reqType); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); - break; } _return: @@ -1382,7 +1378,6 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu default: ctgError("invalid reqType %d", reqType); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); - break; } if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { @@ -1672,7 +1667,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) { int32_t baseResIdx = 0; for (int32_t i = 0; i < dbNum; ++i) { STablesReq* pReq = taosArrayGet(pCtx->pNames, i); - ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables)); + ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, (int32_t)taosArrayGetSize(pReq->pTables)); CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables)); baseResIdx += taosArrayGetSize(pReq->pTables); } @@ -2010,10 +2005,6 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) { _return: - if (dbCache) { - ctgReleaseVgInfoToCache(pCtg, dbCache); - } - CTG_RET(code); } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 3a774afdcce2fc21334686a39a65fcb83606bab2..89571ebf5573465b179001a38f55f954398ffebb 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -597,6 +597,8 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode)); if (NULL == node) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); + taosMemoryFree(operation->data); + taosMemoryFree(operation); CTG_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -648,6 +650,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -668,7 +671,6 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) _return: - taosMemoryFreeClear(op->data); CTG_RET(code); } @@ -681,6 +683,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -700,7 +703,6 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) _return: - taosMemoryFreeClear(op->data); CTG_RET(code); } @@ -714,6 +716,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -731,7 +734,6 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, _return: - taosMemoryFreeClear(op->data); CTG_RET(code); } @@ -744,6 +746,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -760,7 +763,6 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, _return: - taosMemoryFreeClear(op->data); CTG_RET(code); } @@ -773,6 +775,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); + taosMemoryFree(op); ctgFreeVgInfo(dbInfo); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -796,8 +799,6 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId _return: ctgFreeVgInfo(dbInfo); - taosMemoryFreeClear(op->data); - taosMemoryFreeClear(op); CTG_RET(code); } @@ -810,6 +811,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg)); + taosMemoryFree(op); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -834,8 +836,6 @@ _return: taosMemoryFree(output); } - taosMemoryFreeClear(msg); - CTG_RET(code); } @@ -847,6 +847,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -863,8 +864,6 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp _return: - taosMemoryFreeClear(msg); - CTG_RET(code); } @@ -877,6 +876,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -892,7 +892,6 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp _return: tFreeSGetUserAuthRsp(pAuth); - taosMemoryFreeClear(msg); CTG_RET(code); } @@ -906,6 +905,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncO SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg)); + taosMemoryFree(op); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -923,7 +923,6 @@ _return: taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo); taosMemoryFreeClear(*pIndex); - taosMemoryFreeClear(msg); CTG_RET(code); } @@ -937,6 +936,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -952,8 +952,6 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { _return: - taosMemoryFreeClear(msg); - CTG_RET(code); } @@ -968,6 +966,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg)); + taosMemoryFree(op); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -981,8 +980,6 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool _return: - taosMemoryFreeClear(msg); - CTG_RET(code); } @@ -1483,7 +1480,9 @@ int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool sync int32_t code = 0; CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput)); - CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq)); + code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq); + pOutput = NULL; + CTG_ERR_JRET(code); return TSDB_CODE_SUCCESS; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 744d6eab0693293df5887e613539cc9935beba6d..23bccb0835eee85781c4fcbfe5f152119ce5acbf 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -19,6 +19,7 @@ #include "tname.h" #include "tref.h" #include "trpc.h" +#include "ctgRemote.h" int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) { int32_t code = 0; @@ -579,6 +580,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { int32_t num = taosArrayGetSize(pBatch->pMsgs); SBatchReq* pBatchReq = (SBatchReq*)(*msg); + if (num >= CTG_MAX_REQ_IN_BATCH) { + qError("too many msgs %d in one batch request", num); + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + pBatchReq->header.vgId = htonl(vgId); pBatchReq->msgNum = htonl(num); offset += sizeof(SBatchReq); diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index ad677a81ff304a3637c9a24c2c20b4f03cee3ac1..ce4c4cf09ba4c59c751f1d1c8c2dd028d60b7417 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -89,8 +89,8 @@ extern "C" { #define EXPLAIN_STRING_TYPE_FORMAT "%s" #define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s" #define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s" -#define EXPLAIN_OFFSET_FORMAT "offset=%d" -#define EXPLAIN_SOFFSET_FORMAT "soffset=%d" +#define EXPLAIN_OFFSET_FORMAT "offset=%" PRId64 +#define EXPLAIN_SOFFSET_FORMAT "soffset=%" PRId64 #define EXPLAIN_PARTITIONS_FORMAT "partitions=%d" #define COMMAND_RESET_LOG "resetLog" diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 18d839e1091e3fc5f1be2939a22345efe8ea8579..a9134de53752baa6fe40a2e9f51eb8bc7eb9391f 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -485,6 +485,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs SSDataBlock* pBlock = buildCreateTbResultDataBlock(); int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg); if (code) { + blockDataDestroy(pBlock); return code; } return buildRetrieveTableRsp(pBlock, SHOW_CREATE_TB_RESULT_COLS, pRsp); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index c5de44e1f67d3e4843a0357be88bbac730fc2d99..e0a4218243fa933d56de84f6189138421394efb7 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1598,6 +1598,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); if (NULL == rsp) { qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); + blockDataDestroy(pBlock); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 1ade4b89e63be0a53aa9a3b58e7c36d0631f7860..55978855d16100481a301ccf59ea852e68e30b64 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -267,6 +267,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData taosThreadMutexInit(&deleter->mutex, NULL); if (NULL == deleter->pDataBlocks) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + destroyDataSinker((SDataSinkHandle*)deleter); + taosMemoryFree(deleter); return TSDB_CODE_QRY_OUT_OF_MEMORY; } *pHandle = deleter; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index ce0ff485ab41e97ac032b987e4558fec2d0c1f4a..4c4ba59fa9338fbd17cdf6b3685ee5daf3aa3ebc 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -323,6 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); if (code) { + destroyDataSinker((SDataSinkHandle*)pInserterNode); return code; } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 8162b922cec8fd779907df851bc1d8b545cafc28..a917397d020a3431fccad206f731b9cf6dc0a817 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -424,6 +424,12 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { return TSDB_CODE_SUCCESS; } + if ((pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) > TSDB_MAX_COL_TAG_NUM) { + *pDst = NULL; + qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags); + return TSDB_CODE_INVALID_PARA; + } + int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema); *pDst = taosMemoryMalloc(metaSize); if (NULL == *pDst) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 80ba356e8f194f13c21af403bf9be378a0599134..a6f26088de29a1b458c00cc509f5d5e13feb9c76 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t SBuildTableInput *pInput = input; STableCfgReq cfgReq = {0}; cfgReq.header.vgId = pInput->vgId; - strcpy(cfgReq.dbFName, pInput->dbFName); - strcpy(cfgReq.tbName, pInput->tbName); + strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName)); + strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName)); int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq); void *pBuf = (*mallcFp)(bufLen); @@ -615,6 +615,8 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) { STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp)); if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) { qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize); + tFreeSTableCfgRsp(out); + taosMemoryFree(out); return TSDB_CODE_INVALID_MSG; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index d2c16a1cb5186ed4dd2e399f78f926c0238656d1..0ff40f3b9a7f34b76d23ad6a053d18ecfc62a9ab 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -258,7 +258,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); if (len < 0) { - QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); + QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -292,7 +292,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } // Got data from sink - QW_TASK_DLOG("there are data in sink, dataLength:%d", len); + QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64, len); *dataLen += len; @@ -408,7 +408,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); - break; } QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index bb8451ba37bf9dbc79e762f421cde624b440b9f7..8fcfdce2b2552cedb7da70d0e1abb72a17936b26 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -122,7 +122,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_DROP: SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); - break; default: SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1b59c06140daad7370c7721c172b3da2a8163732..c26ae4e646298bd5cd83847f71ca59e39f22fcee 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -169,7 +169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { SCH_LOCK(SCH_WRITE, &pJob->resLock); if (NULL == pJob->execRes.res) { - pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES); + pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES); pJob->execRes.msgType = TDMT_VND_CREATE_TABLE; } @@ -386,7 +386,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa // NEVER REACH HERE SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); - break; } case TDMT_SCH_LINK_BROKEN: SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode)); @@ -981,7 +980,7 @@ _return: } int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { - uint32_t msgSize = 0; + int32_t msgSize = 0; void *msg = NULL; int32_t code = 0; bool isCandidateAddr = false; @@ -1133,12 +1132,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, default: SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - break; } #if 1 SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; - code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); + code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 33bf50d85313a6fa5f9709568f525bd98894c215..c641b88152a6d7d8c13ac26e1421d29dae9e734f 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -904,7 +904,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { } else if (tsQueryPlannerTrace) { char *msg = NULL; int32_t msgLen = 0; - qSubPlanToString(plan, &msg, &msgLen); + SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen)); SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg); taosMemoryFree(msg); } @@ -926,6 +926,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { } SArray *explainRes = NULL; + int32_t code = 0; SQWMsg qwMsg = {0}; qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); @@ -938,14 +939,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); } - SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, + SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { SCH_ERR_RET(schHandleExplainRes(explainRes)); + explainRes = NULL; } - SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + +_return: + + taosArrayDestroy(explainRes); + + SCH_RET(code); } int32_t schLaunchTaskImpl(void *param) { @@ -1097,22 +1105,28 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { void *pRsp = NULL; + int32_t code = 0; SArray *explainRes = NULL; if (SCH_IS_EXPLAIN_JOB(pJob)) { explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); } - SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, + SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { SCH_ERR_RET(schHandleExplainRes(explainRes)); + explainRes = NULL; } SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS)); - return TSDB_CODE_SUCCESS; +_return: + + taosArrayDestroy(explainRes); + + SCH_RET(code); } // Note: no more error processing, handled in function internal diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 7099dbb2c0c20682ee59bb4b3b0275c2684dedca..e20ed9d58e329967016dd0d05e13073c1cb5ea44 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -242,7 +242,7 @@ uint64_t schGenUUID(void) { if (hashId == 0) { char uid[64] = {0}; - int32_t code = taosGetSystemUUID(uid, tListLen(uid)); + int32_t code = taosGetSystemUUID(uid, tListLen(uid) - 1); if (code != TSDB_CODE_SUCCESS) { qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); } else {