未验证 提交 079087dc 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #17417 from taosdata/fix/coverity.pw

fix: fix coverity check issues
...@@ -239,6 +239,7 @@ typedef enum ELogicConditionType { ...@@ -239,6 +239,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_TAGS 128 #define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024 #define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)
#define TSDB_MAX_JSON_TAG_LEN 16384 #define TSDB_MAX_JSON_TAG_LEN 16384
#define TSDB_MAX_JSON_KEY_LEN 256 #define TSDB_MAX_JSON_KEY_LEN 256
...@@ -496,6 +497,8 @@ enum { ...@@ -496,6 +497,8 @@ enum {
#define MAX_NUM_STR_SIZE 40 #define MAX_NUM_STR_SIZE 40
#define MAX_META_MSG_IN_BATCH 1048576
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -440,6 +440,7 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S ...@@ -440,6 +440,7 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
} }
if (userNum <= 0) { if (userNum <= 0) {
taosMemoryFree(users);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -476,6 +477,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl ...@@ -476,6 +477,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
} }
if (dbNum <= 0) { if (dbNum <= 0) {
taosMemoryFree(dbs);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -514,6 +516,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC ...@@ -514,6 +516,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
} }
if (stbNum <= 0) { if (stbNum <= 0) {
taosMemoryFree(stbs);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
extern "C" { extern "C" {
#endif #endif
#define MAX_META_MSG_IN_BATCH 1048576
int32_t mndInitQuery(SMnode *pMnode); int32_t mndInitQuery(SMnode *pMnode);
void mndCleanupQuery(SMnode *pMnode); void mndCleanupQuery(SMnode *pMnode);
......
...@@ -369,7 +369,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) { ...@@ -369,7 +369,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
taosCacheRelease(pMgmt->appCache, (void **)&pApp, false); taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
} }
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) { SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
SAppObj *pApp = NULL; SAppObj *pApp = NULL;
bool hasNext = taosCacheIterNext(pIter); bool hasNext = taosCacheIterNext(pIter);
if (hasNext) { if (hasNext) {
......
...@@ -77,6 +77,12 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { ...@@ -77,6 +77,12 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
void *pRsp = NULL; void *pRsp = NULL;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
mError("too many msgs %d in mnode batch meta req", msgNum);
goto _exit;
}
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) { if (NULL == batchRsp) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -106,6 +112,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { ...@@ -106,6 +112,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
if (fp == NULL) { if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp);
return -1; return -1;
} }
......
...@@ -272,6 +272,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -272,6 +272,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
SRpcMsg rspMsg = {0}; SRpcMsg rspMsg = {0};
void *pRsp = NULL; void *pRsp = NULL;
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
qError("too many msgs %d in vnode batch meta req", msgNum);
goto _exit;
}
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp)); SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) { if (NULL == batchRsp) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
extern "C" { extern "C" {
#endif #endif
#define CTG_MAX_REQ_IN_BATCH 1048576
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -173,14 +173,18 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ...@@ -173,14 +173,18 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput)); CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
} }
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncReq)); code = ctgUpdateTbMetaEnqueue(pCtg, output, syncReq);
output = NULL;
CTG_ERR_JRET(code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
if (output) {
taosMemoryFreeClear(output->tbMeta); taosMemoryFreeClear(output->tbMeta);
taosMemoryFreeClear(output); taosMemoryFreeClear(output);
}
CTG_RET(code); CTG_RET(code);
} }
...@@ -290,7 +294,9 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) { ...@@ -290,7 +294,9 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta)); CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
} }
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp)); code = ctgUpdateTbMetaEnqueue(pCtg, output, syncOp);
output = NULL;
CTG_ERR_JRET(code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -71,7 +71,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) { ...@@ -71,7 +71,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -202,7 +202,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) { ...@@ -202,7 +202,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1056,7 +1056,6 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1056,7 +1056,6 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
default: default:
ctgError("invalid reqType %d", reqType); ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
} }
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
...@@ -1223,7 +1222,6 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1223,7 +1222,6 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
default: default:
ctgError("invalid reqType %d", reqType); ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
} }
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
...@@ -1309,7 +1307,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* ...@@ -1309,7 +1307,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
default: default:
ctgError("invalid reqType %d", reqType); ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
} }
_return: _return:
...@@ -1346,7 +1343,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1346,7 +1343,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
default: default:
ctgError("invalid reqType %d", reqType); ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
} }
_return: _return:
...@@ -1382,7 +1378,6 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1382,7 +1378,6 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
default: default:
ctgError("invalid reqType %d", reqType); ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
} }
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
...@@ -1672,7 +1667,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) { ...@@ -1672,7 +1667,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
int32_t baseResIdx = 0; int32_t baseResIdx = 0;
for (int32_t i = 0; i < dbNum; ++i) { for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i); STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables)); ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, (int32_t)taosArrayGetSize(pReq->pTables));
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables)); CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
baseResIdx += taosArrayGetSize(pReq->pTables); baseResIdx += taosArrayGetSize(pReq->pTables);
} }
...@@ -2010,10 +2005,6 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) { ...@@ -2010,10 +2005,6 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
_return: _return:
if (dbCache) {
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code); CTG_RET(code);
} }
......
...@@ -597,6 +597,8 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { ...@@ -597,6 +597,8 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode)); SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == node) { if (NULL == node) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
taosMemoryFree(operation->data);
taosMemoryFree(operation);
CTG_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -648,6 +650,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) ...@@ -648,6 +650,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -668,7 +671,6 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) ...@@ -668,7 +671,6 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
_return: _return:
taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
...@@ -681,6 +683,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) ...@@ -681,6 +683,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -700,7 +703,6 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) ...@@ -700,7 +703,6 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
_return: _return:
taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
...@@ -714,6 +716,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, ...@@ -714,6 +716,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -731,7 +734,6 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, ...@@ -731,7 +734,6 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
_return: _return:
taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
...@@ -744,6 +746,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, ...@@ -744,6 +746,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -760,7 +763,6 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, ...@@ -760,7 +763,6 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
_return: _return:
taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
...@@ -773,6 +775,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId ...@@ -773,6 +775,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
taosMemoryFree(op);
ctgFreeVgInfo(dbInfo); ctgFreeVgInfo(dbInfo);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -796,8 +799,6 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId ...@@ -796,8 +799,6 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
_return: _return:
ctgFreeVgInfo(dbInfo); ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(op->data);
taosMemoryFreeClear(op);
CTG_RET(code); CTG_RET(code);
} }
...@@ -810,6 +811,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy ...@@ -810,6 +811,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg)); SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -834,8 +836,6 @@ _return: ...@@ -834,8 +836,6 @@ _return:
taosMemoryFree(output); taosMemoryFree(output);
} }
taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
} }
...@@ -847,6 +847,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp ...@@ -847,6 +847,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -863,8 +864,6 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp ...@@ -863,8 +864,6 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
_return: _return:
taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
} }
...@@ -877,6 +876,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp ...@@ -877,6 +876,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -892,7 +892,6 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp ...@@ -892,7 +892,6 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
_return: _return:
tFreeSGetUserAuthRsp(pAuth); tFreeSGetUserAuthRsp(pAuth);
taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
} }
...@@ -906,6 +905,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncO ...@@ -906,6 +905,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncO
SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg)); SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -923,7 +923,6 @@ _return: ...@@ -923,7 +923,6 @@ _return:
taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo); taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(*pIndex); taosMemoryFreeClear(*pIndex);
taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
} }
...@@ -937,6 +936,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { ...@@ -937,6 +936,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg)); SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -952,8 +952,6 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { ...@@ -952,8 +952,6 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
_return: _return:
taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
} }
...@@ -968,6 +966,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool ...@@ -968,6 +966,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -981,8 +980,6 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool ...@@ -981,8 +980,6 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
_return: _return:
taosMemoryFreeClear(msg);
CTG_RET(code); CTG_RET(code);
} }
...@@ -1483,7 +1480,9 @@ int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool sync ...@@ -1483,7 +1480,9 @@ int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool sync
int32_t code = 0; int32_t code = 0;
CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput)); CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq)); code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
pOutput = NULL;
CTG_ERR_JRET(code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tname.h" #include "tname.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "ctgRemote.h"
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) { int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
...@@ -579,6 +580,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) { ...@@ -579,6 +580,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
int32_t num = taosArrayGetSize(pBatch->pMsgs); int32_t num = taosArrayGetSize(pBatch->pMsgs);
SBatchReq* pBatchReq = (SBatchReq*)(*msg); SBatchReq* pBatchReq = (SBatchReq*)(*msg);
if (num >= CTG_MAX_REQ_IN_BATCH) {
qError("too many msgs %d in one batch request", num);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
pBatchReq->header.vgId = htonl(vgId); pBatchReq->header.vgId = htonl(vgId);
pBatchReq->msgNum = htonl(num); pBatchReq->msgNum = htonl(num);
offset += sizeof(SBatchReq); offset += sizeof(SBatchReq);
......
...@@ -89,8 +89,8 @@ extern "C" { ...@@ -89,8 +89,8 @@ extern "C" {
#define EXPLAIN_STRING_TYPE_FORMAT "%s" #define EXPLAIN_STRING_TYPE_FORMAT "%s"
#define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s" #define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s"
#define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s" #define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s"
#define EXPLAIN_OFFSET_FORMAT "offset=%d" #define EXPLAIN_OFFSET_FORMAT "offset=%" PRId64
#define EXPLAIN_SOFFSET_FORMAT "soffset=%d" #define EXPLAIN_SOFFSET_FORMAT "soffset=%" PRId64
#define EXPLAIN_PARTITIONS_FORMAT "partitions=%d" #define EXPLAIN_PARTITIONS_FORMAT "partitions=%d"
#define COMMAND_RESET_LOG "resetLog" #define COMMAND_RESET_LOG "resetLog"
......
...@@ -485,6 +485,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs ...@@ -485,6 +485,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
SSDataBlock* pBlock = buildCreateTbResultDataBlock(); SSDataBlock* pBlock = buildCreateTbResultDataBlock();
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg); int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg);
if (code) { if (code) {
blockDataDestroy(pBlock);
return code; return code;
} }
return buildRetrieveTableRsp(pBlock, SHOW_CREATE_TB_RESULT_COLS, pRsp); return buildRetrieveTableRsp(pBlock, SHOW_CREATE_TB_RESULT_COLS, pRsp);
......
...@@ -1598,6 +1598,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { ...@@ -1598,6 +1598,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) { if (NULL == rsp) {
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
blockDataDestroy(pBlock);
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
......
...@@ -267,6 +267,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData ...@@ -267,6 +267,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
taosThreadMutexInit(&deleter->mutex, NULL); taosThreadMutexInit(&deleter->mutex, NULL);
if (NULL == deleter->pDataBlocks) { if (NULL == deleter->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
destroyDataSinker((SDataSinkHandle*)deleter);
taosMemoryFree(deleter);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
*pHandle = deleter; *pHandle = deleter;
......
...@@ -323,6 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat ...@@ -323,6 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
int32_t code = int32_t code =
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
if (code) { if (code) {
destroyDataSinker((SDataSinkHandle*)pInserterNode);
return code; return code;
} }
......
...@@ -424,6 +424,12 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { ...@@ -424,6 +424,12 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if ((pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) > TSDB_MAX_COL_TAG_NUM) {
*pDst = NULL;
qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
return TSDB_CODE_INVALID_PARA;
}
int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema); int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
*pDst = taosMemoryMalloc(metaSize); *pDst = taosMemoryMalloc(metaSize);
if (NULL == *pDst) { if (NULL == *pDst) {
......
...@@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t ...@@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
SBuildTableInput *pInput = input; SBuildTableInput *pInput = input;
STableCfgReq cfgReq = {0}; STableCfgReq cfgReq = {0};
cfgReq.header.vgId = pInput->vgId; cfgReq.header.vgId = pInput->vgId;
strcpy(cfgReq.dbFName, pInput->dbFName); strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName));
strcpy(cfgReq.tbName, pInput->tbName); strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName));
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq); int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
void *pBuf = (*mallcFp)(bufLen); void *pBuf = (*mallcFp)(bufLen);
...@@ -615,6 +615,8 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) { ...@@ -615,6 +615,8 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp)); STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp));
if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) { if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize); qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize);
tFreeSTableCfgRsp(out);
taosMemoryFree(out);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
......
...@@ -258,7 +258,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, ...@@ -258,7 +258,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len < 0) { if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -292,7 +292,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, ...@@ -292,7 +292,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
} }
// Got data from sink // Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%d", len); QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64, len);
*dataLen += len; *dataLen += len;
...@@ -408,7 +408,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -408,7 +408,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
break;
} }
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
......
...@@ -122,7 +122,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { ...@@ -122,7 +122,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break; break;
case JOB_TASK_STATUS_DROP: case JOB_TASK_STATUS_DROP:
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
break;
default: default:
SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus)); SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
......
...@@ -169,7 +169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -169,7 +169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
SCH_LOCK(SCH_WRITE, &pJob->resLock); SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) { if (NULL == pJob->execRes.res) {
pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES); pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE; pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
} }
...@@ -386,7 +386,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa ...@@ -386,7 +386,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
// NEVER REACH HERE // NEVER REACH HERE
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId); SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
} }
case TDMT_SCH_LINK_BROKEN: case TDMT_SCH_LINK_BROKEN:
SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode)); SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
...@@ -981,7 +980,7 @@ _return: ...@@ -981,7 +980,7 @@ _return:
} }
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
uint32_t msgSize = 0; int32_t msgSize = 0;
void *msg = NULL; void *msg = NULL;
int32_t code = 0; int32_t code = 0;
bool isCandidateAddr = false; bool isCandidateAddr = false;
...@@ -1133,12 +1132,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1133,12 +1132,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
default: default:
SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType); SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
} }
#if 1 #if 1
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL; msg = NULL;
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
......
...@@ -904,7 +904,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -904,7 +904,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
} else if (tsQueryPlannerTrace) { } else if (tsQueryPlannerTrace) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
qSubPlanToString(plan, &msg, &msgLen); SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg); SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
taosMemoryFree(msg); taosMemoryFree(msg);
} }
...@@ -926,6 +926,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -926,6 +926,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
} }
SArray *explainRes = NULL; SArray *explainRes = NULL;
int32_t code = 0;
SQWMsg qwMsg = {0}; SQWMsg qwMsg = {0};
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
...@@ -938,14 +939,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -938,14 +939,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
} }
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &qwMsg, explainRes)); pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) { if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes)); SCH_ERR_RET(schHandleExplainRes(explainRes));
explainRes = NULL;
} }
SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
_return:
taosArrayDestroy(explainRes);
SCH_RET(code);
} }
int32_t schLaunchTaskImpl(void *param) { int32_t schLaunchTaskImpl(void *param) {
...@@ -1097,22 +1105,28 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { ...@@ -1097,22 +1105,28 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
void *pRsp = NULL; void *pRsp = NULL;
int32_t code = 0;
SArray *explainRes = NULL; SArray *explainRes = NULL;
if (SCH_IS_EXPLAIN_JOB(pJob)) { if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
} }
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &pRsp, explainRes)); pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) { if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes)); SCH_ERR_RET(schHandleExplainRes(explainRes));
explainRes = NULL;
} }
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS)); SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
return TSDB_CODE_SUCCESS; _return:
taosArrayDestroy(explainRes);
SCH_RET(code);
} }
// Note: no more error processing, handled in function internal // Note: no more error processing, handled in function internal
......
...@@ -242,7 +242,7 @@ uint64_t schGenUUID(void) { ...@@ -242,7 +242,7 @@ uint64_t schGenUUID(void) {
if (hashId == 0) { if (hashId == 0) {
char uid[64] = {0}; char uid[64] = {0};
int32_t code = taosGetSystemUUID(uid, tListLen(uid)); int32_t code = taosGetSystemUUID(uid, tListLen(uid) - 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册