提交 52fd6a84 编写于 作者: D dapan1121

enh: support db cfg cache update

上级 c83367e4
......@@ -941,6 +941,7 @@ int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq
int32_t tDeserializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t cfgVersion;
int32_t numOfVgroups;
......@@ -977,8 +978,11 @@ typedef struct {
typedef SDbCfgRsp SDbCfgInfo;
int32_t tSerializeSDbCfgRspImpl(SEncoder *encoder, const SDbCfgRsp *pRsp);
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp);
int32_t tDeserializeSDbCfgRspImpl(SDecoder* decoder, SDbCfgRsp *pRsp);
void tFreeSDbCfgRsp(SDbCfgRsp *pRsp);
typedef struct {
int32_t rowNum;
......@@ -1035,12 +1039,17 @@ int32_t tDeserializeSDnodeListRsp(void* buf, int32_t bufLen, SDnodeListRsp* pRsp
void tFreeSDnodeListRsp(SDnodeListRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp;
SUseDbRsp *useDbRsp;
SDbCfgRsp *cfgRsp;
} SDbHbRsp;
typedef struct {
SArray* pArray; // Array of SDbHbRsp
} SDbHbBatchRsp;
int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp);
int32_t tSerializeSDbHbBatchRsp(void* buf, int32_t bufLen, SDbHbBatchRsp* pRsp);
int32_t tDeserializeSDbHbBatchRsp(void* buf, int32_t bufLen, SDbHbBatchRsp* pRsp);
void tFreeSDbHbBatchRsp(SDbHbBatchRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SGetUserAuthRsp
......
......@@ -178,6 +178,8 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
int32_t catalogUpdateDbCfg(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDbCfgInfo* cfgInfo);
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName);
......
......@@ -94,47 +94,51 @@ _return:
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0;
SUseDbBatchRsp batchUseRsp = {0};
if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
SDbHbBatchRsp batchRsp = {0};
if (tDeserializeSDbHbBatchRsp(value, valueLen, &batchRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs,
rsp->uid);
if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
} else {
SDBVgInfo *vgInfo = NULL;
code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
if (TSDB_CODE_SUCCESS != code) {
goto _return;
}
catalogUpdateDBVgInfo(pCatalog, rsp->db, rsp->uid, vgInfo);
if (IS_SYS_DBNAME(rsp->db)) {
code = hbGenerateVgInfoFromRsp(&vgInfo, rsp);
SDbHbRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (rsp->useDbRsp) {
tscDebug("hb use db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64,
rsp->useDbRsp->db, rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
if (rsp->useDbRsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
} else {
SDBVgInfo *vgInfo = NULL;
code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
if (TSDB_CODE_SUCCESS != code) {
goto _return;
}
catalogUpdateDBVgInfo(pCatalog, (rsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->uid, vgInfo);
catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo);
if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
code = hbGenerateVgInfoFromRsp(&vgInfo, rsp->useDbRsp);
if (TSDB_CODE_SUCCESS != code) {
goto _return;
}
catalogUpdateDBVgInfo(pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB, rsp->useDbRsp->uid, vgInfo);
}
}
}
if (code) {
goto _return;
if (rsp->cfgRsp) {
tscDebug("hb db cfg rsp, db:%s, cfgVersion:%d", rsp->cfgRsp->db, rsp->cfgRsp->cfgVersion);
catalogUpdateDbCfg(pCatalog, rsp->cfgRsp->db, rsp->cfgRsp->dbId, rsp->cfgRsp);
rsp->cfgRsp = NULL;
}
}
_return:
tFreeSUseDbBatchRsp(&batchUseRsp);
tFreeSDbHbBatchRsp(&batchRsp);
return code;
}
......@@ -526,11 +530,12 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
for (int32_t i = 0; i < dbNum; ++i) {
SDbCacheInfo *db = &dbs[i];
tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, numOfTable:%d, startTs:%" PRId64,
i, db->dbFName, db->dbId, db->vgVersion, db->numOfTable, db->stateTs);
tscDebug("the %dth expired dbFName:%s, dbId:%" PRId64 ", vgVersion:%d, cfgVersion:%d, numOfTable:%d, startTs:%" PRId64,
i, db->dbFName, db->dbId, db->vgVersion, db->cfgVersion, db->numOfTable, db->stateTs);
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
db->cfgVersion = htonl(db->cfgVersion);
db->numOfTable = htonl(db->numOfTable);
db->stateTs = htobe64(db->stateTs);
}
......
......@@ -2777,7 +2777,26 @@ int32_t tSerializeSUseDbRsp(void *buf, int32_t bufLen, const SUseDbRsp *pRsp) {
return tlen;
}
int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp) {
int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) {
if (pRsp->useDbRsp) {
if (tEncodeI8(pEncoder, 1) < 0) return -1;
if (tSerializeSUseDbRspImp(pEncoder, pRsp->useDbRsp) < 0) return -1;
} else {
if (tEncodeI8(pEncoder, 0) < 0) return -1;
}
if (pRsp->cfgRsp) {
if (tEncodeI8(pEncoder, 1) < 0) return -1;
if (tSerializeSDbCfgRspImpl(pEncoder, pRsp->cfgRsp) < 0) return -1;
} else {
if (tEncodeI8(pEncoder, 0) < 0) return -1;
}
return 0;
}
int32_t tSerializeSDbHbBatchRsp(void *buf, int32_t bufLen, SDbHbBatchRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -2786,8 +2805,8 @@ int32_t tSerializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) {
SUseDbRsp *pUsedbRsp = taosArrayGet(pRsp->pArray, i);
if (tSerializeSUseDbRspImp(&encoder, pUsedbRsp) < 0) return -1;
SDbHbRsp *pDbRsp = taosArrayGet(pRsp->pArray, i);
if (tSerializeSDbHbRspImp(&encoder, pDbRsp) < 0) return -1;
}
tEndEncode(&encoder);
......@@ -2840,7 +2859,25 @@ int32_t tDeserializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) {
return 0;
}
int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pRsp) {
int32_t tDeserializeSDbHbRspImp(SDecoder* decoder, SDbHbRsp* pRsp) {
int8_t flag = 0;
if (tDecodeI8(decoder, &flag) < 0) return -1;
if (flag) {
pRsp->useDbRsp = taosMemoryCalloc(1, sizeof(SUseDbRsp));
if (NULL == pRsp->useDbRsp) return -1;
if (tDeserializeSUseDbRspImp(decoder, pRsp->useDbRsp) < 0) return -1;
}
if (tDecodeI8(decoder, &flag) < 0) return -1;
if (flag) {
pRsp->cfgRsp = taosMemoryCalloc(1, sizeof(SDbCfgRsp));
if (NULL == pRsp->cfgRsp) return -1;
if (tDeserializeSDbCfgRspImpl(decoder, pRsp->cfgRsp) < 0) return -1;
}
return 0;
}
int32_t tDeserializeSDbHbBatchRsp(void *buf, int32_t bufLen, SDbHbBatchRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
......@@ -2849,19 +2886,19 @@ int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pR
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SUseDbRsp));
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SDbHbRsp));
if (pRsp->pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfBatch; ++i) {
SUseDbRsp usedbRsp = {0};
if (tDeserializeSUseDbRspImp(&decoder, &usedbRsp) < 0) {
SDbHbRsp rsp = {0};
if (tDeserializeSDbHbRspImp(&decoder, &rsp) < 0) {
tDecoderClear(&decoder);
return -1;
}
taosArrayPush(pRsp->pArray, &usedbRsp);
taosArrayPush(pRsp->pArray, &rsp);
}
tEndDecode(&decoder);
......@@ -2871,11 +2908,27 @@ int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pR
void tFreeSUsedbRsp(SUseDbRsp *pRsp) { taosArrayDestroy(pRsp->pVgroupInfos); }
void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
void tFreeSDbHbRsp(SDbHbRsp *pDbRsp) {
if (NULL == pDbRsp) {
return;
}
if (pDbRsp->useDbRsp) {
tFreeSUsedbRsp(pDbRsp->useDbRsp);
taosMemoryFree(pDbRsp->useDbRsp);
}
if (pDbRsp->cfgRsp) {
tFreeSDbCfgRsp(pDbRsp->cfgRsp);
taosMemoryFree(pDbRsp->cfgRsp);
}
}
void tFreeSDbHbBatchRsp(SDbHbBatchRsp *pRsp) {
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
for (int32_t i = 0; i < numOfBatch; ++i) {
SUseDbRsp *pUsedbRsp = taosArrayGet(pRsp->pArray, i);
tFreeSUsedbRsp(pUsedbRsp);
SDbHbRsp *pDbRsp = taosArrayGet(pRsp->pArray, i);
tFreeSDbHbRsp(pDbRsp);
}
taosArrayDestroy(pRsp->pArray);
......@@ -3038,89 +3091,93 @@ int32_t tDeserializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableR
return 0;
}
int32_t tSerializeSDbCfgRspImpl(SEncoder *encoder, const SDbCfgRsp *pRsp) {
if (tEncodeCStr(encoder, pRsp->db) < 0) return -1;
if (tEncodeI64(encoder, pRsp->dbId) < 0) return -1;
if (tEncodeI32(encoder, pRsp->cfgVersion) < 0) return -1;
if (tEncodeI32(encoder, pRsp->numOfVgroups) < 0) return -1;
if (tEncodeI32(encoder, pRsp->numOfStables) < 0) return -1;
if (tEncodeI32(encoder, pRsp->buffer) < 0) return -1;
if (tEncodeI32(encoder, pRsp->cacheSize) < 0) return -1;
if (tEncodeI32(encoder, pRsp->pageSize) < 0) return -1;
if (tEncodeI32(encoder, pRsp->pages) < 0) return -1;
if (tEncodeI32(encoder, pRsp->daysPerFile) < 0) return -1;
if (tEncodeI32(encoder, pRsp->daysToKeep0) < 0) return -1;
if (tEncodeI32(encoder, pRsp->daysToKeep1) < 0) return -1;
if (tEncodeI32(encoder, pRsp->daysToKeep2) < 0) return -1;
if (tEncodeI32(encoder, pRsp->minRows) < 0) return -1;
if (tEncodeI32(encoder, pRsp->maxRows) < 0) return -1;
if (tEncodeI32(encoder, pRsp->walFsyncPeriod) < 0) return -1;
if (tEncodeI16(encoder, pRsp->hashPrefix) < 0) return -1;
if (tEncodeI16(encoder, pRsp->hashSuffix) < 0) return -1;
if (tEncodeI8(encoder, pRsp->walLevel) < 0) return -1;
if (tEncodeI8(encoder, pRsp->precision) < 0) return -1;
if (tEncodeI8(encoder, pRsp->compression) < 0) return -1;
if (tEncodeI8(encoder, pRsp->replications) < 0) return -1;
if (tEncodeI8(encoder, pRsp->strict) < 0) return -1;
if (tEncodeI8(encoder, pRsp->cacheLast) < 0) return -1;
if (tEncodeI32(encoder, pRsp->tsdbPageSize) < 0) return -1;
if (tEncodeI32(encoder, pRsp->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(encoder, pRsp->walRollPeriod) < 0) return -1;
if (tEncodeI64(encoder, pRsp->walRetentionSize) < 0) return -1;
if (tEncodeI64(encoder, pRsp->walSegmentSize) < 0) return -1;
if (tEncodeI32(encoder, pRsp->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i);
if (tEncodeI64(encoder, pRetension->freq) < 0) return -1;
if (tEncodeI64(encoder, pRetension->keep) < 0) return -1;
if (tEncodeI8(encoder, pRetension->freqUnit) < 0) return -1;
if (tEncodeI8(encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI8(encoder, pRsp->schemaless) < 0) return -1;
if (tEncodeI16(encoder, pRsp->sstTrigger) < 0) return -1;
return 0;
}
int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->dbId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->cfgVersion) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfStables) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->buffer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->cacheSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->pageSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->pages) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysToKeep0) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysToKeep1) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->daysToKeep2) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->minRows) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->maxRows) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->walFsyncPeriod) < 0) return -1;
if (tEncodeI16(&encoder, pRsp->hashPrefix) < 0) return -1;
if (tEncodeI16(&encoder, pRsp->hashSuffix) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->compression) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->replications) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->strict) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->cacheLast) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->tsdbPageSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->walRollPeriod) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->walRetentionSize) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->walSegmentSize) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfRetensions) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
SRetention *pRetension = taosArrayGet(pRsp->pRetensions, i);
if (tEncodeI64(&encoder, pRetension->freq) < 0) return -1;
if (tEncodeI64(&encoder, pRetension->keep) < 0) return -1;
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI8(&encoder, pRsp->schemaless) < 0) return -1;
if (tEncodeI16(&encoder, pRsp->sstTrigger) < 0) return -1;
tSerializeSDbCfgRspImpl(&encoder, pRsp);
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->dbId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->cfgVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfStables) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->buffer) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->cacheSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->pageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->pages) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysToKeep0) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysToKeep1) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->daysToKeep2) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->minRows) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->maxRows) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->walFsyncPeriod) < 0) return -1;
if (tDecodeI16(&decoder, &pRsp->hashPrefix) < 0) return -1;
if (tDecodeI16(&decoder, &pRsp->hashSuffix) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->replications) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->cacheLast) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tsdbPageSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->walRollPeriod) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->walRetentionSize) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->walSegmentSize) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfRetensions) < 0) return -1;
int32_t tDeserializeSDbCfgRspImpl(SDecoder* decoder, SDbCfgRsp *pRsp) {
if (tDecodeCStrTo(decoder, pRsp->db) < 0) return -1;
if (tDecodeI64(decoder, &pRsp->dbId) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->cfgVersion) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->numOfVgroups) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->numOfStables) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->buffer) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->cacheSize) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->pageSize) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->pages) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->daysPerFile) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->daysToKeep0) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->daysToKeep1) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->daysToKeep2) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->minRows) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->maxRows) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->walFsyncPeriod) < 0) return -1;
if (tDecodeI16(decoder, &pRsp->hashPrefix) < 0) return -1;
if (tDecodeI16(decoder, &pRsp->hashSuffix) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->walLevel) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->precision) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->compression) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->replications) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->strict) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->cacheLast) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->tsdbPageSize) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->walRetentionPeriod) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->walRollPeriod) < 0) return -1;
if (tDecodeI64(decoder, &pRsp->walRetentionSize) < 0) return -1;
if (tDecodeI64(decoder, &pRsp->walSegmentSize) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->numOfRetensions) < 0) return -1;
if (pRsp->numOfRetensions > 0) {
pRsp->pRetensions = taosArrayInit(pRsp->numOfRetensions, sizeof(SRetention));
if (pRsp->pRetensions == NULL) {
......@@ -3131,23 +3188,41 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
for (int32_t i = 0; i < pRsp->numOfRetensions; ++i) {
SRetention rentension = {0};
if (tDecodeI64(&decoder, &rentension.freq) < 0) return -1;
if (tDecodeI64(&decoder, &rentension.keep) < 0) return -1;
if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1;
if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1;
if (tDecodeI64(decoder, &rentension.freq) < 0) return -1;
if (tDecodeI64(decoder, &rentension.keep) < 0) return -1;
if (tDecodeI8(decoder, &rentension.freqUnit) < 0) return -1;
if (tDecodeI8(decoder, &rentension.keepUnit) < 0) return -1;
if (taosArrayPush(pRsp->pRetensions, &rentension) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
if (tDecodeI8(&decoder, &pRsp->schemaless) < 0) return -1;
if (tDecodeI16(&decoder, &pRsp->sstTrigger) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->schemaless) < 0) return -1;
if (tDecodeI16(decoder, &pRsp->sstTrigger) < 0) return -1;
return 0;
}
int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDeserializeSDbCfgRspImpl(&decoder, pRsp) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSDbCfgRsp(SDbCfgRsp *pRsp) {
if (NULL == pRsp) {
return;
}
taosArrayDestroy(pRsp->pRetensions);
}
int32_t tSerializeSUserIndexReq(void *buf, int32_t bufLen, SUserIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -894,6 +894,42 @@ _OVER:
return code;
}
static void mndDumpDbCfgInfo(SDbCfgRsp *cfgRsp, SDbObj *pDb) {
strcpy(cfgRsp->db, pDb->name);
cfgRsp->dbId = pDb->uid;
cfgRsp->cfgVersion = pDb->cfgVersion;
cfgRsp->numOfVgroups = pDb->cfg.numOfVgroups;
cfgRsp->numOfStables = pDb->cfg.numOfStables;
cfgRsp->buffer = pDb->cfg.buffer;
cfgRsp->cacheSize = pDb->cfg.cacheLastSize;
cfgRsp->pageSize = pDb->cfg.pageSize;
cfgRsp->pages = pDb->cfg.pages;
cfgRsp->daysPerFile = pDb->cfg.daysPerFile;
cfgRsp->daysToKeep0 = pDb->cfg.daysToKeep0;
cfgRsp->daysToKeep1 = pDb->cfg.daysToKeep1;
cfgRsp->daysToKeep2 = pDb->cfg.daysToKeep2;
cfgRsp->minRows = pDb->cfg.minRows;
cfgRsp->maxRows = pDb->cfg.maxRows;
cfgRsp->walFsyncPeriod = pDb->cfg.walFsyncPeriod;
cfgRsp->hashPrefix = pDb->cfg.hashPrefix;
cfgRsp->hashSuffix = pDb->cfg.hashSuffix;
cfgRsp->walLevel = pDb->cfg.walLevel;
cfgRsp->precision = pDb->cfg.precision;
cfgRsp->compression = pDb->cfg.compression;
cfgRsp->replications = pDb->cfg.replications;
cfgRsp->strict = pDb->cfg.strict;
cfgRsp->cacheLast = pDb->cfg.cacheLast;
cfgRsp->tsdbPageSize = pDb->cfg.tsdbPageSize;
cfgRsp->walRetentionPeriod = pDb->cfg.walRetentionPeriod;
cfgRsp->walRollPeriod = pDb->cfg.walRollPeriod;
cfgRsp->walRetentionSize = pDb->cfg.walRetentionSize;
cfgRsp->walSegmentSize = pDb->cfg.walSegmentSize;
cfgRsp->numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp->pRetensions = pDb->cfg.pRetensions;
cfgRsp->schemaless = pDb->cfg.schemaless;
cfgRsp->sstTrigger = pDb->cfg.sstTrigger;
}
static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
......@@ -906,43 +942,15 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
goto _OVER;
}
pDb = mndAcquireDb(pMnode, cfgReq.db);
if (pDb == NULL) {
goto _OVER;
}
if (strcasecmp(cfgReq.db, TSDB_INFORMATION_SCHEMA_DB) && strcasecmp(cfgReq.db, TSDB_PERFORMANCE_SCHEMA_DB)) {
pDb = mndAcquireDb(pMnode, cfgReq.db);
if (pDb == NULL) {
goto _OVER;
}
cfgRsp.dbId = pDb->uid;
cfgRsp.cfgVersion = pDb->cfgVersion;
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
cfgRsp.numOfStables = pDb->cfg.numOfStables;
cfgRsp.buffer = pDb->cfg.buffer;
cfgRsp.cacheSize = pDb->cfg.cacheLastSize;
cfgRsp.pageSize = pDb->cfg.pageSize;
cfgRsp.pages = pDb->cfg.pages;
cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
cfgRsp.daysToKeep0 = pDb->cfg.daysToKeep0;
cfgRsp.daysToKeep1 = pDb->cfg.daysToKeep1;
cfgRsp.daysToKeep2 = pDb->cfg.daysToKeep2;
cfgRsp.minRows = pDb->cfg.minRows;
cfgRsp.maxRows = pDb->cfg.maxRows;
cfgRsp.walFsyncPeriod = pDb->cfg.walFsyncPeriod;
cfgRsp.hashPrefix = pDb->cfg.hashPrefix;
cfgRsp.hashSuffix = pDb->cfg.hashSuffix;
cfgRsp.walLevel = pDb->cfg.walLevel;
cfgRsp.precision = pDb->cfg.precision;
cfgRsp.compression = pDb->cfg.compression;
cfgRsp.replications = pDb->cfg.replications;
cfgRsp.strict = pDb->cfg.strict;
cfgRsp.cacheLast = pDb->cfg.cacheLast;
cfgRsp.tsdbPageSize = pDb->cfg.tsdbPageSize;
cfgRsp.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
cfgRsp.walRollPeriod = pDb->cfg.walRollPeriod;
cfgRsp.walRetentionSize = pDb->cfg.walRetentionSize;
cfgRsp.walSegmentSize = pDb->cfg.walSegmentSize;
cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions;
cfgRsp.pRetensions = pDb->cfg.pRetensions;
cfgRsp.schemaless = pDb->cfg.schemaless;
cfgRsp.sstTrigger = pDb->cfg.sstTrigger;
mndDumpDbCfgInfo(&cfgRsp, pDb);
}
int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
......@@ -1340,102 +1348,118 @@ _OVER:
}
int32_t mndValidateDbInfo(SMnode *pMnode, SDbCacheInfo *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen) {
SUseDbBatchRsp batchUseRsp = {0};
batchUseRsp.pArray = taosArrayInit(numOfDbs, sizeof(SUseDbRsp));
if (batchUseRsp.pArray == NULL) {
SDbHbBatchRsp batchRsp = {0};
batchRsp.pArray = taosArrayInit(numOfDbs, sizeof(SDbHbRsp));
if (batchRsp.pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfDbs; ++i) {
SDbCacheInfo *pDbVgVersion = &pDbs[i];
pDbVgVersion->dbId = be64toh(pDbVgVersion->dbId);
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable);
pDbVgVersion->stateTs = be64toh(pDbVgVersion->stateTs);
SDbCacheInfo *pDbCacheInfo = &pDbs[i];
pDbCacheInfo->dbId = be64toh(pDbCacheInfo->dbId);
pDbCacheInfo->vgVersion = htonl(pDbCacheInfo->vgVersion);
pDbCacheInfo->cfgVersion = htonl(pDbCacheInfo->cfgVersion);
pDbCacheInfo->numOfTable = htonl(pDbCacheInfo->numOfTable);
pDbCacheInfo->stateTs = be64toh(pDbCacheInfo->stateTs);
SUseDbRsp usedbRsp = {0};
SDbHbRsp rsp = {0};
if ((0 == strcasecmp(pDbVgVersion->dbFName, TSDB_INFORMATION_SCHEMA_DB) ||
(0 == strcasecmp(pDbVgVersion->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) {
memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN);
if ((0 == strcasecmp(pDbCacheInfo->dbFName, TSDB_INFORMATION_SCHEMA_DB) ||
(0 == strcasecmp(pDbCacheInfo->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) {
int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode);
if (pDbVgVersion->vgVersion < vgVersion) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion++;
} else {
usedbRsp.vgVersion = pDbVgVersion->vgVersion;
if (pDbCacheInfo->vgVersion >= vgVersion) {
continue;
}
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
rsp.useDbRsp = taosMemoryCalloc(1, sizeof(SUseDbRsp));
memcpy(rsp.useDbRsp->db, pDbCacheInfo->dbFName, TSDB_DB_FNAME_LEN);
rsp.useDbRsp->pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
mndBuildDBVgroupInfo(NULL, pMnode, rsp.useDbRsp->pVgroupInfos);
rsp.useDbRsp->vgVersion = vgVersion++;
rsp.useDbRsp->vgNum = taosArrayGetSize(rsp.useDbRsp->pVgroupInfos);
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
taosArrayPush(batchRsp.pArray, &rsp);
continue;
}
SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName);
SDbObj *pDb = mndAcquireDb(pMnode, pDbCacheInfo->dbFName);
if (pDb == NULL) {
mTrace("db:%s, no exist", pDbVgVersion->dbFName);
memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDbVgVersion->dbId;
usedbRsp.vgVersion = -1;
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
mTrace("db:%s, no exist", pDbCacheInfo->dbFName);
rsp.useDbRsp = taosMemoryCalloc(1, sizeof(SUseDbRsp));
memcpy(rsp.useDbRsp->db, pDbCacheInfo->dbFName, TSDB_DB_FNAME_LEN);
rsp.useDbRsp->uid = pDbCacheInfo->dbId;
rsp.useDbRsp->vgVersion = -1;
taosArrayPush(batchRsp.pArray, &rsp);
continue;
}
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable &&
pDbVgVersion->stateTs == pDb->stateTs) {
mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
" numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable,
pDb->vgVersion, pDb->stateTs, numOfTable);
if (pDbCacheInfo->vgVersion >= pDb->vgVersion &&
pDbCacheInfo->cfgVersion >= pDb->cfgVersion &&
numOfTable == pDbCacheInfo->numOfTable &&
pDbCacheInfo->stateTs == pDb->stateTs) {
mTrace("db:%s, valid dbinfo, vgVersion:%d cfgVersion:%d stateTs:%" PRId64
" numOfTables:%d, not changed vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs, pDbCacheInfo->numOfTable,
pDb->vgVersion, pDb->cfgVersion, pDb->stateTs, numOfTable);
mndReleaseDb(pMnode, pDb);
continue;
} else {
mInfo("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
" numOfTables:%d, changed to vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable,
pDb->vgVersion, pDb->stateTs, numOfTable);
mInfo("db:%s, valid dbinfo, vgVersion:%d cfgVersion:%d stateTs:%" PRId64
" numOfTables:%d, changed to vgVersion:%d cfgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbCacheInfo->dbFName, pDbCacheInfo->vgVersion, pDbCacheInfo->cfgVersion, pDbCacheInfo->stateTs, pDbCacheInfo->numOfTable,
pDb->vgVersion, pDb->cfgVersion, pDb->stateTs, numOfTable);
}
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (usedbRsp.pVgroupInfos == NULL) {
mndReleaseDb(pMnode, pDb);
mError("db:%s, failed to malloc usedb response", pDb->name);
continue;
if (pDbCacheInfo->cfgVersion < pDb->cfgVersion) {
rsp.cfgRsp = taosMemoryCalloc(1, sizeof(SDbCfgRsp));
mndDumpDbCfgInfo(rsp.cfgRsp, pDb);
}
if (pDbCacheInfo->vgVersion < pDb->vgVersion ||
numOfTable != pDbCacheInfo->numOfTable ||
pDbCacheInfo->stateTs != pDb->stateTs) {
rsp.useDbRsp = taosMemoryCalloc(1, sizeof(SUseDbRsp));
rsp.useDbRsp->pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
if (rsp.useDbRsp->pVgroupInfos == NULL) {
mndReleaseDb(pMnode, pDb);
mError("db:%s, failed to malloc usedb response", pDb->name);
continue;
}
mndBuildDBVgroupInfo(pDb, pMnode, rsp.useDbRsp->pVgroupInfos);
memcpy(rsp.useDbRsp->db, pDb->name, TSDB_DB_FNAME_LEN);
rsp.useDbRsp->uid = pDb->uid;
rsp.useDbRsp->vgVersion = pDb->vgVersion;
rsp.useDbRsp->stateTs = pDb->stateTs;
rsp.useDbRsp->vgNum = (int32_t)taosArrayGetSize(rsp.useDbRsp->pVgroupInfos);
rsp.useDbRsp->hashMethod = pDb->cfg.hashMethod;
rsp.useDbRsp->hashPrefix = pDb->cfg.hashPrefix;
rsp.useDbRsp->hashSuffix = pDb->cfg.hashSuffix;
}
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDb->uid;
usedbRsp.vgVersion = pDb->vgVersion;
usedbRsp.stateTs = pDb->stateTs;
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
usedbRsp.hashMethod = pDb->cfg.hashMethod;
usedbRsp.hashPrefix = pDb->cfg.hashPrefix;
usedbRsp.hashSuffix = pDb->cfg.hashSuffix;
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
taosArrayPush(batchRsp.pArray, &rsp);
mndReleaseDb(pMnode, pDb);
}
int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp);
int32_t rspLen = tSerializeSDbHbBatchRsp(NULL, 0, &batchRsp);
void *pRsp = taosMemoryMalloc(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeSUseDbBatchRsp(&batchUseRsp);
tFreeSDbHbBatchRsp(&batchRsp);
return -1;
}
tSerializeSUseDbBatchRsp(pRsp, rspLen, &batchUseRsp);
tSerializeSDbHbBatchRsp(pRsp, rspLen, &batchRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
tFreeSUseDbBatchRsp(&batchUseRsp);
tFreeSDbHbBatchRsp(&batchRsp);
return 0;
}
......
......@@ -123,6 +123,11 @@ int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, in
int32_t payloadLen, double selectivityRatio);
int32_t metaUidCacheClear(SMeta *pMeta, uint64_t suid);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen);
int64_t metaGetTbNum(SMeta *pMeta);
int64_t metaGetNtbNum(SMeta *pMeta);
typedef struct {
......
......@@ -729,7 +729,7 @@ int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, i
return TSDB_CODE_FAILED;
}
*pList = taosLRUCacheValue(pCache, pHandle);
*pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
(*pEntry)->hitTimes += 1;
......
......@@ -933,6 +933,23 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogUpdateDbCfg(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDbCfgInfo* cfgInfo) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == dbFName || NULL == cfgInfo) {
freeDbCfgInfo(cfgInfo);
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
}
code = ctgUpdateDbCfgEnqueue(pCtg, dbFName, dbId, cfgInfo, false);
_return:
CTG_API_LEAVE(code);
}
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
CTG_API_ENTER();
......
......@@ -1475,15 +1475,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
CTG_CACHE_NUM_INC(CTG_CI_DB, 1);
SDbCacheInfo vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
SDbCacheInfo dbCacheInfo = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0, .cfgVersion = -1};
tstrncpy(dbCacheInfo.dbFName, dbFName, sizeof(dbCacheInfo.dbFName));
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
if (!IS_SYS_DBNAME(dbFName)) {
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbCacheInfo)));
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &dbCacheInfo, dbId, sizeof(SDbCacheInfo)));
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, dbCacheInfo.vgVersion, dbId);
}
return TSDB_CODE_SUCCESS;
......@@ -1822,8 +1822,8 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
}
bool newAdded = false;
SDbCacheInfo vgVersion = {
.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
SDbCacheInfo dbCacheInfo = {
.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .cfgVersion = -1, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
......@@ -1859,20 +1859,24 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
}
if (dbCache->cfgCache.cfgInfo) {
dbCacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion;
}
vgCache->vgInfo = dbInfo;
msg->dbInfo = NULL;
CTG_DB_NUM_SET(CTG_CI_DB_VGROUP);
ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName,
vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
dbCacheInfo.vgVersion, dbCacheInfo.stateTs, dbCacheInfo.dbId);
ctgWUnlockVgInfo(dbCache);
dbCache = NULL;
// if (!IS_SYS_DBNAME(dbFName)) {
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbCacheInfo),
tstrncpy(dbCacheInfo.dbFName, dbFName, sizeof(dbCacheInfo.dbFName));
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &dbCacheInfo, dbCacheInfo.dbId, sizeof(SDbCacheInfo),
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
//}
......
......@@ -501,6 +501,25 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
}
}
if (dbCache->cfgCache.cfgInfo) {
SDbCfgInfo *pCfg = dbCache->cfgCache.cfgInfo;
ctgDebug("[%d] db [%.*s][0x%" PRIx64
"] %s: cfgVersion:%d, numOfVgroups:%d, numOfStables:%d, buffer:%d, cacheSize:%d, pageSize:%d, pages:%d"
", daysPerFile:%d, daysToKeep0:%d, daysToKeep1:%d, daysToKeep2:%d, minRows:%d, maxRows:%d, walFsyncPeriod:%d"
", hashPrefix:%d, hashSuffix:%d, walLevel:%d, precision:%d, compression:%d, replications:%d, strict:%d"
", cacheLast:%d, tsdbPageSize:%d, walRetentionPeriod:%d, walRollPeriod:%d, walRetentionSize:%" PRId64 ""
", walSegmentSize:%" PRId64 ", numOfRetensions:%d, schemaless:%d, sstTrigger:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted ? "deleted" : "",
pCfg->cfgVersion, pCfg->numOfVgroups, pCfg->numOfStables, pCfg->buffer,
pCfg->cacheSize, pCfg->pageSize, pCfg->pages, pCfg->daysPerFile, pCfg->daysToKeep0,
pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->minRows, pCfg->maxRows, pCfg->walFsyncPeriod,
pCfg->hashPrefix, pCfg->hashSuffix, pCfg->walLevel, pCfg->precision, pCfg->compression,
pCfg->replications, pCfg->strict, pCfg->cacheLast, pCfg->tsdbPageSize, pCfg->walRetentionPeriod,
pCfg->walRollPeriod, pCfg->walRetentionSize, pCfg->walSegmentSize, pCfg->numOfRetensions,
pCfg->schemaless, pCfg->sstTrigger);
}
++i;
pIter = taosHashIterate(dbHash, pIter);
}
}
......
......@@ -61,7 +61,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArra
static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond);
static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond,
SNode* pTagIndexCond, STableListInfo* pListInfo, const char* idstr);
SNode* pTagIndexCond, STableListInfo* pListInfo, uint8_t* digest, const char* idstr);
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList,
void* metaHandle);
......@@ -436,7 +436,6 @@ void freeItem(void* p) {
}
}
static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
if (pTagCond == NULL) {
return;
......@@ -454,7 +453,25 @@ static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
}
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo) {
static void genTbGroupDigest(const SNode* pGroup, uint8_t* filterDigest, T_MD5_CTX* pContext) {
char* payload = NULL;
int32_t len = 0;
nodesNodeToMsg(pGroup, &payload, &len);
if (filterDigest[0]) {
payload = taosMemoryRealloc(payload, len + tListLen(pContext->digest));
memcpy(payload + len, filterDigest + 1, tListLen(pContext->digest));
len += tListLen(pContext->digest);
}
tMD5Init(pContext);
tMD5Update(pContext, (uint8_t*)payload, (uint32_t)len);
tMD5Final(pContext);
taosMemoryFree(payload);
}
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo, uint8_t *digest) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pBlockList = NULL;
SSDataBlock* pResBlock = NULL;
......@@ -462,7 +479,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
SArray* groupData = NULL;
SArray* pUidTagList = NULL;
SArray* tableList = NULL;
static SHashObj *pTableListHash = NULL;
int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
if (rows == 0) {
......@@ -490,17 +506,19 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
}
T_MD5_CTX context = {0};
SNodeListNode* listNode = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
listNode->pNodeList = group;
genTagFilterDigest((SNode *)listNode, &context);
nodesFree(listNode);
SArray **pLastTableList = (SArray **)taosHashGet(pTableListHash, context.digest, sizeof(context.digest));
if (pLastTableList && *pLastTableList) {
pTableListInfo->pTableList = taosArrayDup(*pLastTableList, NULL);
goto end;
} else {
qError("group not hit");
if (tsTagFilterCache) {
SNodeListNode* listNode = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
listNode->pNodeList = group;
genTbGroupDigest((SNode *)listNode, digest, &context);
nodesFree(listNode);
metaGetCachedTbGroup(metaHandle, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), &tableList);
if (tableList) {
taosArrayDestroy(pTableListInfo->pTableList);
pTableListInfo->pTableList = tableList;
qDebug("retrieve tb group list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(pTableListInfo->pTableList));
goto end;
}
}
pUidTagList = taosArrayInit(8, sizeof(STUidTagInfo));
......@@ -629,16 +647,11 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
info->groupId = calcGroupId(keyBuf, len);
}
if (NULL == pTableListHash) {
SHashObj *pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (atomic_val_compare_exchange_ptr(&pTableListHash, NULL, pHash)) {
taosHashCleanup(pHash);
}
if (tsTagFilterCache) {
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
metaPutTbGroupToCache(metaHandle, pTableListInfo->idInfo.suid, context.digest, tListLen(context.digest), tableList, taosArrayGetSize(tableList) * sizeof(STableKeyInfo));
}
tableList = taosArrayDup(pTableListInfo->pTableList, NULL);
taosHashPut(pTableListHash, context.digest, sizeof(context.digest), &tableList, POINTER_BYTES);
// int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
......@@ -1057,7 +1070,7 @@ end:
}
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo, const char* idstr) {
STableListInfo* pListInfo, uint8_t* digest, const char* idstr) {
int32_t code = TSDB_CODE_SUCCESS;
size_t numOfTables = 0;
......@@ -1087,6 +1100,8 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
metaGetCachedTableUidList(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pUidList,
&acquired);
if (acquired) {
digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest));
qDebug("retrieve table uid list from cache, numOfTables:%d", (int32_t)taosArrayGetSize(pUidList));
goto _end;
}
......@@ -1130,6 +1145,8 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
}
metaUidFilterCachePut(metaHandle, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1);
digest[0] = 1;
memcpy(digest + 1, context.digest, tListLen(context.digest));
}
}
......@@ -2025,7 +2042,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
}
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode, SNodeList* group,
bool groupSort) {
bool groupSort, uint8_t *digest) {
int32_t code = TSDB_CODE_SUCCESS;
bool groupByTbname = groupbyTbname(group);
......@@ -2045,7 +2062,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
pTableListInfo->numOfOuputGroups = 1;
}
} else {
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo, digest);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2076,7 +2093,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_INVALID_PARA;
}
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, idStr);
uint8_t digest[17] = {0};
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo, digest, idStr);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to getTableList, code: %s", tstrerror(code));
return code;
......@@ -2094,7 +2112,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS;
}
code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort);
code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pScanNode, pGroupTags, groupSort, digest);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册