From b3c24f6e8f6c1a3b11778e25210811798bbd3143 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 14:38:05 +0800 Subject: [PATCH] feature/qnode --- include/libs/catalog/catalog.h | 6 + source/client/src/clientMain.c | 1 + source/dnode/mgmt/impl/test/stb/stb.cpp | 8 +- source/libs/catalog/inc/catalogInt.h | 26 +++- source/libs/catalog/src/catalog.c | 88 ++++++++++- source/libs/catalog/test/CMakeLists.txt | 5 + source/libs/catalog/test/catalogTests.cpp | 171 +++++++++++++++++++++- source/libs/scheduler/src/scheduler.c | 1 + 8 files changed, 290 insertions(+), 16 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 6f9af4d092..923a726bf8 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -74,6 +74,12 @@ int32_t catalogInit(SCatalogCfg *cfg); */ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle); +/** + * Free a cluster's all catalog info, usually it's not necessary, until the application is closing. + * no current or future usage should be guaranteed by application + * @param pCatalog (input, NO more usage) + * @return error code + */ void catalogFreeHandle(struct SCatalog* pCatalog); int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 2b875b3eb5..dffe178524 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -46,6 +46,7 @@ void taos_cleanup(void) { taosCloseRef(id); rpcCleanup(); + catalogDestroy(); taosCloseLog(); tscInfo("all local resources released"); diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mgmt/impl/test/stb/stb.cpp index b3d5d29785..d3362c7a9b 100644 --- a/source/dnode/mgmt/impl/test/stb/stb.cpp +++ b/source/dnode/mgmt/impl/test/stb/stb.cpp @@ -137,9 +137,9 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { pRsp->numOfColumns = htonl(pRsp->numOfColumns); pRsp->sversion = htonl(pRsp->sversion); pRsp->tversion = htonl(pRsp->tversion); - pRsp->suid = htobe64(pRsp->suid); - pRsp->tuid = htobe64(pRsp->tuid); - pRsp->vgId = htobe64(pRsp->vgId); + pRsp->suid = be64toh(pRsp->suid); + pRsp->tuid = be64toh(pRsp->tuid); + pRsp->vgId = be64toh(pRsp->vgId); for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) { SSchema* pSchema = &pRsp->pSchema[i]; pSchema->colId = htonl(pSchema->colId); @@ -156,7 +156,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { EXPECT_EQ(pRsp->sversion, 1); EXPECT_EQ(pRsp->tversion, 0); EXPECT_GT(pRsp->suid, 0); - EXPECT_EQ(pRsp->tuid, 0); + EXPECT_GT(pRsp->tuid, 0); EXPECT_EQ(pRsp->vgId, 0); { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 439bb21189..91a9c5248c 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -65,7 +65,7 @@ typedef struct STableMetaCache { typedef struct SRentSlotInfo { SRWLatch lock; bool needSort; - SArray *meta; + SArray *meta; // element is SDbVgVersion or SSTableMetaVersion } SRentSlotInfo; typedef struct SMetaRentMgmt { @@ -77,15 +77,35 @@ typedef struct SMetaRentMgmt { } SMetaRentMgmt; typedef struct SCatalog { + uint64_t clusterId; SDBVgroupCache dbCache; STableMetaCache tableCache; SMetaRentMgmt dbRent; SMetaRentMgmt stableRent; } SCatalog; +typedef struct SCtgApiStat { + +} SCtgApiStat; + +typedef struct SCtgResourceStat { + +} SCtgResourceStat; + +typedef struct SCtgCacheStat { + +} SCtgCacheStat; + +typedef struct SCatalogStat { + SCtgApiStat api; + SCtgResourceStat resource; + SCtgCacheStat cache; +} SCatalogStat; + typedef struct SCatalogMgmt { - SHashObj *pCluster; //key: clusterId, value: SCatalog* - SCatalogCfg cfg; + SHashObj *pCluster; //key: clusterId, value: SCatalog* + SCatalogStat stat; + SCatalogCfg cfg; } SCatalogMgmt; typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 08c9752cc3..49b93d7875 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -869,8 +869,63 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg return TSDB_CODE_SUCCESS; } +void ctgFreeMetaRent(SMetaRentMgmt *mgmt) { + if (NULL == mgmt->slots) { + return; + } + + for (int32_t i = 0; i < mgmt->slotNum; ++i) { + SRentSlotInfo *slot = &mgmt->slots[i]; + if (slot->meta) { + taosArrayDestroy(slot->meta); + slot->meta = NULL; + } + } + + tfree(mgmt->slots); +} + +void ctgFreeDbCache(SDBVgroupCache *db) { + if (NULL == db->cache) { + return; + } + + SDBVgroupInfo *dbInfo = NULL; + void *pIter = taosHashIterate(db->cache, NULL); + while (pIter) { + dbInfo = pIter; + + if (dbInfo->vgInfo) { + taosHashCleanup(dbInfo->vgInfo); + dbInfo->vgInfo = NULL; + } + + pIter = taosHashIterate(db->cache, pIter); + } + + taosHashCleanup(db->cache); + db->cache = NULL; +} + +void ctgFreeTableMetaCache(STableMetaCache *table) { + if (table->stableCache) { + taosHashCleanup(table->stableCache); + table->stableCache = NULL; + } + + if (table->cache) { + taosHashCleanup(table->cache); + table->cache = NULL; + } +} + void ctgFreeHandle(struct SCatalog* pCatalog) { - //TODO + ctgFreeMetaRent(&pCatalog->dbRent); + ctgFreeMetaRent(&pCatalog->stableRent); + ctgFreeDbCache(&pCatalog->dbCache); + ctgFreeTableMetaCache(&pCatalog->tableCache); + + free(pCatalog); } int32_t catalogInit(SCatalogCfg *cfg) { @@ -943,6 +998,8 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) { CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } + clusterCtg->clusterId = clusterId; + CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB)); CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stableRent, ctgMgmt.cfg.stableRentSec, CTG_RENT_STABLE)); @@ -977,8 +1034,17 @@ void catalogFreeHandle(struct SCatalog* pCatalog) { if (NULL == pCatalog) { return; } + + if (taosHashRemove(ctgMgmt.pCluster, &pCatalog->clusterId, sizeof(pCatalog->clusterId))) { + ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCatalog->clusterId); + return; + } + + uint64_t clusterId = pCatalog->clusterId; ctgFreeHandle(pCatalog); + + ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); } int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { @@ -1306,10 +1372,24 @@ int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint void catalogDestroy(void) { - if (ctgMgmt.pCluster) { - taosHashCleanup(ctgMgmt.pCluster); //TBD - ctgMgmt.pCluster = NULL; + if (NULL == ctgMgmt.pCluster) { + return; + } + + SCatalog *pCatalog = NULL; + void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL); + while (pIter) { + pCatalog = *(SCatalog **)pIter; + + if (pCatalog) { + catalogFreeHandle(pCatalog); + } + + pIter = taosHashIterate(ctgMgmt.pCluster, pIter); } + + taosHashCleanup(ctgMgmt.pCluster); + ctgMgmt.pCluster = NULL; qInfo("catalog destroyed"); } diff --git a/source/libs/catalog/test/CMakeLists.txt b/source/libs/catalog/test/CMakeLists.txt index 3c7418bdcc..d12e0f310c 100644 --- a/source/libs/catalog/test/CMakeLists.txt +++ b/source/libs/catalog/test/CMakeLists.txt @@ -16,3 +16,8 @@ TARGET_INCLUDE_DIRECTORIES( PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/" PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc" ) + +add_test( + NAME catalogTest + COMMAND catalogTest +) diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 68e962795c..49e3ef532f 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -42,11 +42,13 @@ extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMeta void ctgTestSetPrepareTableMeta(); void ctgTestSetPrepareCTableMeta(); void ctgTestSetPrepareSTableMeta(); +void ctgTestSetPrepareMultiSTableMeta(); bool ctgTestStop = false; bool ctgTestEnableSleep = false; bool ctgTestDeadLoop = false; int32_t ctgTestPrintNum = 200000; +int32_t ctgTestMTRunSec = 30; int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestVgVersion = 1; @@ -55,6 +57,8 @@ int32_t ctgTestColNum = 2; int32_t ctgTestTagNum = 1; int32_t ctgTestSVersion = 1; int32_t ctgTestTVersion = 1; +int32_t ctgTestSuid = 2; +int64_t ctgTestDbId = 33; uint64_t ctgTestClusterId = 0x1; char *ctgTestDbname = "1.db1"; @@ -183,6 +187,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) { ctgTestCurrentVgVersion = dbVgroup->vgVersion; dbVgroup->hashMethod = 0; + dbVgroup->dbId = ctgTestDbId; dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion); @@ -216,7 +221,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM ctgTestCurrentVgVersion = ctgTestVgVersion; rspMsg->vgNum = htonl(ctgTestVgNum); rspMsg->hashMethod = 0; - rspMsg->uid = htobe64(3); + rspMsg->uid = htobe64(ctgTestDbId); SVgroupInfo *vg = NULL; uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; @@ -339,8 +344,8 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc rspMsg->update = 1; rspMsg->sversion = htonl(ctgTestSVersion); rspMsg->tversion = htonl(ctgTestTVersion); - rspMsg->suid = htobe64(0x0000000000000002); - rspMsg->tuid = htobe64(0x0000000000000003); + rspMsg->suid = htobe64(ctgTestSuid); + rspMsg->tuid = htobe64(ctgTestSuid); rspMsg->vgId = 0; SSchema *s = NULL; @@ -366,6 +371,53 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc return; } +void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + STableMetaMsg *rspMsg = NULL; //todo + static int32_t idx = 1; + + pRsp->code =0; + pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (STableMetaMsg *)pRsp->pCont; + sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx); + sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx); + rspMsg->numOfTags = htonl(ctgTestTagNum); + rspMsg->numOfColumns = htonl(ctgTestColNum); + rspMsg->precision = 1; + rspMsg->tableType = TSDB_SUPER_TABLE; + rspMsg->update = 1; + rspMsg->sversion = htonl(ctgTestSVersion); + rspMsg->tversion = htonl(ctgTestTVersion); + rspMsg->suid = htobe64(ctgTestSuid + idx); + rspMsg->tuid = htobe64(ctgTestSuid + idx); + rspMsg->vgId = 0; + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = htonl(1); + s->bytes = htonl(8); + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = htonl(2); + s->bytes = htonl(4); + strcpy(s->name, "col1s"); + + s = &rspMsg->pSchema[2]; + s->type = TSDB_DATA_TYPE_BINARY; + s->colId = htonl(3); + s->bytes = htonl(12); + strcpy(s->name, "tag1s"); + + ++idx; + + return; +} + + + void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); @@ -391,6 +443,14 @@ void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg return; } +void ctgTestPrepareDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareMultiSTableMeta(); + + return; +} + void ctgTestSetPrepareDbVgroups() { @@ -445,6 +505,20 @@ void ctgTestSetPrepareSTableMeta() { } } +void ctgTestSetPrepareMultiSTableMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareMultiSTableMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareMultiSTableMeta); + } + } +} + + void ctgTestSetPrepareDbVgroupsAndNormalMeta() { static Stub stub; stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta); @@ -485,6 +559,19 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() { } } +void ctgTestSetPrepareDbVgroupsAndMultiSuperMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndMultiSuperMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroupsAndMultiSuperMeta); + } + } +} + } @@ -817,6 +904,8 @@ TEST(tableMeta, superTableCase) { ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid); + ASSERT_EQ(tableMeta->suid, ctgTestSuid); ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); ASSERT_EQ(tableMeta->tableInfo.precision, 1); @@ -1094,7 +1183,7 @@ TEST(multiThread, getSetDbVgroupCase) { if (ctgTestDeadLoop) { sleep(1); } else { - sleep(600); + sleep(ctgTestMTRunSec); break; } } @@ -1142,7 +1231,7 @@ TEST(multiThread, ctableMeta) { if (ctgTestDeadLoop) { sleep(1); } else { - sleep(600); + sleep(ctgTestMTRunSec); break; } } @@ -1154,6 +1243,78 @@ TEST(multiThread, ctableMeta) { } +TEST(rentTest, allRent) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SVgroupInfo *pvgInfo = NULL; + SDBVgroupInfo dbVgroup = {0}; + SArray *vgList = NULL; + ctgTestStop = false; + SDbVgVersion *dbs = NULL; + SSTableMetaVersion *stable = NULL; + uint32_t num = 0; + + ctgTestSetPrepareDbVgroupsAndMultiSuperMeta(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + + for (int32_t i = 1; i <= 10; ++i) { + sprintf(n.tname, "%s_%d", ctgTestSTablename, i); + + STableMeta *tableMeta = NULL; + code = catalogGetSTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid + i); + ASSERT_EQ(tableMeta->suid, ctgTestSuid + i); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + code = catalogGetExpiredDBs(pCtg, &dbs, &num); + ASSERT_EQ(code, 0); + printf("%d - expired dbNum:%d\n", i, num); + if (dbs) { + printf("%d - expired dbId:%"PRId64", vgVersion:%d\n", i, dbs->dbId, dbs->vgVersion); + free(dbs); + dbs = NULL; + } + + code = catalogGetExpiredSTables(pCtg, &stable, &num); + ASSERT_EQ(code, 0); + printf("%d - expired stableNum:%d\n", i, num); + if (stable) { + for (int32_t n = 0; n < num; ++n) { + printf("suid:%"PRId64", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion); + } + free(stable); + stable = NULL; + } + printf("*************************************************\n"); + + sleep(2); + } + + catalogDestroy(); +} + + + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 23b52741af..df4f121773 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1102,6 +1102,7 @@ void scheduleFreeJob(void *pJob) { taosHashCleanup(job->failTasks); taosHashCleanup(job->succTasks); taosArrayDestroy(job->levels); + tfree(job); } -- GitLab