提交 c7a7939f 编写于 作者: D dapan1121

add tablemeta cache

上级 3d190482
......@@ -839,7 +839,7 @@ typedef struct {
int32_t tversion;
uint64_t tuid;
uint64_t suid;
SVgroupMsg vgroup;
int32_t vgId;
SSchema pSchema[];
} STableMetaMsg;
......@@ -867,6 +867,7 @@ typedef struct {
int32_t dbVgroupVersion;
int32_t dbVgroupNum;
int32_t dbHashRange;
int32_t dbHashType;
SVgroupInfo vgroupInfo[];
//int32_t vgIdList[];
} SUseDbRspMsg;
......
......@@ -45,42 +45,10 @@ typedef struct SMetaData {
SEpSet *pEpSet; // qnode epset list
} SMetaData;
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
int16_t numOfColumns; // the number of columns
int32_t rowSize; // row size of the schema
} STableComInfo;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
typedef struct SCTableMeta {
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
} SCTableMeta;
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta.
*/
typedef struct STableMeta {
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
SSchema schema[];
} STableMeta;
typedef struct SCatalogCfg {
bool enableVgroupCache;
uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum;
} SCatalogCfg;
int32_t catalogInit(SCatalogCfg *cfg);
......@@ -96,19 +64,25 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle)
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version);
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList);
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup);
/**
* get cluster vgroup list.
* @pVgroupList - hash of vgroup list, key:vgId, value:SVgroupInfo
* @return
*/
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash);
int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta);
/**
......
......@@ -24,10 +24,49 @@ extern "C" {
typedef SVgroupListRspMsg SVgroupListInfo;
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
int16_t numOfColumns; // the number of columns
int32_t rowSize; // row size of the schema
} STableComInfo;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
* The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info.
*/
typedef struct SCTableMeta {
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
} SCTableMeta;
/*
* Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta.
*/
typedef struct STableMeta {
//BEGIN: KEEP THIS PART SAME WITH SCTableMeta
int32_t vgId:24;
int8_t tableType;
uint64_t uid;
uint64_t suid;
//END: KEEP THIS PART SAME WITH SCTableMeta
// if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
SSchema schema[];
} STableMeta;
typedef struct SDBVgroupInfo {
int32_t vgroupVersion;
SArray *vgId;
int32_t hashRange;
int32_t hashType;
} SDBVgroupInfo;
typedef struct SUseDbOutput {
......@@ -36,6 +75,13 @@ typedef struct SUseDbOutput {
SDBVgroupInfo *dbVgroup;
} SUseDbOutput;
typedef struct STableMetaOutput {
int32_t metaNum;
char ctbFname[TSDB_TABLE_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN];
SCTableMeta ctbMeta;
STableMeta *tbMeta;
} STableMetaOutput;
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
......
......@@ -24,16 +24,16 @@ extern "C" {
#include "common.h"
#include "tlog.h"
#define CTG_DEFAULT_CLUSTER_NUMBER 6
#define CTG_DEFAULT_VGROUP_NUMBER 100
#define CTG_DEFAULT_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
#define CTG_DEFAULT_INVALID_VERSION (-1)
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo*
SArray *arrayCache; // SVgroupInfo
SHashObj *cache; // key:vgId, value:SVgroupInfo
} SVgroupListCache;
typedef struct SDBVgroupCache {
......@@ -41,20 +41,23 @@ typedef struct SDBVgroupCache {
} SDBVgroupCache;
typedef struct STableMetaCache {
SHashObj *cache; //key:fulltablename, value:STableMeta
SHashObj *cache; //key:fulltablename, value:STableMeta
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache;
typedef struct SCatalog {
SVgroupListCache vgroupCache;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
} SCatalog;
typedef struct SCatalogMgmt {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node
SCatalogCfg cfg;
} SCatalogMgmt;
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
extern int32_t ctgDebugFlag;
......
......@@ -47,14 +47,14 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* exist) {
if (NULL == pCatalog->vgroupCache.arrayCache || pCatalog->vgroupCache.vgroupVersion < 0) {
int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) {
if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
if (pVgroupList) {
*pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache);
*pVgroupList = pCatalog->vgroupCache.cache;
}
*exist = 1;
......@@ -62,7 +62,6 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t*
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
if (NULL == pCatalog->dbCache.cache) {
*exist = 0;
......@@ -92,6 +91,7 @@ int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgrou
(*dbInfo)->vgroupVersion = info->vgroupVersion;
(*dbInfo)->hashRange = info->hashRange;
(*dbInfo)->hashType = info->hashType;
}
*exist = 1;
......@@ -130,17 +130,328 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
}
int32_t ctgGetTableMetaFromCache(SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));
if (NULL == tbMeta) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
if (tbMeta->tableType == TSDB_CHILD_TABLE) {
STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
if ((*stbMeta)->suid != tbMeta->suid) {
ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
*pTableMeta = calloc(1, metaSize);
if (NULL == *pTableMeta) {
ctgError("calloc size[%d] failed", metaSize);
return TSDB_CODE_CTG_MEM_ERROR;
}
memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta));
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
} else {
int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema);
*pTableMeta = calloc(1, metaSize);
if (NULL == *pTableMeta) {
ctgError("calloc size[%d] failed", metaSize);
return TSDB_CODE_CTG_MEM_ERROR;
}
memcpy(*pTableMeta, tbMeta, metaSize);
}
*exist = 1;
return TSDB_CODE_SUCCESS;
}
void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
epSet->inUse = 0;
epSet->numOfEps = vgroupInfo->numOfEps;
for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
}
}
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen);
if (code) {
return code;
}
SRpcMsg rpcMsg = {
.msgType = TSDB_MSG_TYPE_TABLE_META,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
SEpSet epSet;
ctgGenEpSet(&epSet, vgroupInfo);
rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("get table meta from mnode failed, error code:%d", rpcRsp.code);
return rpcRsp.code;
}
code = queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
return code;
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) {
switch (hashType) {
default:
*fp = MurmurHash3_32;
break;
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
SDBVgroupInfo *dbInfo = NULL;
int32_t code = 0;
CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo));
if (NULL == dbInfo) {
ctgWarn("db[%s] vgroup info not found", pDBName);
return TSDB_CODE_TSC_DB_NOT_SELECTED;
}
if (dbInfo->vgroupVersion < 0 || NULL == dbInfo->vgId) {
ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgId:%p", pDBName, dbInfo->vgroupVersion, dbInfo->vgId);
CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
int32_t vgNum = taosArrayGetSize(dbInfo->vgId);
if (vgNum <= 0) {
ctgError("db[%s] vgroup cache invalid, vgroup number:%p", vgNum);
CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
tableNameHashFp fp = NULL;
CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp));
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));
uint32_t hashUnit = dbInfo->hashRange / vgNum;
uint32_t vgId = hashValue / hashUnit;
SHashObj *vgroupHash = NULL;
CTG_ERR_JRET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
if (NULL == vgroupHash) {
ctgError("get empty vgroup cache");
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) {
ctgError("vgId[%d] not found in vgroup list", vgId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
_return:
if (dbInfo && dbInfo->vgId) {
taosArrayDestroy(dbInfo->vgId);
dbInfo->vgId = NULL;
}
tfree(dbInfo);
return code;
}
STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) {
assert(pChild != NULL);
int32_t total = pChild->numOfColumns + pChild->numOfTags;
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total);
pTableMeta->tableType = TSDB_SUPER_TABLE;
pTableMeta->tableInfo.numOfTags = pChild->numOfTags;
pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns;
pTableMeta->tableInfo.precision = pChild->precision;
pTableMeta->uid = pChild->suid;
pTableMeta->tversion = pChild->tversion;
pTableMeta->sversion = pChild->sversion;
memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total);
int32_t num = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < num; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
int32_t exist = 0;
if (!forceUpdate) {
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));
if (exist) {
return TSDB_CODE_SUCCESS;
}
}
CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName));
CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));
if (0 == exist) {
ctgError("get table meta from cache failed, but fetch succeed");
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateTableMetaCache(SCatalog *pCatalog, STableMetaOutput *output) {
if (output->metaNum != 1 && output->metaNum != 2) {
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
if (NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp");
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
if (NULL == pCatalog->tableCache.cache) {
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
return TSDB_CODE_CTG_MEM_ERROR;
}
}
if (NULL == pCatalog->tableCache.cache) {
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
return TSDB_CODE_CTG_MEM_ERROR;
}
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.stableCache) {
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
return TSDB_CODE_CTG_MEM_ERROR;
}
}
if (output->metaNum == 2) {
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
goto error_exit;
}
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
goto error_exit;
}
}
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, sizeof(*output->tbMeta)) != 0) {
ctgError("push table[%s] to table cache failed", output->tbFname);
goto error_exit;
}
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
goto error_exit;
}
}
return TSDB_CODE_SUCCESS;
error_exit:
if (pCatalog->vgroupCache.cache) {
taosHashCleanup(pCatalog->vgroupCache.cache);
pCatalog->vgroupCache.cache = NULL;
}
pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t catalogInit(SCatalogCfg *cfg) {
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER);
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
}
if (cfg) {
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
} else {
ctgMgmt.cfg.enableVgroupCache = true;
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
int32_t catalogGetHandle(const char *clusterId, SCatalog** catalogHandle) {
if (NULL == clusterId || NULL == catalogHandle) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
......@@ -190,7 +501,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) {
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) {
int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) {
if (NULL == pVgroup) {
ctgError("no valid vgroup list info to update");
return TSDB_CODE_CTG_INTERNAL_ERROR;
......@@ -200,22 +511,11 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup)
ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion);
return TSDB_CODE_CTG_INVALID_INPUT;
}
if (NULL == pCatalog->vgroupCache.arrayCache) {
pCatalog->vgroupCache.arrayCache = taosArrayInit(pVgroup->vgroupNum, sizeof(pVgroup->vgroupInfo[0]));
if (NULL == pCatalog->vgroupCache.arrayCache) {
ctgError("init array[%d] for cluster cache failed", pVgroup->vgroupNum);
return TSDB_CODE_CTG_MEM_ERROR;
}
} else {
taosArrayClear(pCatalog->vgroupCache.arrayCache);
}
if (NULL == pCatalog->vgroupCache.cache) {
pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_CACHE_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->vgroupCache.cache) {
ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_VGROUP_NUMBER);
ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_CACHE_VGROUP_NUMBER);
return TSDB_CODE_CTG_MEM_ERROR;
}
} else {
......@@ -224,13 +524,7 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup)
SVgroupInfo *vInfo = NULL;
for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) {
vInfo = taosArrayPush(pCatalog->vgroupCache.arrayCache, &pVgroup->vgroupInfo[i]);
if (NULL == vInfo) {
ctgError("push to vgroup array cache failed");
goto error_exit;
}
if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &vInfo, POINTER_BYTES) != 0) {
if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &pVgroup->vgroupInfo[i], sizeof(pVgroup->vgroupInfo[i])) != 0) {
ctgError("push to vgroup hash cache failed");
goto error_exit;
}
......@@ -241,11 +535,6 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup)
return TSDB_CODE_SUCCESS;
error_exit:
if (pCatalog->vgroupCache.arrayCache) {
taosArrayDestroy(pCatalog->vgroupCache.arrayCache);
pCatalog->vgroupCache.arrayCache = NULL;
}
if (pCatalog->vgroupCache.cache) {
taosHashCleanup(pCatalog->vgroupCache.cache);
pCatalog->vgroupCache.cache = NULL;
......@@ -256,15 +545,14 @@ error_exit:
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList) {
int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
int32_t exist = 0;
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist));
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist));
if (exist) {
return TSDB_CODE_SUCCESS;
......@@ -274,10 +562,10 @@ int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps,
CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup));
CTG_ERR_RET(catalogUpdateVgroup(pCatalog, pVgroup));
CTG_ERR_RET(catalogUpdateVgroupCache(pCatalog, pVgroup));
if (pVgroupList) {
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist));
if (pVgroupHash) {
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist));
}
if (0 == exist) {
......@@ -288,6 +576,7 @@ int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps,
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
if (NULL == pCatalog || NULL == dbName || NULL == version) {
return TSDB_CODE_CTG_INVALID_INPUT;
......@@ -309,7 +598,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
return TSDB_CODE_SUCCESS;
}
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
......@@ -324,9 +613,9 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
}
if (NULL == pCatalog->dbCache.cache) {
pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->dbCache.cache) {
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_DB_NUMBER);
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
return TSDB_CODE_CTG_MEM_ERROR;
}
}
......@@ -369,11 +658,11 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
if (DbOut.vgroupList) {
CTG_ERR_JRET(catalogUpdateVgroup(pCatalog, DbOut.vgroupList));
CTG_ERR_JRET(catalogUpdateVgroupCache(pCatalog, DbOut.vgroupList));
}
if (DbOut.dbVgroup) {
CTG_ERR_JRET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup));
CTG_ERR_JRET(catalogUpdateDBVgroupCache(pCatalog, dbName, DbOut.dbVgroup));
}
if (dbInfo) {
......@@ -388,37 +677,35 @@ _return:
return code;
}
int32_t catalogGetTableMeta(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta);
}
int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
SBuildTableMetaInput bInput = {0};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen);
if (code) {
return code;
}
SRpcMsg rpcMsg = {
.msgType = TSDB_MSG_TYPE_TABLE_META,
.pCont = msg,
.contLen = msgLen,
};
SVgroupInfo vgroupInfo = {0};
CTG_ERR_RET(ctgGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
SRpcMsg rpcRsp = {0};
STableMetaOutput output = {0};
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output));
tfree(output.tbMeta);
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) {
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
}
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList) {
}
......
......@@ -1464,29 +1464,6 @@ int32_t copyTagData(STagData* dst, const STagData* src) {
return 0;
}
STableMeta* createSuperTableMeta(STableMetaMsg* pChild) {
assert(pChild != NULL);
int32_t total = pChild->numOfColumns + pChild->numOfTags;
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total);
pTableMeta->tableType = TSDB_SUPER_TABLE;
pTableMeta->tableInfo.numOfTags = pChild->numOfTags;
pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns;
pTableMeta->tableInfo.precision = pChild->precision;
pTableMeta->uid = pChild->suid;
pTableMeta->tversion = pChild->tversion;
pTableMeta->sversion = pChild->sversion;
memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total);
int32_t num = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < num; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
uint32_t getTableMetaSize(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
......
......@@ -222,6 +222,13 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion;
pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange);
pOut->dbVgroup->hashType = htonl(pRsp->dbHashType);
if (pOut->dbVgroup->hashRange < 0) {
qError("invalid hashRange[%d] for db[%s]", pOut->dbVgroup->hashRange, pRsp->db);
code = TSDB_CODE_TSC_INVALID_INPUT;
goto _exit;
}
for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) {
*(vgIdList + i) = htonl(*(vgIdList + i));
......@@ -244,13 +251,134 @@ _exit:
return code;
}
static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
pMetaMsg->numOfTags = htonl(pMetaMsg->numOfTags);
pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = htonl(pMetaMsg->sversion);
pMetaMsg->tversion = htonl(pMetaMsg->tversion);
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
pMetaMsg->suid = htobe64(pMetaMsg->suid);
pMetaMsg->vgId = htonl(pMetaMsg->vgId);
if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && pMetaMsg->tableType != TSDB_NORMAL_TABLE) {
qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pMetaMsg->sversion < 0) {
qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pMetaMsg->tversion < 0) {
qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion);
return TSDB_CODE_TSC_INVALID_VALUE;
}
SSchema* pSchema = pMetaMsg->pSchema;
int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags;
for (int i = 0; i < numOfTotalCols; ++i) {
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
pSchema++;
}
if (pMetaMsg->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchema[0].colId);
return TSDB_CODE_TSC_INVALID_VALUE;
}
return TSDB_CODE_SUCCESS;
}
int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STableMeta **pMeta) {
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
STableMeta* pTableMeta = calloc(1, metaSize);
if (NULL == pTableMeta) {
qError("calloc size[%d] failed", metaSize);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType;
pTableMeta->uid = msg->suid;
pTableMeta->suid = msg->suid;
pTableMeta->sversion = msg->sversion;
pTableMeta->tversion = msg->tversion;
pTableMeta->tableInfo.numOfTags = msg->numOfTags;
pTableMeta->tableInfo.precision = msg->precision;
pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
for(int32_t i = 0; i < msg->numOfColumns; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total);
*pMeta = pTableMeta;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)msg;
int32_t code = queryConvertTableMetaMsg(pMetaMsg);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STableMetaOutput *pOut = (STableMetaOutput *)output;
if (!tIsValidSchema(pMetaMsg->pSchema, pMetaMsg->numOfColumns, pMetaMsg->numOfTags)) {
qError("validate table meta schema in rsp msg failed");
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
pOut->metaNum = 2;
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
pOut->ctbMeta.vgId = pMetaMsg->vgId;
pOut->ctbMeta.tableType = pMetaMsg->tableType;
pOut->ctbMeta.uid = pMetaMsg->tuid;
pOut->ctbMeta.suid = pMetaMsg->suid;
code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta);
} else {
pOut->metaNum = 1;
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta);
}
return code;
}
void msgInit() {
queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg;
queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg;
queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp;
queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册