提交 639b5102 编写于 作者: D dapan1121

feature/qnode

上级 b2ad12cb
...@@ -167,6 +167,7 @@ typedef struct { ...@@ -167,6 +167,7 @@ typedef struct {
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion; int32_t vgVersion;
} SBuildUseDBInput; } SBuildUseDBInput;
...@@ -563,6 +564,7 @@ int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp); ...@@ -563,6 +564,7 @@ int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
typedef struct { typedef struct {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion; int32_t vgVersion;
} SUseDbReq; } SUseDbReq;
......
...@@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); ...@@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
*/ */
void catalogFreeHandle(SCatalog* pCatalog); void catalogFreeHandle(SCatalog* pCatalog);
int32_t catalogGetDBVgVersion(SCatalog* pCatalog, const char* dbName, int32_t* version); int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId);
/** /**
* Get a DB's all vgroup info. * Get a DB's all vgroup info.
......
...@@ -156,6 +156,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); ...@@ -156,6 +156,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
*/ */
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp);
void initQueryModuleMsgHandle(); void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema(); const SSchema* tGetTbnameColumnSchema();
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "catalog.h" #include "catalog.h"
#include "query.h"
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
...@@ -243,6 +244,23 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -243,6 +244,23 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
struct SCatalog *pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) {
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code));
} else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
}
}
tFreeSUsedbRsp(&usedbRsp);
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
free(pMsg->pData); free(pMsg->pData);
setErrno(pRequest, code); setErrno(pRequest, code);
...@@ -256,6 +274,26 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -256,6 +274,26 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SName name = {0}; SName name = {0};
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB); tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB);
SUseDbOutput output = {0};
code = queryBuildUseDbOutput(&output, &usedbRsp);
if (code != 0) {
terrno = code;
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
tfree(output.dbVgroup);
tscError("failed to build use db output since %s", terrstr());
} else {
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code));
} else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
}
}
tFreeSUsedbRsp(&usedbRsp); tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
......
...@@ -1421,6 +1421,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { ...@@ -1421,6 +1421,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -1435,6 +1436,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) { ...@@ -1435,6 +1436,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -940,13 +940,18 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { ...@@ -940,13 +940,18 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
char *p = strchr(usedbReq.db, '.'); char *p = strchr(usedbReq.db, '.');
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) { if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
code = 0;
} else { } else {
pDb = mndAcquireDb(pMnode, usedbReq.db); pDb = mndAcquireDb(pMnode, usedbReq.db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_EXIST; terrno = TSDB_CODE_MND_DB_NOT_EXIST;
goto USE_DB_OVER;
}
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
usedbRsp.uid = usedbReq.dbId;
usedbRsp.vgVersion = usedbReq.vgVersion;
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
} else {
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
goto USE_DB_OVER; goto USE_DB_OVER;
...@@ -962,7 +967,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { ...@@ -962,7 +967,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
goto USE_DB_OVER; goto USE_DB_OVER;
} }
if (usedbReq.vgVersion < pDb->vgVersion) { if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid) {
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos); mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
} }
...@@ -971,12 +976,15 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { ...@@ -971,12 +976,15 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
usedbRsp.vgVersion = pDb->vgVersion; usedbRsp.vgVersion = pDb->vgVersion;
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos); usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.hashMethod = pDb->hashMethod; usedbRsp.hashMethod = pDb->hashMethod;
code = 0;
}
} }
int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp); int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp);
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) { if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto USE_DB_OVER; goto USE_DB_OVER;
} }
...@@ -984,7 +992,6 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { ...@@ -984,7 +992,6 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
pReq->pCont = pRsp; pReq->pCont = pRsp;
pReq->contLen = contLen; pReq->contLen = contLen;
code = 0;
USE_DB_OVER: USE_DB_OVER:
if (code != 0) { if (code != 0) {
......
...@@ -489,7 +489,7 @@ PROCESS_RPC_END: ...@@ -489,7 +489,7 @@ PROCESS_RPC_END:
if (code == TSDB_CODE_APP_NOT_READY) { if (code == TSDB_CODE_APP_NOT_READY) {
mndSendRedirectRsp(pMnode, &pMsg->rpcMsg); mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
} else if (code != 0) { } else if (code != 0) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = code};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} else { } else {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
......
...@@ -167,6 +167,28 @@ void ctgDbgShowDBCache(SHashObj *dbHash) { ...@@ -167,6 +167,28 @@ void ctgDbgShowDBCache(SHashObj *dbHash) {
CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId); CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
CTG_CACHE_DEBUG("deleted: %d", dbCache->deleted);
if (dbCache->vgInfo) {
CTG_CACHE_DEBUG("vgVersion: %d", dbCache->vgInfo->vgVersion);
CTG_CACHE_DEBUG("hashMethod: %d", dbCache->vgInfo->hashMethod);
if (dbCache->vgInfo->vgHash) {
CTG_CACHE_DEBUG("vgNum: %d", taosHashGetSize(dbCache->vgInfo->vgHash));
//TODO
} else {
CTG_CACHE_DEBUG("vgHash: %p", dbCache->vgInfo->vgHash);
}
} else {
CTG_CACHE_DEBUG("vgInfo: %p", dbCache->vgInfo);
}
if (dbCache->tbCache.metaCache) {
CTG_CACHE_DEBUG("metaNum: %d", taosHashGetSize(dbCache->tbCache.metaCache));
}
if (dbCache->tbCache.stbCache) {
CTG_CACHE_DEBUG("stbNum: %d", taosHashGetSize(dbCache->tbCache.stbCache));
}
pIter = taosHashIterate(dbHash, pIter); pIter = taosHashIterate(dbHash, pIter);
} }
} }
...@@ -242,6 +264,34 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { ...@@ -242,6 +264,34 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
} }
int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
action.data = msg;
CTG_ERR_JRET(ctgPushAction(&action));
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
return TSDB_CODE_SUCCESS;
_return:
tfree(action.data);
CTG_RET(code);
}
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
CTG_LOCK(CTG_WRITE, &cache->stbLock); CTG_LOCK(CTG_WRITE, &cache->stbLock);
if (cache->stbCache) { if (cache->stbCache) {
...@@ -452,12 +502,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE ...@@ -452,12 +502,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_DB_NOT_EXIST(rpcRsp.code)) { ctgError("error rsp for use db, error:%s, db:%s", tstrerror(rpcRsp.code), input->db);
ctgDebug("db not exist in mnode, dbFName:%s", input->db);
return rpcRsp.code;
}
ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
CTG_ERR_RET(rpcRsp.code); CTG_ERR_RET(rpcRsp.code);
} }
...@@ -1365,20 +1410,33 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) { ...@@ -1365,20 +1410,33 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
bool inCache = false; bool inCache = false;
int32_t code = 0; int32_t code = 0;
if (!forceUpdate) {
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
if (inCache) {
if (inCache && !forceUpdate) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
}
SUseDbOutput DbOut = {0}; SUseDbOutput DbOut = {0};
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
tstrncpy(input.db, dbFName, tListLen(input.db)); tstrncpy(input.db, dbFName, tListLen(input.db));
if (inCache) {
input.dbId = (*dbCache)->dbId;
input.vgVersion = (*dbCache)->vgInfo->vgVersion;
} else {
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
}
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut)); code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut);
if (code) {
if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) {
ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId);
ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId);
}
CTG_ERR_RET(code);
}
CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo)); CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
...@@ -1772,7 +1830,6 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) { ...@@ -1772,7 +1830,6 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
} }
void* ctgUpdateThreadFunc(void* param) { void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog"); setThreadName("catalog");
...@@ -1964,10 +2021,10 @@ void catalogFreeHandle(SCatalog* pCtg) { ...@@ -1964,10 +2021,10 @@ void catalogFreeHandle(SCatalog* pCtg) {
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
} }
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) { int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) {
CTG_API_ENTER(); CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == version) { if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
...@@ -1994,6 +2051,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers ...@@ -1994,6 +2051,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
} }
*version = dbCache->vgInfo->vgVersion; *version = dbCache->vgInfo->vgVersion;
*dbId = dbCache->dbId;
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
...@@ -2099,29 +2157,12 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { ...@@ -2099,29 +2157,12 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS); CTG_API_LEAVE(TSDB_CODE_SUCCESS);
} }
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB}; CTG_ERR_JRET(ctgPushRmDBMsgInQueue(pCtg, dbFName, dbId));
SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId;
action.data = msg;
CTG_ERR_JRET(ctgPushAction(&action));
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
CTG_API_LEAVE(TSDB_CODE_SUCCESS); CTG_API_LEAVE(TSDB_CODE_SUCCESS);
_return: _return:
tfree(action.data);
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
......
...@@ -128,6 +128,7 @@ void ctgTestInitLogFile() { ...@@ -128,6 +128,7 @@ void ctgTestInitLogFile() {
tsAsyncLog = 0; tsAsyncLog = 0;
qDebugFlag = 159; qDebugFlag = 159;
strcpy(tsLogDir, "/var/log/taos");
ctgDbgEnableDebug("api"); ctgDbgEnableDebug("api");
...@@ -827,7 +828,7 @@ void *ctgTestSetCtableMetaThread(void *param) { ...@@ -827,7 +828,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
return NULL; return NULL;
} }
#if 0 #if 1
TEST(tableMeta, normalTable) { TEST(tableMeta, normalTable) {
...@@ -1498,7 +1499,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { ...@@ -1498,7 +1499,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) { while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM)) {
usleep(10000); usleep(10000);
} }
......
...@@ -762,6 +762,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch ...@@ -762,6 +762,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
SUseDbReq usedbReq = {0}; SUseDbReq usedbReq = {0};
tNameExtractFullName(&n, usedbReq.db); tNameExtractFullName(&n, usedbReq.db);
catalogGetDBVgVersion(pCtx->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId);
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
void* pBuf = malloc(bufLen); void* pBuf = malloc(bufLen);
......
...@@ -24,6 +24,32 @@ ...@@ -24,6 +24,32 @@
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0}; int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0};
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
memcpy(pOut->db, usedbRsp->db, TSDB_DB_FNAME_LEN);
pOut->dbId = usedbRsp->uid;
pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
if (NULL == pOut->dbVgroup) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pOut->dbVgroup->vgVersion = usedbRsp->vgVersion;
pOut->dbVgroup->hashMethod = usedbRsp->hashMethod;
pOut->dbVgroup->vgHash =
taosHashInit(usedbRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup->vgHash) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
SBuildTableMetaInput *pInput = input; SBuildTableMetaInput *pInput = input;
if (NULL == input || NULL == msg || NULL == msgLen) { if (NULL == input || NULL == msg || NULL == msgLen) {
...@@ -57,6 +83,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms ...@@ -57,6 +83,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db)); strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db));
usedbReq.db[sizeof(usedbReq.db) - 1] = 0; usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
usedbReq.vgVersion = pInput->vgVersion; usedbReq.vgVersion = pInput->vgVersion;
usedbReq.dbId = pInput->dbId;
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
void *pBuf = rpcMallocCont(bufLen); void *pBuf = rpcMallocCont(bufLen);
...@@ -90,35 +117,10 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { ...@@ -90,35 +117,10 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
goto PROCESS_USEDB_OVER; goto PROCESS_USEDB_OVER;
} }
memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN); code = queryBuildUseDbOutput(pOut, &usedbRsp);
pOut->dbId = usedbRsp.uid;
pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
if (NULL == pOut->dbVgroup) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto PROCESS_USEDB_OVER;
}
pOut->dbVgroup->vgVersion = usedbRsp.vgVersion;
pOut->dbVgroup->hashMethod = usedbRsp.hashMethod;
pOut->dbVgroup->vgHash =
taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup->vgHash) {
tfree(pOut->dbVgroup);
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto PROCESS_USEDB_OVER;
}
for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto PROCESS_USEDB_OVER;
}
}
code = 0;
PROCESS_USEDB_OVER: PROCESS_USEDB_OVER:
if (code != 0) { if (code != 0) {
if (pOut) { if (pOut) {
if (pOut->dbVgroup) taosHashCleanup(pOut->dbVgroup->vgHash); if (pOut->dbVgroup) taosHashCleanup(pOut->dbVgroup->vgHash);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册