未验证 提交 679145dc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16418 from taosdata/fix/TD-18457

fix(query): fix valgrind report uninitialized variable error
...@@ -1975,7 +1975,7 @@ _OVER: ...@@ -1975,7 +1975,7 @@ _OVER:
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str, int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
int32_t acctId, char* db) { int32_t acctId, char* db) {
SName name; SName name = {0};
if (len1 <= 0) { if (len1 <= 0) {
return -1; return -1;
......
...@@ -188,7 +188,7 @@ typedef struct SCtgTbCache { ...@@ -188,7 +188,7 @@ typedef struct SCtgTbCache {
typedef struct SCtgVgCache { typedef struct SCtgVgCache {
SRWLatch vgLock; SRWLatch vgLock;
SDBVgInfo *vgInfo; SDBVgInfo *vgInfo;
} SCtgVgCache; } SCtgVgCache;
typedef struct SCtgDBCache { typedef struct SCtgDBCache {
...@@ -224,7 +224,7 @@ typedef struct SCtgUserAuth { ...@@ -224,7 +224,7 @@ typedef struct SCtgUserAuth {
} SCtgUserAuth; } SCtgUserAuth;
typedef struct SCatalog { typedef struct SCatalog {
uint64_t clusterId; uint64_t clusterId;
SHashObj *userCache; //key:user, value:SCtgUserAuth SHashObj *userCache; //key:user, value:SCtgUserAuth
SHashObj *dbCache; //key:dbname, value:SCtgDBCache SHashObj *dbCache; //key:dbname, value:SCtgDBCache
SCtgRentMgmt dbRent; SCtgRentMgmt dbRent;
...@@ -253,9 +253,9 @@ typedef struct SCtgJob { ...@@ -253,9 +253,9 @@ typedef struct SCtgJob {
int32_t jobResCode; int32_t jobResCode;
int32_t taskIdx; int32_t taskIdx;
SRWLatch taskLock; SRWLatch taskLock;
uint64_t queryId; uint64_t queryId;
SCatalog* pCtg; SCatalog* pCtg;
SRequestConnInfo conn; SRequestConnInfo conn;
void* userParam; void* userParam;
catalogCallback userFp; catalogCallback userFp;
...@@ -279,7 +279,7 @@ typedef struct SCtgMsgCtx { ...@@ -279,7 +279,7 @@ typedef struct SCtgMsgCtx {
void* lastOut; void* lastOut;
void* out; void* out;
char* target; char* target;
SHashObj* pBatchs; SHashObj* pBatchs;
} SCtgMsgCtx; } SCtgMsgCtx;
...@@ -364,7 +364,7 @@ typedef struct SCtgCacheStat { ...@@ -364,7 +364,7 @@ typedef struct SCtgCacheStat {
uint64_t numOfMetaHit; uint64_t numOfMetaHit;
uint64_t numOfMetaMiss; uint64_t numOfMetaMiss;
uint64_t numOfIndexHit; uint64_t numOfIndexHit;
uint64_t numOfIndexMiss; uint64_t numOfIndexMiss;
uint64_t numOfUserHit; uint64_t numOfUserHit;
uint64_t numOfUserMiss; uint64_t numOfUserMiss;
uint64_t numOfClear; uint64_t numOfClear;
...@@ -451,7 +451,7 @@ typedef struct SCtgCacheOperation { ...@@ -451,7 +451,7 @@ typedef struct SCtgCacheOperation {
int32_t opId; int32_t opId;
void *data; void *data;
bool syncOp; bool syncOp;
tsem_t rspSem; tsem_t rspSem;
bool stopQueue; bool stopQueue;
bool unLocked; bool unLocked;
} SCtgCacheOperation; } SCtgCacheOperation;
...@@ -466,7 +466,7 @@ typedef struct SCtgQueue { ...@@ -466,7 +466,7 @@ typedef struct SCtgQueue {
bool stopQueue; bool stopQueue;
SCtgQNode *head; SCtgQNode *head;
SCtgQNode *tail; SCtgQNode *tail;
tsem_t reqSem; tsem_t reqSem;
uint64_t qRemainNum; uint64_t qRemainNum;
} SCtgQueue; } SCtgQueue;
...@@ -475,7 +475,7 @@ typedef struct SCatalogMgmt { ...@@ -475,7 +475,7 @@ typedef struct SCatalogMgmt {
int32_t jobPool; int32_t jobPool;
SRWLatch lock; SRWLatch lock;
SCtgQueue queue; SCtgQueue queue;
TdThread updateThread; TdThread updateThread;
SHashObj *pCluster; //key: clusterId, value: SCatalog* SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat; SCatalogStat stat;
SCatalogCfg cfg; SCatalogCfg cfg;
...@@ -528,8 +528,8 @@ typedef struct SCtgOperation { ...@@ -528,8 +528,8 @@ typedef struct SCtgOperation {
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) #define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
#define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST) #define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST)
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__) #define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
...@@ -576,7 +576,7 @@ typedef struct SCtgOperation { ...@@ -576,7 +576,7 @@ typedef struct SCtgOperation {
} \ } \
} while (0) } while (0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
......
...@@ -39,7 +39,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -39,7 +39,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosMemoryFree(task.taskCtx); taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(ctx->pName, name, sizeof(*name)); memcpy(ctx->pName, name, sizeof(*name));
ctx->flag = CTG_FLAG_UNKNOWN_STB; ctx->flag = CTG_FLAG_UNKNOWN_STB;
...@@ -69,7 +69,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -69,7 +69,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d",
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -89,7 +89,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -89,7 +89,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
} }
SCtgDbVgCtx* ctx = task.taskCtx; SCtgDbVgCtx* ctx = task.taskCtx;
memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName)); memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -113,7 +113,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -113,7 +113,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
} }
SCtgDbCfgCtx* ctx = task.taskCtx; SCtgDbCfgCtx* ctx = task.taskCtx;
memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName)); memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -137,7 +137,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -137,7 +137,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
} }
SCtgDbInfoCtx* ctx = task.taskCtx; SCtgDbInfoCtx* ctx = task.taskCtx;
memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName)); memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -167,7 +167,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -167,7 +167,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosMemoryFree(task.taskCtx); taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(ctx->pName, name, sizeof(*name)); memcpy(ctx->pName, name, sizeof(*name));
tNameGetFullDbName(ctx->pName, ctx->dbFName); tNameGetFullDbName(ctx->pName, ctx->dbFName);
...@@ -197,7 +197,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -197,7 +197,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d",
pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -248,7 +248,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -248,7 +248,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
} }
SCtgIndexCtx* ctx = task.taskCtx; SCtgIndexCtx* ctx = task.taskCtx;
strcpy(ctx->indexFName, name); strcpy(ctx->indexFName, name);
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -272,7 +272,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -272,7 +272,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
} }
SCtgUdfCtx* ctx = task.taskCtx; SCtgUdfCtx* ctx = task.taskCtx;
strcpy(ctx->udfName, name); strcpy(ctx->udfName, name);
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -296,7 +296,7 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -296,7 +296,7 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
} }
SCtgUserCtx* ctx = task.taskCtx; SCtgUserCtx* ctx = task.taskCtx;
memcpy(&ctx->user, user, sizeof(*user)); memcpy(&ctx->user, user, sizeof(*user));
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -339,7 +339,7 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -339,7 +339,7 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosMemoryFree(task.taskCtx); taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(ctx->pName, name, sizeof(*name)); memcpy(ctx->pName, name, sizeof(*name));
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -368,7 +368,7 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) { ...@@ -368,7 +368,7 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
taosMemoryFree(task.taskCtx); taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(ctx->pName, name, sizeof(*name)); memcpy(ctx->pName, name, sizeof(*name));
taosArrayPush(pJob->pTasks, &task); taosArrayPush(pJob->pTasks, &task);
...@@ -387,7 +387,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con ...@@ -387,7 +387,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
taosHashCleanup(pTb); taosHashCleanup(pTb);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
for (int32_t i = 0; i < pJob->dbVgNum; ++i) { for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbVgroup, i); char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
...@@ -474,7 +474,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas ...@@ -474,7 +474,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
if (taskId) { if (taskId) {
*taskId = tid; *taskId = tid;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -510,7 +510,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const ...@@ -510,7 +510,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
pJob->pCtg = pCtg; pJob->pCtg = pCtg;
pJob->conn = *pConn; pJob->conn = *pConn;
pJob->userParam = param; pJob->userParam = param;
pJob->tbMetaNum = tbMetaNum; pJob->tbMetaNum = tbMetaNum;
pJob->tbHashNum = tbHashNum; pJob->tbHashNum = tbHashNum;
pJob->qnodeNum = qnodeNum; pJob->qnodeNum = qnodeNum;
...@@ -844,20 +844,20 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) { ...@@ -844,20 +844,20 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) {
pJob->jobRes.pSvrVer->code = pTask->code; pJob->jobRes.pSvrVer->code = pTask->code;
pJob->jobRes.pSvrVer->pRes = pTask->res; pJob->jobRes.pSvrVer->pRes = pTask->res;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgCallSubCb(SCtgTask *pTask) { int32_t ctgCallSubCb(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pTask->lock); CTG_LOCK(CTG_WRITE, &pTask->lock);
int32_t parentNum = taosArrayGetSize(pTask->pParents); int32_t parentNum = taosArrayGetSize(pTask->pParents);
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); SCtgTask* pParent = taosArrayGetP(pTask->pParents, i);
pParent->subRes.code = pTask->code; pParent->subRes.code = pTask->code;
if (TSDB_CODE_SUCCESS == pTask->code) { if (TSDB_CODE_SUCCESS == pTask->code) {
code = (*gCtgAsyncFps[pTask->type].cloneFp)(pTask, &pParent->subRes.res); code = (*gCtgAsyncFps[pTask->type].cloneFp)(pTask, &pParent->subRes.res);
...@@ -868,22 +868,22 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { ...@@ -868,22 +868,22 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1); SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1);
pParMsgCtx->pBatchs = pMsgCtx->pBatchs; pParMsgCtx->pBatchs = pMsgCtx->pBatchs;
CTG_ERR_JRET(pParent->subRes.fp(pParent)); CTG_ERR_JRET(pParent->subRes.fp(pParent));
} }
_return: _return:
CTG_UNLOCK(CTG_WRITE, &pTask->lock); CTG_UNLOCK(CTG_WRITE, &pTask->lock);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgCallUserCb(void* param) { int32_t ctgCallUserCb(void* param) {
SCtgJob* pJob = (SCtgJob*)param; SCtgJob* pJob = (SCtgJob*)param;
qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId); qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);
...@@ -922,9 +922,9 @@ _return: ...@@ -922,9 +922,9 @@ _return:
//taosSsleep(2); //taosSsleep(2);
//qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId); //qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
taosAsyncExec(ctgCallUserCb, pJob, NULL); taosAsyncExec(ctgCallUserCb, pJob, NULL);
CTG_RET(code); CTG_RET(code);
} }
...@@ -932,7 +932,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -932,7 +932,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
...@@ -958,38 +958,38 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -958,38 +958,38 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
} }
case TDMT_MND_TABLE_META: { case TDMT_MND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (CTG_IS_META_NULL(pOut->metaType)) { if (CTG_IS_META_NULL(pOut->metaType)) {
if (CTG_FLAG_IS_STB(flag)) { if (CTG_FLAG_IS_STB(flag)) {
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName); tNameGetFullDbName(pName, dbFName);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId; *vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
} else { } else {
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
tstrncpy(input.db, dbFName, tListLen(input.db)); tstrncpy(input.db, dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq)); CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false); ctgRemoveTbMetaFromCache(pCtg, pName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
} }
...@@ -998,12 +998,12 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -998,12 +998,12 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out;
TSWAP(pLastOut->tbMeta, pOut->tbMeta); TSWAP(pLastOut->tbMeta, pOut->tbMeta);
} }
break; break;
} }
case TDMT_VND_TABLE_META: { case TDMT_VND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (CTG_IS_META_NULL(pOut->metaType)) { if (CTG_IS_META_NULL(pOut->metaType)) {
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false); ctgRemoveTbMetaFromCache(pCtg, pName, false);
...@@ -1013,12 +1013,12 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1013,12 +1013,12 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
if (CTG_FLAG_IS_STB(flag)) { if (CTG_FLAG_IS_STB(flag)) {
break; break;
} }
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) { if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(pOut->tbMeta);
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
} else if (CTG_IS_META_BOTH(pOut->metaType)) { } else if (CTG_IS_META_BOTH(pOut->metaType)) {
int32_t exist = 0; int32_t exist = 0;
...@@ -1029,13 +1029,13 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1029,13 +1029,13 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
stbCtx.flag = flag; stbCtx.flag = flag;
stbCtx.pName = &stbName; stbCtx.pName = &stbName;
taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(pOut->tbMeta);
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (pOut->tbMeta) { if (pOut->tbMeta) {
exist = 1; exist = 1;
} }
} }
if (0 == exist) { if (0 == exist) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out); TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
...@@ -1056,7 +1056,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1056,7 +1056,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
if (CTG_IS_META_BOTH(pOut->metaType)) { if (CTG_IS_META_BOTH(pOut->metaType)) {
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
/* /*
else if (CTG_IS_META_CTABLE(pOut->metaType)) { else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *pName; SName stbName = *pName;
...@@ -1064,7 +1064,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1064,7 +1064,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
SCtgTbMetaCtx stbCtx = {0}; SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = flag; stbCtx.flag = flag;
stbCtx.pName = &stbName; stbCtx.pName = &stbName;
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (NULL == pOut->tbMeta) { if (NULL == pOut->tbMeta) {
ctgDebug("stb no longer exist, stbName:%s", stbName.tname); ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
...@@ -1088,7 +1088,7 @@ _return: ...@@ -1088,7 +1088,7 @@ _return:
if (pTask->res || code) { if (pTask->res || code) {
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
} }
CTG_RET(code); CTG_RET(code);
} }
...@@ -1097,7 +1097,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1097,7 +1097,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
...@@ -1125,38 +1125,38 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1125,38 +1125,38 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
} }
case TDMT_MND_TABLE_META: { case TDMT_MND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (CTG_IS_META_NULL(pOut->metaType)) { if (CTG_IS_META_NULL(pOut->metaType)) {
if (CTG_FLAG_IS_STB(flag)) { if (CTG_FLAG_IS_STB(flag)) {
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName); tNameGetFullDbName(pName, dbFName);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId; *vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
} else { } else {
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
tstrncpy(input.db, dbFName, tListLen(input.db)); tstrncpy(input.db, dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq)); CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false); ctgRemoveTbMetaFromCache(pCtg, pName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
} }
...@@ -1165,12 +1165,12 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1165,12 +1165,12 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out;
TSWAP(pLastOut->tbMeta, pOut->tbMeta); TSWAP(pLastOut->tbMeta, pOut->tbMeta);
} }
break; break;
} }
case TDMT_VND_TABLE_META: { case TDMT_VND_TABLE_META: {
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
if (CTG_IS_META_NULL(pOut->metaType)) { if (CTG_IS_META_NULL(pOut->metaType)) {
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
ctgRemoveTbMetaFromCache(pCtg, pName, false); ctgRemoveTbMetaFromCache(pCtg, pName, false);
...@@ -1180,12 +1180,12 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1180,12 +1180,12 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
if (CTG_FLAG_IS_STB(flag)) { if (CTG_FLAG_IS_STB(flag)) {
break; break;
} }
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) { if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(pOut->tbMeta);
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
} else if (CTG_IS_META_BOTH(pOut->metaType)) { } else if (CTG_IS_META_BOTH(pOut->metaType)) {
int32_t exist = 0; int32_t exist = 0;
...@@ -1196,14 +1196,14 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1196,14 +1196,14 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
stbCtx.flag = flag; stbCtx.flag = flag;
stbCtx.pName = &stbName; stbCtx.pName = &stbName;
taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(pOut->tbMeta);
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (pOut->tbMeta) { if (pOut->tbMeta) {
ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName)); ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
exist = 1; exist = 1;
} }
} }
if (0 == exist) { if (0 == exist) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out); TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
...@@ -1224,7 +1224,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1224,7 +1224,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
if (CTG_IS_META_BOTH(pOut->metaType)) { if (CTG_IS_META_BOTH(pOut->metaType)) {
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
} }
/* /*
else if (CTG_IS_META_CTABLE(pOut->metaType)) { else if (CTG_IS_META_CTABLE(pOut->metaType)) {
SName stbName = *pName; SName stbName = *pName;
...@@ -1232,7 +1232,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1232,7 +1232,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SCtgTbMetaCtx stbCtx = {0}; SCtgTbMetaCtx stbCtx = {0};
stbCtx.flag = flag; stbCtx.flag = flag;
stbCtx.pName = &stbName; stbCtx.pName = &stbName;
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (NULL == pOut->tbMeta) { if (NULL == pOut->tbMeta) {
ctgDebug("stb no longer exist, stbName:%s", stbName.tname); ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
...@@ -1273,7 +1273,7 @@ _return: ...@@ -1273,7 +1273,7 @@ _return:
if (pTask->res && taskDone) { if (pTask->res && taskDone) {
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
} }
CTG_RET(code); CTG_RET(code);
} }
...@@ -1282,7 +1282,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf * ...@@ -1282,7 +1282,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1290,7 +1290,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf * ...@@ -1290,7 +1290,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *
case TDMT_MND_USE_DB: { case TDMT_MND_USE_DB: {
SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out; SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;
SDBVgInfo* pDb = NULL; SDBVgInfo* pDb = NULL;
CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res)); CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res));
CTG_ERR_JRET(cloneDbVgInfo(pOut->dbVgroup, &pDb)); CTG_ERR_JRET(cloneDbVgInfo(pOut->dbVgroup, &pDb));
...@@ -1316,7 +1316,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1316,7 +1316,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
...@@ -1330,7 +1330,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1330,7 +1330,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
} }
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, (SVgroupInfo*)pTask->res)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, (SVgroupInfo*)pTask->res));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false)); CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL; pOut->dbVgroup = NULL;
...@@ -1354,7 +1354,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1354,7 +1354,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
bool taskDone = false; bool taskDone = false;
...@@ -1367,7 +1367,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1367,7 +1367,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, tReq, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true)); CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, tReq, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false)); CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL; pOut->dbVgroup = NULL;
...@@ -1394,7 +1394,7 @@ _return: ...@@ -1394,7 +1394,7 @@ _return:
pRes->code = code; pRes->code = code;
pRes->pRes = NULL; pRes->pRes = NULL;
} }
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList); TSWAP(pTask->res, ctx->pResList);
taskDone = true; taskDone = true;
...@@ -1419,9 +1419,9 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1419,9 +1419,9 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo)); CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
pTask->res = pInfo; pTask->res = pInfo;
SCtgTbIndexCtx* ctx = pTask->taskCtx; SCtgTbIndexCtx* ctx = pTask->taskCtx;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false)); CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
_return: _return:
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) { if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
...@@ -1438,7 +1438,7 @@ int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1438,7 +1438,7 @@ int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1452,7 +1452,7 @@ int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1452,7 +1452,7 @@ int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1471,7 +1471,7 @@ int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1471,7 +1471,7 @@ int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1485,7 +1485,7 @@ int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1485,7 +1485,7 @@ int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1499,7 +1499,7 @@ int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1499,7 +1499,7 @@ int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1513,7 +1513,7 @@ int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *p ...@@ -1513,7 +1513,7 @@ int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *p
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1525,7 +1525,7 @@ int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf * ...@@ -1525,7 +1525,7 @@ int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
bool pass = false; bool pass = false;
SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out; SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out;
...@@ -1573,7 +1573,7 @@ int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf ...@@ -1573,7 +1573,7 @@ int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
_return: _return:
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
...@@ -1583,7 +1583,7 @@ _return: ...@@ -1583,7 +1583,7 @@ _return:
int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int32_t* vgId) { int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int32_t* vgId) {
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
int32_t code = 0; int32_t code = 0;
...@@ -1603,7 +1603,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int ...@@ -1603,7 +1603,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pName, dbFName); tNameGetFullDbName(pName, dbFName);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (dbCache) { if (dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
...@@ -1632,7 +1632,7 @@ _return: ...@@ -1632,7 +1632,7 @@ _return:
} }
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
...@@ -1649,14 +1649,14 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { ...@@ -1649,14 +1649,14 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pCtx->flag, pCtx->pName, &pCtx->vgId)); CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pCtx->flag, pCtx->pName, &pCtx->vgId));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
...@@ -1670,18 +1670,18 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { ...@@ -1670,18 +1670,18 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables)); CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
baseResIdx += taosArrayGetSize(pReq->pTables); baseResIdx += taosArrayGetSize(pReq->pTables);
} }
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
if (pCtx->fetchNum <= 0) { if (pCtx->fetchNum <= 0) {
TSWAP(pTask->res, pCtx->pResList); TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx)); pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
...@@ -1689,19 +1689,19 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { ...@@ -1689,19 +1689,19 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = pFetch->fetchIdx; tReq.msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId)); CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
...@@ -1710,18 +1710,18 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { ...@@ -1710,18 +1710,18 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res)); CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL; dbCache = NULL;
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
} else { } else {
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
tstrncpy(input.db, pCtx->dbFName, tListLen(input.db)); tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
...@@ -1742,7 +1742,7 @@ _return: ...@@ -1742,7 +1742,7 @@ _return:
int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
...@@ -1751,7 +1751,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { ...@@ -1751,7 +1751,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
pTask->res = taosMemoryMalloc(sizeof(SVgroupInfo)); pTask->res = taosMemoryMalloc(sizeof(SVgroupInfo));
...@@ -1762,17 +1762,17 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { ...@@ -1762,17 +1762,17 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL; dbCache = NULL;
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
} else { } else {
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
tstrncpy(input.db, pCtx->dbFName, tListLen(input.db)); tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq)); CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq));
} }
...@@ -1786,16 +1786,16 @@ _return: ...@@ -1786,16 +1786,16 @@ _return:
} }
int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0; int32_t fetchIdx = 0;
int32_t baseResIdx = 0; int32_t baseResIdx = 0;
int32_t code = 0; int32_t code = 0;
for (int32_t i = 0; i < dbNum; ++i) { for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i); STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
...@@ -1804,7 +1804,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { ...@@ -1804,7 +1804,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
if (NULL != dbCache) { if (NULL != dbCache) {
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = -1; tReq.msgIdx = -1;
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
...@@ -1815,21 +1815,21 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { ...@@ -1815,21 +1815,21 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0); ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0);
baseResIdx += taosArrayGetSize(pReq->pTables); baseResIdx += taosArrayGetSize(pReq->pTables);
taosArraySetSize(pCtx->pResList, baseResIdx); taosArraySetSize(pCtx->pResList, baseResIdx);
} }
} }
pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs); pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
if (pCtx->fetchNum <= 0) { if (pCtx->fetchNum <= 0) {
TSWAP(pTask->res, pCtx->pResList); TSWAP(pTask->res, pCtx->pResList);
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx)); pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum); taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
...@@ -1837,10 +1837,10 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { ...@@ -1837,10 +1837,10 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
strcpy(input.db, pReq->dbFName); strcpy(input.db, pReq->dbFName);
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
SCtgTaskReq tReq; SCtgTaskReq tReq;
...@@ -1854,14 +1854,14 @@ _return: ...@@ -1854,14 +1854,14 @@ _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
} }
return code; return code;
} }
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
SArray* pRes = NULL; SArray* pRes = NULL;
...@@ -1874,18 +1874,18 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { ...@@ -1874,18 +1874,18 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes)); CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
if (pRes) { if (pRes) {
pTask->res = pRes; pTask->res = pRes;
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask)); CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
SArray* pRes = NULL; SArray* pRes = NULL;
...@@ -1915,7 +1915,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { ...@@ -1915,7 +1915,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
CTG_ERR_JRET(ctgGetTableCfgFromVnode(pCtg, pConn, pCtx->pName, pCtx->pVgInfo, NULL, pTask)); CTG_ERR_JRET(ctgGetTableCfgFromVnode(pCtg, pConn, pCtx->pName, pCtx->pVgInfo, NULL, pTask));
} }
...@@ -1926,13 +1926,13 @@ _return: ...@@ -1926,13 +1926,13 @@ _return:
if (CTG_TASK_LAUNCHED == pTask->status) { if (CTG_TASK_LAUNCHED == pTask->status) {
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
} }
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
...@@ -1945,7 +1945,7 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { ...@@ -1945,7 +1945,7 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
} }
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
...@@ -1959,7 +1959,7 @@ int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { ...@@ -1959,7 +1959,7 @@ int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
...@@ -1975,7 +1975,7 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { ...@@ -1975,7 +1975,7 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) { int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx; SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
...@@ -2014,7 +2014,7 @@ _return: ...@@ -2014,7 +2014,7 @@ _return:
} }
int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
...@@ -2029,7 +2029,7 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { ...@@ -2029,7 +2029,7 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
} }
int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
...@@ -2044,7 +2044,7 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { ...@@ -2044,7 +2044,7 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
} }
int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
bool inCache = false; bool inCache = false;
...@@ -2054,7 +2054,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { ...@@ -2054,7 +2054,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
if (NULL == pMsgCtx->pBatchs) { if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs; pMsgCtx->pBatchs = pJob->pBatchs;
} }
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass)); CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass));
if (inCache) { if (inCache) {
pTask->res = taosMemoryCalloc(1, sizeof(bool)); pTask->res = taosMemoryCalloc(1, sizeof(bool));
...@@ -2062,7 +2062,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { ...@@ -2062,7 +2062,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
*(bool*)pTask->res = pass; *(bool*)pTask->res = pass;
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2073,7 +2073,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { ...@@ -2073,7 +2073,7 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
} }
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
...@@ -2096,7 +2096,7 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) { ...@@ -2096,7 +2096,7 @@ int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask) {
int32_t ctgGetTbCfgCb(SCtgTask *pTask) { int32_t ctgGetTbCfgCb(SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(pTask->subRes.code); CTG_ERR_JRET(pTask->subRes.code);
SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
...@@ -2104,7 +2104,7 @@ int32_t ctgGetTbCfgCb(SCtgTask *pTask) { ...@@ -2104,7 +2104,7 @@ int32_t ctgGetTbCfgCb(SCtgTask *pTask) {
pCtx->tbType = ((STableMeta*)pTask->subRes.res)->tableType; pCtx->tbType = ((STableMeta*)pTask->subRes.res)->tableType;
} else if (CTG_TASK_GET_DB_VGROUP == pTask->subRes.type) { } else if (CTG_TASK_GET_DB_VGROUP == pTask->subRes.type) {
SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res; SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res;
pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo)); pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo));
} }
...@@ -2167,7 +2167,7 @@ SCtgAsyncFps gCtgAsyncFps[] = { ...@@ -2167,7 +2167,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
int32_t ctgMakeAsyncRes(SCtgJob *pJob) { int32_t ctgMakeAsyncRes(SCtgJob *pJob) {
int32_t code = 0; int32_t code = 0;
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask));
...@@ -2180,16 +2180,16 @@ int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, in ...@@ -2180,16 +2180,16 @@ int32_t ctgSearchExistingTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, in
bool equal = false; bool equal = false;
SCtgTask* pTask = NULL; SCtgTask* pTask = NULL;
int32_t code = 0; int32_t code = 0;
CTG_LOCK(CTG_READ, &pJob->taskLock); CTG_LOCK(CTG_READ, &pJob->taskLock);
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
pTask = taosArrayGet(pJob->pTasks, i); pTask = taosArrayGet(pJob->pTasks, i);
if (type != pTask->type) { if (type != pTask->type) {
continue; continue;
} }
CTG_ERR_JRET((*gCtgAsyncFps[type].compFp)(pTask, param, &equal)); CTG_ERR_JRET((*gCtgAsyncFps[type].compFp)(pTask, param, &equal));
if (equal) { if (equal) {
break; break;
...@@ -2208,7 +2208,7 @@ _return: ...@@ -2208,7 +2208,7 @@ _return:
int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
int32_t code = 0; int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pSub->lock); CTG_LOCK(CTG_WRITE, &pSub->lock);
if (CTG_TASK_DONE == pSub->status) { if (CTG_TASK_DONE == pSub->status) {
pTask->subRes.code = pSub->code; pTask->subRes.code = pSub->code;
...@@ -2216,7 +2216,7 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { ...@@ -2216,7 +2216,7 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1); SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1); SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pMsgCtx->pBatchs = pSubMsgCtx->pBatchs; pMsgCtx->pBatchs = pSubMsgCtx->pBatchs;
CTG_ERR_JRET(pTask->subRes.fp(pTask)); CTG_ERR_JRET(pTask->subRes.fp(pTask));
} else { } else {
if (NULL == pSub->pParents) { if (NULL == pSub->pParents) {
...@@ -2230,7 +2230,7 @@ _return: ...@@ -2230,7 +2230,7 @@ _return:
CTG_UNLOCK(CTG_WRITE, &pSub->lock); CTG_UNLOCK(CTG_WRITE, &pSub->lock);
CTG_RET(code); CTG_RET(code);
} }
...@@ -2242,13 +2242,13 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, ...@@ -2242,13 +2242,13 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
ctgClearSubTaskRes(&pTask->subRes); ctgClearSubTaskRes(&pTask->subRes);
pTask->subRes.type = type; pTask->subRes.type = type;
pTask->subRes.fp = fp; pTask->subRes.fp = fp;
CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId)); CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId));
if (subTaskId < 0) { if (subTaskId < 0) {
CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId)); CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId));
newTask = true; newTask = true;
} }
SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId); SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId);
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
...@@ -2267,21 +2267,21 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, ...@@ -2267,21 +2267,21 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
int32_t ctgLaunchJob(SCtgJob *pJob) { int32_t ctgLaunchJob(SCtgJob *pJob) {
int32_t taskNum = taosArrayGetSize(pJob->pTasks); int32_t taskNum = taosArrayGetSize(pJob->pTasks);
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
pTask->status = CTG_TASK_LAUNCHED; pTask->status = CTG_TASK_LAUNCHED;
} }
if (taskNum <= 0) { if (taskNum <= 0) {
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
taosAsyncExec(ctgCallUserCb, pJob, NULL); taosAsyncExec(ctgCallUserCb, pJob, NULL);
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
} else { } else {
ctgLaunchBatchs(pJob->pCtg, pJob, pJob->pBatchs); ctgLaunchBatchs(pJob->pCtg, pJob, pJob->pBatchs);
#endif #endif
......
...@@ -1715,7 +1715,7 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { ...@@ -1715,7 +1715,7 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
} }
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, bool isStable, int32_t tableNo, SToken* pTbToken) {
SName name; SName name = {0};
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo, CHECK_CODE(reserveTableMetaInCacheForInsert(&name, isStable ? CATALOG_REQ_TYPE_META : CATALOG_REQ_TYPE_BOTH, tableNo,
pCxt->pMetaCache)); pCxt->pMetaCache));
...@@ -1730,7 +1730,7 @@ static int32_t checkTableName(const char* pTableName, SMsgBuf* pMsgBuf) { ...@@ -1730,7 +1730,7 @@ static int32_t checkTableName(const char* pTableName, SMsgBuf* pMsgBuf) {
} }
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) { static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, int32_t tableNo, SToken* pTbToken) {
SName name; SName name = {0};
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(checkTableName(name.tname, &pCxt->msg)); CHECK_CODE(checkTableName(name.tname, &pCxt->msg));
CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache)); CHECK_CODE(reserveTableMetaInCacheForInsert(&name, CATALOG_REQ_TYPE_VGROUP, tableNo, pCxt->pMetaCache));
......
...@@ -252,11 +252,15 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp ...@@ -252,11 +252,15 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
// the max slots is not defined by user // the max slots is not defined by user
pHashObj->capacity = taosHashCapacity((int32_t)capacity); pHashObj->capacity = taosHashCapacity((int32_t)capacity);
pHashObj->size = 0;
pHashObj->equalFp = memcmp; pHashObj->equalFp = memcmp;
pHashObj->hashFp = fn; pHashObj->hashFp = fn;
pHashObj->type = type; pHashObj->type = type;
pHashObj->lock = 0;
pHashObj->enableUpdate = update; pHashObj->enableUpdate = update;
pHashObj->freeFp = NULL;
pHashObj->callbackFp = NULL;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
...@@ -329,7 +333,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo ...@@ -329,7 +333,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo
// disable resize // disable resize
taosHashRLock(pHashObj); taosHashRLock(pHashObj);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); uint32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
taosHashEntryWLock(pHashObj, pe); taosHashEntryWLock(pHashObj, pe);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册