提交 55ec2aa0 编写于 作者: D dapan1121

catalog async test

上级 a38fcf4e
......@@ -64,7 +64,7 @@ typedef struct SCatalogReq {
} SCatalogReq;
typedef struct SMetaData {
SArray *pTableMeta; // SArray<STableMeta>
SArray *pTableMeta; // SArray<STableMeta*>
SArray *pDbVgroup; // SArray<SArray<SVgroupInfo>*>
SArray *pTableHash; // SArray<SVgroupInfo>
SArray *pUdfList; // SArray<SFuncInfo>
......@@ -248,6 +248,8 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
*/
int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
......@@ -267,6 +269,9 @@ int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId);
/**
* Destroy catalog and relase all resources
*/
......
......@@ -180,7 +180,7 @@ char* jobTaskStatusStr(int32_t status);
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t));
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize);
#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE
......
......@@ -171,7 +171,7 @@ typedef struct SCtgJob {
uint64_t queryId;
SCatalog* pCtg;
void* pTrans;
const SEpSet* pMgmtEps;
SEpSet pMgmtEps;
void* userParam;
catalogCallback userFp;
int32_t tbMetaNum;
......
......@@ -327,28 +327,31 @@ int32_t ctgChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const c
return TSDB_CODE_SUCCESS;
}
SGetUserAuthRsp authRsp = {0};
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(CTG_PARAMS_LIST(), user, &authRsp, NULL));
SGetUserAuthRsp* authRsp = taosMemoryCalloc(1, sizeof(SGetUserAuthRsp));
if (NULL == authRsp) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(CTG_PARAMS_LIST(), user, authRsp, NULL));
if (authRsp.superAuth) {
if (authRsp->superAuth) {
*pass = true;
goto _return;
}
if (authRsp.createdDbs && taosHashGet(authRsp.createdDbs, dbFName, strlen(dbFName))) {
if (authRsp->createdDbs && taosHashGet(authRsp->createdDbs, dbFName, strlen(dbFName))) {
*pass = true;
goto _return;
}
if (type == AUTH_TYPE_READ && authRsp.readDbs && taosHashGet(authRsp.readDbs, dbFName, strlen(dbFName))) {
if (type == AUTH_TYPE_READ && authRsp->readDbs && taosHashGet(authRsp->readDbs, dbFName, strlen(dbFName))) {
*pass = true;
} else if (type == AUTH_TYPE_WRITE && authRsp.writeDbs && taosHashGet(authRsp.writeDbs, dbFName, strlen(dbFName))) {
} else if (type == AUTH_TYPE_WRITE && authRsp->writeDbs && taosHashGet(authRsp->writeDbs, dbFName, strlen(dbFName))) {
*pass = true;
}
_return:
ctgPutUpdateUserToQueue(pCtg, &authRsp, false);
ctgPutUpdateUserToQueue(pCtg, authRsp, false);
return TSDB_CODE_SUCCESS;
}
......
......@@ -21,173 +21,189 @@
#include "tref.h"
int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
pTask->type = CTG_TASK_GET_TB_META;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
task.type = CTG_TASK_GET_TB_META;
task.taskId = taskIdx;
task.pJob = pJob;
pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaCtx));
if (NULL == pTask->taskCtx) {
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbMetaCtx* ctx = pTask->taskCtx;
SCtgTbMetaCtx* ctx = task.taskCtx;
ctx->pName = taosMemoryMalloc(sizeof(*name));
if (NULL == ctx->pName) {
taosMemoryFree(pTask->taskCtx);
taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(ctx->pName, name, sizeof(*name));
ctx->flag = CTG_FLAG_UNKNOWN_STB;
qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, pTask->type, name->tname);
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
pTask->type = CTG_TASK_GET_DB_VGROUP;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
task.type = CTG_TASK_GET_DB_VGROUP;
task.taskId = taskIdx;
task.pJob = pJob;
pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbVgCtx));
if (NULL == pTask->taskCtx) {
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbVgCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgDbVgCtx* ctx = pTask->taskCtx;
SCtgDbVgCtx* ctx = task.taskCtx;
memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, pTask->type, dbFName);
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
pTask->type = CTG_TASK_GET_DB_CFG;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
task.type = CTG_TASK_GET_DB_CFG;
task.taskId = taskIdx;
task.pJob = pJob;
pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbCfgCtx));
if (NULL == pTask->taskCtx) {
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbCfgCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgDbCfgCtx* ctx = pTask->taskCtx;
SCtgDbCfgCtx* ctx = task.taskCtx;
memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, pTask->type, dbFName);
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
pTask->type = CTG_TASK_GET_TB_HASH;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
task.type = CTG_TASK_GET_TB_HASH;
task.taskId = taskIdx;
task.pJob = pJob;
pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashCtx));
if (NULL == pTask->taskCtx) {
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgTbHashCtx* ctx = pTask->taskCtx;
SCtgTbHashCtx* ctx = task.taskCtx;
ctx->pName = taosMemoryMalloc(sizeof(*name));
if (NULL == ctx->pName) {
taosMemoryFree(pTask->taskCtx);
taosMemoryFree(task.taskCtx);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(ctx->pName, name, sizeof(*name));
tNameGetFullDbName(ctx->pName, ctx->dbFName);
qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, pTask->type, name->tname);
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
task.type = CTG_TASK_GET_QNODE;
task.taskId = taskIdx;
task.pJob = pJob;
task.taskCtx = NULL;
pTask->type = CTG_TASK_GET_QNODE;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
pTask->taskCtx = NULL;
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, pTask->type);
qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, task.type);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
pTask->type = CTG_TASK_GET_INDEX;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
task.type = CTG_TASK_GET_INDEX;
task.taskId = taskIdx;
task.pJob = pJob;
pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgIndexCtx));
if (NULL == pTask->taskCtx) {
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgIndexCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgIndexCtx* ctx = pTask->taskCtx;
SCtgIndexCtx* ctx = task.taskCtx;
strcpy(ctx->indexFName, name);
qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, pTask->type, name);
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, task.type, name);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
pTask->type = CTG_TASK_GET_UDF;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
task.type = CTG_TASK_GET_UDF;
task.taskId = taskIdx;
task.pJob = pJob;
pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgUdfCtx));
if (NULL == pTask->taskCtx) {
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgUdfCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgUdfCtx* ctx = pTask->taskCtx;
SCtgUdfCtx* ctx = task.taskCtx;
strcpy(ctx->udfName, name);
qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, pTask->type, name);
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, task.type, name);
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, taskIdx);
SCtgTask task = {0};
pTask->type = CTG_TASK_GET_USER;
pTask->taskId = taskIdx;
pTask->pJob = pJob;
task.type = CTG_TASK_GET_USER;
task.taskId = taskIdx;
task.pJob = pJob;
pTask->taskCtx = taosMemoryCalloc(1, sizeof(SCtgUserCtx));
if (NULL == pTask->taskCtx) {
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgUserCtx));
if (NULL == task.taskCtx) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgUserCtx* ctx = pTask->taskCtx;
SCtgUserCtx* ctx = task.taskCtx;
memcpy(&ctx->user, user, sizeof(*user));
qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, pTask->type, user->user);
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, task.type, user->user);
return TSDB_CODE_SUCCESS;
}
......@@ -222,7 +238,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob->userFp = fp;
pJob->pCtg = pCtg;
pJob->pTrans = pTrans;
pJob->pMgmtEps = pMgmtEps;
pJob->pMgmtEps = *pMgmtEps;
pJob->userParam = param;
pJob->tbMetaNum = tbMetaNum;
......@@ -303,15 +319,13 @@ _return:
int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pTableMeta) {
pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, sizeof(STableMeta));
pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, POINTER_BYTES);
if (NULL == pJob->jobRes.pTableMeta) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
taosArrayPush(pJob->jobRes.pTableMeta, pTask->res);
taosMemoryFreeClear(pTask->res);
taosArrayPush(pJob->jobRes.pTableMeta, &pTask->res);
return TSDB_CODE_SUCCESS;
}
......@@ -340,7 +354,7 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
}
}
taosArrayPush(pJob->jobRes.pTableHash, &pTask->res);
taosArrayPush(pJob->jobRes.pTableHash, pTask->res);
return TSDB_CODE_SUCCESS;
}
......@@ -376,7 +390,7 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
}
}
taosArrayPush(pJob->jobRes.pDbCfg, &pTask->res);
taosArrayPush(pJob->jobRes.pDbCfg, pTask->res);
return TSDB_CODE_SUCCESS;
}
......@@ -451,7 +465,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
switch (reqType) {
case TDMT_MND_USE_DB: {
......@@ -529,7 +543,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
taosMemoryFreeClear(pOut->tbMeta);
CTG_ERR_JRET(ctgGetTbMetaFromMnode(CTG_PARAMS_LIST(), ctx->pName, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnode(CTG_PARAMS_LIST(), ctx->pName, NULL, pTask));
} else if (CTG_IS_META_BOTH(pOut->metaType)) {
int32_t exist = 0;
if (!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) {
......@@ -538,7 +552,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
if (0 == exist) {
TSWAP(pTask->msgCtx.lastOut, pTask->msgCtx.out);
CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), pOut->dbFName, pOut->tbName, NULL, pTask));
CTG_RET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), pOut->dbFName, pOut->tbName, NULL, pTask));
} else {
taosMemoryFreeClear(pOut->tbMeta);
......@@ -598,7 +612,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM
SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
switch (reqType) {
case TDMT_MND_USE_DB: {
......@@ -632,7 +646,7 @@ int32_t ctgHandleGetTbHashRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
switch (reqType) {
case TDMT_MND_USE_DB: {
......@@ -724,7 +738,7 @@ int32_t ctgHandleGetUserRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pM
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
bool pass = false;
SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out;
......@@ -766,7 +780,7 @@ _return:
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
int32_t code = 0;
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
......@@ -788,7 +802,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
tNameGetFullDbName(ctx->pName, dbFName);
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL == dbCache) {
if (dbCache) {
SVgroupInfo vgInfo = {0};
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo));
......@@ -817,7 +831,7 @@ _return:
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
CTG_ERR_RET(ctgGetTbMetaFromCache(CTG_PARAMS_LIST(), (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res));
if (pTask->res) {
......@@ -834,7 +848,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgDBCache *dbCache = NULL;
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
......@@ -866,7 +880,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
int32_t code = 0;
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgDBCache *dbCache = NULL;
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
......@@ -901,7 +915,7 @@ _return:
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
CTG_ERR_RET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), NULL, pTask));
......@@ -911,7 +925,7 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetDBCfgFromMnode(CTG_PARAMS_LIST(), pCtx->dbFName, NULL, pTask));
......@@ -922,7 +936,7 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), pCtx->indexFName, NULL, pTask));
......@@ -933,7 +947,7 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgGetUdfInfoFromMnode(CTG_PARAMS_LIST(), pCtx->udfName, NULL, pTask));
......@@ -944,7 +958,7 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
void *pTrans = pTask->pJob->pTrans;
const SEpSet* pMgmtEps = pTask->pJob->pMgmtEps;
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
bool inCache = false;
bool pass = false;
......
......@@ -248,7 +248,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", ctx->pName->tname);
ctgDebug("db %d.%s not in cache", ctx->pName->acctId, ctx->pName->dbname);
return TSDB_CODE_SUCCESS;
}
......@@ -716,6 +716,8 @@ int32_t ctgPutUpdateUserToQueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syn
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
taosMemoryFree(pAuth);
return TSDB_CODE_SUCCESS;
_return:
......
......@@ -25,9 +25,9 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);
taosMemoryFree(param);
ctgDebug("async call result: %s", tstrerror(code));
qDebug("async call result: %s", tstrerror(code));
if (NULL == pResult) {
ctgDebug("empty meta result");
qDebug("empty meta result");
return;
}
......@@ -36,25 +36,24 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if (pResult->pTableMeta && taosArrayGetSize(pResult->pTableMeta) > 0) {
num = taosArrayGetSize(pResult->pTableMeta);
for (int32_t i = 0; i < num; ++i) {
STableMeta *p = taosArrayGet(pResult->pTableMeta, i);
STableMeta *p = *(STableMeta **)taosArrayGet(pResult->pTableMeta, i);
STableComInfo *c = &p->tableInfo;
if (TSDB_CHILD_TABLE == p->tableType) {
ctgDebug("table meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64, p->tableType, p->vgId, p->uid, p->suid);
return;
qDebug("table meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64, p->tableType, p->vgId, p->uid, p->suid);
} else {
ctgDebug("table meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
qDebug("table meta: type:%d, vgId:%d, uid:%" PRIx64 ",suid:%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision, c->numOfColumns, c->rowSize);
}
int32_t colNum = c->numOfColumns + c->numOfTags;
for (int32_t j = 0; j < colNum; ++j) {
SSchema *s = &p->schema[j];
ctgDebug("[%d] name:%s, type:%d, colId:%d, bytes:%d", j, s->name, s->type, s->colId, s->bytes);
qDebug("[%d] name:%s, type:%d, colId:%d, bytes:%d", j, s->name, s->type, s->colId, s->bytes);
}
}
} else {
ctgDebug("empty table meta");
qDebug("empty table meta");
}
if (pResult->pDbVgroup && taosArrayGetSize(pResult->pDbVgroup) > 0) {
......@@ -62,14 +61,65 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
for (int32_t i = 0; i < num; ++i) {
SArray *pDb = *(SArray**)taosArrayGet(pResult->pDbVgroup, i);
int32_t vgNum = taosArrayGetSize(pDb);
qDebug("db %d vgInfo:", i);
for (int32_t j = 0; j < vgNum; ++j) {
SVgroupInfo* pInfo = taosArrayGet(pDb, j);
ctgDebug(param, ...);
qDebug("vg %d info: vgId:%d", j, pInfo->vgId);
}
}
} else {
ctgDebug("empty db vgroup");
qDebug("empty db vgroup");
}
if (pResult->pTableHash && taosArrayGetSize(pResult->pTableHash) > 0) {
num = taosArrayGetSize(pResult->pTableHash);
for (int32_t i = 0; i < num; ++i) {
SVgroupInfo* pInfo = taosArrayGet(pResult->pTableHash, i);
qDebug("table %d vg info: vgId:%d", i, pInfo->vgId);
}
} else {
qDebug("empty table hash vgroup");
}
if (pResult->pUdfList && taosArrayGetSize(pResult->pUdfList) > 0) {
num = taosArrayGetSize(pResult->pUdfList);
for (int32_t i = 0; i < num; ++i) {
SFuncInfo* pInfo = taosArrayGet(pResult->pUdfList, i);
qDebug("udf %d info: name:%s, funcType:%d", i, pInfo->name, pInfo->funcType);
}
} else {
qDebug("empty udf info");
}
if (pResult->pDbCfg && taosArrayGetSize(pResult->pDbCfg) > 0) {
num = taosArrayGetSize(pResult->pDbCfg);
for (int32_t i = 0; i < num; ++i) {
SDbCfgInfo* pInfo = taosArrayGet(pResult->pDbCfg, i);
qDebug("db %d info: numOFVgroups:%d, numOfStables:%d", i, pInfo->numOfVgroups, pInfo->numOfStables);
}
} else {
qDebug("empty db cfg info");
}
if (pResult->pUser && taosArrayGetSize(pResult->pUser) > 0) {
num = taosArrayGetSize(pResult->pUser);
for (int32_t i = 0; i < num; ++i) {
bool* auth = taosArrayGet(pResult->pUser, i);
qDebug("user auth %d info: %d", i, *auth);
}
} else {
qDebug("empty user auth info");
}
if (pResult->pQnodeList && taosArrayGetSize(pResult->pQnodeList) > 0) {
num = taosArrayGetSize(pResult->pQnodeList);
for (int32_t i = 0; i < num; ++i) {
SQueryNodeAddr* qaddr = taosArrayGet(pResult->pQnodeList, i);
qDebug("qnode %d info: id:%d", i, qaddr->nodeId);
}
} else {
qDebug("empty qnode info");
}
}
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) {
......
......@@ -264,10 +264,11 @@ int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_QNODE_LIST;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pMgmtEps->inUse);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
CTG_ERR_RET(code);
......@@ -301,10 +302,11 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
CTG_ERR_RET(code);
......@@ -338,10 +340,11 @@ int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, S
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_DB_CFG;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)dbFName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
CTG_ERR_RET(code);
......@@ -375,10 +378,11 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get index from mnode, indexName:%s", indexName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)indexName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
CTG_ERR_RET(code);
......@@ -412,10 +416,11 @@ int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out,
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get udf info from mnode, funcName:%s", funcName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)funcName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
CTG_ERR_RET(code);
......@@ -449,10 +454,11 @@ int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_USER_AUTH;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get user auth from mnode, user:%s", user);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)user, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
CTG_ERR_RET(code);
......@@ -491,10 +497,11 @@ int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STabl
int32_t reqType = TDMT_MND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, tbName);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build mnode stablemeta msg failed, code:%x", code);
CTG_ERR_RET(code);
......@@ -537,6 +544,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
int32_t reqType = TDMT_VND_TABLE_META;
char tbFName[TSDB_TABLE_FNAME_LEN];
sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get table meta from vnode, vgId:%d, tbFName:%s", vgroupInfo->vgId, tbFName);
......@@ -544,7 +552,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
char *msg = NULL;
int32_t msgLen = 0;
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build vnode tablemeta msg failed, code:%x, tbFName:%s", code, tbFName);
CTG_ERR_RET(code);
......
......@@ -35,7 +35,7 @@ void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pUdfList);
pData->pUdfList = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pData->pDbCfg); ++i) {
SDbCfgInfo* pInfo = taosArrayGet(pData->pDbCfg, i);
taosArrayDestroy(pInfo->pRetensions);
......@@ -167,12 +167,15 @@ void ctgFreeHandle(SCatalog* pCtg) {
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
if (NULL == pOutput || NULL == pOutput->dbVgroup) {
if (NULL == pOutput) {
return;
}
taosHashCleanup(pOutput->dbVgroup->vgHash);
taosMemoryFreeClear(pOutput->dbVgroup);
if (pOutput->dbVgroup) {
taosHashCleanup(pOutput->dbVgroup->vgHash);
taosMemoryFreeClear(pOutput->dbVgroup);
}
taosMemoryFree(pOutput);
}
......@@ -267,6 +270,7 @@ void ctgFreeTask(SCtgTask* pTask) {
switch (pTask->type) {
case CTG_TASK_GET_QNODE: {
taosArrayDestroy((SArray*)pTask->res);
taosMemoryFreeClear(pTask->taskCtx);
pTask->res = NULL;
break;
}
......@@ -277,17 +281,19 @@ void ctgFreeTask(SCtgTask* pTask) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
pTask->msgCtx.lastOut = NULL;
}
taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res);
break;
}
case CTG_TASK_GET_DB_VGROUP: {
taosArrayDestroy((SArray*)pTask->res);
taosMemoryFreeClear(pTask->taskCtx);
pTask->res = NULL;
break;
}
case CTG_TASK_GET_DB_CFG: {
taosMemoryFreeClear(pTask->taskCtx);
if (pTask->res) {
taosArrayDestroy(((SDbCfgInfo*)pTask->res)->pRetensions);
taosMemoryFreeClear(pTask->res);
}
break;
......@@ -295,6 +301,7 @@ void ctgFreeTask(SCtgTask* pTask) {
case CTG_TASK_GET_TB_HASH: {
SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
taosMemoryFreeClear(taskCtx->pName);
taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res);
break;
}
......
......@@ -2766,6 +2766,8 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
break;
}
}
taosArrayDestroy(dbCfg.pRetensions);
return code;
}
......
......@@ -22,7 +22,7 @@
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wformat-truncation"
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallocFp)(int32_t)) = {0};
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0};
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
......@@ -58,7 +58,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
SBuildTableMetaInput *pInput = input;
if (NULL == input || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
......@@ -72,7 +72,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
tstrncpy(infoReq.tbName, pInput->tbName, TSDB_TABLE_NAME_LEN);
int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq);
void *pBuf = rpcMallocCont(bufLen);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSTableInfoReq(pBuf, bufLen, &infoReq);
*msg = pBuf;
......@@ -81,7 +81,7 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
SBuildUseDBInput *pInput = input;
if (NULL == pInput || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
......@@ -95,7 +95,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
usedbReq.numOfTable = pInput->numOfTable;
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
void *pBuf = rpcMallocCont(bufLen);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSUseDbReq(pBuf, bufLen, &usedbReq);
*msg = pBuf;
......@@ -104,7 +104,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -113,7 +113,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
qnodeListReq.rowNum = -1;
int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq);
void *pBuf = rpcMallocCont(bufLen);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq);
*msg = pBuf;
......@@ -122,7 +122,7 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -131,7 +131,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
strcpy(dbCfgReq.db, input);
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
void *pBuf = rpcMallocCont(bufLen);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq);
*msg = pBuf;
......@@ -140,7 +140,7 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -149,7 +149,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
strcpy(indexReq.indexFName, input);
int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
void *pBuf = rpcMallocCont(bufLen);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSUserIndexReq(pBuf, bufLen, &indexReq);
*msg = pBuf;
......@@ -158,7 +158,7 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -170,7 +170,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
taosArrayPush(funcReq.pFuncNames, input);
int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq);
void *pBuf = rpcMallocCont(bufLen);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq);
taosArrayDestroy(funcReq.pFuncNames);
......@@ -181,7 +181,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
......@@ -190,7 +190,7 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
strncpy(req.user, input, sizeof(req.user));
int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req);
void *pBuf = rpcMallocCont(bufLen);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSGetUserAuthReq(pBuf, bufLen, &req);
*msg = pBuf;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册