提交 b3c24f6e 编写于 作者: D dapan1121

feature/qnode

上级 41b8ea6e
......@@ -74,6 +74,12 @@ int32_t catalogInit(SCatalogCfg *cfg);
*/
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle);
/**
* Free a cluster's all catalog info, usually it's not necessary, until the application is closing.
* no current or future usage should be guaranteed by application
* @param pCatalog (input, NO more usage)
* @return error code
*/
void catalogFreeHandle(struct SCatalog* pCatalog);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
......
......@@ -46,6 +46,7 @@ void taos_cleanup(void) {
taosCloseRef(id);
rpcCleanup();
catalogDestroy();
taosCloseLog();
tscInfo("all local resources released");
......
......@@ -137,9 +137,9 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
pRsp->sversion = htonl(pRsp->sversion);
pRsp->tversion = htonl(pRsp->tversion);
pRsp->suid = htobe64(pRsp->suid);
pRsp->tuid = htobe64(pRsp->tuid);
pRsp->vgId = htobe64(pRsp->vgId);
pRsp->suid = be64toh(pRsp->suid);
pRsp->tuid = be64toh(pRsp->tuid);
pRsp->vgId = be64toh(pRsp->vgId);
for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
SSchema* pSchema = &pRsp->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
......@@ -156,7 +156,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
EXPECT_EQ(pRsp->sversion, 1);
EXPECT_EQ(pRsp->tversion, 0);
EXPECT_GT(pRsp->suid, 0);
EXPECT_EQ(pRsp->tuid, 0);
EXPECT_GT(pRsp->tuid, 0);
EXPECT_EQ(pRsp->vgId, 0);
{
......
......@@ -65,7 +65,7 @@ typedef struct STableMetaCache {
typedef struct SRentSlotInfo {
SRWLatch lock;
bool needSort;
SArray *meta;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
} SRentSlotInfo;
typedef struct SMetaRentMgmt {
......@@ -77,14 +77,34 @@ typedef struct SMetaRentMgmt {
} SMetaRentMgmt;
typedef struct SCatalog {
uint64_t clusterId;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
SMetaRentMgmt dbRent;
SMetaRentMgmt stableRent;
} SCatalog;
typedef struct SCtgApiStat {
} SCtgApiStat;
typedef struct SCtgResourceStat {
} SCtgResourceStat;
typedef struct SCtgCacheStat {
} SCtgCacheStat;
typedef struct SCatalogStat {
SCtgApiStat api;
SCtgResourceStat resource;
SCtgCacheStat cache;
} SCatalogStat;
typedef struct SCatalogMgmt {
SHashObj *pCluster; //key: clusterId, value: SCatalog*
SCatalogStat stat;
SCatalogCfg cfg;
} SCatalogMgmt;
......
......@@ -869,8 +869,63 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg
return TSDB_CODE_SUCCESS;
}
void ctgFreeMetaRent(SMetaRentMgmt *mgmt) {
if (NULL == mgmt->slots) {
return;
}
for (int32_t i = 0; i < mgmt->slotNum; ++i) {
SRentSlotInfo *slot = &mgmt->slots[i];
if (slot->meta) {
taosArrayDestroy(slot->meta);
slot->meta = NULL;
}
}
tfree(mgmt->slots);
}
void ctgFreeDbCache(SDBVgroupCache *db) {
if (NULL == db->cache) {
return;
}
SDBVgroupInfo *dbInfo = NULL;
void *pIter = taosHashIterate(db->cache, NULL);
while (pIter) {
dbInfo = pIter;
if (dbInfo->vgInfo) {
taosHashCleanup(dbInfo->vgInfo);
dbInfo->vgInfo = NULL;
}
pIter = taosHashIterate(db->cache, pIter);
}
taosHashCleanup(db->cache);
db->cache = NULL;
}
void ctgFreeTableMetaCache(STableMetaCache *table) {
if (table->stableCache) {
taosHashCleanup(table->stableCache);
table->stableCache = NULL;
}
if (table->cache) {
taosHashCleanup(table->cache);
table->cache = NULL;
}
}
void ctgFreeHandle(struct SCatalog* pCatalog) {
//TODO
ctgFreeMetaRent(&pCatalog->dbRent);
ctgFreeMetaRent(&pCatalog->stableRent);
ctgFreeDbCache(&pCatalog->dbCache);
ctgFreeTableMetaCache(&pCatalog->tableCache);
free(pCatalog);
}
int32_t catalogInit(SCatalogCfg *cfg) {
......@@ -943,6 +998,8 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
clusterCtg->clusterId = clusterId;
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stableRent, ctgMgmt.cfg.stableRentSec, CTG_RENT_STABLE));
......@@ -978,7 +1035,16 @@ void catalogFreeHandle(struct SCatalog* pCatalog) {
return;
}
if (taosHashRemove(ctgMgmt.pCluster, &pCatalog->clusterId, sizeof(pCatalog->clusterId))) {
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCatalog->clusterId);
return;
}
uint64_t clusterId = pCatalog->clusterId;
ctgFreeHandle(pCatalog);
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
}
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
......@@ -1306,11 +1372,25 @@ int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint
void catalogDestroy(void) {
if (ctgMgmt.pCluster) {
taosHashCleanup(ctgMgmt.pCluster); //TBD
ctgMgmt.pCluster = NULL;
if (NULL == ctgMgmt.pCluster) {
return;
}
SCatalog *pCatalog = NULL;
void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL);
while (pIter) {
pCatalog = *(SCatalog **)pIter;
if (pCatalog) {
catalogFreeHandle(pCatalog);
}
pIter = taosHashIterate(ctgMgmt.pCluster, pIter);
}
taosHashCleanup(ctgMgmt.pCluster);
ctgMgmt.pCluster = NULL;
qInfo("catalog destroyed");
}
......
......@@ -16,3 +16,8 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/catalog/inc"
)
add_test(
NAME catalogTest
COMMAND catalogTest
)
......@@ -42,11 +42,13 @@ extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMeta
void ctgTestSetPrepareTableMeta();
void ctgTestSetPrepareCTableMeta();
void ctgTestSetPrepareSTableMeta();
void ctgTestSetPrepareMultiSTableMeta();
bool ctgTestStop = false;
bool ctgTestEnableSleep = false;
bool ctgTestDeadLoop = false;
int32_t ctgTestPrintNum = 200000;
int32_t ctgTestMTRunSec = 30;
int32_t ctgTestCurrentVgVersion = 0;
int32_t ctgTestVgVersion = 1;
......@@ -55,6 +57,8 @@ int32_t ctgTestColNum = 2;
int32_t ctgTestTagNum = 1;
int32_t ctgTestSVersion = 1;
int32_t ctgTestTVersion = 1;
int32_t ctgTestSuid = 2;
int64_t ctgTestDbId = 33;
uint64_t ctgTestClusterId = 0x1;
char *ctgTestDbname = "1.db1";
......@@ -183,6 +187,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
ctgTestCurrentVgVersion = dbVgroup->vgVersion;
dbVgroup->hashMethod = 0;
dbVgroup->dbId = ctgTestDbId;
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
......@@ -216,7 +221,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
ctgTestCurrentVgVersion = ctgTestVgVersion;
rspMsg->vgNum = htonl(ctgTestVgNum);
rspMsg->hashMethod = 0;
rspMsg->uid = htobe64(3);
rspMsg->uid = htobe64(ctgTestDbId);
SVgroupInfo *vg = NULL;
uint32_t hashUnit = UINT32_MAX / ctgTestVgNum;
......@@ -339,8 +344,52 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
rspMsg->update = 1;
rspMsg->sversion = htonl(ctgTestSVersion);
rspMsg->tversion = htonl(ctgTestTVersion);
rspMsg->suid = htobe64(0x0000000000000002);
rspMsg->tuid = htobe64(0x0000000000000003);
rspMsg->suid = htobe64(ctgTestSuid);
rspMsg->tuid = htobe64(ctgTestSuid);
rspMsg->vgId = 0;
SSchema *s = NULL;
s = &rspMsg->pSchema[0];
s->type = TSDB_DATA_TYPE_TIMESTAMP;
s->colId = htonl(1);
s->bytes = htonl(8);
strcpy(s->name, "ts");
s = &rspMsg->pSchema[1];
s->type = TSDB_DATA_TYPE_INT;
s->colId = htonl(2);
s->bytes = htonl(4);
strcpy(s->name, "col1s");
s = &rspMsg->pSchema[2];
s->type = TSDB_DATA_TYPE_BINARY;
s->colId = htonl(3);
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
return;
}
void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
static int32_t idx = 1;
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
rspMsg->tableType = TSDB_SUPER_TABLE;
rspMsg->update = 1;
rspMsg->sversion = htonl(ctgTestSVersion);
rspMsg->tversion = htonl(ctgTestTVersion);
rspMsg->suid = htobe64(ctgTestSuid + idx);
rspMsg->tuid = htobe64(ctgTestSuid + idx);
rspMsg->vgId = 0;
SSchema *s = NULL;
......@@ -362,10 +411,13 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
s->bytes = htonl(12);
strcpy(s->name, "tag1s");
++idx;
return;
}
void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
......@@ -391,6 +443,14 @@ void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg
return;
}
void ctgTestPrepareDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
ctgTestSetPrepareMultiSTableMeta();
return;
}
void ctgTestSetPrepareDbVgroups() {
......@@ -445,6 +505,20 @@ void ctgTestSetPrepareSTableMeta() {
}
}
void ctgTestSetPrepareMultiSTableMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareMultiSTableMeta);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, ctgTestPrepareMultiSTableMeta);
}
}
}
void ctgTestSetPrepareDbVgroupsAndNormalMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta);
......@@ -485,6 +559,19 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
}
}
void ctgTestSetPrepareDbVgroupsAndMultiSuperMeta() {
static Stub stub;
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
for (const auto& f : result) {
stub.set(f.second, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
}
}
}
}
......@@ -817,6 +904,8 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid);
ASSERT_EQ(tableMeta->suid, ctgTestSuid);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
......@@ -1094,7 +1183,7 @@ TEST(multiThread, getSetDbVgroupCase) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
sleep(ctgTestMTRunSec);
break;
}
}
......@@ -1142,7 +1231,7 @@ TEST(multiThread, ctableMeta) {
if (ctgTestDeadLoop) {
sleep(1);
} else {
sleep(600);
sleep(ctgTestMTRunSec);
break;
}
}
......@@ -1154,6 +1243,78 @@ TEST(multiThread, ctableMeta) {
}
TEST(rentTest, allRent) {
struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SArray *vgList = NULL;
ctgTestStop = false;
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stable = NULL;
uint32_t num = 0;
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
initQueryModuleMsgHandle();
int32_t code = catalogInit(NULL);
ASSERT_EQ(code, 0);
code = catalogGetHandle(ctgTestClusterId, &pCtg);
ASSERT_EQ(code, 0);
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
strcpy(n.dbname, "db1");
for (int32_t i = 1; i <= 10; ++i) {
sprintf(n.tname, "%s_%d", ctgTestSTablename, i);
STableMeta *tableMeta = NULL;
code = catalogGetSTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
ASSERT_EQ(code, 0);
ASSERT_EQ(tableMeta->vgId, 0);
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
ASSERT_EQ(tableMeta->uid, ctgTestSuid + i);
ASSERT_EQ(tableMeta->suid, ctgTestSuid + i);
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
code = catalogGetExpiredDBs(pCtg, &dbs, &num);
ASSERT_EQ(code, 0);
printf("%d - expired dbNum:%d\n", i, num);
if (dbs) {
printf("%d - expired dbId:%"PRId64", vgVersion:%d\n", i, dbs->dbId, dbs->vgVersion);
free(dbs);
dbs = NULL;
}
code = catalogGetExpiredSTables(pCtg, &stable, &num);
ASSERT_EQ(code, 0);
printf("%d - expired stableNum:%d\n", i, num);
if (stable) {
for (int32_t n = 0; n < num; ++n) {
printf("suid:%"PRId64", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion);
}
free(stable);
stable = NULL;
}
printf("*************************************************\n");
sleep(2);
}
catalogDestroy();
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
......
......@@ -1102,6 +1102,7 @@ void scheduleFreeJob(void *pJob) {
taosHashCleanup(job->failTasks);
taosHashCleanup(job->succTasks);
taosArrayDestroy(job->levels);
tfree(job);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册