提交 94b50315 编写于 作者: D dapan1121

feature/qnode

上级 4766deb2
...@@ -113,6 +113,13 @@ extern const int32_t TYPE_BYTES[15]; ...@@ -113,6 +113,13 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_INS_TABLE_BNODES "bnodes" #define TSDB_INS_TABLE_BNODES "bnodes"
#define TSDB_INS_TABLE_SNODES "snodes" #define TSDB_INS_TABLE_SNODES "snodes"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_TOPICS "topics"
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
#define TSDB_INDEX_TYPE_SMA "SMA" #define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT" #define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
......
...@@ -43,8 +43,7 @@ extern "C" { ...@@ -43,8 +43,7 @@ extern "C" {
} \ } \
} while (0) } while (0)
//#define HEARTBEAT_INTERVAL 1500 // ms #define HEARTBEAT_INTERVAL 1500 // ms
#define HEARTBEAT_INTERVAL 15000 // ms TODO
typedef struct SAppInstInfo SAppInstInfo; typedef struct SAppInstInfo SAppInstInfo;
......
...@@ -38,6 +38,10 @@ extern "C" { ...@@ -38,6 +38,10 @@ extern "C" {
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg); typedef int32_t (*MndMsgFp)(SNodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode);
...@@ -118,6 +122,7 @@ typedef struct SMnode { ...@@ -118,6 +122,7 @@ typedef struct SMnode {
STelemMgmt telemMgmt; STelemMgmt telemMgmt;
SSyncMgmt syncMgmt; SSyncMgmt syncMgmt;
SHashObj *infosMeta; SHashObj *infosMeta;
SHashObj *perfsMeta;
SGrantInfo grant; SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb; SMsgCb msgCb;
......
...@@ -15,10 +15,7 @@ ...@@ -15,10 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndInfoSchema.h" #include "mndInfoSchema.h"
#include "mndInt.h"
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
//!!!! Note: only APPEND columns in below tables, NO insert !!!! //!!!! Note: only APPEND columns in below tables, NO insert !!!!
static const SInfosTableSchema dnodesSchema[] = { static const SInfosTableSchema dnodesSchema[] = {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndInfoSchema.h" #include "mndInfoSchema.h"
#include "mndPerfSchema.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
...@@ -1516,6 +1517,11 @@ static int32_t mndProcessTableMetaReq(SNodeMsg *pReq) { ...@@ -1516,6 +1517,11 @@ static int32_t mndProcessTableMetaReq(SNodeMsg *pReq) {
if (mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { if (mndBuildInsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) {
goto RETRIEVE_META_OVER; goto RETRIEVE_META_OVER;
} }
} else if (0 == strcmp(infoReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) {
mDebug("performance_schema table:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
if (mndBuildPerfsTableSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) {
goto RETRIEVE_META_OVER;
}
} else { } else {
mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName); mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) {
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "mndDnode.h" #include "mndDnode.h"
#include "mndFunc.h" #include "mndFunc.h"
#include "mndInfoSchema.h" #include "mndInfoSchema.h"
#include "mndPerfSchema.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndOffset.h" #include "mndOffset.h"
#include "mndProfile.h" #include "mndProfile.h"
...@@ -208,6 +209,7 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) { ...@@ -208,6 +209,7 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitSma, mndCleanupSma) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (deploy) { if (deploy) {
......
...@@ -227,20 +227,20 @@ typedef struct SCtgAction { ...@@ -227,20 +227,20 @@ typedef struct SCtgAction {
#define CTG_FLAG_STB 0x1 #define CTG_FLAG_STB 0x1
#define CTG_FLAG_NOT_STB 0x2 #define CTG_FLAG_NOT_STB 0x2
#define CTG_FLAG_UNKNOWN_STB 0x4 #define CTG_FLAG_UNKNOWN_STB 0x4
#define CTG_FLAG_INF_DB 0x8 #define CTG_FLAG_SYS_DB 0x8
#define CTG_FLAG_FORCE_UPDATE 0x10 #define CTG_FLAG_FORCE_UPDATE 0x10
#define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB) #define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB)
#define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB) #define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB) #define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
#define CTG_FLAG_IS_INF_DB(_flag) ((_flag) & CTG_FLAG_INF_DB) #define CTG_FLAG_IS_SYS_DB(_flag) ((_flag) & CTG_FLAG_SYS_DB)
#define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE) #define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE)
#define CTG_FLAG_SET_INF_DB(_flag) ((_flag) |= CTG_FLAG_INF_DB) #define CTG_FLAG_SET_SYS_DB(_flag) ((_flag) |= CTG_FLAG_SYS_DB)
#define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0) #define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_IS_INF_DBNAME(_dbname) ((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) #define CTG_IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
......
...@@ -217,7 +217,7 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) ...@@ -217,7 +217,7 @@ int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
} }
char *p = strchr(dbFName, '.'); char *p = strchr(dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) { if (p && CTG_IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1; dbFName = p + 1;
} }
...@@ -304,7 +304,7 @@ int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t d ...@@ -304,7 +304,7 @@ int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t d
} }
char *p = strchr(dbFName, '.'); char *p = strchr(dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) { if (p && CTG_IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1; dbFName = p + 1;
} }
...@@ -336,7 +336,7 @@ int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, boo ...@@ -336,7 +336,7 @@ int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, boo
} }
char *p = strchr(output->dbFName, '.'); char *p = strchr(output->dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) { if (p && CTG_IS_SYS_DBNAME(p + 1)) {
memmove(output->dbFName, p + 1, strlen(p + 1)); memmove(output->dbFName, p + 1, strlen(p + 1));
} }
...@@ -410,7 +410,7 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) { ...@@ -410,7 +410,7 @@ void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
char *p = strchr(dbFName, '.'); char *p = strchr(dbFName, '.');
if (p && CTG_IS_INF_DBNAME(p + 1)) { if (p && CTG_IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1; dbFName = p + 1;
} }
...@@ -688,7 +688,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable ...@@ -688,7 +688,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
} }
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_INF_DB(flag)) { if (CTG_FLAG_IS_SYS_DB(flag)) {
strcpy(dbFName, pTableName->dbname); strcpy(dbFName, pTableName->dbname);
} else { } else {
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
...@@ -1721,7 +1721,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, ...@@ -1721,7 +1721,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
SVgroupInfo vgroupInfo = {0}; SVgroupInfo vgroupInfo = {0};
int32_t code = 0; int32_t code = 0;
if (!CTG_FLAG_IS_INF_DB(flag)) { if (!CTG_FLAG_IS_SYS_DB(flag)) {
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo)); CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo));
} }
...@@ -1732,7 +1732,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, ...@@ -1732,7 +1732,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
if (CTG_FLAG_IS_INF_DB(flag)) { if (CTG_FLAG_IS_SYS_DB(flag)) {
ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName)); ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName));
CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output)); CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output));
...@@ -1820,8 +1820,8 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -1820,8 +1820,8 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
uint64_t suid = 0; uint64_t suid = 0;
STableMetaOutput *output = NULL; STableMetaOutput *output = NULL;
if (CTG_IS_INF_DBNAME(pTableName->dbname)) { if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
CTG_FLAG_SET_INF_DB(flag); CTG_FLAG_SET_SYS_DB(flag);
} }
CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &inCache, flag, &dbId)); CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &inCache, flag, &dbId));
...@@ -1829,7 +1829,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -1829,7 +1829,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t tbType = 0; int32_t tbType = 0;
if (inCache) { if (inCache) {
if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_INF_DB(flag)))) { if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_SYS_DB(flag)))) {
goto _return; goto _return;
} }
...@@ -1885,7 +1885,7 @@ _return: ...@@ -1885,7 +1885,7 @@ _return:
if (CTG_TABLE_NOT_EXIST(code) && inCache) { if (CTG_TABLE_NOT_EXIST(code) && inCache) {
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_INF_DB(flag)) { if (CTG_FLAG_IS_SYS_DB(flag)) {
strcpy(dbFName, pTableName->dbname); strcpy(dbFName, pTableName->dbname);
} else { } else {
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
...@@ -2633,7 +2633,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm ...@@ -2633,7 +2633,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
if (CTG_IS_INF_DBNAME(pTableName->dbname)) { if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
...@@ -2666,7 +2666,7 @@ _return: ...@@ -2666,7 +2666,7 @@ _return:
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
CTG_API_ENTER(); CTG_API_ENTER();
if (CTG_IS_INF_DBNAME(pTableName->dbname)) { if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
......
...@@ -344,6 +344,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -344,6 +344,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
atomic_store_64(&result, 0); atomic_store_64(&result, 0);
freeResultWithRid(oresult); freeResultWithRid(oresult);
taos_free_result(pSql);
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册