未验证 提交 abdf66e6 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #9119 from taosdata/catalog_dev

Catalog dev
......@@ -32,7 +32,7 @@ extern "C" {
struct SCatalog;
typedef struct SCatalogReq {
char clusterId[TSDB_CLUSTER_ID_LEN]; //????
char dbName[TSDB_DB_NAME_LEN];
SArray *pTableName; // table full name
SArray *pUdf; // udf name
bool qNodeRequired; // valid qnode
......@@ -82,7 +82,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta);
/**
......@@ -91,7 +91,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const
* @pVgroupList - array of SVgroupInfo
* @return
*/
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList);
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList);
/**
......@@ -103,7 +103,7 @@ int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSe
* @param pMetaData
* @return
*/
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
......
......@@ -20,6 +20,7 @@
extern "C" {
#endif
extern bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
......
......@@ -16,6 +16,7 @@
#include "catalogInt.h"
#include "trpc.h"
#include "query.h"
#include "tname.h"
SCatalogMgmt ctgMgmt = {0};
......@@ -47,7 +48,7 @@ int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSe
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) {
int32_t ctgGetVgroupFromCache(struct SCatalog* pCatalog, SHashObj** pVgroupList, int32_t* exist) {
if (NULL == pCatalog->vgroupCache.cache || pCatalog->vgroupCache.vgroupVersion < 0) {
*exist = 0;
return TSDB_CODE_SUCCESS;
......@@ -62,7 +63,7 @@ int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SHashObj** pVgroupList, int32_
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetDBVgroupFromCache(SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, int32_t *exist) {
if (NULL == pCatalog->dbCache.cache) {
*exist = 0;
return TSDB_CODE_SUCCESS;
......@@ -130,13 +131,13 @@ int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEp
}
int32_t ctgGetTableMetaFromCache(SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
if (NULL == pCatalog->tableCache.cache) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
char tbFullName[TSDB_TABLE_FNAME_LEN];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);
......@@ -200,7 +201,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
return TSDB_CODE_CTG_INVALID_INPUT;
}
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
char tbFullName[TSDB_TABLE_FNAME_LEN];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
......@@ -251,7 +252,54 @@ int32_t ctgGetHashFunction(int32_t hashType, tableNameHashFp *fp) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
int32_t ctgGetVgroupFromVgId(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, int32_t vgId, SVgroupInfo *pVgroup) {
SHashObj *vgroupHash = NULL;
CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
if (NULL == vgroupHash) {
ctgError("get empty vgroup cache");
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) {
ctgError("vgId[%d] not found in vgroup list", vgId);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetVgroupFromVgIdBatch(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SArray* vgIds, SArray* vgroupList) {
SHashObj *vgroupHash = NULL;
SVgroupInfo pVgroup = {0};
int32_t vgIdNum = taosArrayGetSize(vgIds);
CTG_ERR_RET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
if (NULL == vgroupHash) {
ctgError("get empty vgroup cache");
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
for (int32_t i = 0; i < vgIdNum; ++i) {
int32_t *vgId = taosArrayGet(vgIds, i);
if (NULL == taosHashGetClone(vgroupHash, vgId, sizeof(*vgId), &pVgroup)) {
ctgError("vgId[%d] not found in vgroup list", vgId);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
if (NULL == taosArrayPush(vgroupList, &pVgroup)) {
ctgError("push vgroup to array failed, idx:%d", i);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableHashVgroup(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
SDBVgroupInfo *dbInfo = NULL;
int32_t code = 0;
......@@ -269,7 +317,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm
int32_t vgNum = taosArrayGetSize(dbInfo->vgId);
if (vgNum <= 0) {
ctgError("db[%s] vgroup cache invalid, vgroup number:%p", vgNum);
ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
}
......@@ -277,7 +325,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm
CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashType, &fp));
char tbFullName[TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN + 1];
char tbFullName[TSDB_TABLE_FNAME_LEN];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
......@@ -285,18 +333,7 @@ int32_t ctgGetTableHashVgroup(SCatalog *pCatalog, void *pRpc, const SEpSet *pMgm
uint32_t hashUnit = dbInfo->hashRange / vgNum;
uint32_t vgId = hashValue / hashUnit;
SHashObj *vgroupHash = NULL;
CTG_ERR_JRET(catalogGetVgroup(pCatalog, pRpc, pMgmtEps, &vgroupHash));
if (NULL == vgroupHash) {
ctgError("get empty vgroup cache");
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (NULL == taosHashGetClone(vgroupHash, &vgId, sizeof(vgId), pVgroup)) {
ctgError("vgId[%d] not found in vgroup list", vgId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, vgId, pVgroup));
_return:
if (dbInfo && dbInfo->vgId) {
......@@ -335,7 +372,7 @@ STableMeta* ctgCreateSTableMeta(STableMetaMsg* pChild) {
return pTableMeta;
}
int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
......@@ -363,7 +400,7 @@ int32_t ctgGetTableMetaImpl(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtE
}
int32_t ctgUpdateTableMetaCache(SCatalog *pCatalog, STableMetaOutput *output) {
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
if (output->metaNum != 1 && output->metaNum != 2) {
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
return TSDB_CODE_CTG_INTERNAL_ERROR;
......@@ -433,7 +470,6 @@ error_exit:
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t catalogInit(SCatalogCfg *cfg) {
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
......@@ -451,7 +487,7 @@ int32_t catalogInit(SCatalogCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetHandle(const char *clusterId, SCatalog** catalogHandle) {
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
if (NULL == clusterId || NULL == catalogHandle) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
......@@ -545,7 +581,7 @@ error_exit:
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) {
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SHashObj** pVgroupHash) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
......@@ -677,7 +713,7 @@ _return:
return code;
}
int32_t catalogGetTableMeta(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta);
}
......@@ -701,21 +737,97 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
return TSDB_CODE_SUCCESS;
}
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta* pTableMeta) {
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
}
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList) {
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
STableMeta *tbMeta = NULL;
int32_t code = 0;
SVgroupInfo vgroupInfo = {0};
SDBVgroupInfo *dbVgroup = NULL;
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
CTG_ERR_JRET(ctgGetVgroupFromVgIdBatch(pCatalog, pRpc, pMgmtEps, dbVgroup->vgId, pVgroupList));
} else {
CTG_ERR_JRET(ctgGetVgroupFromVgId(pCatalog, pRpc, pMgmtEps, tbMeta->vgId, &vgroupInfo));
if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) {
ctgError("push vgroupInfo to array failed");
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
}
_return:
tfree(tbMeta);
if (dbVgroup && dbVgroup->vgId) {
taosArrayDestroy(dbVgroup->vgId);
dbVgroup->vgId = NULL;
}
tfree(dbVgroup);
return code;
}
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
int32_t code = 0;
if (pReq->pTableName) {
char dbName[TSDB_FULL_DB_NAME_LEN];
int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
if (tbNum > 0) {
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit num[%d] failed", tbNum);
return TSDB_CODE_CTG_MEM_ERROR;
}
}
for (int32_t i = 0; i < tbNum; ++i) {
SName *name = taosArrayGet(pReq->pTableName, i);
STableMeta *pTableMeta = NULL;
snprintf(dbName, sizeof(dbName), "%s.%s", name->acctId, name->dbname);
CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta));
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
ctgError("taosArrayPush failed, idx:%d", i);
tfree(pTableMeta);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
}
}
return TSDB_CODE_SUCCESS;
_return:
if (pRsp->pTableMeta) {
int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
for (int32_t i = 0; i < aSize; ++i) {
STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
tfree(pMeta);
}
taosArrayDestroy(pRsp->pTableMeta);
}
return 0;
return code;
}
void catalogDestroy(void) {
......
......@@ -4088,7 +4088,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
}
// load the meta data from catalog
code = catalogGetAllMeta(pCatalog, NULL, &req, &data);
code = catalogGetAllMeta(pCatalog, NULL, NULL, &req, &data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册