提交 3b5828c0 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/mnode

...@@ -135,30 +135,30 @@ void basic_consume_loop(tmq_t *tmq, ...@@ -135,30 +135,30 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n"); fprintf(stderr, "%% Consumer closed\n");
} }
void sync_consume_loop(tmq_t *rk, void sync_consume_loop(tmq_t *tmq,
tmq_list_t *topics) { tmq_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000; static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0; int msg_count = 0;
tmq_resp_err_t err; tmq_resp_err_t err;
if ((err = tmq_subscribe(rk, topics))) { if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
return; return;
} }
while (running) { while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(rk, 500); tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) { if (tmqmessage) {
msg_process(tmqmessage); msg_process(tmqmessage);
tmq_message_destroy(tmqmessage); tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) if ((++msg_count % MIN_COMMIT_COUNT) == 0)
tmq_commit(rk, NULL, 0); tmq_commit(tmq, NULL, 0);
} }
} }
err = tmq_consumer_close(rk); err = tmq_consumer_close(tmq);
if (err) if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else else
......
...@@ -154,8 +154,8 @@ typedef enum _mgmt_table { ...@@ -154,8 +154,8 @@ typedef enum _mgmt_table {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
char* dbName; char* dbFName;
char* tableFullName; char* tbName;
} SBuildTableMetaInput; } SBuildTableMetaInput;
typedef struct { typedef struct {
...@@ -697,8 +697,8 @@ typedef struct { ...@@ -697,8 +697,8 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
char dbFname[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char tableFname[TSDB_TABLE_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
} STableInfoReq; } STableInfoReq;
typedef struct { typedef struct {
...@@ -723,9 +723,9 @@ typedef struct { ...@@ -723,9 +723,9 @@ typedef struct {
} SVgroupsInfo; } SVgroupsInfo;
typedef struct { typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name char tbName[TSDB_TABLE_NAME_LEN];
char stbFname[TSDB_TABLE_FNAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN];
char dbFname[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
int32_t numOfTags; int32_t numOfTags;
int32_t numOfColumns; int32_t numOfColumns;
int8_t precision; int8_t precision;
......
...@@ -273,12 +273,11 @@ ...@@ -273,12 +273,11 @@
#define NEW_TK_SOFFSET 64 #define NEW_TK_SOFFSET 64
#define NEW_TK_LIMIT 65 #define NEW_TK_LIMIT 65
#define NEW_TK_OFFSET 66 #define NEW_TK_OFFSET 66
#define NEW_TK_NK_LR 67 #define NEW_TK_ASC 67
#define NEW_TK_ASC 68 #define NEW_TK_DESC 68
#define NEW_TK_DESC 69 #define NEW_TK_NULLS 69
#define NEW_TK_NULLS 70 #define NEW_TK_FIRST 70
#define NEW_TK_FIRST 71 #define NEW_TK_LAST 71
#define NEW_TK_LAST 72
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
......
...@@ -49,17 +49,19 @@ typedef struct SCatalogCfg { ...@@ -49,17 +49,19 @@ typedef struct SCatalogCfg {
uint32_t maxTblCacheNum; uint32_t maxTblCacheNum;
uint32_t maxDBCacheNum; uint32_t maxDBCacheNum;
uint32_t dbRentSec; uint32_t dbRentSec;
uint32_t stableRentSec; uint32_t stbRentSec;
} SCatalogCfg; } SCatalogCfg;
typedef struct SSTableMetaVersion { typedef struct SSTableMetaVersion {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t suid; uint64_t suid;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
} SSTableMetaVersion; } SSTableMetaVersion;
typedef struct SDbVgVersion { typedef struct SDbVgVersion {
char dbName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
int64_t dbId; int64_t dbId;
int32_t vgVersion; int32_t vgVersion;
} SDbVgVersion; } SDbVgVersion;
...@@ -99,7 +101,9 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const ...@@ -99,7 +101,9 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo); int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId);
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid);
/** /**
* Get a table's meta data. * Get a table's meta data.
......
...@@ -81,16 +81,15 @@ typedef struct STableMeta { ...@@ -81,16 +81,15 @@ typedef struct STableMeta {
} STableMeta; } STableMeta;
typedef struct SDBVgroupInfo { typedef struct SDBVgroupInfo {
SRWLatch lock;
uint64_t dbId; uint64_t dbId;
int32_t vgVersion; int32_t vgVersion;
int8_t hashMethod; int8_t hashMethod;
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo SHashObj *vgHash; //key:vgId, value:SVgroupInfo
} SDBVgroupInfo; } SDBVgroupInfo;
typedef struct SUseDbOutput { typedef struct SUseDbOutput {
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
SDBVgroupInfo dbVgroup; SDBVgroupInfo *dbVgroup;
} SUseDbOutput; } SUseDbOutput;
enum { enum {
...@@ -103,8 +102,9 @@ enum { ...@@ -103,8 +102,9 @@ enum {
typedef struct STableMetaOutput { typedef struct STableMetaOutput {
int32_t metaType; int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char tbFname[TSDB_TABLE_FNAME_LEN]; char ctbName[TSDB_TABLE_NAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
SCTableMeta ctbMeta; SCTableMeta ctbMeta;
STableMeta *tbMeta; STableMeta *tbMeta;
} STableMetaOutput; } STableMetaOutput;
......
...@@ -78,6 +78,11 @@ typedef struct SNodeList { ...@@ -78,6 +78,11 @@ typedef struct SNodeList {
SListCell* pTail; SListCell* pTail;
} SNodeList; } SNodeList;
typedef struct SNameStr {
int32_t len;
char* pName;
} SNameStr;
typedef struct SDataType { typedef struct SDataType {
uint8_t type; uint8_t type;
uint8_t precision; uint8_t precision;
...@@ -89,6 +94,7 @@ typedef struct SExprNode { ...@@ -89,6 +94,7 @@ typedef struct SExprNode {
ENodeType nodeType; ENodeType nodeType;
SDataType resType; SDataType resType;
char aliasName[TSDB_COL_NAME_LEN]; char aliasName[TSDB_COL_NAME_LEN];
SNodeList* pAssociationList;
} SExprNode; } SExprNode;
typedef enum EColumnType { typedef enum EColumnType {
...@@ -102,7 +108,9 @@ typedef struct SColumnNode { ...@@ -102,7 +108,9 @@ typedef struct SColumnNode {
EColumnType colType; // column or tag EColumnType colType; // column or tag
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN]; char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
} SColumnNode; } SColumnNode;
typedef struct SValueNode { typedef struct SValueNode {
...@@ -176,13 +184,16 @@ typedef struct SFunctionNode { ...@@ -176,13 +184,16 @@ typedef struct SFunctionNode {
typedef struct STableNode { typedef struct STableNode {
ENodeType type; ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
char tableAliasName[TSDB_COL_NAME_LEN]; char tableAlias[TSDB_TABLE_NAME_LEN];
} STableNode; } STableNode;
struct STableMeta;
typedef struct SRealTableNode { typedef struct SRealTableNode {
STableNode table; // QUERY_NODE_REAL_TABLE STableNode table; // QUERY_NODE_REAL_TABLE
char dbName[TSDB_DB_NAME_LEN]; struct STableMeta* pMeta;
} SRealTableNode; } SRealTableNode;
typedef struct STempTableNode { typedef struct STempTableNode {
...@@ -273,7 +284,6 @@ typedef struct SFillNode { ...@@ -273,7 +284,6 @@ typedef struct SFillNode {
typedef struct SSelectStmt { typedef struct SSelectStmt {
ENodeType type; // QUERY_NODE_SELECT_STMT ENodeType type; // QUERY_NODE_SELECT_STMT
bool isDistinct; bool isDistinct;
bool isStar;
SNodeList* pProjectionList; // SNode SNodeList* pProjectionList; // SNode
SNode* pFromTable; SNode* pFromTable;
SNode* pWhere; SNode* pWhere;
...@@ -295,6 +305,8 @@ typedef struct SSetOperator { ...@@ -295,6 +305,8 @@ typedef struct SSetOperator {
ESetOperatorType opType; ESetOperatorType opType;
SNode* pLeft; SNode* pLeft;
SNode* pRight; SNode* pRight;
SNodeList* pOrderByList; // SOrderByExprNode
SNode* pLimit;
} SSetOperator; } SSetOperator;
SNode* nodesMakeNode(ENodeType type); SNode* nodesMakeNode(ENodeType type);
...@@ -306,8 +318,10 @@ void nodesDestroyList(SNodeList* pList); ...@@ -306,8 +318,10 @@ void nodesDestroyList(SNodeList* pList);
typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext); typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext);
bool nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext); void nodesWalkNode(SNode* pNode, FQueryNodeWalker walker, void* pContext);
bool nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext); void nodesWalkList(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
void nodesWalkNodePostOrder(SNode* pNode, FQueryNodeWalker walker, void* pContext);
void nodesWalkListPostOrder(SNodeList* pList, FQueryNodeWalker walker, void* pContext);
bool nodesWalkStmt(SNode* pNode, FQueryNodeWalker walker, void* pContext); bool nodesWalkStmt(SNode* pNode, FQueryNodeWalker walker, void* pContext);
......
...@@ -440,7 +440,10 @@ int32_t* taosGetErrno(); ...@@ -440,7 +440,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error #define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error
//parser
#define TSDB_CODE_PARSER_INVALID_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2601) //invalid column name
#define TSDB_CODE_PARSER_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x2602) //table not exist
#define TSDB_CODE_PARSER_AMBIGUOUS_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2603) //ambiguous column
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -41,19 +41,14 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -41,19 +41,14 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid); tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) { if (rsp->vgVersion < 0) {
SDbVgVersion dbInfo; code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
strcpy(dbInfo.dbName, rsp->db);
dbInfo.dbId = rsp->uid;
dbInfo.vgVersion = rsp->vgVersion;
code = catalogRemoveDBVgroup(pCatalog, &dbInfo);
} else { } else {
SDBVgroupInfo vgInfo = {0}; SDBVgroupInfo vgInfo = {0};
vgInfo.dbId = rsp->uid; vgInfo.dbId = rsp->uid;
vgInfo.vgVersion = rsp->vgVersion; vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod; vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgInfo = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgInfo) { if (NULL == vgInfo.vgHash) {
tscError("hash init[%d] failed", rsp->vgNum); tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -67,16 +62,16 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -67,16 +62,16 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port); rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
} }
if (0 != taosHashPut(vgInfo.vgInfo, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno); tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgInfo); taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
} }
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
if (code) { if (code) {
taosHashCleanup(vgInfo.vgInfo); taosHashCleanup(vgInfo.vgHash);
} }
} }
...@@ -90,6 +85,75 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -90,6 +85,75 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0;
int32_t schemaNum = 0;
while (msgLen < valueLen) {
STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen);
rsp->numOfColumns = ntohl(rsp->numOfColumns);
rsp->suid = be64toh(rsp->suid);
if (rsp->numOfColumns < 0) {
schemaNum = 0;
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
code = catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
} else {
rsp->numOfTags = ntohl(rsp->numOfTags);
schemaNum = rsp->numOfColumns + rsp->numOfTags;
/*
rsp->vgNum = ntohl(rsp->vgNum);
rsp->uid = be64toh(rsp->uid);
SDBVgroupInfo vgInfo = {0};
vgInfo.dbId = rsp->uid;
vgInfo.vgVersion = rsp->vgVersion;
vgInfo.hashMethod = rsp->hashMethod;
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo.vgHash) {
tscError("hash init[%d] failed", rsp->vgNum);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < rsp->vgNum; ++i) {
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
if (code) {
taosHashCleanup(vgInfo.vgHash);
}
*/
}
if (code) {
return code;
}
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
}
return TSDB_CODE_SUCCESS;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) { static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey)); SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) { if (NULL == info) {
...@@ -122,9 +186,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs ...@@ -122,9 +186,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog); hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
break; break;
} }
case HEARTBEAT_KEY_STBINFO: case HEARTBEAT_KEY_STBINFO:{
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
break;
}
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
break; break;
}
default: default:
tscError("invalid hb key type:%d", kv->key); tscError("invalid hb key type:%d", kv->key);
break; break;
...@@ -157,7 +236,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code ...@@ -157,7 +236,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
tfree(param); tfree(param);
if (rspNum) { if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
} else { } else {
atomic_add_fetch_32(&emptyRspNum, 1); atomic_add_fetch_32(&emptyRspNum, 1);
} }
...@@ -204,6 +283,37 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl ...@@ -204,6 +283,37 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SSTableMetaVersion *stbs = NULL;
uint32_t stbNum = 0;
int32_t code = 0;
code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (stbNum <= 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < stbNum; ++i) {
SSTableMetaVersion *stb = &stbs[i];
stb->suid = htobe64(stb->suid);
stb->sversion = htons(stb->sversion);
stb->tversion = htons(stb->tversion);
}
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS;
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) { int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param; int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
...@@ -219,6 +329,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req ...@@ -219,6 +329,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req
return code; return code;
} }
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -389,7 +504,6 @@ static void hbStopThread() { ...@@ -389,7 +504,6 @@ static void hbStopThread() {
} }
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) { SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
return NULL;
hbMgrInit(); hbMgrInit();
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
...@@ -447,7 +561,6 @@ void appHbMgrCleanup(void) { ...@@ -447,7 +561,6 @@ void appHbMgrCleanup(void) {
} }
int hbMgrInit() { int hbMgrInit() {
return 0;
// init once // init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0; if (old == 1) return 0;
...@@ -504,7 +617,6 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo * ...@@ -504,7 +617,6 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
} }
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) { int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
return 0;
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY}; SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
SHbConnInfo info = {0}; SHbConnInfo info = {0};
......
...@@ -105,11 +105,12 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -105,11 +105,12 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
pthread_mutex_lock(&appInfo.mutex); pthread_mutex_lock(&appInfo.mutex);
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
SAppInstInfo* p = NULL;
if (pInst == NULL) { if (pInst == NULL) {
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
/*p->pAppHbMgr = appHbMgrInit(p, key);*/ p->pAppHbMgr = appHbMgrInit(p, key);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p; pInst = &p;
......
...@@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connType = HEARTBEAT_TYPE_QUERY; pTscObj->connType = HEARTBEAT_TYPE_QUERY;
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);*/ hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
...@@ -302,15 +302,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -302,15 +302,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData; SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData;
SDbVgVersion dbVer = {0};
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
rsp->uid = be64toh(rsp->uid);
strncpy(dbVer.dbName, rsp->db, sizeof(dbVer.dbName));
dbVer.dbId = be64toh(rsp->uid);
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDBVgroup(pCatalog, &dbVer); catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
return code; return code;
......
...@@ -5,14 +5,26 @@ MESSAGE(STATUS "build parser unit test") ...@@ -5,14 +5,26 @@ MESSAGE(STATUS "build parser unit test")
SET(CMAKE_CXX_STANDARD 11) SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST}) ADD_EXECUTABLE(clientTest clientTests.cpp)
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
clientTest clientTest
PUBLIC os util common transport parser catalog scheduler function gtest taos qcom PUBLIC os util common transport parser catalog scheduler function gtest taos qcom
) )
ADD_EXECUTABLE(tmqTest tmqTest.cpp)
TARGET_LINK_LIBRARIES(
tmqTest
PUBLIC os util common transport parser catalog scheduler function gtest taos qcom
)
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
clientTest clientTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/" PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc" PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
) )
TARGET_INCLUDE_DIRECTORIES(
tmqTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
)
...@@ -564,119 +564,6 @@ TEST(testCase, insert_test) { ...@@ -564,119 +564,6 @@ TEST(testCase, insert_test) {
} }
#endif #endif
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
#endif
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_TEST) {
}
#if 0 #if 0
TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "../inc/clientInt.h"
#include "taos.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
taosInitGlobalCfg();
// taos_init();
}
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_Test) {
}
#endif
...@@ -125,7 +125,7 @@ const char* Testbase::GetMetaName(int32_t index) { ...@@ -125,7 +125,7 @@ const char* Testbase::GetMetaName(int32_t index) {
int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; } int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
const char* Testbase::GetMetaTbName() { return pMeta->tbFname; } const char* Testbase::GetMetaTbName() { return pMeta->tbName; }
void Testbase::SendShowRetrieveReq() { void Testbase::SendShowRetrieveReq() {
int32_t contLen = sizeof(SRetrieveTableReq); int32_t contLen = sizeof(SRetrieveTableReq);
...@@ -144,7 +144,7 @@ void Testbase::SendShowRetrieveReq() { ...@@ -144,7 +144,7 @@ void Testbase::SendShowRetrieveReq() {
pos = 0; pos = 0;
} }
const char* Testbase::GetShowName() { return pMeta->tbFname; } const char* Testbase::GetShowName() { return pMeta->tbName; }
int8_t Testbase::GetShowInt8() { int8_t Testbase::GetShowInt8() {
int8_t data = *((int8_t*)(pData + pos)); int8_t data = *((int8_t*)(pData + pos));
......
...@@ -619,7 +619,7 @@ typedef struct SMqTopicObj { ...@@ -619,7 +619,7 @@ typedef struct SMqTopicObj {
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
uint64_t uid; uint64_t uid;
uint64_t dbUid; int64_t dbUid;
int32_t version; int32_t version;
SRWLatch lock; SRWLatch lock;
int32_t sqlLen; int32_t sqlLen;
......
...@@ -28,6 +28,9 @@ void mndCleanupStb(SMnode *pMnode); ...@@ -28,6 +28,9 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -426,7 +426,7 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -426,7 +426,7 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE); pShow->numOfRows = sdbGetSize(pSdb, SDB_BNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -186,7 +186,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp ...@@ -186,7 +186,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
cols++; cols++;
pMeta->numOfColumns = htonl(cols); pMeta->numOfColumns = htonl(cols);
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
pShow->offset[0] = 0; pShow->offset[0] = 0;
...@@ -196,7 +196,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp ...@@ -196,7 +196,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp
pShow->numOfRows = 1; pShow->numOfRows = 1;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -907,9 +907,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * ...@@ -907,9 +907,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
len = 0; len = 0;
SDbObj *pDb = mndAcquireDb(pMnode, db->dbName); SDbObj *pDb = mndAcquireDb(pMnode, db->dbFName);
if (pDb == NULL) { if (pDb == NULL) {
mInfo("db %s not exist", db->dbName); mInfo("db %s not exist", db->dbFName);
len = sizeof(SUseDbRsp); len = sizeof(SUseDbRsp);
} else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) { } else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) {
...@@ -929,7 +929,7 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void * ...@@ -929,7 +929,7 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
} }
pRsp = (SUseDbRsp *)((char *)buf + bufOffset); pRsp = (SUseDbRsp *)((char *)buf + bufOffset);
memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN); memcpy(pRsp->db, db->dbFName, TSDB_DB_FNAME_LEN);
if (pDb) { if (pDb) {
int32_t vgNum = 0; int32_t vgNum = 0;
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum); mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
...@@ -1113,7 +1113,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe ...@@ -1113,7 +1113,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe
pShow->numOfRows = sdbGetSize(pSdb, SDB_DB); pShow->numOfRows = sdbGetSize(pSdb, SDB_DB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -608,7 +608,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp ...@@ -608,7 +608,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
pShow->numOfRows = TSDB_CONFIG_NUMBER; pShow->numOfRows = TSDB_CONFIG_NUMBER;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
...@@ -715,7 +715,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -715,7 +715,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE); pShow->numOfRows = sdbGetSize(pSdb, SDB_DNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -482,7 +482,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p ...@@ -482,7 +482,7 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC); pShow->numOfRows = sdbGetSize(pSdb, SDB_FUNC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -624,7 +624,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -624,7 +624,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE); pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
mndUpdateMnodeRole(pMnode); mndUpdateMnodeRole(pMnode);
return 0; return 0;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "mndProfile.h" #include "mndProfile.h"
//#include "mndConsumer.h" //#include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndStb.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
//#include "mndTopic.h" //#include "mndTopic.h"
...@@ -376,9 +377,16 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -376,9 +377,16 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
} }
break; break;
} }
case HEARTBEAT_KEY_STBINFO: case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv);
}
break; break;
}
default: default:
mError("invalid kv key:%d", kv->key); mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_MND_APP_ERROR; hbRsp.status = TSDB_CODE_MND_APP_ERROR;
...@@ -623,7 +631,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -623,7 +631,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable); pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
...@@ -792,7 +800,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -792,7 +800,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = 1000000; pShow->numOfRows = 1000000;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -426,7 +426,7 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -426,7 +426,7 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE); pShow->numOfRows = sdbGetSize(pSdb, SDB_QNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -428,7 +428,7 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -428,7 +428,7 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE); pShow->numOfRows = sdbGetSize(pSdb, SDB_SNODE);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -769,20 +769,23 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { ...@@ -769,20 +769,23 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
STableInfoReq *pInfo = pReq->rpcMsg.pCont; STableInfoReq *pInfo = pReq->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname); char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", pInfo->dbFName, pInfo->tbName);
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->tableFname); mDebug("stb:%s, start to retrieve meta", tbFName);
SDbObj *pDb = mndAcquireDb(pMnode, pInfo->dbFName);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); mError("stb:%s, failed to retrieve meta since %s", tbFName, terrstr());
return -1; return -1;
} }
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->tableFname); SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) { if (pStb == NULL) {
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB; terrno = TSDB_CODE_MND_INVALID_STB;
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
return -1; return -1;
} }
...@@ -796,11 +799,13 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { ...@@ -796,11 +799,13 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
return -1; return -1;
} }
memcpy(pMeta->tbFname, pStb->name, TSDB_TABLE_FNAME_LEN); strcpy(pMeta->dbFName, pStb->db);
strcpy(pMeta->tbName, pInfo->tbName);
strcpy(pMeta->stbName, pInfo->tbName);
pMeta->numOfTags = htonl(pStb->numOfTags); pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns); pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision; pMeta->precision = pDb->cfg.precision;
...@@ -835,10 +840,113 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { ...@@ -835,10 +840,113 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
pReq->pCont = pMeta; pReq->pCont = pMeta;
pReq->contLen = contLen; pReq->contLen = contLen;
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags); mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", tbFName, pStb->numOfColumns, pStb->numOfTags);
return 0; return 0;
} }
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) {
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
STableMetaRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
SSTableMetaVersion *stb = &stbs[i];
stb->suid = be64toh(stb->suid);
stb->sversion = ntohs(stb->sversion);
stb->tversion = ntohs(stb->tversion);
if ((contLen + sizeof(STableMetaRsp)) > bufSize) {
bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
pRsp = (STableMetaRsp *)((char *)buf + contLen);
strcpy(pRsp->dbFName, stb->dbFName);
strcpy(pRsp->tbName, stb->stbName);
strcpy(pRsp->stbName, stb->stbName);
mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName);
SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName);
if (pDb == NULL) {
pRsp->numOfColumns = -1;
pRsp->suid = htobe64(stb->suid);
contLen += sizeof(STableMetaRsp);
mWarn("db:%s, failed to require db since %s", stb->dbFName, terrstr());
continue;
}
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName);
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
pRsp->numOfColumns = -1;
pRsp->suid = htobe64(stb->suid);
contLen += sizeof(STableMetaRsp);
mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr());
continue;
}
taosRLockLatch(&pStb->lock);
if (stb->suid == pStb->uid && stb->sversion == pStb->version) {
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
continue;
}
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t len = totalCols * sizeof(SSchema);
contLen += sizeof(STableMetaRsp) + len;
if (contLen > bufSize) {
bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
pRsp->numOfTags = htonl(pStb->numOfTags);
pRsp->numOfColumns = htonl(pStb->numOfColumns);
pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->update = pDb->cfg.update;
pRsp->sversion = htonl(pStb->version);
pRsp->suid = htobe64(pStb->uid);
pRsp->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
}
if (contLen > 0) {
*rsp = buf;
*rspLen = contLen;
} else {
*rsp = NULL;
tfree(buf);
*rspLen = 0;
}
return 0;
}
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) { static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -911,7 +1019,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pM ...@@ -911,7 +1019,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pM
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB); pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -745,7 +745,7 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { ...@@ -745,7 +745,7 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont; STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname); mDebug("subscribe:%s, start to retrieve meta", pInfo->tbName);
#if 0 #if 0
SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname);
...@@ -876,7 +876,7 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRs ...@@ -876,7 +876,7 @@ static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRs
pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER); pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -338,7 +338,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { ...@@ -338,7 +338,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont; STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("topic:%s, start to retrieve meta", pInfo->tableFname); mDebug("topic:%s, start to retrieve meta", pInfo->tbName);
#if 0 #if 0
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname); SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
...@@ -469,7 +469,7 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp * ...@@ -469,7 +469,7 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *
pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC); pShow->numOfRows = sdbGetSize(pSdb, SDB_TOPIC);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -474,7 +474,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p ...@@ -474,7 +474,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
pShow->numOfRows = sdbGetSize(pSdb, SDB_USER); pShow->numOfRows = sdbGetSize(pSdb, SDB_USER);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -525,7 +525,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp ...@@ -525,7 +525,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
} }
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
...@@ -638,7 +638,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * ...@@ -638,7 +638,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pShow->replica = dnodeId; pShow->replica = dnodeId;
pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId); pShow->numOfRows = mndGetVnodesNum(pMnode, dnodeId);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbName, mndShowStr(pShow->type));
return 0; return 0;
} }
......
...@@ -64,7 +64,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) { ...@@ -64,7 +64,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
STableMetaRsp* pMeta = test.GetShowMeta(); STableMetaRsp* pMeta = test.GetShowMeta();
EXPECT_STREQ(pMeta->tbFname, "show connections"); EXPECT_STREQ(pMeta->tbName, "show connections");
EXPECT_EQ(pMeta->numOfTags, 0); EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, 7); EXPECT_EQ(pMeta->numOfColumns, 7);
EXPECT_EQ(pMeta->precision, 0); EXPECT_EQ(pMeta->precision, 0);
......
...@@ -126,7 +126,8 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -126,7 +126,8 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
int32_t contLen = sizeof(STableInfoReq); int32_t contLen = sizeof(STableInfoReq);
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen); STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
strcpy(pReq->tableFname, "1.d1.stb"); strcpy(pReq->dbFName, "1.d1");
strcpy(pReq->tbName, "stb");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
...@@ -146,8 +147,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -146,8 +147,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pSchema->bytes = htonl(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
} }
EXPECT_STREQ(pRsp->tbFname, "1.d1.stb"); EXPECT_STREQ(pRsp->dbFName, "1.d1");
EXPECT_STREQ(pRsp->stbFname, ""); EXPECT_STREQ(pRsp->tbName, "stb");
EXPECT_STREQ(pRsp->stbName, "stb");
EXPECT_EQ(pRsp->numOfColumns, 2); EXPECT_EQ(pRsp->numOfColumns, 2);
EXPECT_EQ(pRsp->numOfTags, 3); EXPECT_EQ(pRsp->numOfTags, 3);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
......
...@@ -84,7 +84,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -84,7 +84,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int msgLen = 0; int msgLen = 0;
int32_t code = TSDB_CODE_VND_APP_ERROR; int32_t code = TSDB_CODE_VND_APP_ERROR;
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid); pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tbName, &uid);
if (pTbCfg == NULL) { if (pTbCfg == NULL) {
code = TSDB_CODE_VND_TB_NOT_EXIST; code = TSDB_CODE_VND_TB_NOT_EXIST;
goto _exit; goto _exit;
...@@ -119,13 +119,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -119,13 +119,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit; goto _exit;
} }
memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname)); memcpy(pTbMetaMsg->dbFName, pReq->dbFName, sizeof(pTbMetaMsg->dbFName));
strcpy(pTbMetaMsg->tbFname, pTbCfg->name); strcpy(pTbMetaMsg->tbName, pReq->tbName);
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name); strcpy(pTbMetaMsg->stbName, pStbCfg->name);
pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid); pTbMetaMsg->suid = htobe64(pTbCfg->ctbCfg.suid);
} else if (pTbCfg->type == META_SUPER_TABLE) { } else if (pTbCfg->type == META_SUPER_TABLE) {
strcpy(pTbMetaMsg->stbFname, pTbCfg->name); strcpy(pTbMetaMsg->stbName, pTbCfg->name);
pTbMetaMsg->suid = htobe64(uid); pTbMetaMsg->suid = htobe64(uid);
} }
pTbMetaMsg->numOfTags = htonl(nTagCols); pTbMetaMsg->numOfTags = htonl(nTagCols);
......
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6 #define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100 #define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20 #define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000 #define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000
#define CTG_DEFAULT_RENT_SECOND 10 #define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10 #define CTG_DEFAULT_RENT_SLOT_SIZE 10
...@@ -47,55 +47,52 @@ enum { ...@@ -47,55 +47,52 @@ enum {
CTG_RENT_STABLE, CTG_RENT_STABLE,
}; };
typedef struct SCTGDebug { typedef struct SCtgDebug {
int32_t lockDebug; int32_t lockDebug;
} SCTGDebug; } SCtgDebug;
typedef struct SVgroupListCache { typedef struct SCtgTbMetaCache {
int32_t vgroupVersion; SRWLatch stbLock;
SHashObj *cache; // key:vgId, value:SVgroupInfo SHashObj *cache; //key:tbname, value:STableMeta
} SVgroupListCache; SHashObj *stbCache; //key:suid, value:STableMeta*
} SCtgTbMetaCache;
typedef struct SDBVgroupCache { typedef struct SCtgDBCache {
SHashObj *cache; //key:dbname, value:SDBVgroupInfo SRWLatch vgLock;
} SDBVgroupCache; int8_t deleted;
SDBVgroupInfo *vgInfo;
SCtgTbMetaCache tbCache;
} SCtgDBCache;
typedef struct STableMetaCache { typedef struct SCtgRentSlot {
SRWLatch stableLock;
SHashObj *cache; //key:fulltablename, value:STableMeta
SHashObj *stableCache; //key:suid, value:STableMeta*
} STableMetaCache;
typedef struct SRentSlotInfo {
SRWLatch lock; SRWLatch lock;
bool needSort; bool needSort;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
} SRentSlotInfo; } SCtgRentSlot;
typedef struct SMetaRentMgmt { typedef struct SCtgRentMgmt {
int8_t type; int8_t type;
uint16_t slotNum; uint16_t slotNum;
uint16_t slotRIdx; uint16_t slotRIdx;
int64_t lastReadMsec; int64_t lastReadMsec;
SRentSlotInfo *slots; SCtgRentSlot *slots;
} SMetaRentMgmt; } SCtgRentMgmt;
typedef struct SCatalog { typedef struct SCatalog {
uint64_t clusterId; uint64_t clusterId;
SDBVgroupCache dbCache; SHashObj *dbCache; //key:dbname, value:SCtgDBCache
STableMetaCache tableCache; SCtgRentMgmt dbRent;
SMetaRentMgmt dbRent; SCtgRentMgmt stbRent;
SMetaRentMgmt stableRent;
} SCatalog; } SCatalog;
typedef struct SCtgApiStat { typedef struct SCtgApiStat {
} SCtgApiStat; } SCtgApiStat;
typedef struct SCtgResourceStat { typedef struct SCtgRuntimeStat {
} SCtgResourceStat; } SCtgRuntimeStat;
typedef struct SCtgCacheStat { typedef struct SCtgCacheStat {
...@@ -103,7 +100,7 @@ typedef struct SCtgCacheStat { ...@@ -103,7 +100,7 @@ typedef struct SCtgCacheStat {
typedef struct SCatalogStat { typedef struct SCatalogStat {
SCtgApiStat api; SCtgApiStat api;
SCtgResourceStat resource; SCtgRuntimeStat runtime;
SCtgCacheStat cache; SCtgCacheStat cache;
} SCatalogStat; } SCatalogStat;
......
此差异已折叠。
...@@ -128,15 +128,14 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) { ...@@ -128,15 +128,14 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(sn.dbname, "db1"); strcpy(sn.dbname, "db1");
strcpy(sn.tname, ctgTestSTablename); strcpy(sn.tname, ctgTestSTablename);
char tbFullName[TSDB_TABLE_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN] = {0};
tNameExtractFullName(&cn, tbFullName); tNameGetFullDbName(&cn, db);
strcpy(output->dbFName, db);
SET_META_TYPE_BOTH_TABLE(output->metaType); SET_META_TYPE_BOTH_TABLE(output->metaType);
strcpy(output->ctbFname, tbFullName); strcpy(output->ctbName, cn.tname);
strcpy(output->tbName, sn.tname);
tNameExtractFullName(&cn, tbFullName);
strcpy(output->tbFname, tbFullName);
output->ctbMeta.vgId = 9; output->ctbMeta.vgId = 9;
output->ctbMeta.tableType = TSDB_CHILD_TABLE; output->ctbMeta.tableType = TSDB_CHILD_TABLE;
...@@ -175,10 +174,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) { ...@@ -175,10 +174,11 @@ void ctgTestBuildCTableMetaOutput(STableMetaOutput *output) {
strcpy(s->name, "tag1s"); strcpy(s->name, "tag1s");
} }
void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) { void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
static int32_t vgVersion = ctgTestVgVersion + 1; static int32_t vgVersion = ctgTestVgVersion + 1;
int32_t vgNum = 0; int32_t vgNum = 0;
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
SDBVgroupInfo *dbVgroup = (SDBVgroupInfo *)calloc(1, sizeof(SDBVgroupInfo));
dbVgroup->vgVersion = vgVersion++; dbVgroup->vgVersion = vgVersion++;
...@@ -186,7 +186,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) { ...@@ -186,7 +186,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
dbVgroup->hashMethod = 0; dbVgroup->hashMethod = 0;
dbVgroup->dbId = ctgTestDbId; dbVgroup->dbId = ctgTestDbId;
dbVgroup->vgInfo = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion); vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion);
uint32_t hashUnit = UINT32_MAX / vgNum; uint32_t hashUnit = UINT32_MAX / vgNum;
...@@ -203,8 +203,10 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) { ...@@ -203,8 +203,10 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
addr->port = htons(n + 22); addr->port = htons(n + 22);
} }
taosHashPut(dbVgroup->vgInfo, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo)); taosHashPut(dbVgroup->vgHash, &vgInfo.vgId, sizeof(vgInfo.vgId), &vgInfo, sizeof(vgInfo));
} }
*pdbVgroup = dbVgroup;
} }
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
...@@ -250,7 +252,8 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM ...@@ -250,7 +252,8 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen); pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont; rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename); strcpy(rspMsg->dbFName, ctgTestDbname);
strcpy(rspMsg->tbName, ctgTestTablename);
rspMsg->numOfTags = 0; rspMsg->numOfTags = 0;
rspMsg->numOfColumns = htonl(ctgTestColNum); rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1; rspMsg->precision = 1;
...@@ -285,8 +288,9 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc ...@@ -285,8 +288,9 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen); pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont; rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename); strcpy(rspMsg->dbFName, ctgTestDbname);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); strcpy(rspMsg->tbName, ctgTestCTablename);
strcpy(rspMsg->stbName, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum); rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum); rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1; rspMsg->precision = 1;
...@@ -327,8 +331,9 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc ...@@ -327,8 +331,9 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen); pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont; rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); strcpy(rspMsg->dbFName, ctgTestDbname);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); strcpy(rspMsg->tbName, ctgTestSTablename);
strcpy(rspMsg->stbName, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum); rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum); rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1; rspMsg->precision = 1;
...@@ -370,8 +375,9 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, ...@@ -370,8 +375,9 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg,
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen); pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaRsp *)pRsp->pCont; rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx); strcpy(rspMsg->dbFName, ctgTestDbname);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx); sprintf(rspMsg->tbName, "%s_%d", ctgTestSTablename, idx);
sprintf(rspMsg->stbName, "%s_%d", ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum); rspMsg->numOfTags = htonl(ctgTestTagNum);
rspMsg->numOfColumns = htonl(ctgTestColNum); rspMsg->numOfColumns = htonl(ctgTestColNum);
rspMsg->precision = 1; rspMsg->precision = 1;
...@@ -589,12 +595,12 @@ void *ctgTestGetDbVgroupThread(void *param) { ...@@ -589,12 +595,12 @@ void *ctgTestGetDbVgroupThread(void *param) {
void *ctgTestSetDbVgroupThread(void *param) { void *ctgTestSetDbVgroupThread(void *param) {
struct SCatalog *pCtg = (struct SCatalog *)param; struct SCatalog *pCtg = (struct SCatalog *)param;
int32_t code = 0; int32_t code = 0;
SDBVgroupInfo dbVgroup = {0}; SDBVgroupInfo *dbVgroup = NULL;
int32_t n = 0; int32_t n = 0;
while (!ctgTestStop) { while (!ctgTestStop) {
ctgTestBuildDBVgroup(&dbVgroup); ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup); code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
if (code) { if (code) {
assert(0); assert(0);
} }
...@@ -669,6 +675,7 @@ void *ctgTestSetCtableMetaThread(void *param) { ...@@ -669,6 +675,7 @@ void *ctgTestSetCtableMetaThread(void *param) {
return NULL; return NULL;
} }
TEST(tableMeta, normalTable) { TEST(tableMeta, normalTable) {
struct SCatalog *pCtg = NULL; struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
...@@ -741,7 +748,7 @@ TEST(tableMeta, normalTable) { ...@@ -741,7 +748,7 @@ TEST(tableMeta, normalTable) {
} }
if (stbNum) { if (stbNum) {
printf("got expired stb,suid:%" PRId64 "\n", stb->suid); printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
free(stb); free(stb);
stb = NULL; stb = NULL;
} else { } else {
...@@ -837,7 +844,7 @@ TEST(tableMeta, childTableCase) { ...@@ -837,7 +844,7 @@ TEST(tableMeta, childTableCase) {
} }
if (stbNum) { if (stbNum) {
printf("got expired stb,suid:%" PRId64 "\n", stb->suid); printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
free(stb); free(stb);
stb = NULL; stb = NULL;
} else { } else {
...@@ -938,7 +945,8 @@ TEST(tableMeta, superTableCase) { ...@@ -938,7 +945,8 @@ TEST(tableMeta, superTableCase) {
} }
if (stbNum) { if (stbNum) {
printf("got expired stb,suid:%" PRId64 "\n", stb->suid); printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
free(stb); free(stb);
stb = NULL; stb = NULL;
} else { } else {
...@@ -1062,9 +1070,11 @@ TEST(dbVgroup, getSetDbVgroupCase) { ...@@ -1062,9 +1070,11 @@ TEST(dbVgroup, getSetDbVgroupCase) {
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
SVgroupInfo *pvgInfo = NULL; SVgroupInfo *pvgInfo = NULL;
SDBVgroupInfo dbVgroup = {0}; SDBVgroupInfo *dbVgroup = NULL;
SArray *vgList = NULL; SArray *vgList = NULL;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndNormalMeta(); ctgTestSetPrepareDbVgroupsAndNormalMeta();
initQueryModuleMsgHandle(); initQueryModuleMsgHandle();
...@@ -1099,7 +1109,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { ...@@ -1099,7 +1109,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
taosArrayDestroy(vgList); taosArrayDestroy(vgList);
ctgTestBuildDBVgroup(&dbVgroup); ctgTestBuildDBVgroup(&dbVgroup);
code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, &dbVgroup); code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
...@@ -1169,6 +1179,7 @@ TEST(multiThread, getSetDbVgroupCase) { ...@@ -1169,6 +1179,7 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy(); catalogDestroy();
} }
TEST(multiThread, ctableMeta) { TEST(multiThread, ctableMeta) {
struct SCatalog *pCtg = NULL; struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
...@@ -1178,6 +1189,8 @@ TEST(multiThread, ctableMeta) { ...@@ -1178,6 +1189,8 @@ TEST(multiThread, ctableMeta) {
SArray *vgList = NULL; SArray *vgList = NULL;
ctgTestStop = false; ctgTestStop = false;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndChildMeta(); ctgTestSetPrepareDbVgroupsAndChildMeta();
initQueryModuleMsgHandle(); initQueryModuleMsgHandle();
...@@ -1212,11 +1225,13 @@ TEST(multiThread, ctableMeta) { ...@@ -1212,11 +1225,13 @@ TEST(multiThread, ctableMeta) {
} }
ctgTestStop = true; ctgTestStop = true;
sleep(1); sleep(2);
catalogDestroy(); catalogDestroy();
} }
TEST(rentTest, allRent) { TEST(rentTest, allRent) {
struct SCatalog *pCtg = NULL; struct SCatalog *pCtg = NULL;
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
...@@ -1229,6 +1244,8 @@ TEST(rentTest, allRent) { ...@@ -1229,6 +1244,8 @@ TEST(rentTest, allRent) {
SSTableMetaVersion *stable = NULL; SSTableMetaVersion *stable = NULL;
uint32_t num = 0; uint32_t num = 0;
ctgTestInitLogFile();
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta(); ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
initQueryModuleMsgHandle(); initQueryModuleMsgHandle();
...@@ -1273,7 +1290,7 @@ TEST(rentTest, allRent) { ...@@ -1273,7 +1290,7 @@ TEST(rentTest, allRent) {
printf("%d - expired stableNum:%d\n", i, num); printf("%d - expired stableNum:%d\n", i, num);
if (stable) { if (stable) {
for (int32_t n = 0; n < num; ++n) { for (int32_t n = 0; n < num; ++n) {
printf("suid:%" PRId64 ", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion); printf("suid:%" PRId64 ", dbFName:%s, stbName:%s, sversion:%d, tversion:%d\n", stable[n].suid, stable[n].dbFName, stable[n].stbName, stable[n].sversion, stable[n].tversion);
} }
free(stable); free(stable);
stable = NULL; stable = NULL;
...@@ -1291,4 +1308,4 @@ int main(int argc, char **argv) { ...@@ -1291,4 +1308,4 @@ int main(int argc, char **argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
\ No newline at end of file
...@@ -30,7 +30,7 @@ extern SToken nil_token; ...@@ -30,7 +30,7 @@ extern SToken nil_token;
SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode); SNodeList* createNodeList(SAstCreateContext* pCxt, SNode* pNode);
SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode); SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode);
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableName, const SToken* pColumnName); SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName);
SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral); SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral);
SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral); SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral);
SNode* addMinusSign(SAstCreateContext* pCxt, SNode* pNode); SNode* addMinusSign(SAstCreateContext* pCxt, SNode* pNode);
......
...@@ -187,7 +187,7 @@ table_reference(A) ::= joined_table(B). ...@@ -187,7 +187,7 @@ table_reference(A) ::= joined_table(B).
table_primary(A) ::= table_name(B) alias_opt(C). { PARSER_TRACE; A = createRealTableNode(pCxt, NULL, &B, &C); } table_primary(A) ::= table_name(B) alias_opt(C). { PARSER_TRACE; A = createRealTableNode(pCxt, NULL, &B, &C); }
table_primary(A) ::= db_name(B) NK_DOT table_name(C) alias_opt(D). { PARSER_TRACE; A = createRealTableNode(pCxt, &B, &C, &D); } table_primary(A) ::= db_name(B) NK_DOT table_name(C) alias_opt(D). { PARSER_TRACE; A = createRealTableNode(pCxt, &B, &C, &D); }
table_primary(A) ::= subquery(B) alias_opt(C). { PARSER_TRACE; A = createTempTableNode(pCxt, B, &C); } table_primary(A) ::= subquery(B) alias_opt(C). { PARSER_TRACE; A = createTempTableNode(pCxt, B, &C); }
table_primary ::= parenthesized_joined_table. table_primary(A) ::= parenthesized_joined_table(B). { PARSER_TRACE; A = B; }
%type alias_opt { SToken } %type alias_opt { SToken }
%destructor alias_opt { PARSER_DESTRUCTOR_TRACE; } %destructor alias_opt { PARSER_DESTRUCTOR_TRACE; }
...@@ -297,9 +297,9 @@ query_expression_body(A) ::= ...@@ -297,9 +297,9 @@ query_expression_body(A) ::=
query_expression_body(B) UNION ALL query_expression_body(D). { PARSER_TRACE; A = createSetOperator(pCxt, SET_OP_TYPE_UNION_ALL, B, D); } query_expression_body(B) UNION ALL query_expression_body(D). { PARSER_TRACE; A = createSetOperator(pCxt, SET_OP_TYPE_UNION_ALL, B, D); }
query_primary(A) ::= query_specification(B). { PARSER_TRACE; A = B; } query_primary(A) ::= query_specification(B). { PARSER_TRACE; A = B; }
query_primary(A) ::= //query_primary(A) ::=
NK_LP query_expression_body(B) // NK_LP query_expression_body(B)
order_by_clause_opt limit_clause_opt slimit_clause_opt NK_RP. { PARSER_TRACE; A = B;} // order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { PARSER_TRACE; A = B;}
%type order_by_clause_opt { SNodeList* } %type order_by_clause_opt { SNodeList* }
%destructor order_by_clause_opt { PARSER_DESTRUCTOR_TRACE; nodesDestroyList($$); } %destructor order_by_clause_opt { PARSER_DESTRUCTOR_TRACE; nodesDestroyList($$); }
...@@ -317,7 +317,7 @@ limit_clause_opt(A) ::= LIMIT NK_INTEGER(B) OFFSET NK_INTEGER(C). ...@@ -317,7 +317,7 @@ limit_clause_opt(A) ::= LIMIT NK_INTEGER(B) OFFSET NK_INTEGER(C).
limit_clause_opt(A) ::= LIMIT NK_INTEGER(C) NK_COMMA NK_INTEGER(B). { PARSER_TRACE; A = createLimitNode(pCxt, &B, &C); } limit_clause_opt(A) ::= LIMIT NK_INTEGER(C) NK_COMMA NK_INTEGER(B). { PARSER_TRACE; A = createLimitNode(pCxt, &B, &C); }
/************************************************ subquery ************************************************************/ /************************************************ subquery ************************************************************/
subquery(A) ::= NK_LR query_expression(B) NK_RP. { PARSER_TRACE; A = B; } subquery(A) ::= NK_LP query_expression(B) NK_RP. { PARSER_TRACE; A = B; }
/************************************************ search_condition ****************************************************/ /************************************************ search_condition ****************************************************/
search_condition(A) ::= boolean_value_expression(B). { PARSER_TRACE; A = B; } search_condition(A) ::= boolean_value_expression(B). { PARSER_TRACE; A = B; }
......
...@@ -28,6 +28,7 @@ typedef struct SQuery { ...@@ -28,6 +28,7 @@ typedef struct SQuery {
} SQuery; } SQuery;
int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery); int32_t doParse(SParseContext* pParseCxt, SQuery* pQuery);
int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -60,14 +60,14 @@ SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode ...@@ -60,14 +60,14 @@ SNodeList* addNodeToList(SAstCreateContext* pCxt, SNodeList* pList, SNode* pNode
return nodesListAppend(pList, pNode); return nodesListAppend(pList, pNode);
} }
SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableName, const SToken* pColumnName) { SNode* createColumnNode(SAstCreateContext* pCxt, const SToken* pTableAlias, const SToken* pColumnName) {
if (!checkTableName(pCxt, pTableName) || !checkColumnName(pCxt, pColumnName)) { if (!checkTableName(pCxt, pTableAlias) || !checkColumnName(pCxt, pColumnName)) {
return NULL; return NULL;
} }
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
CHECK_OUT_OF_MEM(col); CHECK_OUT_OF_MEM(col);
if (NULL != pTableName) { if (NULL != pTableAlias) {
strncpy(col->tableName, pTableName->z, pTableName->n); strncpy(col->tableAlias, pTableAlias->z, pTableAlias->n);
} }
strncpy(col->colName, pColumnName->z, pColumnName->n); strncpy(col->colName, pColumnName->z, pColumnName->n);
return (SNode*)col; return (SNode*)col;
...@@ -150,7 +150,14 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const ...@@ -150,7 +150,14 @@ SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const
SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE); SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE);
CHECK_OUT_OF_MEM(realTable); CHECK_OUT_OF_MEM(realTable);
if (NULL != pDbName) { if (NULL != pDbName) {
strncpy(realTable->dbName, pDbName->z, pDbName->n); strncpy(realTable->table.dbName, pDbName->z, pDbName->n);
} else {
strcpy(realTable->table.dbName, pCxt->pQueryCxt->db);
}
if (NULL != pTableAlias && TK_NIL != pTableAlias->type) {
strncpy(realTable->table.tableAlias, pTableAlias->z, pTableAlias->n);
} else {
strncpy(realTable->table.tableAlias, pTableName->z, pTableName->n);
} }
strncpy(realTable->table.tableName, pTableName->z, pTableName->n); strncpy(realTable->table.tableName, pTableName->z, pTableName->n);
return (SNode*)realTable; return (SNode*)realTable;
...@@ -160,6 +167,9 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok ...@@ -160,6 +167,9 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
STempTableNode* tempTable = (STempTableNode*)nodesMakeNode(QUERY_NODE_TEMP_TABLE); STempTableNode* tempTable = (STempTableNode*)nodesMakeNode(QUERY_NODE_TEMP_TABLE);
CHECK_OUT_OF_MEM(tempTable); CHECK_OUT_OF_MEM(tempTable);
tempTable->pSubquery = pSubquery; tempTable->pSubquery = pSubquery;
if (NULL != pTableAlias && TK_NIL != pTableAlias->type) {
strncpy(tempTable->table.tableAlias, pTableAlias->z, pTableAlias->n);
}
return (SNode*)tempTable; return (SNode*)tempTable;
} }
...@@ -288,9 +298,6 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr ...@@ -288,9 +298,6 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT); SSelectStmt* select = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
CHECK_OUT_OF_MEM(select); CHECK_OUT_OF_MEM(select);
select->isDistinct = isDistinct; select->isDistinct = isDistinct;
if (NULL == pProjectionList) {
select->isStar = true;
}
select->pProjectionList = pProjectionList; select->pProjectionList = pProjectionList;
select->pFromTable = pTable; select->pFromTable = pTable;
return (SNode*)select; return (SNode*)select;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// int32_t doTranslate() {
// }
此差异已折叠。
此差异已折叠。
...@@ -38,46 +38,149 @@ protected: ...@@ -38,46 +38,149 @@ protected:
} }
bool run(int32_t expectCode = TSDB_CODE_SUCCESS) { bool run(int32_t parseCode = TSDB_CODE_SUCCESS, int32_t translateCode = TSDB_CODE_SUCCESS) {
int32_t code = doParse(&cxt_, &query_); int32_t code = doParse(&cxt_, &query_);
// cout << "doParse return " << code << endl;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] code:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl; cout << "sql:[" << cxt_.pSql << "] code:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
return (code == expectCode); return (TSDB_CODE_SUCCESS != parseCode);
}
if (TSDB_CODE_SUCCESS != parseCode) {
return false;
}
code = doTranslate(&cxt_, &query_);
// cout << "doTranslate return " << code << endl;
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] code:" << tstrerror(code) << ", msg:" << errMagBuf_ << endl;
return (TSDB_CODE_SUCCESS != translateCode);
} }
if (NULL != query_.pRoot && QUERY_NODE_SELECT_STMT == nodeType(query_.pRoot)) { if (NULL != query_.pRoot && QUERY_NODE_SELECT_STMT == nodeType(query_.pRoot)) {
SSelectStmt* select = (SSelectStmt*)query_.pRoot; string sql;
string sql("SELECT "); selectToSql(query_.pRoot, sql);
if (select->isDistinct) { cout << "input sql : [" << cxt_.pSql << "]" << endl;
sql.append("DISTINCT "); cout << "output sql : [" << sql << "]" << endl;
}
if (nullptr == select->pProjectionList) {
sql.append("* ");
} else {
nodeListToSql(select->pProjectionList, sql);
}
sql.append("FROM ");
tableToSql(select->pFromTable, sql);
cout << sql << endl;
} }
return (code == expectCode); return (TSDB_CODE_SUCCESS == translateCode);
} }
private: private:
static const int max_err_len = 1024; static const int max_err_len = 1024;
static const int max_sql_len = 1024 * 1024; static const int max_sql_len = 1024 * 1024;
void selectToSql(const SNode* node, string& sql) {
SSelectStmt* select = (SSelectStmt*)node;
sql.append("SELECT ");
if (select->isDistinct) {
sql.append("DISTINCT ");
}
if (nullptr == select->pProjectionList) {
sql.append("* ");
} else {
nodeListToSql(select->pProjectionList, sql);
sql.append(" ");
}
sql.append("FROM ");
tableToSql(select->pFromTable, sql);
if (nullptr != select->pWhere) {
sql.append(" WHERE ");
nodeToSql(select->pWhere, sql);
}
}
void tableToSql(const SNode* node, string& sql) { void tableToSql(const SNode* node, string& sql) {
const STableNode* table = (const STableNode*)node; const STableNode* table = (const STableNode*)node;
switch (nodeType(node)) { switch (nodeType(node)) {
case QUERY_NODE_REAL_TABLE: { case QUERY_NODE_REAL_TABLE: {
SRealTableNode* realTable = (SRealTableNode*)table; SRealTableNode* realTable = (SRealTableNode*)table;
if ('\0' != realTable->dbName[0]) { if ('\0' != realTable->table.dbName[0]) {
sql.append(realTable->dbName); sql.append(realTable->table.dbName);
sql.append("."); sql.append(".");
} }
sql.append(realTable->table.tableName); sql.append(realTable->table.tableName);
break; break;
} }
case QUERY_NODE_TEMP_TABLE: {
STempTableNode* tempTable = (STempTableNode*)table;
sql.append("(");
selectToSql(tempTable->pSubquery, sql);
sql.append(") ");
sql.append(tempTable->table.tableAlias);
break;
}
case QUERY_NODE_JOIN_TABLE: {
SJoinTableNode* joinTable = (SJoinTableNode*)table;
tableToSql(joinTable->pLeft, sql);
sql.append(" JOIN ");
tableToSql(joinTable->pRight, sql);
if (nullptr != joinTable->pOnCond) {
sql.append(" ON ");
nodeToSql(joinTable->pOnCond, sql);
}
break;
}
default:
break;
}
}
string opTypeToSql(EOperatorType type) {
switch (type) {
case OP_TYPE_ADD:
return " + ";
case OP_TYPE_SUB:
return " - ";
case OP_TYPE_MULTI:
case OP_TYPE_DIV:
case OP_TYPE_MOD:
case OP_TYPE_GREATER_THAN:
case OP_TYPE_GREATER_EQUAL:
case OP_TYPE_LOWER_THAN:
case OP_TYPE_LOWER_EQUAL:
case OP_TYPE_EQUAL:
return " = ";
case OP_TYPE_NOT_EQUAL:
case OP_TYPE_IN:
case OP_TYPE_NOT_IN:
case OP_TYPE_LIKE:
case OP_TYPE_NOT_LIKE:
case OP_TYPE_MATCH:
case OP_TYPE_NMATCH:
case OP_TYPE_JSON_GET_VALUE:
case OP_TYPE_JSON_CONTAINS:
default:
break;
}
return " unknown operator ";
}
void nodeToSql(const SNode* node, string& sql) {
if (nullptr == node) {
return;
}
switch (nodeType(node)) {
case QUERY_NODE_COLUMN: {
SColumnNode* pCol = (SColumnNode*)node;
if ('\0' != pCol->dbName[0]) {
sql.append(pCol->dbName);
sql.append(".");
}
if ('\0' != pCol->tableAlias[0]) {
sql.append(pCol->tableAlias);
sql.append(".");
}
sql.append(pCol->colName);
break;
}
case QUERY_NODE_VALUE:
break;
case QUERY_NODE_OPERATOR: {
SOperatorNode* pOp = (SOperatorNode*)node;
nodeToSql(pOp->pLeft, sql);
sql.append(opTypeToSql(pOp->opType));
nodeToSql(pOp->pRight, sql);
break;
}
default: default:
break; break;
} }
...@@ -91,13 +194,8 @@ private: ...@@ -91,13 +194,8 @@ private:
sql.append(", "); sql.append(", ");
} }
firstNode = false; firstNode = false;
switch (nodeType(node)) { nodeToSql(node, sql);
case QUERY_NODE_COLUMN:
sql.append(((SColumnNode*)node)->colName);
break;
}
} }
sql.append(" ");
} }
void reset() { void reset() {
...@@ -125,10 +223,13 @@ TEST_F(NewParserTest, selectStar) { ...@@ -125,10 +223,13 @@ TEST_F(NewParserTest, selectStar) {
bind("SELECT * FROM test.t1"); bind("SELECT * FROM test.t1");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
bind("SELECT ts FROM t1"); bind("SELECT ts, c1 FROM t1");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
bind("SELECT ts, tag1, c1 FROM t1"); bind("SELECT ts, t.c1 FROM (SELECT * FROM t1) t");
ASSERT_TRUE(run());
bind("SELECT * FROM t1 tt1, t1 tt2 WHERE tt1.c1 = tt2.c1");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
...@@ -147,3 +248,16 @@ TEST_F(NewParserTest, syntaxError) { ...@@ -147,3 +248,16 @@ TEST_F(NewParserTest, syntaxError) {
bind("SELECT * FROM test.t1 t WHER"); bind("SELECT * FROM test.t1 t WHER");
ASSERT_TRUE(run(TSDB_CODE_FAILED)); ASSERT_TRUE(run(TSDB_CODE_FAILED));
} }
TEST_F(NewParserTest, semanticError) {
setDatabase("root", "test");
bind("SELECT * FROM t10");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_FAILED));
bind("SELECT c1, c3 FROM t1");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_FAILED));
bind("SELECT c2 FROM t1 tt1, t1 tt2 WHERE tt1.c1 = tt2.c1");
ASSERT_TRUE(run(TSDB_CODE_SUCCESS, TSDB_CODE_FAILED));
}
...@@ -45,11 +45,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 ...@@ -45,11 +45,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
bMsg->header.vgId = htonl(bInput->vgId); bMsg->header.vgId = htonl(bInput->vgId);
if (bInput->dbName) { if (bInput->dbFName) {
tstrncpy(bMsg->dbFname, bInput->dbName, tListLen(bMsg->dbFname)); tstrncpy(bMsg->dbFName, bInput->dbFName, tListLen(bMsg->dbFName));
} }
tstrncpy(bMsg->tableFname, bInput->tableFullName, tListLen(bMsg->tableFname)); tstrncpy(bMsg->tbName, bInput->tbName, tListLen(bMsg->tbName));
*msgLen = (int32_t)sizeof(*bMsg); *msgLen = (int32_t)sizeof(*bMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -113,12 +113,19 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { ...@@ -113,12 +113,19 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
} }
pOut->dbVgroup.vgVersion = pRsp->vgVersion; pOut->dbVgroup = calloc(1, sizeof(SDBVgroupInfo));
pOut->dbVgroup.hashMethod = pRsp->hashMethod; if (NULL == pOut->dbVgroup) {
pOut->dbVgroup.dbId = pRsp->uid; qError("calloc %d failed", (int32_t)sizeof(SDBVgroupInfo));
pOut->dbVgroup.vgInfo = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); return TSDB_CODE_TSC_OUT_OF_MEMORY;
if (NULL == pOut->dbVgroup.vgInfo) { }
qError("hash init[%d] failed", pRsp->vgNum);
pOut->dbVgroup->vgVersion = pRsp->vgVersion;
pOut->dbVgroup->hashMethod = pRsp->hashMethod;
pOut->dbVgroup->dbId = pRsp->uid;
pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pOut->dbVgroup->vgHash) {
qError("taosHashInit %d failed", pRsp->vgNum);
tfree(pOut->dbVgroup);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -131,8 +138,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { ...@@ -131,8 +138,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port); pRsp->vgroupInfo[i].epset.eps[n].port = ntohs(pRsp->vgroupInfo[i].epset.eps[n].port);
} }
if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) {
qError("hash push failed"); qError("taosHashPut failed");
goto _return; goto _return;
} }
} }
...@@ -142,8 +149,10 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { ...@@ -142,8 +149,10 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) {
return code; return code;
_return: _return:
if (pOut) { if (pOut) {
tfree(pOut->dbVgroup.vgInfo); taosHashCleanup(pOut->dbVgroup->vgHash);
tfree(pOut->dbVgroup);
} }
return code; return code;
...@@ -248,16 +257,13 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { ...@@ -248,16 +257,13 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
strcpy(pOut->dbFName, pMetaMsg->dbFName);
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
SET_META_TYPE_BOTH_TABLE(pOut->metaType); SET_META_TYPE_BOTH_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) { strcpy(pOut->ctbName, pMetaMsg->tbName);
snprintf(pOut->ctbFname, sizeof(pOut->ctbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); strcpy(pOut->tbName, pMetaMsg->stbName);
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
} else {
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
}
pOut->ctbMeta.vgId = pMetaMsg->vgId; pOut->ctbMeta.vgId = pMetaMsg->vgId;
pOut->ctbMeta.tableType = pMetaMsg->tableType; pOut->ctbMeta.tableType = pMetaMsg->tableType;
...@@ -268,11 +274,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { ...@@ -268,11 +274,7 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
} else { } else {
SET_META_TYPE_TABLE(pOut->metaType); SET_META_TYPE_TABLE(pOut->metaType);
if (pMetaMsg->dbFname[0]) { strcpy(pOut->tbName, pMetaMsg->tbName);
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
} else {
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
}
code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta); code = queryCreateTableMetaFromMsg(pMetaMsg, (pMetaMsg->tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
} }
...@@ -291,4 +293,4 @@ void initQueryModuleMsgHandle() { ...@@ -291,4 +293,4 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
\ No newline at end of file
...@@ -561,7 +561,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -561,7 +561,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp; *rspMsg = rsp;
*dataLen = 0; *dataLen = 0;
...@@ -573,7 +573,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -573,7 +573,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// Got data from sink // Got data from sink
......
...@@ -132,7 +132,7 @@ typedef struct SSchJob { ...@@ -132,7 +132,7 @@ typedef struct SSchJob {
SQueryProfileSummary summary; SQueryProfileSummary summary;
} SSchJob; } SSchJob;
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children)) #define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册