未验证 提交 2c33a51c 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #10111 from taosdata/feature/qnode

Feature/qnode
......@@ -154,8 +154,8 @@ typedef enum _mgmt_table {
typedef struct {
int32_t vgId;
char* dbName;
char* tableFullName;
char* dbFName;
char* tbName;
} SBuildTableMetaInput;
typedef struct {
......@@ -696,8 +696,8 @@ typedef struct {
typedef struct {
SMsgHead header;
char dbFname[TSDB_DB_FNAME_LEN];
char tableFname[TSDB_TABLE_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
} STableInfoReq;
typedef struct {
......@@ -722,9 +722,9 @@ typedef struct {
} SVgroupsInfo;
typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
char stbFname[TSDB_TABLE_FNAME_LEN];
char dbFname[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
int32_t numOfTags;
int32_t numOfColumns;
int8_t precision;
......
......@@ -49,7 +49,7 @@ typedef struct SCatalogCfg {
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
uint32_t dbRentSec;
uint32_t stableRentSec;
uint32_t stbRentSec;
} SCatalogCfg;
typedef struct SSTableMetaVersion {
......@@ -99,7 +99,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo);
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId);
/**
* Get a table's meta data.
......
......@@ -81,16 +81,15 @@ typedef struct STableMeta {
} STableMeta;
typedef struct SDBVgroupInfo {
SRWLatch lock;
uint64_t dbId;
int32_t vgVersion;
int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
} SDBVgroupInfo;
typedef struct SUseDbOutput {
char db[TSDB_DB_FNAME_LEN];
SDBVgroupInfo dbVgroup;
char db[TSDB_DB_FNAME_LEN];
SDBVgroupInfo *dbVgroup;
} SUseDbOutput;
enum {
......@@ -103,8 +102,9 @@ enum {
typedef struct STableMetaOutput {
int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
char ctbName[TSDB_TABLE_NAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
SCTableMeta ctbMeta;
STableMeta *tbMeta;
} STableMetaOutput;
......
......@@ -41,19 +41,14 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) {
SDbVgVersion dbInfo;
strcpy(dbInfo.dbName, rsp->db);
dbInfo.dbId = rsp->uid;
dbInfo.vgVersion = rsp->vgVersion;
code = catalogRemoveDBVgroup(pCatalog, &dbInfo);
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else {
SDBVgroupInfo vgInfo = {0};
vgInfo.dbId = rsp->uid;
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgInfo) {
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgHash) {
tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -67,16 +62,16 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgInfo);
taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgInfo);
taosHashCleanup(vgInfo.vgHash);
}
}
......
......@@ -302,15 +302,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData;
SDbVgVersion dbVer = {0};
struct SCatalog *pCatalog = NULL;
strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName));
dbVer.dbId = be64toh(rsp->uid);
rsp->uid = be64toh(rsp->uid);
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDBVgroup(pCatalog, &dbVer);
catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
tsem_post(&pRequest->body.rspSem);
return code;
......
......@@ -125,7 +125,7 @@ const char* Testbase::GetMetaName(int32_t index) {
int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
const char* Testbase::GetMetaTbName() { return pMeta->tbFname; }
const char* Testbase::GetMetaTbName() { return pMeta->tbName; }
void Testbase::SendShowRetrieveReq() {
int32_t contLen = sizeof(SRetrieveTableReq);
......@@ -144,7 +144,7 @@ void Testbase::SendShowRetrieveReq() {
pos = 0;
}
const char* Testbase::GetShowName() { return pMeta->tbFname; }
const char* Testbase::GetShowName() { return pMeta->tbName; }
int8_t Testbase::GetShowInt8() {
int8_t data = *((int8_t*)(pData + pos));
......
......@@ -426,7 +426,7 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -186,7 +186,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
cols++;
pMeta->numOfColumns = htonl(cols);
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......@@ -196,7 +196,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
pShow->numOfRows = 1;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -1113,7 +1113,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -608,7 +608,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
pShow->numOfRows = TSDB_CONFIG_NUMBER;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......@@ -715,7 +715,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -482,7 +482,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -624,7 +624,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
mndUpdateMnodeRole(pMnode);
return 0;
......
......@@ -623,7 +623,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......@@ -792,7 +792,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = 1000000;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -426,7 +426,7 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -428,7 +428,7 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -723,20 +723,23 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
STableInfoReq *pInfo = pReq->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", pInfo->dbFName, pInfo->tbName);
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->tableFname);
mDebug("stb:%s, start to retrieve meta", tbFName);
SDbObj *pDb = mndAcquireDbByStb(pMnode, tbFName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
mError("stb:%s, failed to retrieve meta since %s", tbFName, terrstr());
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->tableFname);
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
return -1;
}
......@@ -750,11 +753,13 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
return -1;
}
memcpy(pMeta->tbFname, pStb->name, TSDB_TABLE_FNAME_LEN);
strcpy(pMeta->dbFName, pStb->db);
strcpy(pMeta->tbName, pInfo->tbName);
strcpy(pMeta->stbName, pInfo->tbName);
pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision;
......@@ -779,7 +784,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
pReq->pCont = pMeta;
pReq->contLen = contLen;
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", tbFName, pStb->numOfColumns, pStb->numOfTags);
return 0;
}
......@@ -855,7 +860,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pM
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -745,7 +745,7 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname);
mDebug("subscribe:%s, start to retrieve meta", pInfo->tbName);
#if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
......@@ -876,7 +876,7 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRs
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -338,7 +338,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("topic:%s, start to retrieve meta", pInfo->tableFname);
mDebug("topic:%s, start to retrieve meta", pInfo->tbName);
#if 0
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
......@@ -469,7 +469,7 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -474,7 +474,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -525,7 +525,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
}
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......@@ -638,7 +638,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->replica = dnodeId;
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0;
}
......
......@@ -64,7 +64,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
STableMetaRsp* pMeta = test.GetShowMeta();
EXPECT_STREQ(pMeta->tbFname, "show connections");
EXPECT_STREQ(pMeta->tbName, "show connections");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, 7);
EXPECT_EQ(pMeta->precision, 0);
......
......@@ -126,7 +126,8 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
int32_t contLen = sizeof(STableInfoReq);
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
strcpy(pReq->tableFname, "1.d1.stb");
strcpy(pReq->dbFName, "1.d1");
strcpy(pReq->tbName, "stb");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
......@@ -146,8 +147,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pSchema->bytes = htonl(pSchema->bytes);
}
EXPECT_STREQ(pRsp->tbFname, "1.d1.stb");
EXPECT_STREQ(pRsp->stbFname, "");
EXPECT_STREQ(pRsp->dbFName, "1.d1");
EXPECT_STREQ(pRsp->tbName, "stb");
EXPECT_STREQ(pRsp->stbName, "stb");
EXPECT_EQ(pRsp->numOfColumns, 2);
EXPECT_EQ(pRsp->numOfTags, 3);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
......
......@@ -84,7 +84,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int msgLen = 0;
int32_t code = TSDB_CODE_VND_APP_ERROR;
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tbName, &uid);
if (pTbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST;
goto _exit;
......@@ -119,13 +119,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit;
}
memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname));
strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
strcpy(pTbMetaMsg->tbName, pReq->tbName);
if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
strcpy(pTbMetaMsg->stbName, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
} else if (pTbCfg->type == META_SUPER_TABLE) {
strcpy(pTbMetaMsg->stbFname, pTbCfg->name);
strcpy(pTbMetaMsg->stbName, pTbCfg->name);
pTbMetaMsg->suid = htobe64(uid);
}
pTbMetaMsg->numOfTags = htonl(nTagCols);
......
......@@ -47,55 +47,52 @@ enum {
CTG_RENT_STABLE,
};
typedef struct SCTGDebug {
typedef struct SCtgDebug {
int32_t lockDebug;
} SCTGDebug;
} SCtgDebug;
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo
} SVgroupListCache;
typedef struct SCtgTbMetaCache {
SRWLatch stbLock;
SHashObj *cache; //key:tbname, value:STableMeta
SHashObj *stbCache; //key:suid, value:STableMeta*
} SCtgTbMetaCache;
typedef struct SDBVgroupCache {
SHashObj *cache; //key:dbname, value:SDBVgroupInfo
} SDBVgroupCache;
typedef struct SCtgDBCache {
SRWLatch vgLock;
int8_t deleted;
SDBVgroupInfo *vgInfo;
SCtgTbMetaCache tbCache;
} SCtgDBCache;
typedef struct STableMetaCache {
SRWLatch stableLock;
SHashObj *cache; //key:fulltablename, value:STableMeta
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache;
typedef struct SRentSlotInfo {
typedef struct SCtgRentSlot {
SRWLatch lock;
bool needSort;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
} SRentSlotInfo;
} SCtgRentSlot;
typedef struct SMetaRentMgmt {
typedef struct SCtgRentMgmt {
int8_t type;
uint16_t slotNum;
uint16_t slotRIdx;
int64_t lastReadMsec;
SRentSlotInfo *slots;
} SMetaRentMgmt;
SCtgRentSlot *slots;
} SCtgRentMgmt;
typedef struct SCatalog {
uint64_t clusterId;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
SMetaRentMgmt dbRent;
SMetaRentMgmt stableRent;
uint64_t clusterId;
SHashObj *dbCache; //key:dbname, value:SCtgDBCache
SCtgRentMgmt dbRent;
SCtgRentMgmt stbRent;
} SCatalog;
typedef struct SCtgApiStat {
} SCtgApiStat;
typedef struct SCtgResourceStat {
typedef struct SCtgRuntimeStat {
} SCtgResourceStat;
} SCtgRuntimeStat;
typedef struct SCtgCacheStat {
......@@ -103,7 +100,7 @@ typedef struct SCtgCacheStat {
typedef struct SCatalogStat {
SCtgApiStat api;
SCtgResourceStat resource;
SCtgRuntimeStat runtime;
SCtgCacheStat cache;
} SCatalogStat;
......
......@@ -20,31 +20,106 @@
SCatalogMgmt ctgMgmt = {0};
SCTGDebug gCTGDebug = {0};
SCtgDebug gCTGDebug = {0};
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
if (NULL == pCatalog->dbCache.cache) {
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
if (NULL == mgmt->slots) {
return;
}
for (int32_t i = 0; i < mgmt->slotNum; ++i) {
SCtgRentSlot *slot = &mgmt->slots[i];
if (slot->meta) {
taosArrayDestroy(slot->meta);
slot->meta = NULL;
}
}
tfree(mgmt->slots);
}
void ctgFreeTableMetaCache(SCtgTbMetaCache *table) {
CTG_LOCK(CTG_WRITE, &table->stbLock);
if (table->stbCache) {
taosHashCleanup(table->stbCache);
table->stbCache = NULL;
}
CTG_UNLOCK(CTG_WRITE, &table->stbLock);
if (table->cache) {
taosHashCleanup(table->cache);
table->cache = NULL;
}
}
void ctgFreeDbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache) {
return;
}
atomic_store_8(&dbCache->deleted, 1);
SDBVgroupInfo *dbInfo = NULL;
if (dbCache->vgInfo) {
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
if (dbCache->vgInfo->vgHash) {
taosHashCleanup(dbCache->vgInfo->vgHash);
dbCache->vgInfo->vgHash = NULL;
}
tfree(dbCache->vgInfo);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
}
ctgFreeTableMetaCache(&dbCache->tbCache);
}
void ctgFreeHandle(struct SCatalog* pCatalog) {
ctgFreeMetaRent(&pCatalog->dbRent);
ctgFreeMetaRent(&pCatalog->stbRent);
if (pCatalog->dbCache) {
void *pIter = taosHashIterate(pCatalog->dbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCatalog->dbCache, pIter);
}
taosHashCleanup(pCatalog->dbCache);
}
free(pCatalog);
}
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SCtgDBCache **dbCache, bool *inCache) {
if (NULL == pCatalog->dbCache) {
*inCache = false;
ctgWarn("empty db cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
SDBVgroupInfo *info = NULL;
SCtgDBCache *cache = NULL;
while (true) {
info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
cache = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (NULL == info) {
if (NULL == cache) {
*inCache = false;
ctgWarn("not in db vgroup cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &info->lock);
if (NULL == info->vgInfo) {
CTG_UNLOCK(CTG_READ, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
CTG_LOCK(CTG_READ, &cache->vgLock);
if (NULL == cache->vgInfo) {
CTG_UNLOCK(CTG_READ, &cache->vgLock);
taosHashRelease(pCatalog->dbCache, cache);
ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName);
continue;
......@@ -53,7 +128,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S
break;
}
*dbInfo = info;
*dbCache = cache;
*inCache = true;
ctgDebug("Got db vgroup from cache, dbName:%s", dbName);
......@@ -98,48 +173,73 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
return TSDB_CODE_SUCCESS;
}
int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, const char* tbFullName, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, char* tbName, int32_t *exist) {
if (NULL == pCatalog->dbCache) {
*exist = 0;
ctgWarn("empty tablemeta cache, tbName:%s", tbFullName);
ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
}
SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
if (NULL == dbCache) {
*exist = 0;
ctgWarn("db not exist in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
size_t sz = 0;
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
size_t sz = 0;
STableMeta *tbMeta = taosHashGet(dbCache->tbCache.cache, tbName, strlen(tbName));
if (NULL == tbMeta) {
taosHashRelease(pCatalog->dbCache, dbCache);
*exist = 0;
ctgDebug("tablemeta not in cache, tbName:%s", tbFullName);
ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
}
*exist = 1;
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("tablemeta is in cache, tbName:%s", tbFullName);
ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
if (NULL == pCatalog->dbCache) {
*exist = 0;
ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);
ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
*pTableMeta = NULL;
size_t sz = 0;
STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, db, strlen(db));
if (NULL == dbCache) {
*exist = 0;
ctgWarn("no db cache, dbFName:%s, tbName:%s", db, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
if (NULL == dbCache->tbCache.cache) {
*exist = 0;
taosHashRelease(pCatalog->dbCache, dbCache);
ctgWarn("empty tbmeta cache, dbFName:%s, tbName:%s", db, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
size_t sz = 0;
STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.cache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz);
if (NULL == *pTableMeta) {
*exist = 0;
ctgDebug("tablemeta not in cache, tbName:%s", tbFullName);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", db, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -148,80 +248,91 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN
tbMeta = *pTableMeta;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("Got tbmeta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, db, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
ctgError("stable not in stableCache, suid:%"PRIx64, tbMeta->suid);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgError("stable not in stbCache, suid:%"PRIx64, tbMeta->suid);
tfree(*pTableMeta);
*exist = 0;
return TSDB_CODE_SUCCESS;
}
if ((*stbMeta)->suid != tbMeta->suid) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
tfree(*pTableMeta);
ctgError("stable suid in stableCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
*pTableMeta = realloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgError("realloc size[%d] failed", metaSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", db, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) {
if (NULL == pCatalog->tableCache.cache) {
ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);
if (NULL == pCatalog->dbCache) {
ctgWarn("empty db cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
char dbName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbName);
size_t sz = 0;
STableMeta *pTableMeta = NULL;
taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz);
SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (NULL == dbCache) {
ctgInfo("db not in cache, dbFName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.cache, pTableName->tname, strlen(pTableName->tname));
if (NULL == pTableMeta) {
ctgWarn("tablemeta not in cache, tbName:%s", tbFullName);
ctgWarn("tbmeta not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname);
taosHashRelease(pCatalog->dbCache, dbCache);
return TSDB_CODE_SUCCESS;
}
*tbType = pTableMeta->tableType;
*tbType = atomic_load_8(&pTableMeta->tableType);
taosHashRelease(dbCache->tbCache.cache, dbCache);
taosHashRelease(pCatalog->dbCache, dbCache);
ctgDebug("Got tabletype from cache, tbName:%s, type:%d", tbFullName, *tbType);
ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
ctgDebug("try to get table meta from mnode, tbName:%s", tbFullName);
ctgDebug("try to get table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen);
if (code) {
......@@ -242,30 +353,30 @@ int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransport
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NULL(output->metaType);
ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName);
ctgDebug("stablemeta not exist in mnode, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
}
ctgError("error rsp for stablemeta from mnode, code:%s, tbName:%s", tstrerror(rpcRsp.code), tbFullName);
ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tbName);
CTG_ERR_RET(rpcRsp.code);
}
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
ctgError("Process mnode stablemeta rsp failed, code:%x, tbName:%s", code, tbFullName);
ctgError("Process mnode stablemeta rsp failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tbName);
CTG_ERR_RET(code);
}
ctgDebug("Got table meta from mnode, tbName:%s", tbFullName);
ctgDebug("Got table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, tbFullName, output);
return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output);
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
......@@ -273,18 +384,18 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
char dbFullName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFullName);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName);
ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName));
ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)tNameGetTableName(pTableName)};
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
char *msg = NULL;
int32_t msgLen = 0;
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
if (code) {
ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, tNameGetTableName(pTableName));
ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tNameGetTableName(pTableName));
CTG_ERR_RET(code);
}
......@@ -300,21 +411,21 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter,
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
SET_META_TYPE_NULL(output->metaType);
ctgDebug("tablemeta not exist in vnode, tbName:%s", tNameGetTableName(pTableName));
ctgDebug("tablemeta not exist in vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
return TSDB_CODE_SUCCESS;
}
ctgError("error rsp for table meta from vnode, code:%s, tbName:%s", tstrerror(rpcRsp.code), tNameGetTableName(pTableName));
ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tNameGetTableName(pTableName));
CTG_ERR_RET(rpcRsp.code);
}
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
ctgError("Process vnode tablemeta rsp failed, code:%s, tbName:%s", tstrerror(code), tNameGetTableName(pTableName));
ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName, tNameGetTableName(pTableName));
CTG_ERR_RET(code);
}
ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, tNameGetTableName(pTableName));
ctgDebug("Got table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
return TSDB_CODE_SUCCESS;
}
......@@ -334,7 +445,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
SVgroupInfo *vgInfo = NULL;
SArray *vgList = NULL;
int32_t code = 0;
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
......@@ -342,7 +453,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
......@@ -351,7 +462,7 @@ int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(dbInfo->vgInfo, pIter);
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
}
......@@ -374,7 +485,7 @@ _return:
int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t code = 0;
int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
......@@ -386,33 +497,32 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn
tableNameHashFp fp = NULL;
SVgroupInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName);
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));
void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
taosHashCancelIterate(dbInfo->vgInfo, pIter);
taosHashCancelIterate(dbInfo->vgHash, pIter);
break;
}
pIter = taosHashIterate(dbInfo->vgInfo, pIter);
pIter = taosHashIterate(dbInfo->vgHash, pIter);
vgInfo = NULL;
}
if (NULL == vgInfo) {
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgInfo));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
*pVgroup = *vgInfo;
_return:
CTG_RET(code);
}
......@@ -437,12 +547,12 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
}
int32_t ctgMetaRentInit(SMetaRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
mgmt->type = type;
size_t msgSize = sizeof(SRentSlotInfo) * mgmt->slotNum;
size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
mgmt->slots = calloc(1, msgSize);
if (NULL == mgmt->slots) {
......@@ -456,10 +566,10 @@ int32_t ctgMetaRentInit(SMetaRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
}
int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx];
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
......@@ -486,10 +596,10 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx];
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
......@@ -526,10 +636,10 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentRemove(SMetaRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
int16_t widx = abs(id % mgmt->slotNum);
SRentSlotInfo *slot = &mgmt->slots[widx];
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
......@@ -562,14 +672,14 @@ _return:
}
int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) {
ridx %= mgmt->slotNum;
atomic_store_16(&mgmt->slotRIdx, ridx);
}
SRentSlotInfo *slot = &mgmt->slots[ridx];
SCtgRentSlot *slot = &mgmt->slots[ridx];
int32_t code = 0;
CTG_LOCK(CTG_READ, &slot->lock);
......@@ -608,7 +718,7 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
while (true) {
int64_t msec = taosGetTimestampMs();
int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
......@@ -635,52 +745,79 @@ int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t s
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname);
ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == pCatalog->tableCache.cache) {
if (NULL == pCatalog->dbCache) {
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) {
taosHashCleanup(cache);
}
}
while (true) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, output->dbFName, strlen(output->dbFName));
if (dbCache) {
break;
}
SCtgDBCache newDbCache = {0};
if (taosHashPut(pCatalog->dbCache, output->dbFName, strlen(output->dbFName), &newDbCache, sizeof(newDbCache))) {
ctgError("taosHashPut db to cache failed, db:%s", output->dbFName);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
}
if (NULL == dbCache->tbCache.cache) {
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) {
if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.cache, NULL, cache)) {
taosHashCleanup(cache);
}
}
if (NULL == pCatalog->tableCache.stableCache) {
if (NULL == dbCache->tbCache.stbCache) {
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) {
if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) {
taosHashCleanup(cache);
}
}
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("taosHashPut ctablemeta to cache failed, ctbName:%s", output->ctbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
if (taosHashPut(dbCache->tbCache.cache, output->ctbName, strlen(output->ctbName), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("taosHashPut ctbmeta to cache failed, ctbName:%s", output->ctbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
ctgDebug("update child tablemeta to cache, tbName:%s", output->ctbFname);
ctgDebug("ctbmeta updated to cache, ctbName:%s", output->ctbName);
}
if (CTG_IS_META_CTABLE(output->metaType)) {
return TSDB_CODE_SUCCESS;
goto _return;
}
if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
......@@ -689,44 +826,50 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
bool newAdded = false;
SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion};
CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) {
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
ctgError("taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
if (taosHashPutExt(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
STableMeta *tbMeta = taosHashGet(dbCache->tbCache.cache, output->tbName, strlen(output->tbName));
if (taosHashPutExt(dbCache->tbCache.stbCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) {
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, output->tbMeta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
ctgDebug("update stable to cache, suid:%"PRIx64, output->tbMeta->suid);
if (newAdded) {
CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
} else {
CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare));
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare));
}
} else {
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) {
ctgError("taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
}
ctgDebug("update tablemeta to cache, tbName:%s", output->tbFname);
ctgDebug("update tablemeta to cache, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
_return:
if (dbCache) {
taosHashRelease(pCatalog->dbCache, dbCache);
}
CTG_RET(code);
}
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SDBVgroupInfo** dbInfo) {
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SCtgDBCache** dbCache) {
bool inCache = false;
if (!forceUpdate) {
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbCache, &inCache));
if (inCache) {
return TSDB_CODE_SUCCESS;
}
......@@ -742,8 +885,8 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
while (true) {
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup));
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbCache, &inCache));
if (!inCache) {
ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbName);
......@@ -757,66 +900,51 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
}
int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (oldInfo) {
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
if (dbInfo->vgVersion <= oldInfo->vgVersion) {
ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
return TSDB_CODE_SUCCESS;
}
if (oldInfo->vgInfo) {
ctgInfo("cleanup db vgInfo, db:%s", dbName);
taosHashCleanup(oldInfo->vgInfo);
oldInfo->vgInfo = NULL;
}
CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, oldInfo);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target, bool *removed) {
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, bool *removed) {
*removed = false;
SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName));
if (NULL == info) {
ctgInfo("db not exist in dbCache, may be removed, db:%s", target->dbName);
SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (NULL == dbCache) {
ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_WRITE, &info->lock);
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
if (info->dbId != target->dbId) {
ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
if (NULL == dbCache->vgInfo) {
ctgInfo("db vgInfo not in dbCache, may be removed, db:%s, dbId:%"PRIx64, dbName, dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
return TSDB_CODE_SUCCESS;
}
if (dbCache->vgInfo->dbId != dbId) {
ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbName, dbCache->vgInfo->dbId, dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
return TSDB_CODE_SUCCESS;
}
if (info->vgInfo) {
ctgInfo("cleanup db vgInfo, db:%s", target->dbName);
taosHashCleanup(info->vgInfo);
info->vgInfo = NULL;
if (dbCache->vgInfo->vgHash) {
ctgInfo("cleanup db vgInfo, db:%s, dbId:%"PRIx64, dbName, dbId);
taosHashCleanup(dbCache->vgInfo->vgHash);
tfree(dbCache->vgInfo);
}
if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) {
ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName);
CTG_UNLOCK(CTG_WRITE, &info->lock);
taosHashRelease(pCatalog->dbCache.cache, info);
if (taosHashRemove(pCatalog->dbCache, dbName, strlen(dbName))) {
ctgError("taosHashRemove from dbCache failed, db:%s", dbName);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
dbCache->deleted = true;
CTG_UNLOCK(CTG_WRITE, &info->lock);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeTableMetaCache(&dbCache->tbCache);
taosHashRelease(pCatalog->dbCache.cache, info);
taosHashRelease(pCatalog->dbCache, dbCache);
*removed = true;
......@@ -839,7 +967,7 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
STableMetaOutput *output = &voutput;
if (CTG_IS_STABLE(isSTable)) {
ctgDebug("will renew table meta, supposed to be stable, tbName:%s", tNameGetTableName(pTableName));
ctgDebug("will renew tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
// if get from mnode failed, will not try vnode
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));
......@@ -850,15 +978,15 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
output = &moutput;
}
} else {
ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
ctgDebug("will renew tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType);
ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType);
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput));
voutput.metaType = moutput.metaType;
......@@ -867,9 +995,9 @@ int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, con
moutput.tbMeta = NULL;
} else if (CTG_IS_META_BOTH(voutput.metaType)) {
int32_t exist = 0;
CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.tbFname, &exist));
CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.dbFName, voutput.tbName, &exist));
if (0 == exist) {
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput));
if (CTG_IS_META_NULL(moutput.metaType)) {
SET_META_TYPE_NULL(voutput.metaType);
......@@ -934,64 +1062,6 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg
return TSDB_CODE_SUCCESS;
}
void ctgFreeMetaRent(SMetaRentMgmt *mgmt) {
if (NULL == mgmt->slots) {
return;
}
for (int32_t i = 0; i < mgmt->slotNum; ++i) {
SRentSlotInfo *slot = &mgmt->slots[i];
if (slot->meta) {
taosArrayDestroy(slot->meta);
slot->meta = NULL;
}
}
tfree(mgmt->slots);
}
void ctgFreeDbCache(SDBVgroupCache *db) {
if (NULL == db->cache) {
return;
}
SDBVgroupInfo *dbInfo = NULL;
void *pIter = taosHashIterate(db->cache, NULL);
while (pIter) {
dbInfo = pIter;
if (dbInfo->vgInfo) {
taosHashCleanup(dbInfo->vgInfo);
dbInfo->vgInfo = NULL;
}
pIter = taosHashIterate(db->cache, pIter);
}
taosHashCleanup(db->cache);
db->cache = NULL;
}
void ctgFreeTableMetaCache(STableMetaCache *table) {
if (table->stableCache) {
taosHashCleanup(table->stableCache);
table->stableCache = NULL;
}
if (table->cache) {
taosHashCleanup(table->cache);
table->cache = NULL;
}
}
void ctgFreeHandle(struct SCatalog* pCatalog) {
ctgFreeMetaRent(&pCatalog->dbRent);
ctgFreeMetaRent(&pCatalog->stableRent);
ctgFreeDbCache(&pCatalog->dbCache);
ctgFreeTableMetaCache(&pCatalog->tableCache);
free(pCatalog);
}
int32_t catalogInit(SCatalogCfg *cfg) {
if (ctgMgmt.pCluster) {
......@@ -1014,14 +1084,14 @@ int32_t catalogInit(SCatalogCfg *cfg) {
ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
}
if (ctgMgmt.cfg.stableRentSec == 0) {
ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
if (ctgMgmt.cfg.stbRentSec == 0) {
ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
}
} else {
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
}
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
......@@ -1030,7 +1100,7 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stableRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stableRentSec);
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stbRentSec);
return TSDB_CODE_SUCCESS;
}
......@@ -1066,7 +1136,7 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
clusterCtg->clusterId = clusterId;
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stableRent, ctgMgmt.cfg.stableRentSec, CTG_RENT_STABLE));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, ctgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
if (code) {
......@@ -1117,21 +1187,33 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCatalog->dbCache.cache) {
if (NULL == pCatalog->dbCache) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("empty db cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (NULL == dbInfo) {
SCtgDBCache *db = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (NULL == db) {
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("db not in cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
*version = dbInfo->vgVersion;
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
CTG_LOCK(CTG_READ, &db->vgLock);
if (NULL == db->vgInfo) {
CTG_UNLOCK(CTG_READ, &db->vgLock);
*version = CTG_DEFAULT_INVALID_VERSION;
ctgInfo("db not in cache, dbName:%s", dbName);
return TSDB_CODE_SUCCESS;
}
*version = db->vgInfo->vgVersion;
CTG_UNLOCK(CTG_READ, &db->vgLock);
taosHashRelease(pCatalog->dbCache, db);
ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version);
......@@ -1143,29 +1225,31 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SDBVgroupInfo* db = NULL;
SCtgDBCache* dbCache = NULL;
SVgroupInfo *vgInfo = NULL;
int32_t code = 0;
SArray *vgList = NULL;
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &dbCache));
vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
int32_t vgNum = (int32_t)taosHashGetSize(dbCache->vgInfo->vgHash);
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit %d failed", taosHashGetSize(db->vgInfo));
ctgError("taosArrayInit %d failed", vgNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
void *pIter = taosHashIterate(db->vgInfo, NULL);
void *pIter = taosHashIterate(dbCache->vgInfo->vgHash, NULL);
while (pIter) {
vgInfo = pIter;
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
taosHashCancelIterate(dbCache->vgInfo->vgHash, pIter);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
pIter = taosHashIterate(db->vgInfo, pIter);
pIter = taosHashIterate(dbCache->vgInfo->vgHash, pIter);
vgInfo = NULL;
}
......@@ -1173,9 +1257,10 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
vgList = NULL;
_return:
if (db) {
CTG_UNLOCK(CTG_READ, &db->lock);
taosHashRelease(pCatalog->dbCache.cache, db);
if (dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
}
if (vgList) {
......@@ -1194,80 +1279,117 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) {
ctgError("invalid db vgInfo, dbName:%s, vgInfo:%p, vgVersion:%d", dbName, dbInfo->vgInfo, dbInfo->vgVersion);
if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbName:%s, vgHash:%p, vgVersion:%d", dbName, dbInfo->vgHash, dbInfo->vgVersion);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL == pCatalog->dbCache.cache) {
if (NULL == pCatalog->dbCache) {
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == cache) {
ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) {
if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) {
taosHashCleanup(cache);
}
} else {
CTG_ERR_JRET(ctgValidateAndFreeDbInfo(pCatalog, dbName, dbInfo));
}
bool newAdded = false;
dbInfo->lock = 0;
if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
if (dbCache) {
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
if (NULL == dbCache->vgInfo) {
newAdded = true;
dbCache->vgInfo = dbInfo;
} else {
if (dbCache->vgInfo->dbId != dbInfo->dbId) {
ctgMetaRentRemove(&pCatalog->dbRent, dbCache->vgInfo->dbId, ctgDbVgVersionCompare);
newAdded = true;
} else if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
goto _return;
}
if (dbCache->vgInfo->vgHash) {
ctgInfo("cleanup db vgHash, db:%s", dbName);
taosHashCleanup(dbCache->vgInfo->vgHash);
dbCache->vgInfo->vgHash = NULL;
}
tfree(dbCache->vgInfo);
dbCache->vgInfo = dbInfo;
}
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
} else {
SCtgDBCache newDBCache = {0};
newDBCache.vgInfo = dbInfo;
if (taosHashPut(pCatalog->dbCache, dbName, strlen(dbName), &newDBCache, sizeof(newDBCache)) != 0) {
ctgError("taosHashPut db & db vgroup to cache failed, db:%s", dbName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
newAdded = true;
}
dbInfo->vgInfo = NULL;
dbInfo = NULL;
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName));
if (newAdded) {
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion)));
} else {
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
}
ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);
ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, vgVersion.vgVersion);
_return:
if (dbInfo && dbInfo->vgInfo) {
taosHashCleanup(dbInfo->vgInfo);
dbInfo->vgInfo = NULL;
if (dbInfo) {
taosHashCleanup(dbInfo->vgHash);
dbInfo->vgHash = NULL;
tfree(dbInfo);
}
CTG_RET(code);
}
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) {
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId) {
int32_t code = 0;
bool removed = false;
if (NULL == pCatalog || NULL == dbInfo) {
if (NULL == pCatalog || NULL == dbName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCatalog->dbCache.cache) {
if (NULL == pCatalog->dbCache) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo, &removed));
CTG_ERR_RET(ctgValidateAndRemoveDb(pCatalog, dbName, dbId, &removed));
if (!removed) {
return TSDB_CODE_SUCCESS;
}
ctgInfo("db removed from cache, db:%s", dbInfo->dbName);
ctgInfo("db removed from cache, db:%s, uid:%"PRIx64, dbName, dbId);
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbInfo->dbId, ctgDbVgVersionCompare));
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbId, ctgDbVgVersionCompare));
ctgDebug("db removed from rent, db:%s", dbInfo->dbName);
ctgDebug("db removed from rent, db:%s, uid:%"PRIx64, dbName, dbId);
CTG_RET(code);
}
......@@ -1301,7 +1423,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
STableMeta *tbMeta = NULL;
int32_t code = 0;
SVgroupInfo vgroupInfo = {0};
SDBVgroupInfo* dbVgroup = NULL;
SCtgDBCache* dbCache = NULL;
SArray *vgList = NULL;
*pVgroupList = NULL;
......@@ -1310,7 +1432,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbCache));
// REMOEV THIS ....
if (0 == tbMeta->vgId) {
......@@ -1323,10 +1445,10 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
// REMOVE THIS ....
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbCache->vgInfo, pVgroupList));
} else {
int32_t vgId = tbMeta->vgId;
if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
if (NULL == taosHashGetClone(dbCache->vgInfo->vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -1349,9 +1471,9 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
_return:
tfree(tbMeta);
if (dbVgroup) {
CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
if (dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
}
if (vgList) {
......@@ -1364,21 +1486,21 @@ _return:
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
SDBVgroupInfo* dbInfo = NULL;
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbCache->vgInfo, pTableName, pVgroup));
_return:
if (dbInfo) {
CTG_UNLOCK(CTG_READ, &dbInfo->lock);
taosHashRelease(pCatalog->dbCache.cache, dbInfo);
if (dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
taosHashRelease(pCatalog->dbCache, dbCache);
}
CTG_RET(code);
......@@ -1422,6 +1544,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S
return TSDB_CODE_SUCCESS;
_return:
if (pRsp->pTableMeta) {
int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
for (int32_t i = 0; i < aSize; ++i) {
......@@ -1451,7 +1574,7 @@ int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion *
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_RET(ctgMetaRentGet(&pCatalog->stableRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
CTG_RET(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}
int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) {
......
......@@ -128,15 +128,14 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(sn.dbname, "db1");
strcpy(sn.tname, ctgTestSTablename);
char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&cn, tbFullName);
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&cn, db);
strcpy(output->dbFName, db);
SET_META_TYPE_BOTH_TABLE(output->metaType);
strcpy(output->ctbFname, tbFullName);
tNameExtractFullName(&cn, tbFullName);
strcpy(output->tbFname, tbFullName);
strcpy(output->ctbName, cn.tname);
strcpy(output->tbName, sn.tname);
output->ctbMeta.vgId = 9;
output->ctbMeta.tableType = TSDB_CHILD_TABLE;
......@@ -175,10 +174,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(s->name, "tag1s");
}
void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
static int32_t vgVersion = ctgTestVgVersion + 1;
int32_t vgNum = 0;
SVgroupInfo vgInfo = {0};
SDBVgroupInfo *dbVgroup = (SDBVgroupInfo *)calloc(1, sizeof(SDBVgroupInfo));
dbVgroup->vgVersion = vgVersion++;
......@@ -186,7 +186,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
dbVgroup->hashMethod = 0;
dbVgroup->dbId = ctgTestDbId;
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
uint32_t hashUnit = UINT32_MAX / vgNum;
......@@ -203,8 +203,10 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
addr->port = htons(n + 22);
}
taosHashPut(dbVgroup->vgInfo, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
taosHashPut(dbVgroup->vgHash, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
}
*pdbVgroup = dbVgroup;
}
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
......@@ -250,7 +252,8 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename);
strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestTablename);
rspMsg->numOfTags = 0;
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -285,8 +288,9 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestCTablename);
strcpy(rspMsg->stbName, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -327,8 +331,9 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestSTablename);
strcpy(rspMsg->stbName, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -370,8 +375,9 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg,
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
strcpy(rspMsg->dbFName, ctgTestDbname);
sprintf(rspMsg->tbName, "%s_%d", ctgTestSTablename, idx);
sprintf(rspMsg->stbName, "%s_%d", ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1;
......@@ -589,12 +595,12 @@ void *ctgTestGetDbVgroupThread(void *param) {
void *ctgTestSetDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0;
SDBVgroupInfo dbVgroup = {0};
SDBVgroupInfo *dbVgroup = NULL;
int32_t n = 0;
while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
if (code) {
assert(0);
}
......@@ -669,6 +675,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
return NULL;
}
TEST(tableMeta, normalTable) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1062,9 +1069,11 @@ TEST(dbVgroup, getSetDbVgroupCase) {
void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0};
SDBVgroupInfo *dbVgroup = NULL;
SArray *vgList = NULL;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndNormalMeta();
initQueryModuleMsgHandle();
......@@ -1099,7 +1108,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
......@@ -1169,6 +1178,7 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy();
}
TEST(multiThread, ctableMeta) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1178,6 +1188,8 @@ TEST(multiThread, ctableMeta) {
SArray *vgList = NULL;
ctgTestStop = false;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndChildMeta();
initQueryModuleMsgHandle();
......@@ -1212,11 +1224,13 @@ TEST(multiThread, ctableMeta) {
}
ctgTestStop = true;
sleep(1);
sleep(2);
catalogDestroy();
}
TEST(rentTest, allRent) {
struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1;
......@@ -1229,6 +1243,8 @@ TEST(rentTest, allRent) {
SSTableMetaVersion *stable = NULL;
uint32_t num = 0;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
initQueryModuleMsgHandle();
......@@ -1291,4 +1307,4 @@ int main(int argc, char **argv) {
return RUN_ALL_TESTS();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
......@@ -45,11 +45,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
bMsg->header.vgId = htonl(bInput->vgId);
if (bInput->dbName) {
tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname));
if (bInput->dbFName) {
tstrncpy(bMsg->dbFName, bInput->dbFName, tListLen(bMsg->dbFName));
}
tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname));
tstrncpy(bMsg->tbName, bInput->tbName, tListLen(bMsg->tbName));
*msgLen = (int32_t)sizeof(*bMsg);
return TSDB_CODE_SUCCESS;
......@@ -113,12 +113,19 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pOut->dbVgroup.vgVersion = pRsp->vgVersion;
pOut->dbVgroup.hashMethod = pRsp->hashMethod;
pOut->dbVgroup.dbId = pRsp->uid;
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup.vgInfo) {
qError("hash init[%d] failed", pRsp->vgNum);
pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo));
if (NULL == pOut->dbVgroup) {
qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo));
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pOut->dbVgroup->vgVersion = pRsp->vgVersion;
pOut->dbVgroup->hashMethod = pRsp->hashMethod;
pOut->dbVgroup->dbId = pRsp->uid;
pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup->vgHash) {
qError("taosHashInit %d failed", pRsp->vgNum);
tfree(pOut->dbVgroup);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -131,8 +138,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
qError("hash push failed");
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
qError("taosHashPut failed");
goto _return;
}
}
......@@ -142,8 +149,10 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return code;
_return:
if (pOut) {
tfree(pOut->dbVgroup.vgInfo);
taosHashCleanup(pOut->dbVgroup->vgHash);
tfree(pOut->dbVgroup);
}
return code;
......@@ -248,16 +257,13 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
strcpy(pOut->dbFName, pMetaMsg->dbFName);
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
SET_META_TYPE_BOTH_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
} else {
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
}
strcpy(pOut->ctbName, pMetaMsg->tbName);
strcpy(pOut->tbName, pMetaMsg->stbName);
pOut->ctbMeta.vgId = pMetaMsg->vgId;
pOut->ctbMeta.tableType = pMetaMsg->tableType;
......@@ -268,11 +274,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
} else {
SET_META_TYPE_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) {
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
} else {
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
}
strcpy(pOut->tbName, pMetaMsg->tbName);
code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
}
......@@ -291,4 +293,4 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
......@@ -132,7 +132,7 @@ typedef struct SSchJob {
SQueryProfileSummary summary;
} SSchJob;
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
......
......@@ -773,14 +773,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
pErrTask = par;
atomic_add_fetch_32(&par->childReady, 1);
int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1);
SCH_LOCK(SCH_WRITE, &par->lock);
SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source);
SCH_UNLOCK(SCH_WRITE, &par->lock);
if (SCH_TASK_READY_TO_LUNCH(par)) {
if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) {
SCH_ERR_RET(schLaunchTask(pJob, par));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册