提交 bf4efa77 编写于 作者: D dapan1121

feature/qnode

上级 5a383152
...@@ -111,7 +111,9 @@ int32_t tNameExtractFullName(const SName* name, char* dst) { ...@@ -111,7 +111,9 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
return -1; return -1;
} }
int32_t len = snprintf(dst, TSDB_DB_FNAME_LEN, "%d.%s", name->acctId, name->dbname); int32_t len = 0;
snprintf(dst, TSDB_DB_FNAME_LEN, "%d.%s", name->acctId, name->dbname);
size_t tnameLen = strlen(name->tname); size_t tnameLen = strlen(name->tname);
if (tnameLen > 0) { if (tnameLen > 0) {
......
...@@ -937,7 +937,8 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { ...@@ -937,7 +937,8 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
goto USE_DB_OVER; goto USE_DB_OVER;
} }
if (0 == strcmp(usedbReq.db, TSDB_INFORMATION_SCHEMA_DB)) { char *p = strchr(usedbReq.db, '.');
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN); memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
} else { } else {
pDb = mndAcquireDb(pMnode, usedbReq.db); pDb = mndAcquireDb(pMnode, usedbReq.db);
......
...@@ -198,11 +198,12 @@ typedef struct SCtgAction { ...@@ -198,11 +198,12 @@ typedef struct SCtgAction {
#define CTG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB) #define CTG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB) #define CTG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
#define CTG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB) #define CTG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB)
#define CTG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB)
#define CTG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0) #define CTG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_GEN_STB_FLAG(_isStb) ((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB) #define CTG_GEN_STB_FLAG(_isStb) ((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)
#define CTG_TBTYPE_MATCH(_flag, tbType) (CTG_IS_UNKNOWN_STB(_flag) || (CTG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) #define CTG_TBTYPE_MATCH(_flag, tbType) (CTG_IS_UNKNOWN_STB(_flag) || (CTG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_IS_INF_DBNAME(_sname) ((_sname)->dbname[0] == 'i' && 0 == strcmp((_sname)->dbname, TSDB_INFORMATION_SCHEMA_DB)) #define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB)))
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
......
...@@ -509,7 +509,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, ...@@ -509,7 +509,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
} }
int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) { int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist, int32_t flag) {
if (NULL == pCtg->dbCache) { if (NULL == pCtg->dbCache) {
*exist = 0; *exist = 0;
ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname); ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname);
...@@ -517,7 +517,11 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable ...@@ -517,7 +517,11 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
} }
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName); if (CTG_IS_INF_DB(flag)) {
strcpy(dbFName, pTableName->dbname);
} else {
tNameGetFullDbName(pTableName, dbFName);
}
*pTableMeta = NULL; *pTableMeta = NULL;
...@@ -590,15 +594,19 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable ...@@ -590,15 +594,19 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType) { int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType, int32_t flag) {
if (NULL == pCtg->dbCache) { if (NULL == pCtg->dbCache) {
ctgWarn("empty db cache, tbName:%s", pTableName->tname); ctgWarn("empty db cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName); if (CTG_IS_INF_DB(flag)) {
strcpy(dbFName, pTableName->dbname);
} else {
tNameGetFullDbName(pTableName, dbFName);
}
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) { if (NULL == dbCache) {
...@@ -1078,6 +1086,10 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { ...@@ -1078,6 +1086,10 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
if (CTG_IS_INF_DBNAME(dbFName)) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion))); CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId); ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
...@@ -1544,8 +1556,12 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -1544,8 +1556,12 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t exist = 0; int32_t exist = 0;
int32_t code = 0; int32_t code = 0;
if ((!forceUpdate) || (CTG_IS_INF_DBNAME(pTableName))) { if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist)); CTG_SET_INF_DB(flag);
}
if ((!forceUpdate) || (CTG_IS_INF_DB(flag))) {
CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist, flag));
if (exist && CTG_TBTYPE_MATCH(flag, (*pTableMeta)->tableType)) { if (exist && CTG_TBTYPE_MATCH(flag, (*pTableMeta)->tableType)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1555,7 +1571,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -1555,7 +1571,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
} else if (CTG_IS_UNKNOWN_STB(flag)) { } else if (CTG_IS_UNKNOWN_STB(flag)) {
int32_t tbType = 0; int32_t tbType = 0;
CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType)); CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType, flag));
CTG_SET_STB(flag, tbType); CTG_SET_STB(flag, tbType);
} }
...@@ -1588,7 +1604,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -1588,7 +1604,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
SName stbName = *pTableName; SName stbName = *pTableName;
strcpy(stbName.tname, output->tbName); strcpy(stbName.tname, output->tbName);
CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist)); CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist, flag));
if (0 == exist) { if (0 == exist) {
ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname); ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname);
continue; continue;
...@@ -1669,6 +1685,11 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) { ...@@ -1669,6 +1685,11 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
char *p = strchr(output->dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) {
memmove(output->dbFName, p + 1, strlen(p + 1));
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache)); CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
if (NULL == dbCache) { if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId); ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
...@@ -2232,6 +2253,11 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm ...@@ -2232,6 +2253,11 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) { if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
STableMeta *tbMeta = NULL; STableMeta *tbMeta = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -2316,6 +2342,11 @@ _return: ...@@ -2316,6 +2342,11 @@ _return:
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
CTG_API_ENTER(); CTG_API_ENTER();
if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL; SCtgDBCache* dbCache = NULL;
int32_t code = 0; int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0}; char db[TSDB_DB_FNAME_LEN] = {0};
......
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
namespace { namespace {
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta, extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
int32_t *exist); int32_t *exist, int32_t flag);
extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action); extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action);
extern "C" int32_t ctgDbgEnableDebug(char *option); extern "C" int32_t ctgDbgEnableDebug(char *option);
...@@ -243,6 +243,8 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) { ...@@ -243,6 +243,8 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) {
rspMsg->suid = ctgTestSuid + 1; rspMsg->suid = ctgTestSuid + 1;
rspMsg->tuid = ctgTestSuid + 1; rspMsg->tuid = ctgTestSuid + 1;
rspMsg->vgId = 1; rspMsg->vgId = 1;
rspMsg->pSchemas = (SSchema *)calloc(rspMsg->numOfTags + rspMsg->numOfColumns, sizeof(SSchema));
SSchema *s = NULL; SSchema *s = NULL;
s = &rspMsg->pSchemas[0]; s = &rspMsg->pSchemas[0];
...@@ -770,7 +772,7 @@ void *ctgTestGetCtableMetaThread(void *param) { ...@@ -770,7 +772,7 @@ void *ctgTestGetCtableMetaThread(void *param) {
strcpy(cn.tname, ctgTestCTablename); strcpy(cn.tname, ctgTestCTablename);
while (!ctgTestStop) { while (!ctgTestStop) {
code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &exist); code = ctgGetTableMetaFromCache(pCtg, &cn, &tbMeta, &exist, 0);
if (code || 0 == exist) { if (code || 0 == exist) {
assert(0); assert(0);
} }
...@@ -1290,6 +1292,7 @@ TEST(tableMeta, updateStbMeta) { ...@@ -1290,6 +1292,7 @@ TEST(tableMeta, updateStbMeta) {
code = catalogUpdateSTableMeta(pCtg, &rsp); code = catalogUpdateSTableMeta(pCtg, &rsp);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
tfree(rsp.pSchemas);
while (true) { while (true) {
uint64_t n = 0; uint64_t n = 0;
......
...@@ -198,7 +198,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isSuperTable, STabl ...@@ -198,7 +198,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isSuperTable, STabl
} }
int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) { int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
int32_t code = -1; int32_t code = 0;
STableMetaRsp metaRsp = {0}; STableMetaRsp metaRsp = {0};
if (NULL == output || NULL == msg || msgSize <= 0) { if (NULL == output || NULL == msg || msgSize <= 0) {
...@@ -216,7 +216,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) { ...@@ -216,7 +216,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
goto PROCESS_META_OVER; goto PROCESS_META_OVER;
} }
if (!tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) { if (0 != strcmp(metaRsp.dbFName, TSDB_INFORMATION_SCHEMA_DB) && !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) {
code = TSDB_CODE_TSC_INVALID_VALUE; code = TSDB_CODE_TSC_INVALID_VALUE;
goto PROCESS_META_OVER; goto PROCESS_META_OVER;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册