未验证 提交 2861534a 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #13781 from taosdata/feature/cacheIndex

feat: cache table index
......@@ -1134,14 +1134,16 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
void tFreeSTableIndexRsp(void *info);
typedef struct {
SArray* pArray; // Array of STableMetaRsp
} STableMetaBatchRsp;
SArray* pMetaRsp; // Array of STableMetaRsp
SArray* pIndexRsp; // Array of STableIndexRsp;
} SSTbHbRsp;
int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp);
int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
int32_t tDeserializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
void tFreeSSTbHbRsp(SSTbHbRsp* pRsp);
typedef struct {
int32_t numOfTables;
......@@ -2502,7 +2504,11 @@ typedef struct {
} STableIndexInfo;
typedef struct {
SArray* pIndex;
char tbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t suid;
int32_t version;
SArray* pIndex;
} STableIndexRsp;
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
......
......@@ -98,14 +98,15 @@ typedef struct SCatalogCfg {
uint32_t stbRentSec;
} SCatalogCfg;
typedef struct SSTableMetaVersion {
typedef struct SSTableVersion {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid;
int16_t sversion;
int16_t tversion;
} SSTableMetaVersion;
int16_t tversion;
int32_t smaVer;
} SSTableVersion;
typedef struct SDbVgVersion {
char dbFName[TSDB_DB_FNAME_LEN];
......@@ -267,7 +268,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion** stables, uint32_t* num);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num);
......@@ -279,6 +280,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp);
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
......
......@@ -99,15 +99,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0;
STableMetaBatchRsp batchMetaRsp = {0};
if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) {
SSTbHbRsp hbRsp = {0};
if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i);
int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
if (rsp->numOfColumns < 0) {
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
......@@ -116,7 +116,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
tFreeSTableMetaBatchRsp(&batchMetaRsp);
tFreeSSTbHbRsp(&hbRsp);
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -124,7 +124,17 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
}
}
tFreeSTableMetaBatchRsp(&batchMetaRsp);
int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
catalogUpdateTableIndex(pCatalog, rsp);
}
taosArrayDestroy(hbRsp.pIndexRsp);
hbRsp.pIndexRsp = NULL;
tFreeSSTbHbRsp(&hbRsp);
return TSDB_CODE_SUCCESS;
}
......@@ -455,7 +465,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
}
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SSTableMetaVersion *stbs = NULL;
SSTableVersion *stbs = NULL;
uint32_t stbNum = 0;
int32_t code = 0;
......@@ -469,15 +479,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
}
for (int32_t i = 0; i < stbNum; ++i) {
SSTableMetaVersion *stb = &stbs[i];
SSTableVersion *stb = &stbs[i];
stb->suid = htobe64(stb->suid);
stb->sversion = htons(stb->sversion);
stb->tversion = htons(stb->tversion);
stb->smaVer = htonl(stb->smaVer);
}
SKv kv = {
.key = HEARTBEAT_KEY_STBINFO,
.valueLen = sizeof(SSTableMetaVersion) * stbNum,
.valueLen = sizeof(SSTableVersion) * stbNum,
.value = stbs,
};
......
......@@ -2438,6 +2438,10 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tbName) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
......@@ -2472,6 +2476,10 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
......@@ -2631,18 +2639,35 @@ int32_t tSerializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp)
return tlen;
}
int32_t tSerializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatchRsp *pRsp) {
int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i);
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
if (tEncodeI32(&encoder, numOfMeta) < 0) return -1;
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
if (tEncodeSTableMetaRsp(&encoder, pMetaRsp) < 0) return -1;
}
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
if (tEncodeI32(&encoder, numOfIndex) < 0) return -1;
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
if (tEncodeCStr(&encoder, pIndexRsp->tbName) < 0) return -1;
if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1;
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2662,26 +2687,58 @@ int32_t tDeserializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp
return 0;
}
int32_t tDeserializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatchRsp *pRsp) {
int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(STableMetaRsp));
if (pRsp->pArray == NULL) {
int32_t numOfMeta = 0;
if (tDecodeI32(&decoder, &numOfMeta) < 0) return -1;
pRsp->pMetaRsp = taosArrayInit(numOfMeta, sizeof(STableMetaRsp));
if (pRsp->pMetaRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfBatch; ++i) {
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp tableMetaRsp = {0};
if (tDecodeSTableMetaRsp(&decoder, &tableMetaRsp) < 0) return -1;
taosArrayPush(pRsp->pArray, &tableMetaRsp);
taosArrayPush(pRsp->pMetaRsp, &tableMetaRsp);
}
int32_t numOfIndex = 0;
if (tDecodeI32(&decoder, &numOfIndex) < 0) return -1;
pRsp->pIndexRsp = taosArrayInit(numOfIndex, sizeof(STableIndexRsp));
if (pRsp->pIndexRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp tableIndexRsp = {0};
if (tDecodeCStrTo(&decoder, tableIndexRsp.tbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1;
if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
tableIndexRsp.pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == tableIndexRsp.pIndex) return -1;
STableIndexInfo info;
for (int32_t i = 0; i < num; ++i) {
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
if (NULL == taosArrayPush(tableIndexRsp.pIndex, &info)) {
taosMemoryFree(info.expr);
return -1;
}
}
}
taosArrayPush(pRsp->pIndexRsp, &tableIndexRsp);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -2690,14 +2747,32 @@ int32_t tDeserializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatc
void tFreeSTableMetaRsp(STableMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pSchemas); }
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp *pRsp) {
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
for (int32_t i = 0; i < numOfBatch; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i);
void tFreeSTableIndexRsp(void *info) {
if (NULL == info) {
return;
}
STableIndexRsp *pInfo = (STableIndexRsp *)info;
taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
}
void tFreeSSTbHbRsp(SSTbHbRsp *pRsp) {
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
tFreeSTableMetaRsp(pMetaRsp);
}
taosArrayDestroy(pRsp->pArray);
taosArrayDestroy(pRsp->pMetaRsp);
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
tFreeSTableIndexRsp(pIndexRsp);
}
taosArrayDestroy(pRsp->pIndexRsp);
}
int32_t tSerializeSShowRsp(void *buf, int32_t bufLen, SShowRsp *pRsp) {
......
......@@ -26,6 +26,7 @@ int32_t mndInitSma(SMnode *pMnode);
void mndCleanupSma(SMnode *pMnode);
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma);
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
#ifdef __cplusplus
}
......
......@@ -27,7 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp,
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen);
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
......
......@@ -433,7 +433,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
......
......@@ -532,6 +532,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
......@@ -899,18 +900,31 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return code;
}
static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) {
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
int32_t code = 0;
SSmaObj *pSma = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
STableIndexInfo info;
SStbObj* pStb = mndAcquireStb(pMnode, tbFName);
if (NULL == pStb) {
*exist = false;
return TSDB_CODE_SUCCESS;
}
strcpy(rsp->dbFName, pStb->db);
strcpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1);
rsp->suid = pStb->uid;
rsp->version = pStb->smaVer;
mndReleaseStb(pMnode, pStb);
while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
if (pSma->stb[0] != indexReq->tbFName[0] || strcmp(pSma->stb, indexReq->tbFName)) {
if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
continue;
}
......@@ -1022,7 +1036,7 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
goto _OVER;
}
code = mndGetTableSma(pMnode, &indexReq, &rsp, &exist);
code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
if (code) {
goto _OVER;
}
......@@ -1114,4 +1128,4 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
\ No newline at end of file
}
......@@ -27,6 +27,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndSma.h"
#include "tname.h"
#define STB_VER_NUMBER 1
......@@ -1271,7 +1272,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
return 0;
}
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
......@@ -1288,6 +1289,10 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
return -1;
}
if (smaVer) {
*smaVer = pStb->smaVer;
}
int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
......@@ -1634,7 +1639,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
}
} else {
mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) {
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp, NULL) != 0) {
goto _OVER;
}
}
......@@ -1667,51 +1672,86 @@ _OVER:
return code;
}
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen) {
STableMetaBatchRsp batchMetaRsp = {0};
batchMetaRsp.pArray = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
if (batchMetaRsp.pArray == NULL) {
SSTbHbRsp hbRsp = {0};
hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
if (hbRsp.pMetaRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
if (NULL == hbRsp.pIndexRsp) {
taosArrayDestroy(hbRsp.pMetaRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfStbs; ++i) {
SSTableMetaVersion *pStbVersion = &pStbVersions[i];
SSTableVersion *pStbVersion = &pStbVersions[i];
pStbVersion->suid = be64toh(pStbVersion->suid);
pStbVersion->sversion = ntohs(pStbVersion->sversion);
pStbVersion->tversion = ntohs(pStbVersion->tversion);
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
STableMetaRsp metaRsp = {0};
int32_t smaVer = 0;
mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
metaRsp.numOfColumns = -1;
metaRsp.suid = pStbVersion->suid;
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
continue;
}
if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp);
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
} else {
tFreeSTableMetaRsp(&metaRsp);
}
if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) {
bool exist = false;
char tbFName[TSDB_TABLE_FNAME_LEN];
STableIndexRsp indexRsp = {0};
indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
if (NULL == indexRsp.pIndex) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
if (code || !exist) {
indexRsp.suid = pStbVersion->suid;
indexRsp.version = -1;
indexRsp.pIndex = NULL;
}
strcpy(indexRsp.dbFName, pStbVersion->dbFName);
strcpy(indexRsp.tbName, pStbVersion->stbName);
taosArrayPush(hbRsp.pIndexRsp, &indexRsp);
}
}
int32_t rspLen = tSerializeSTableMetaBatchRsp(NULL, 0, &batchMetaRsp);
int32_t rspLen = tSerializeSSTbHbRsp(NULL, 0, &hbRsp);
if (rspLen < 0) {
tFreeSTableMetaBatchRsp(&batchMetaRsp);
tFreeSSTbHbRsp(&hbRsp);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = taosMemoryMalloc(rspLen);
if (pRsp == NULL) {
tFreeSTableMetaBatchRsp(&batchMetaRsp);
tFreeSSTbHbRsp(&hbRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSTableMetaBatchRsp(pRsp, rspLen, &batchMetaRsp);
tFreeSTableMetaBatchRsp(&batchMetaRsp);
tSerializeSSTbHbRsp(pRsp, rspLen, &hbRsp);
tFreeSSTbHbRsp(&hbRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
return 0;
......
......@@ -57,6 +57,8 @@ enum {
CTG_OP_DROP_TB_META,
CTG_OP_UPDATE_USER,
CTG_OP_UPDATE_VG_EPSET,
CTG_OP_UPDATE_TB_INDEX,
CTG_OP_DROP_TB_INDEX,
CTG_OP_MAX
};
......@@ -128,25 +130,33 @@ typedef struct SCtgUserCtx {
SUserAuthInfo user;
} SCtgUserCtx;
typedef struct SCtgTbMetaCache {
SRWLatch stbLock;
SRWLatch metaLock; // RC between cache destroy and all other operations
SHashObj *metaCache; //key:tbname, value:STableMeta
SHashObj *stbCache; //key:suid, value:STableMeta*
} SCtgTbMetaCache;
typedef STableIndexRsp STableIndex;
typedef struct SCtgDBCache {
typedef struct SCtgTbCache {
SRWLatch metaLock;
STableMeta *pMeta;
SRWLatch indexLock;
STableIndex *pIndex;
} SCtgTbCache;
typedef struct SCtgVgCache {
SRWLatch vgLock;
SDBVgInfo *vgInfo;
} SCtgVgCache;
typedef struct SCtgDBCache {
SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads
uint64_t dbId;
int8_t deleted;
SDBVgInfo *vgInfo;
SCtgTbMetaCache tbCache;
SCtgVgCache vgCache;
SHashObj *tbCache; // key:tbname, value:SCtgTbCache
SHashObj *stbCache; // key:suid, value:STableMeta*
} SCtgDBCache;
typedef struct SCtgRentSlot {
SRWLatch lock;
bool needSort;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
SArray *meta; // element is SDbVgVersion or SSTableVersion
} SCtgRentSlot;
typedef struct SCtgRentMgmt {
......@@ -245,8 +255,10 @@ typedef struct SCtgCacheStat {
uint64_t userNum;
uint64_t vgHitNum;
uint64_t vgMissNum;
uint64_t tblHitNum;
uint64_t tblMissNum;
uint64_t tbMetaHitNum;
uint64_t tbMetaMissNum;
uint64_t tbIndexHitNum;
uint64_t tbIndexMissNum;
uint64_t userHitNum;
uint64_t userMissNum;
} SCtgCacheStat;
......@@ -268,10 +280,10 @@ typedef struct SCtgUpdateVgMsg {
SDBVgInfo* dbInfo;
} SCtgUpdateVgMsg;
typedef struct SCtgUpdateTblMsg {
SCatalog* pCtg;
STableMetaOutput* output;
} SCtgUpdateTblMsg;
typedef struct SCtgUpdateTbMetaMsg {
SCatalog* pCtg;
STableMetaOutput* pMeta;
} SCtgUpdateTbMetaMsg;
typedef struct SCtgDropDBMsg {
SCatalog* pCtg;
......@@ -305,6 +317,17 @@ typedef struct SCtgUpdateUserMsg {
SGetUserAuthRsp userAuth;
} SCtgUpdateUserMsg;
typedef struct SCtgUpdateTbIndexMsg {
SCatalog* pCtg;
STableIndex* pIndex;
} SCtgUpdateTbIndexMsg;
typedef struct SCtgDropTbIndexMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
} SCtgDropTbIndexMsg;
typedef struct SCtgUpdateEpsetMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
......@@ -465,12 +488,11 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgRUnlockVgInfo(SCtgDBCache *dbCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
......@@ -479,12 +501,18 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache);
int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes);
int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation);
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation);
......@@ -493,7 +521,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName* name, SArray** out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
......@@ -521,6 +549,8 @@ void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
char *ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info);
extern SCatalogMgmt gCtgMgmt;
......
......@@ -96,8 +96,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
if (NULL != dbCache) {
input.dbId = dbCache->dbId;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
......@@ -349,8 +348,8 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo *pConn, const char* user, co
int32_t code = 0;
*pass = false;
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, user, dbFName, type, &inCache, pass));
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, (char*)user, (char*)dbFName, type, &inCache, pass));
if (inCache) {
return TSDB_CODE_SUCCESS;
......@@ -382,6 +381,45 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbIndex(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pRes) {
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pTableName, pRes));
if (*pRes) {
return TSDB_CODE_SUCCESS;
}
STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t code = ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pIndex, NULL);
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
code = 0;
goto _return;
}
CTG_ERR_JRET(code);
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pIndex->pIndex, &pInfo));
*pRes = pInfo;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
return TSDB_CODE_SUCCESS;
_return:
tFreeSTableIndexRsp(pIndex);
taosMemoryFree(pIndex);
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
*pRes = NULL;
CTG_RET(code);
}
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pVgList) {
STableMeta *tbMeta = NULL;
int32_t code = 0;
......@@ -404,7 +442,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
if (dbCache) {
vgHash = dbCache->vgInfo->vgHash;
vgHash = dbCache->vgCache.vgInfo->vgHash;
} else {
vgHash = vgInfo->vgHash;
}
......@@ -442,7 +480,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -631,12 +669,11 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
*version = dbCache->vgInfo->vgVersion;
*version = dbCache->vgCache.vgInfo->vgVersion;
*dbId = dbCache->dbId;
*tableNum = dbCache->vgInfo->numOfTable;
*tableNum = dbCache->vgCache.vgInfo->numOfTable;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
......@@ -661,7 +698,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo));
if (dbCache) {
vgHash = dbCache->vgInfo->vgHash;
vgHash = dbCache->vgCache.vgInfo->vgHash;
} else {
vgHash = vgInfo->vgHash;
}
......@@ -674,7 +711,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -741,6 +778,30 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == pRsp) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
STableIndex* pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pIndex, pRsp, sizeof(STableIndex));
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
CTG_API_ENTER();
......@@ -932,12 +993,12 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -1060,14 +1121,14 @@ _return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion** stables, uint32_t* num) {
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == stables || NULL == num) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableMetaVersion)));
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableVersion)));
}
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion** dbs, uint32_t* num) {
......@@ -1138,7 +1199,12 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo *pConn, const SNam
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pRes, NULL));
int32_t code = 0;
CTG_ERR_JRET(ctgGetTbIndex(pCtg, pConn, (SName*)pTableName, pRes));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName, SFuncInfo* pInfo) {
......
......@@ -344,6 +344,11 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* p
}
}
for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i);
ctgDropTbIndexEnqueue(pCtg, name, true);
}
return TSDB_CODE_SUCCESS;
}
......@@ -680,15 +685,14 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) {
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
} else {
SBuildUseDBInput input = {0};
......@@ -786,7 +790,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -866,9 +870,15 @@ _return:
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out);
STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
pTask->res = pInfo;
SCtgTbIndexCtx* ctx = pTask->taskCtx;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
_return:
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
......@@ -1008,7 +1018,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (dbCache) {
SVgroupInfo vgInfo = {0};
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo));
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
......@@ -1026,8 +1036,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......@@ -1057,7 +1066,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) {
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgInfo->vgHash, (SArray**)&pTask->res));
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
} else {
......@@ -1072,8 +1081,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......@@ -1092,7 +1100,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
if (NULL == pTask->res) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
} else {
......@@ -1107,8 +1115,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......@@ -1119,6 +1126,15 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
SArray* pRes = NULL;
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
if (pRes) {
pTask->res = pRes;
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
return TSDB_CODE_SUCCESS;
......@@ -1157,9 +1173,9 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
SDbInfo* pInfo = (SDbInfo*)pTask->res;
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) {
pInfo->vgVer = dbCache->vgInfo->vgVersion;
pInfo->vgVer = dbCache->vgCache.vgInfo->vgVersion;
pInfo->dbId = dbCache->dbId;
pInfo->tbNum = dbCache->vgInfo->numOfTable;
pInfo->tbNum = dbCache->vgCache.vgInfo->numOfTable;
} else {
pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION;
}
......@@ -1169,8 +1185,7 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......
此差异已折叠。
......@@ -19,7 +19,7 @@
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.apiEnable = true};
SCtgDebug gCTGDebug = {0};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);
......@@ -266,11 +266,11 @@ int32_t ctgdGetStatNum(char *option, void *res) {
}
int32_t ctgdGetTbMetaNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
return dbCache->tbCache ? (int32_t)taosHashGetSize(dbCache->tbCache) : 0;
}
int32_t ctgdGetStbNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
return dbCache->stbCache ? (int32_t)taosHashGetSize(dbCache->stbCache) : 0;
}
int32_t ctgdGetRentNum(SCtgRentMgmt *rent) {
......@@ -363,17 +363,17 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
dbFName = taosHashGetKey(pIter, &len);
int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0;
int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0;
int32_t metaNum = dbCache->tbCache ? taosHashGetSize(dbCache->tbCache) : 0;
int32_t stbNum = dbCache->stbCache ? taosHashGetSize(dbCache->stbCache) : 0;
int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION;
int32_t hashMethod = -1;
int32_t vgNum = 0;
if (dbCache->vgInfo) {
vgVersion = dbCache->vgInfo->vgVersion;
hashMethod = dbCache->vgInfo->hashMethod;
if (dbCache->vgInfo->vgHash) {
vgNum = taosHashGetSize(dbCache->vgInfo->vgHash);
if (dbCache->vgCache.vgInfo) {
vgVersion = dbCache->vgCache.vgInfo->vgVersion;
hashMethod = dbCache->vgCache.vgInfo->hashMethod;
if (dbCache->vgCache.vgInfo->vgHash) {
vgNum = taosHashGetSize(dbCache->vgCache.vgInfo->vgHash);
}
}
......
......@@ -431,7 +431,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, SArray** out, SCtgTask* pTask) {
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
......@@ -448,10 +448,11 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
......
......@@ -44,6 +44,16 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
}
}
void ctgFreeSTableIndex(void *info) {
if (NULL == info) {
return;
}
STableIndex *pInfo = (STableIndex *)info;
taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
}
void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pTableMeta);
pData->pTableMeta = NULL;
......@@ -110,25 +120,39 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
taosMemoryFreeClear(mgmt->slots);
}
void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
CTG_LOCK(CTG_WRITE, &cache->stbLock);
if (cache->stbCache) {
int32_t stblNum = taosHashGetSize(cache->stbCache);
taosHashCleanup(cache->stbCache);
cache->stbCache = NULL;
CTG_CACHE_STAT_DEC(stblNum, stblNum);
int32_t stblNum = taosHashGetSize(dbCache->stbCache);
taosHashCleanup(dbCache->stbCache);
dbCache->stbCache = NULL;
CTG_CACHE_STAT_DEC(stblNum, stblNum);
}
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
taosMemoryFreeClear(pCache->pMeta);
if (pCache->pIndex) {
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCache->pIndex);
}
}
void ctgFreeTbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache->tbCache) {
return;
}
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
CTG_LOCK(CTG_WRITE, &cache->metaLock);
if (cache->metaCache) {
int32_t tblNum = taosHashGetSize(cache->metaCache);
taosHashCleanup(cache->metaCache);
cache->metaCache = NULL;
CTG_CACHE_STAT_DEC(tblNum, tblNum);
int32_t tblNum = taosHashGetSize(dbCache->tbCache);
SCtgTbCache *pCache = taosHashIterate(dbCache->tbCache, NULL);
while (NULL != pCache) {
ctgFreeTbCacheImpl(pCache);
pCache = taosHashIterate(dbCache->tbCache, pCache);
}
CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
taosHashCleanup(dbCache->tbCache);
dbCache->tbCache = NULL;
CTG_CACHE_STAT_DEC(tblNum, tblNum);
}
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
......@@ -144,16 +168,18 @@ void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
taosMemoryFreeClear(vgInfo);
}
void ctgFreeVgInfoCache(SCtgDBCache *dbCache) {
ctgFreeVgInfo(dbCache->vgCache.vgInfo);
}
void ctgFreeDbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache) {
return;
}
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeVgInfo (dbCache->vgInfo);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeTbMetaCache(&dbCache->tbCache);
ctgFreeVgInfoCache(dbCache);
ctgFreeStbMetaCache(dbCache);
ctgFreeTbCache(dbCache);
}
......@@ -167,16 +193,13 @@ void ctgFreeHandle(SCatalog* pCtg) {
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
taosHashCleanup(pCtg->dbCache);
CTG_CACHE_STAT_DEC(dbNum, dbNum);
}
......@@ -186,14 +209,12 @@ void ctgFreeHandle(SCatalog* pCtg) {
void *pIter = taosHashIterate(pCtg->userCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pCtg->userCache, pIter);
}
taosHashCleanup(pCtg->userCache);
CTG_CACHE_STAT_DEC(userNum, userNum);
}
......@@ -252,9 +273,9 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
break;
}
case TDMT_MND_GET_TABLE_INDEX: {
SArray** pOut = (SArray**)pCtx->out;
STableIndex* pOut = (STableIndex*)pCtx->out;
if (pOut) {
taosArrayDestroyEx(*pOut, tFreeSTableIndexInfo);
taosArrayDestroyEx(pOut->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCtx->out);
}
break;
......@@ -535,9 +556,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
}
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) {
return -1;
} else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
} else if (*(uint64_t *)key1 > ((SSTableVersion*)key2)->suid) {
return 1;
} else {
return 0;
......@@ -555,9 +576,9 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
}
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) {
return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
} else if (((SSTableVersion*)key1)->suid > ((SSTableVersion*)key2)->suid) {
return 1;
} else {
return 0;
......@@ -640,6 +661,27 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
return TSDB_CODE_SUCCESS;
}
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
if (NULL == pIndex) {
*pRes = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t num = taosArrayGetSize(pIndex);
*pRes = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == *pRes) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo *pInfo = taosArrayGet(pIndex, i);
taosArrayPush(*pRes, pInfo);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) {
if (msgType == TDMT_VND_TABLE_META) {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
......
......@@ -895,9 +895,9 @@ void *ctgTestSetCtableMetaThread(void *param) {
output = (STableMetaOutput *)taosMemoryMalloc(sizeof(STableMetaOutput));
ctgTestBuildCTableMetaOutput(output);
SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
SCtgUpdateTbMetaMsg *msg = (SCtgUpdateTbMetaMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
msg->pCtg = pCtg;
msg->output = output;
msg->pMeta = output;
operation.data = msg;
code = ctgOpUpdateTbMeta(&operation);
......@@ -989,7 +989,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
SSTableVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
......@@ -1098,7 +1098,7 @@ TEST(tableMeta, childTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
SSTableVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
......@@ -1220,7 +1220,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL;
SSTableVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0;
while (i < 5) {
......@@ -2299,7 +2299,7 @@ TEST(rentTest, allRent) {
SArray *vgList = NULL;
ctgTestStop = false;
SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stable = NULL;
SSTableVersion *stable = NULL;
uint32_t num = 0;
ctgTestInitLogFile();
......
......@@ -484,13 +484,11 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexRsp out = {0};
if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) {
STableIndexRsp *out = (STableIndexRsp*)output;
if (tDeserializeSTableIndexRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
*(void **)output = out.pIndex;
return TSDB_CODE_SUCCESS;
}
......
......@@ -837,7 +837,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWorker *mgmt = qwAcquire(refId);
if (NULL == mgmt) {
QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
taosMemoryFree(param);
return;
}
......
......@@ -1476,6 +1476,11 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
for (uint32_t i = 0; i < info->fields[FLD_TYPE_VALUE].num; ++i) {
SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i];
if (field->desc) {
if (QUERY_NODE_VALUE != nodeType(field->desc)) {
qDebug("VAL%d => [type:not value node][val:NIL]", i); //TODO
continue;
}
SValueNode *var = (SValueNode *)field->desc;
SDataType *dType = &var->node.resType;
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) {
......
......@@ -458,6 +458,9 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
}
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) {
if (NULL == pSrc) {
return NULL;
}
ASSERT(pSrc->elemSize == sizeof(void*));
SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*));
for (int32_t i = 0; i < pSrc->size; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册