From 924e3ad6bde95b866b863d679491e85ec9456560 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 15 Dec 2021 09:52:37 +0800 Subject: [PATCH] use db to get vgroup list and db info --- include/common/taosmsg.h | 25 +++- include/libs/catalog/catalog.h | 8 +- include/libs/query/query.h | 16 +++ include/os/osMemory.h | 4 +- source/libs/catalog/inc/catalogInt.h | 1 + source/libs/catalog/src/catalog.c | 116 ++++++++++++++++--- source/libs/parser/CMakeLists.txt | 4 +- source/libs/parser/test/CMakeLists.txt | 2 +- source/libs/planner/CMakeLists.txt | 4 +- source/libs/planner/test/CMakeLists.txt | 2 +- source/libs/query/src/querymsg.c | 146 +++++++++++++++++++++++- 11 files changed, 297 insertions(+), 31 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index a75c4a7bac..caf872689c 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -219,6 +219,13 @@ typedef struct SBuildTableMetaInput { char *tableFullName; } SBuildTableMetaInput; +typedef struct SBuildUseDBInput { + char db[TSDB_TABLE_FNAME_LEN]; + int32_t vgroupVersion; + int32_t dbGroupVersion; +} SBuildUseDBInput; + + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -617,9 +624,12 @@ typedef struct { typedef struct { char db[TSDB_TABLE_FNAME_LEN]; int8_t ignoreNotExists; + int32_t vgroupVersion; + int32_t dbGroupVersion; int32_t reserve[8]; } SUseDbMsg; + typedef struct { char db[TSDB_TABLE_FNAME_LEN]; int32_t reserve[8]; @@ -806,8 +816,6 @@ typedef struct SVgroupListRspMsg { SVgroupInfo vgroupInfo[]; } SVgroupListRspMsg; -typedef SVgroupListRspMsg SVgroupListInfo; - typedef struct { int32_t vgId; int8_t numOfEps; @@ -852,6 +860,19 @@ typedef struct { char *data; } STagData; +typedef struct { + int32_t vgroupNum; + int32_t vgroupVersion; + char db[TSDB_TABLE_FNAME_LEN]; + int32_t dbVgroupVersion; + int32_t dbVgroupNum; + int32_t dbHashRange; + SVgroupInfo vgroupInfo[]; +//int32_t vgIdList[]; +} SUseDbRspMsg; + + + /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 0b45f71557..f9d3b3c8c1 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -27,16 +27,10 @@ extern "C" { #include "transport.h" #include "common.h" #include "taosmsg.h" +#include "query.h" struct SCatalog; -typedef struct SDBVgroupInfo { - int32_t vgroupVersion; - SArray *vgId; - int32_t hashRange; - int32_t hashNum; -} SDBVgroupInfo; - typedef struct SCatalogReq { char clusterId[TSDB_CLUSTER_ID_LEN]; //???? SArray *pTableName; // table full name diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 866a69ed8e..02ae708874 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -20,6 +20,22 @@ extern "C" { #endif +#include "tarray.h" + +typedef SVgroupListRspMsg SVgroupListInfo; + +typedef struct SDBVgroupInfo { + int32_t vgroupVersion; + SArray *vgId; + int32_t hashRange; +} SDBVgroupInfo; + +typedef struct SUseDbOutput { + SVgroupListInfo *vgroupList; + char db[TSDB_TABLE_FNAME_LEN]; + SDBVgroupInfo *dbVgroup; +} SUseDbOutput; + extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 10c90cd9aa..5f1d5a9a8a 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -23,8 +23,8 @@ extern "C" { #define tfree(x) \ do { \ if (x) { \ - free((void *)x); \ - x = 0; \ + free((void *)(x)); \ + (x) = 0; \ } \ } while (0) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 720f197782..a08b64f9a9 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -26,6 +26,7 @@ extern "C" { #define CTG_DEFAULT_CLUSTER_NUMBER 6 #define CTG_DEFAULT_VGROUP_NUMBER 100 +#define CTG_DEFAULT_DB_NUMBER 20 #define CTG_DEFAULT_INVALID_VERSION (-1) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index b488ab8101..92b6094529 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -63,21 +63,69 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* } -int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { -/* +int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { if (NULL == pCatalog->dbCache.cache) { *exist = 0; return TSDB_CODE_SUCCESS; } - taosHashGet(SHashObj * pHashObj, const void * key, size_t keyLen) + SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + + if (NULL == info || info->vgroupVersion < pCatalog->vgroupCache.vgroupVersion) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } if (dbInfo) { - *pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache); + *dbInfo = calloc(1, sizeof(**dbInfo)); + if (NULL == *dbInfo) { + ctgError("calloc size[%d] failed", (int32_t)sizeof(**dbInfo)); + return TSDB_CODE_CTG_MEM_ERROR; + } + + (*dbInfo)->vgId = taosArrayDup(info->vgId); + if (NULL == (*dbInfo)->vgId) { + ctgError("taos array duplicate failed"); + tfree(*dbInfo); + return TSDB_CODE_CTG_MEM_ERROR; + } + + (*dbInfo)->vgroupVersion = info->vgroupVersion; + (*dbInfo)->hashRange = info->hashRange; } *exist = 1; -*/ + + return TSDB_CODE_SUCCESS; +} + + + +int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) { + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + int32_t code = queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_USE_DB, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + + code = queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + return code; + } + return TSDB_CODE_SUCCESS; } @@ -144,7 +192,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { if (NULL == pVgroup) { - ctgError("vgroup get from mnode succeed, but no output"); + ctgError("no valid vgroup list info to update"); return TSDB_CODE_CTG_INTERNAL_ERROR; } @@ -262,7 +310,33 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, } int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { + if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + if (dbInfo->vgroupVersion < 0) { + if (pCatalog->dbCache.cache) { + taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)); + } + + ctgWarn("remove db [%s] from cache", dbName); + return TSDB_CODE_SUCCESS; + } + if (NULL == pCatalog->dbCache.cache) { + pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->dbCache.cache) { + ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_DB_NUMBER); + return TSDB_CODE_CTG_MEM_ERROR; + } + } + + if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) { + ctgError("push to vgroup hash cache failed"); + return TSDB_CODE_CTG_MEM_ERROR; + } + + return TSDB_CODE_SUCCESS; } @@ -273,8 +347,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* return TSDB_CODE_CTG_INVALID_INPUT; } -/* int32_t exist = 0; + int32_t code = 0; if (0 == forceUpdate) { CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); @@ -284,18 +358,34 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* } } - SDBVgroupInfo* newDbInfo = NULL; + SUseDbOutput DbOut = {0}; + SBuildUseDBInput input = {0}; + + strncpy(input.db, dbName, sizeof(input.db)); + input.db[sizeof(input.db) - 1] = 0; + input.vgroupVersion = pCatalog->vgroupCache.vgroupVersion; + input.dbGroupVersion = CTG_DEFAULT_INVALID_VERSION; - CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, dbName, &newDbInfo)); + CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, newDbInfo)); + if (DbOut.vgroupList) { + CTG_ERR_JRET(catalogUpdateVgroup(pCatalog, DbOut.vgroupList)); + } + + if (DbOut.dbVgroup) { + CTG_ERR_JRET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup)); + } if (dbInfo) { - *dbInfo = newDbInfo; + *dbInfo = DbOut.dbVgroup; + DbOut.dbVgroup = NULL; } -*/ - return TSDB_CODE_SUCCESS; +_return: + tfree(DbOut.dbVgroup); + tfree(DbOut.vgroupList); + + return code; } diff --git a/source/libs/parser/CMakeLists.txt b/source/libs/parser/CMakeLists.txt index 155b72c1f9..5e635aa6a1 100644 --- a/source/libs/parser/CMakeLists.txt +++ b/source/libs/parser/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( parser - PRIVATE os util common catalog function transport + PRIVATE os util common catalog function transport query ) -ADD_SUBDIRECTORY(test) \ No newline at end of file +ADD_SUBDIRECTORY(test) diff --git a/source/libs/parser/test/CMakeLists.txt b/source/libs/parser/test/CMakeLists.txt index f7d7113243..4b9e586be3 100644 --- a/source/libs/parser/test/CMakeLists.txt +++ b/source/libs/parser/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(parserTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( parserTest - PUBLIC os util common parser catalog transport gtest function planner + PUBLIC os util common parser catalog transport gtest function planner query ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/planner/CMakeLists.txt b/source/libs/planner/CMakeLists.txt index 23efce38f4..4e0d03d07a 100644 --- a/source/libs/planner/CMakeLists.txt +++ b/source/libs/planner/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( planner - PRIVATE os util common catalog parser transport function + PRIVATE os util common catalog parser transport function query ) -ADD_SUBDIRECTORY(test) \ No newline at end of file +ADD_SUBDIRECTORY(test) diff --git a/source/libs/planner/test/CMakeLists.txt b/source/libs/planner/test/CMakeLists.txt index a83d7a39d9..f00adfaeb2 100644 --- a/source/libs/planner/test/CMakeLists.txt +++ b/source/libs/planner/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(plannerTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( plannerTest - PUBLIC os util common planner parser catalog transport gtest function + PUBLIC os util common planner parser catalog transport gtest function query ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 924878c872..8f35fd9c3e 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -15,7 +15,7 @@ #include "taosmsg.h" #include "queryInt.h" - +#include "query.h" int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0}; @@ -60,6 +60,36 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 return TSDB_CODE_SUCCESS; } +int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == input || NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SBuildUseDBInput* bInput = (SBuildUseDBInput *)input; + + int32_t estimateSize = sizeof(SUseDbMsg); + if (NULL == *msg || msgSize < estimateSize) { + tfree(*msg); + *msg = calloc(1, estimateSize); + if (NULL == *msg) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + + SUseDbMsg *bMsg = (SUseDbMsg *)*msg; + + strncpy(bMsg->db, bInput->db, sizeof(bMsg->db)); + bMsg->db[sizeof(bMsg->db) - 1] = 0; + + bMsg->vgroupVersion = bInput->vgroupVersion; + bMsg->dbGroupVersion = bInput->dbGroupVersion; + + *msgLen = (int32_t)sizeof(*bMsg); + + return TSDB_CODE_SUCCESS; +} + + int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { if (NULL == output || NULL == msg || msgSize <= 0) { @@ -103,12 +133,126 @@ int32_t queryProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } + + + +int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SUseDbRspMsg *pRsp = (SUseDbRspMsg *)msg; + SUseDbOutput *pOut = (SUseDbOutput *)output; + int32_t code = 0; + + if (msgSize <= sizeof(*pRsp)) { + qError("invalid use db rsp msg size, msgSize:%d", msgSize); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + pRsp->vgroupVersion = htonl(pRsp->vgroupVersion); + pRsp->dbVgroupVersion = htonl(pRsp->dbVgroupVersion); + + pRsp->vgroupNum = htonl(pRsp->vgroupNum); + pRsp->dbVgroupNum = htonl(pRsp->dbVgroupNum); + + if (pRsp->vgroupNum < 0) { + qError("invalid vgroup number[%d]", pRsp->vgroupNum); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pRsp->dbVgroupNum < 0) { + qError("invalid db vgroup number[%d]", pRsp->dbVgroupNum); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + int32_t expectSize = pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + pRsp->dbVgroupNum * sizeof(int32_t) + sizeof(*pRsp); + if (msgSize != expectSize) { + qError("vgroup list msg size mis-match, msgSize:%d, expected:%d, vgroup number:%d, db vgroup number:%d", msgSize, expectSize, pRsp->vgroupNum, pRsp->dbVgroupNum); + return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; + } + + if (pRsp->vgroupVersion < 0) { + qInfo("no new vgroup list info"); + if (pRsp->vgroupNum != 0) { + qError("invalid vgroup number[%d] for no new vgroup list case", pRsp->vgroupNum); + return TSDB_CODE_TSC_INVALID_VALUE; + } + } else { + int32_t s = sizeof(*pOut->vgroupList) + sizeof(pOut->vgroupList->vgroupInfo[0]) * pRsp->vgroupNum; + pOut->vgroupList = calloc(1, s); + if (NULL == pOut->vgroupList) { + qError("calloc size[%d] failed", s); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + pOut->vgroupList->vgroupNum = pRsp->vgroupNum; + pOut->vgroupList->vgroupVersion = pRsp->vgroupVersion; + + for (int32_t i = 0; i < pRsp->vgroupNum; ++i) { + pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); + for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { + pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port); + } + + memcpy(&pOut->vgroupList->vgroupInfo[i], &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i])); + } + } + + int32_t *vgIdList = (int32_t *)((char *)pRsp->vgroupInfo + sizeof(pRsp->vgroupInfo[0]) * pRsp->vgroupNum); + + memcpy(pOut->db, pRsp->db, sizeof(pOut->db)); + + if (pRsp->dbVgroupVersion < 0) { + qInfo("no new vgroup info for db[%s]", pRsp->db); + } else { + pOut->dbVgroup = calloc(1, sizeof(*pOut->dbVgroup)); + if (NULL == pOut->dbVgroup) { + qError("calloc size[%d] failed", (int32_t)sizeof(*pOut->dbVgroup)); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _exit; + } + + pOut->dbVgroup->vgId = taosArrayInit(pRsp->dbVgroupNum, sizeof(int32_t)); + if (NULL == pOut->dbVgroup->vgId) { + qError("taosArrayInit size[%d] failed", pRsp->dbVgroupNum); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _exit; + } + + pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion; + pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange); + + for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) { + *(vgIdList + i) = htonl(*(vgIdList + i)); + + taosArrayPush(pOut->dbVgroup->vgId, vgIdList + i) ; + } + } + + return code; + +_exit: + if (pOut->dbVgroup && pOut->dbVgroup->vgId) { + taosArrayDestroy(pOut->dbVgroup->vgId); + pOut->dbVgroup->vgId = NULL; + } + + tfree(pOut->dbVgroup); + tfree(pOut->vgroupList); + + return code; +} + + void msgInit() { queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg; + queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg; //tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp; + queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp; /* tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; -- GitLab