diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index caf872689c2117c74efcf2c61782a8291a9fa8b8..fd6c0620e80e55e5015878f623457b629c2bd46d 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -839,7 +839,7 @@ typedef struct { int32_t tversion; uint64_t tuid; uint64_t suid; - SVgroupMsg vgroup; + int32_t vgId; SSchema pSchema[]; } STableMetaMsg; @@ -867,6 +867,7 @@ typedef struct { int32_t dbVgroupVersion; int32_t dbVgroupNum; int32_t dbHashRange; + int32_t dbHashType; SVgroupInfo vgroupInfo[]; //int32_t vgIdList[]; } SUseDbRspMsg; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f9d3b3c8c18c627df6decfbca87e969aa759fe27..38965c6ba98efb1c9c5de76c25e4a18312980b23 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -45,42 +45,10 @@ typedef struct SMetaData { SEpSet *pEpSet; // qnode epset list } SMetaData; -typedef struct STableComInfo { - uint8_t numOfTags; // the number of tags in schema - uint8_t precision; // the number of precision - int16_t numOfColumns; // the number of columns - int32_t rowSize; // row size of the schema -} STableComInfo; - -/* - * ASSERT(sizeof(SCTableMeta) == 24) - * ASSERT(tableType == TSDB_CHILD_TABLE) - * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. - */ -typedef struct SCTableMeta { - int32_t vgId:24; - int8_t tableType; - uint64_t uid; - uint64_t suid; -} SCTableMeta; - -/* - * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta. - */ -typedef struct STableMeta { - int32_t vgId:24; - int8_t tableType; - uint64_t uid; - uint64_t suid; - // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info - int16_t sversion; - int16_t tversion; - STableComInfo tableInfo; - SSchema schema[]; -} STableMeta; - typedef struct SCatalogCfg { - + bool enableVgroupCache; + uint32_t maxTblCacheNum; + uint32_t maxDBCacheNum; } SCatalogCfg; int32_t catalogInit(SCatalogCfg *cfg); @@ -96,19 +64,25 @@ int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version); -int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList); -int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); + +/** + * get cluster vgroup list. + * @pVgroupList - hash of vgroup list, key:vgId, value:SVgroupInfo + * @return + */ +int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash); +int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup); int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo); -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta); -int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta); -int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta); +int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta); +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName); +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta); /** diff --git a/include/libs/query/query.h b/include/libs/query/query.h index 02ae70887484428710cd224ae389b0de8b18c9bf..bfe2db6a611a6aaa71328136fc163746347fdc31 100644 --- a/include/libs/query/query.h +++ b/include/libs/query/query.h @@ -24,10 +24,49 @@ extern "C" { typedef SVgroupListRspMsg SVgroupListInfo; +typedef struct STableComInfo { + uint8_t numOfTags; // the number of tags in schema + uint8_t precision; // the number of precision + int16_t numOfColumns; // the number of columns + int32_t rowSize; // row size of the schema +} STableComInfo; + +/* + * ASSERT(sizeof(SCTableMeta) == 24) + * ASSERT(tableType == TSDB_CHILD_TABLE) + * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. + */ +typedef struct SCTableMeta { + int32_t vgId:24; + int8_t tableType; + uint64_t uid; + uint64_t suid; +} SCTableMeta; + +/* + * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta. + */ +typedef struct STableMeta { + //BEGIN: KEEP THIS PART SAME WITH SCTableMeta + int32_t vgId:24; + int8_t tableType; + uint64_t uid; + uint64_t suid; + //END: KEEP THIS PART SAME WITH SCTableMeta + + // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info + int16_t sversion; + int16_t tversion; + STableComInfo tableInfo; + SSchema schema[]; +} STableMeta; + + typedef struct SDBVgroupInfo { int32_t vgroupVersion; SArray *vgId; int32_t hashRange; + int32_t hashType; } SDBVgroupInfo; typedef struct SUseDbOutput { @@ -36,6 +75,13 @@ typedef struct SUseDbOutput { SDBVgroupInfo *dbVgroup; } SUseDbOutput; +typedef struct STableMetaOutput { + int32_t metaNum; + char ctbFname[TSDB_TABLE_FNAME_LEN]; + char tbFname[TSDB_TABLE_FNAME_LEN]; + SCTableMeta ctbMeta; + STableMeta *tbMeta; +} STableMetaOutput; extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index a08b64f9a91b8db5b16bfae330dab3dd9b9388c8..455c82b1bc0e480d89136a37346f43d6ede319aa 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -24,16 +24,16 @@ extern "C" { #include "common.h" #include "tlog.h" -#define CTG_DEFAULT_CLUSTER_NUMBER 6 -#define CTG_DEFAULT_VGROUP_NUMBER 100 -#define CTG_DEFAULT_DB_NUMBER 20 +#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 +#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 +#define CTG_DEFAULT_CACHE_DB_NUMBER 20 +#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000 #define CTG_DEFAULT_INVALID_VERSION (-1) typedef struct SVgroupListCache { int32_t vgroupVersion; - SHashObj *cache; // key:vgId, value:SVgroupInfo* - SArray *arrayCache; // SVgroupInfo + SHashObj *cache; // key:vgId, value:SVgroupInfo } SVgroupListCache; typedef struct SDBVgroupCache { @@ -41,20 +41,23 @@ typedef struct SDBVgroupCache { } SDBVgroupCache; typedef struct STableMetaCache { - SHashObj *cache; //key:fulltablename, value:STableMeta + SHashObj *cache; //key:fulltablename, value:STableMeta + SHashObj *stableCache; //key:suid, value:STableMeta* } STableMetaCache; typedef struct SCatalog { SVgroupListCache vgroupCache; - SDBVgroupCache dbCache; - STableMetaCache tableCache; + SDBVgroupCache dbCache; + STableMetaCache tableCache; } SCatalog; typedef struct SCatalogMgmt { void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node + SCatalogCfg cfg; } SCatalogMgmt; +typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); extern int32_t ctgDebugFlag; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 92b60945291184afc39c151b6d7067acef8c8192..9921a2696b8c3c2798268bfa21444615f6c99c5f 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -47,14 +47,14 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* exist) { - if (NULL == pCatalog->vgroupCache.arrayCache || pCatalog->vgroupCache.vgroupVersion < 0) { +int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) { + if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) { *exist = 0; return TSDB_CODE_SUCCESS; } if (pVgroupList) { - *pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache); + *pVgroupList = pCatalog->vgroupCache.cache; } *exist = 1; @@ -62,7 +62,6 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* return TSDB_CODE_SUCCESS; } - int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) { if (NULL == pCatalog->dbCache.cache) { *exist = 0; @@ -92,6 +91,7 @@ int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgrou (*dbInfo)->vgroupVersion = info->vgroupVersion; (*dbInfo)->hashRange = info->hashRange; + (*dbInfo)->hashType = info->hashType; } *exist = 1; @@ -130,17 +130,328 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp } +int32_t ctgGetTableMetaFromCache(SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) { + if (NULL == pCatalog->tableCache.cache) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + + snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName); + + STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName)); + + if (NULL == tbMeta) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + if (tbMeta->tableType == TSDB_CHILD_TABLE) { + STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid)); + if (NULL == stbMeta || NULL == *stbMeta) { + *exist = 0; + return TSDB_CODE_SUCCESS; + } + + if ((*stbMeta)->suid != tbMeta->suid) { + ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema); + *pTableMeta = calloc(1, metaSize); + if (NULL == *pTableMeta) { + ctgError("calloc size[%d] failed", metaSize); + return TSDB_CODE_CTG_MEM_ERROR; + } + + memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta)); + memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta)); + } else { + int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema); + *pTableMeta = calloc(1, metaSize); + if (NULL == *pTableMeta) { + ctgError("calloc size[%d] failed", metaSize); + return TSDB_CODE_CTG_MEM_ERROR; + } + + memcpy(*pTableMeta, tbMeta, metaSize); + } + + *exist = 1; + + return TSDB_CODE_SUCCESS; +} + +void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { + epSet->inUse = 0; + epSet->numOfEps = vgroupInfo->numOfEps; + + for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) { + memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i])); + memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i])); + } +} + + +int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + + snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); + + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; + char *msg = NULL; + SEpSet *pVnodeEpSet = NULL; + int32_t msgLen = 0; + + int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); + if (code) { + return code; + } + + SRpcMsg rpcMsg = { + .msgType = TSDB_MSG_TYPE_TABLE_META, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + SEpSet epSet; + + ctgGenEpSet(&epSet, vgroupInfo); + + rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp); + + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("get table meta from mnode failed, error code:%d", rpcRsp.code); + return rpcRsp.code; + } + + code = queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + return code; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) { + switch (hashType) { + default: + *fp = MurmurHash3_32; + break; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) { + SDBVgroupInfo *dbInfo = NULL; + int32_t code = 0; + + CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbInfo)); + + if (NULL == dbInfo) { + ctgWarn("db[%s] vgroup info not found", pDBName); + return TSDB_CODE_TSC_DB_NOT_SELECTED; + } + + if (dbInfo->vgroupVersion < 0 || NULL == dbInfo->vgId) { + ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgId:%p", pDBName, dbInfo->vgroupVersion, dbInfo->vgId); + CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); + } + + int32_t vgNum = taosArrayGetSize(dbInfo->vgId); + if (vgNum <= 0) { + ctgError("db[%s] vgroup cache invalid, vgroup number:%p", vgNum); + CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED); + } + + tableNameHashFp fp = NULL; + + CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp)); + + char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1]; + + snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName); + + uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName)); + uint32_t hashUnit = dbInfo->hashRange / vgNum; + uint32_t vgId = hashValue / hashUnit; + + SHashObj *vgroupHash = NULL; + + CTG_ERR_JRET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash)); + if (NULL == vgroupHash) { + ctgError("get empty vgroup cache"); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) { + ctgError("vgId[%d] not found in vgroup list", vgId); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + +_return: + if (dbInfo && dbInfo->vgId) { + taosArrayDestroy(dbInfo->vgId); + dbInfo->vgId = NULL; + } + + tfree(dbInfo); + + return code; +} + + + +STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) { + assert(pChild != NULL); + int32_t total = pChild->numOfColumns + pChild->numOfTags; + + STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total); + pTableMeta->tableType = TSDB_SUPER_TABLE; + pTableMeta->tableInfo.numOfTags = pChild->numOfTags; + pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns; + pTableMeta->tableInfo.precision = pChild->precision; + + pTableMeta->uid = pChild->suid; + pTableMeta->tversion = pChild->tversion; + pTableMeta->sversion = pChild->sversion; + + memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total); + + int32_t num = pTableMeta->tableInfo.numOfColumns; + for(int32_t i = 0; i < num; ++i) { + pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; + } + + return pTableMeta; +} + +int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) { + if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { + return TSDB_CODE_CTG_INVALID_INPUT; + } + + int32_t exist = 0; + + if (!forceUpdate) { + CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist)); + + if (exist) { + return TSDB_CODE_SUCCESS; + } + } + + CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName)); + + CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist)); + + if (0 == exist) { + ctgError("get table meta from cache failed, but fetch succeed"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgUpdateTableMetaCache(SCatalog *pCatalog, STableMetaOutput *output) { + if (output->metaNum != 1 && output->metaNum != 2) { + ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (NULL == output->tbMeta) { + ctgError("no valid table meta got from meta rsp"); + return TSDB_CODE_CTG_INTERNAL_ERROR; + } + + if (NULL == pCatalog->tableCache.cache) { + pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.cache) { + ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + } + + if (NULL == pCatalog->tableCache.cache) { + pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.cache) { + ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + + pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + if (NULL == pCatalog->tableCache.stableCache) { + ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); + return TSDB_CODE_CTG_MEM_ERROR; + } + } + + if (output->metaNum == 2) { + if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { + ctgError("push ctable[%s] to table cache failed", output->ctbFname); + goto error_exit; + } + + if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { + ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); + goto error_exit; + } + } + + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, sizeof(*output->tbMeta)) != 0) { + ctgError("push table[%s] to table cache failed", output->tbFname); + goto error_exit; + } + + if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { + if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) { + ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); + goto error_exit; + } + } + + return TSDB_CODE_SUCCESS; + +error_exit: + if (pCatalog->vgroupCache.cache) { + taosHashCleanup(pCatalog->vgroupCache.cache); + pCatalog->vgroupCache.cache = NULL; + } + + pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION; + + return TSDB_CODE_CTG_INTERNAL_ERROR; +} + + int32_t catalogInit(SCatalogCfg *cfg) { - ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == ctgMgmt.pCluster) { - CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER); + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); + } + + if (cfg) { + memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg)); + } else { + ctgMgmt.cfg.enableVgroupCache = true; + ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; + ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; } return TSDB_CODE_SUCCESS; } - -int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) { +int32_t catalogGetHandle(const char *clusterId, SCatalog** catalogHandle) { if (NULL == clusterId || NULL == catalogHandle) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -190,7 +501,7 @@ int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) { -int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { +int32_t catalogUpdateVgroupCache(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) { if (NULL == pVgroup) { ctgError("no valid vgroup list info to update"); return TSDB_CODE_CTG_INTERNAL_ERROR; @@ -200,22 +511,11 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion); return TSDB_CODE_CTG_INVALID_INPUT; } - - - if (NULL == pCatalog->vgroupCache.arrayCache) { - pCatalog->vgroupCache.arrayCache = taosArrayInit(pVgroup->vgroupNum, sizeof(pVgroup->vgroupInfo[0])); - if (NULL == pCatalog->vgroupCache.arrayCache) { - ctgError("init array[%d] for cluster cache failed", pVgroup->vgroupNum); - return TSDB_CODE_CTG_MEM_ERROR; - } - } else { - taosArrayClear(pCatalog->vgroupCache.arrayCache); - } if (NULL == pCatalog->vgroupCache.cache) { - pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_CACHE_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->vgroupCache.cache) { - ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_VGROUP_NUMBER); + ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_CACHE_VGROUP_NUMBER); return TSDB_CODE_CTG_MEM_ERROR; } } else { @@ -224,13 +524,7 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) SVgroupInfo *vInfo = NULL; for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) { - vInfo = taosArrayPush(pCatalog->vgroupCache.arrayCache, &pVgroup->vgroupInfo[i]); - if (NULL == vInfo) { - ctgError("push to vgroup array cache failed"); - goto error_exit; - } - - if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &vInfo, POINTER_BYTES) != 0) { + if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &pVgroup->vgroupInfo[i], sizeof(pVgroup->vgroupInfo[i])) != 0) { ctgError("push to vgroup hash cache failed"); goto error_exit; } @@ -241,11 +535,6 @@ int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) return TSDB_CODE_SUCCESS; error_exit: - if (pCatalog->vgroupCache.arrayCache) { - taosArrayDestroy(pCatalog->vgroupCache.arrayCache); - pCatalog->vgroupCache.arrayCache = NULL; - } - if (pCatalog->vgroupCache.cache) { taosHashCleanup(pCatalog->vgroupCache.cache); pCatalog->vgroupCache.cache = NULL; @@ -256,15 +545,14 @@ error_exit: return TSDB_CODE_CTG_INTERNAL_ERROR; } - -int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList) { +int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) { if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) { return TSDB_CODE_CTG_INVALID_INPUT; } int32_t exist = 0; - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); if (exist) { return TSDB_CODE_SUCCESS; @@ -274,10 +562,10 @@ int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup)); - CTG_ERR_RET(catalogUpdateVgroup(pCatalog, pVgroup)); + CTG_ERR_RET(catalogUpdateVgroupCache(pCatalog, pVgroup)); - if (pVgroupList) { - CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist)); + if (pVgroupHash) { + CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupHash, &exist)); } if (0 == exist) { @@ -288,6 +576,7 @@ int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, return TSDB_CODE_SUCCESS; } + int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) { if (NULL == pCatalog || NULL == dbName || NULL == version) { return TSDB_CODE_CTG_INVALID_INPUT; @@ -309,7 +598,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { return TSDB_CODE_CTG_INVALID_INPUT; } @@ -324,9 +613,9 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB } if (NULL == pCatalog->dbCache.cache) { - pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->dbCache.cache) { - ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_DB_NUMBER); + ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); return TSDB_CODE_CTG_MEM_ERROR; } } @@ -369,11 +658,11 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); if (DbOut.vgroupList) { - CTG_ERR_JRET(catalogUpdateVgroup(pCatalog, DbOut.vgroupList)); + CTG_ERR_JRET(catalogUpdateVgroupCache(pCatalog, DbOut.vgroupList)); } if (DbOut.dbVgroup) { - CTG_ERR_JRET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup)); + CTG_ERR_JRET(catalogUpdateDBVgroupCache(pCatalog, dbName, DbOut.dbVgroup)); } if (dbInfo) { @@ -388,37 +677,35 @@ _return: return code; } +int32_t catalogGetTableMeta(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { + return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta); +} - -int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) { - if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { +int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) { + if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) { return TSDB_CODE_CTG_INVALID_INPUT; } - SBuildTableMetaInput bInput = {0}; - char *msg = NULL; - SEpSet *pVnodeEpSet = NULL; - int32_t msgLen = 0; - - int32_t code = queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen); - if (code) { - return code; - } - - SRpcMsg rpcMsg = { - .msgType = TSDB_MSG_TYPE_TABLE_META, - .pCont = msg, - .contLen = msgLen, - }; + SVgroupInfo vgroupInfo = {0}; + + CTG_ERR_RET(ctgGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo)); - SRpcMsg rpcRsp = {0}; + STableMetaOutput output = {0}; + + CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output)); - rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output)); + tfree(output.tbMeta); + return TSDB_CODE_SUCCESS; } -int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) { +int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta) { + return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); +} + +int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList) { } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 28d01b9e66f35d38abca57a7fbe9504e61ee9647..7fcb7ea3047d1baaece2c5b7c9e89d83d66012e8 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -1464,29 +1464,6 @@ int32_t copyTagData(STagData* dst, const STagData* src) { return 0; } -STableMeta* createSuperTableMeta(STableMetaMsg* pChild) { - assert(pChild != NULL); - int32_t total = pChild->numOfColumns + pChild->numOfTags; - - STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * total); - pTableMeta->tableType = TSDB_SUPER_TABLE; - pTableMeta->tableInfo.numOfTags = pChild->numOfTags; - pTableMeta->tableInfo.numOfColumns = pChild->numOfColumns; - pTableMeta->tableInfo.precision = pChild->precision; - - pTableMeta->uid = pChild->suid; - pTableMeta->tversion = pChild->tversion; - pTableMeta->sversion = pChild->sversion; - - memcpy(pTableMeta->schema, pChild->pSchema, sizeof(SSchema) * total); - - int32_t num = pTableMeta->tableInfo.numOfColumns; - for(int32_t i = 0; i < num; ++i) { - pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; - } - - return pTableMeta; -} uint32_t getTableMetaSize(const STableMeta* pTableMeta) { assert(pTableMeta != NULL); diff --git a/source/libs/query/src/querymsg.c b/source/libs/query/src/querymsg.c index 8f35fd9c3ef43732dfae9c19423baa91306cdc13..c5864fd41b5ed9e2f632ab2d43d73cde660da25f 100644 --- a/source/libs/query/src/querymsg.c +++ b/source/libs/query/src/querymsg.c @@ -222,6 +222,13 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { pOut->dbVgroup->vgroupVersion = pRsp->dbVgroupVersion; pOut->dbVgroup->hashRange = htonl(pRsp->dbHashRange); + pOut->dbVgroup->hashType = htonl(pRsp->dbHashType); + + if (pOut->dbVgroup->hashRange < 0) { + qError("invalid hashRange[%d] for db[%s]", pOut->dbVgroup->hashRange, pRsp->db); + code = TSDB_CODE_TSC_INVALID_INPUT; + goto _exit; + } for (int32_t i = 0; i < pRsp->dbVgroupNum; ++i) { *(vgIdList + i) = htonl(*(vgIdList + i)); @@ -244,13 +251,134 @@ _exit: return code; } +static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { + pMetaMsg->numOfTags = htonl(pMetaMsg->numOfTags); + pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns); + pMetaMsg->sversion = htonl(pMetaMsg->sversion); + pMetaMsg->tversion = htonl(pMetaMsg->tversion); + pMetaMsg->tuid = htobe64(pMetaMsg->tuid); + pMetaMsg->suid = htobe64(pMetaMsg->suid); + pMetaMsg->vgId = htonl(pMetaMsg->vgId); + + if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) { + qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) { + qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && pMetaMsg->tableType != TSDB_NORMAL_TABLE) { + qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->sversion < 0) { + qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->tversion < 0) { + qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + SSchema* pSchema = pMetaMsg->pSchema; + + int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags; + for (int i = 0; i < numOfTotalCols; ++i) { + pSchema->bytes = htonl(pSchema->bytes); + pSchema->colId = htonl(pSchema->colId); + + pSchema++; + } + + if (pMetaMsg->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { + qError("invalid colId[%d] for the first column in table meta rsp msg", pMetaMsg->pSchema[0].colId); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STableMeta **pMeta) { + int32_t total = msg->numOfColumns + msg->numOfTags; + int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total; + + STableMeta* pTableMeta = calloc(1, metaSize); + if (NULL == pTableMeta) { + qError("calloc size[%d] failed", metaSize); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType; + pTableMeta->uid = msg->suid; + pTableMeta->suid = msg->suid; + pTableMeta->sversion = msg->sversion; + pTableMeta->tversion = msg->tversion; + + pTableMeta->tableInfo.numOfTags = msg->numOfTags; + pTableMeta->tableInfo.precision = msg->precision; + pTableMeta->tableInfo.numOfColumns = msg->numOfColumns; + + for(int32_t i = 0; i < msg->numOfColumns; ++i) { + pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; + } + + memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total); + + *pMeta = pTableMeta; + + return TSDB_CODE_SUCCESS; +} + + +int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { + STableMetaMsg *pMetaMsg = (STableMetaMsg *)msg; + int32_t code = queryConvertTableMetaMsg(pMetaMsg); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + STableMetaOutput *pOut = (STableMetaOutput *)output; + + if (!tIsValidSchema(pMetaMsg->pSchema, pMetaMsg->numOfColumns, pMetaMsg->numOfTags)) { + qError("validate table meta schema in rsp msg failed"); + return TSDB_CODE_TSC_INVALID_VALUE; + } + + if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { + pOut->metaNum = 2; + + memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); + memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); + + pOut->ctbMeta.vgId = pMetaMsg->vgId; + pOut->ctbMeta.tableType = pMetaMsg->tableType; + pOut->ctbMeta.uid = pMetaMsg->tuid; + pOut->ctbMeta.suid = pMetaMsg->suid; + + code = queryCreateTableMetaFromMsg(pMetaMsg, true, &pOut->tbMeta); + } else { + pOut->metaNum = 1; + + memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); + + code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta); + } + + return code; +} + void msgInit() { queryBuildMsg[TSDB_MSG_TYPE_TABLE_META] = queryBuildTableMetaReqMsg; queryBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = queryBuildVgroupListReqMsg; queryBuildMsg[TSDB_MSG_TYPE_USE_DB] = queryBuildUseDbMsg; - //tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp; + queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = queryProcessTableMetaRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = queryProcessVgroupListRsp; queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB] = queryProcessUseDBRsp;