未验证 提交 ad0be66a 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19935 from taosdata/enh/dynamicIdx

feat:  change  index dynamicly 
......@@ -130,3 +130,4 @@ tools/COPYING
tools/BUGS
tools/taos-tools
tools/taosws-rs
tags
......@@ -29,6 +29,7 @@ extern "C" {
typedef struct SBuffer SBuffer;
typedef struct SSchema SSchema;
typedef struct SSchema2 SSchema2;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SValue SValue;
......
......@@ -144,6 +144,8 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
#define TSDB_ALTER_TABLE_UPDATE_OPTIONS 9
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10
#define TSDB_ALTER_TABLE_ADD_TAG_INDEX 11
#define TSDB_ALTER_TABLE_DROP_TAG_INDEX 12
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
......@@ -294,6 +296,15 @@ struct SSchema {
char name[TSDB_COL_NAME_LEN];
};
struct SSchema2 {
int8_t type;
int8_t flags;
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
char alias[TSDB_COL_NAME_LEN];
};
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
......@@ -347,8 +358,19 @@ void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
#define IS_IDX_ON(s) (((s)->flags & 0x02) == COL_IDX_ON)
#define IS_SET_NULL(s) (((s)->flags & COL_SET_NULL) == COL_SET_NULL)
#define SSCHMEA_SET_IDX_ON(s) \
do { \
(s)->flags |= COL_IDX_ON; \
} while (0)
#define SSCHMEA_SET_IDX_OFF(s) \
do { \
(s)->flags &= (~COL_IDX_ON); \
} while (0)
#define SSCHMEA_TYPE(s) ((s)->type)
#define SSCHMEA_FLAGS(s) ((s)->flags)
#define SSCHMEA_COLID(s) ((s)->colId)
......@@ -1240,6 +1262,17 @@ typedef struct {
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
typedef struct {
char colName[TSDB_COL_NAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
int64_t stbUid;
int64_t dbUid;
int64_t reserved[8];
} SDropIndexReq;
int32_t tSerializeSDropIdxReq(void* buf, int32_t bufLen, SDropIndexReq* pReq);
int32_t tDeserializeSDropIdxReq(void* buf, int32_t bufLen, SDropIndexReq* pReq);
typedef struct {
int64_t dbUid;
char db[TSDB_DB_FNAME_LEN];
......@@ -2812,6 +2845,22 @@ typedef struct {
int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char idxName[TSDB_COL_NAME_LEN];
int8_t idxType;
} SCreateTagIndexReq;
int32_t tSerializeSCreateTagIdxReq(void* buf, int32_t bufLen, SCreateTagIndexReq* pReq);
int32_t tDeserializeSCreateTagIdxReq(void* buf, int32_t bufLen, SCreateTagIndexReq* pReq);
typedef SMDropSmaReq SDropTagIndexReq;
int32_t tSerializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
int32_t tDeserializeSDropTagIdxReq(void* buf, int32_t bufLen, SDropTagIndexReq* pReq);
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
......
......@@ -220,6 +220,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
......
......@@ -253,6 +253,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
// mnode-func
#define TSDB_CODE_MND_INVALID_FUNC_NAME TAOS_DEF_ERROR_CODE(0, 0x0370)
// #define TSDB_CODE_MND_INVALID_FUNC_LEN TAOS_DEF_ERROR_CODE(0, 0x0371) // 2.x
......@@ -265,6 +266,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_FUNC_COMMENT TAOS_DEF_ERROR_CODE(0, 0x0378)
#define TSDB_CODE_MND_INVALID_FUNC_RETRIEVE TAOS_DEF_ERROR_CODE(0, 0x0379)
// mnode-db
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380)
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //
......@@ -366,6 +369,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0481)
#define TSDB_CODE_MND_INVALID_SMA_OPTION TAOS_DEF_ERROR_CODE(0, 0x0482)
// mnode-tag-indxe
#define TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0483)
#define TSDB_CODE_MND_TAG_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0484)
// dnode
// #define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) // 2.x
// #define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) // 2.x
......
......@@ -104,6 +104,7 @@ extern const int32_t TYPE_BYTES[16];
#define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
#define TSDB_INDEX_TYPE_NORMAL "NORMAL"
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2
......
......@@ -418,7 +418,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
if(ASSERT(waitingRspNum >= 0)){
if (ASSERT(waitingRspNum >= 0)) {
tscError("tmqCommitRspCountDown error:%d", waitingRspNum);
return;
}
......
......@@ -120,6 +120,8 @@ static const SSysDbTableSchema userIdxSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "column_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "index_type", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userStbsSchema[] = {
......
......@@ -918,6 +918,65 @@ int32_t tDeserializeSMDropSmaReq(void *buf, int32_t bufLen, SMDropSmaReq *pReq)
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSCreateTagIdxReq(void *buf, int32_t bufLen, SCreateTagIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->stbName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->colName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->idxName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->idxType) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSCreateTagIdxReq(void *buf, int32_t bufLen, SCreateTagIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->stbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->colName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->idxName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->idxType) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
tEndEncode(&encoder);
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDropTagIdxReq(void *buf, int32_t bufLen, SDropTagIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSMCreateFullTextReq(void *buf, int32_t bufLen, SMCreateFullTextReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -3984,6 +4043,44 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSDropIdxReq(void *buf, int32_t bufLen, SDropIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->colName) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->stb) < 0) return -1;
if (tEncodeI64(&encoder, pReq->stbUid) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
// TODO
return 0;
}
int32_t tDeserializeSDropIdxReq(void *buf, int32_t bufLen, SDropIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->colName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->stb) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->stbUid) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
// TODO
return 0;
}
int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *pReq) {
SEncoder encoder = {0};
......
......@@ -157,6 +157,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
......@@ -182,6 +184,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
......
......@@ -522,6 +522,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -389,6 +389,18 @@ typedef struct {
SSchemaWrapper schemaTag; // for dstVgroup
} SSmaObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
char dstTbName[TSDB_TABLE_FNAME_LEN];
char colName[TSDB_COL_NAME_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
} SIdxObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
......
#ifndef _TD_MND_IDX_H_
#define _TD_MND_IDX_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitIdx(SMnode *pMnode);
void mndCleanupIdx(SMnode *pMnode);
SIdxObj *mndAcquireIdx(SMnode *pMnode, char *Name);
void mndReleaseIdx(SMnode *pMnode, SIdxObj *pSma);
int32_t mndDropIdxsByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxObj *pIdx);
int32_t mndGetTableIdx(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq);
int32_t mndSetCreateIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetDropIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetAlterIdxRedoLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_IDX_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_IDX_COMM_H_
#define _TD_MND_IDX_COMM_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SSIdx {
int type; // sma or idx
void *pIdx;
} SSIdx;
// retrieve sma index and tag index
typedef struct {
void *pSmaIter;
void *pIdxIter;
} SSmaAndTagIter;
int32_t mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_IDX_COMM_H_*/
\ No newline at end of file
......@@ -42,6 +42,11 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
const char *mndGetStbStr(const char *src);
int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId);
void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void *alterOriData,
int32_t alterOriDataLen);
#ifdef __cplusplus
}
#endif
......
......@@ -17,6 +17,7 @@
#include "mndDb.h"
#include "mndCluster.h"
#include "mndDnode.h"
#include "mndIndex.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndSma.h"
......@@ -1053,6 +1054,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropIdxsByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndUserRemoveDb(pMnode, pTrans, pDb->name) != 0) goto _OVER;
......
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndIndex.h"
#include "mndIndexComm.h"
#include "mndSma.h"
static void *mndGetIdx(SMnode *pMnode, char *name, int type) {
SSdb *pSdb = pMnode->pSdb;
void *pIdx = sdbAcquire(pSdb, type, name);
if (pIdx == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = 0;
}
return pIdx;
}
int mndAcquireGlobalIdx(SMnode *pMnode, char *name, int type, SSIdx *idx) {
SSmaObj *pSma = mndGetIdx(pMnode, name, SDB_SMA);
SIdxObj *pIdx = mndGetIdx(pMnode, name, SDB_IDX);
terrno = 0;
if (pSma == NULL && pIdx == NULL) return 0;
if (pSma != NULL) {
if (type == SDB_SMA) {
idx->type = SDB_SMA;
idx->pIdx = pSma;
} else {
mndReleaseSma(pMnode, pSma);
terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST;
return -1;
}
} else {
if (type == SDB_IDX) {
idx->type = SDB_IDX;
idx->pIdx = pIdx;
} else {
mndReleaseIdx(pMnode, pIdx);
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
return -1;
}
}
return 0;
}
......@@ -21,6 +21,7 @@
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndGrant.h"
#include "mndIndex.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndPerfSchema.h"
......@@ -425,6 +426,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-idx", mndInitIdx, mndCleanupIdx) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
......
......@@ -17,6 +17,8 @@
#include "mndSma.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndIndex.h"
#include "mndIndexComm.h"
#include "mndInfoSchema.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
......@@ -43,9 +45,13 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
static void mndDestroySmaObj(SSmaObj *pSmaObj);
// sma and tag index comm func
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
int32_t mndInitSma(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_SMA,
......@@ -58,14 +64,14 @@ int32_t mndInitSma(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -712,8 +718,13 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
goto _OVER;
}
SSIdx idx = {0};
if (mndAcquireGlobalIdx(pMnode, createReq.name, SDB_SMA, &idx) == 0) {
pSma = idx.pIdx;
} else {
goto _OVER;
}
pSma = mndAcquireSma(pMnode, createReq.name);
if (pSma != NULL) {
if (createReq.igExists) {
mInfo("sma:%s, already exist in sma:%s, ignore exist is set", createReq.name, pSma->name);
......@@ -959,7 +970,12 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
mInfo("sma:%s, start to drop", dropReq.name);
pSma = mndAcquireSma(pMnode, dropReq.name);
SSIdx idx = {0};
if (mndAcquireGlobalIdx(pMnode, dropReq.name, SDB_SMA, &idx) == 0) {
pSma = idx.pIdx;
} else {
goto _OVER;
}
if (pSma == NULL) {
if (dropReq.igNotExists) {
mInfo("sma:%s, not exist, ignore not exist is set", dropReq.name);
......@@ -998,7 +1014,14 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
int32_t code = -1;
SSmaObj *pSma = NULL;
pSma = mndAcquireSma(pMnode, indexReq->indexFName);
SSIdx idx = {0};
if (0 == mndAcquireGlobalIdx(pMnode, indexReq->indexFName, SDB_SMA, &idx)) {
pSma = idx.pIdx;
} else {
*exist = false;
return 0;
}
if (pSma == NULL) {
*exist = false;
return 0;
......@@ -1207,10 +1230,10 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
}
SSmaAndTagIter *pIter = pShow->pIter;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_SMA, pShow->pIter, (void **)&pSma);
if (pShow->pIter == NULL) break;
pIter->pSmaIter = sdbFetch(pSdb, SDB_SMA, pIter->pSmaIter, (void **)&pSma);
if (pIter->pSmaIter == NULL) break;
if (NULL != pDb && pSma->dbUid != pDb->uid) {
sdbRelease(pSdb, pSma);
......@@ -1247,6 +1270,18 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSma->createdTime, false);
char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(col, (char *)"");
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)col, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tag, (char *)"sma_index");
colDataAppend(pColInfo, numOfRows, (const char *)tag, false);
numOfRows++;
sdbRelease(pSdb, pSma);
}
......@@ -1256,7 +1291,30 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
return numOfRows;
}
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
// sma and tag index comm func
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
int ret = mndProcessDropSmaReq(pReq);
if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST) {
terrno = 0;
ret = mndProcessDropTagIdxReq(pReq);
}
return ret;
}
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
if (pShow->pIter == NULL) {
pShow->pIter = taosMemoryCalloc(1, sizeof(SSmaAndTagIter));
}
int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
if (read < rows) read += mndRetrieveTagIdx(pReq, pShow, pBlock, rows - read);
return read;
}
static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) {
SSmaAndTagIter *p = pIter;
if (p != NULL) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, p->pSmaIter);
sdbCancelFetch(pSdb, p->pIdxIter);
}
taosMemoryFree(p);
}
......@@ -1008,6 +1008,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
} else if (pTrans->originRpcType == TDMT_MND_CREATE_INDEX) {
void *pCont = NULL;
int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen);
}
}
if (pTrans->rpcRspLen != 0) {
......
......@@ -147,7 +147,8 @@ typedef enum {
SDB_STB = 18,
SDB_DB = 19,
SDB_FUNC = 20,
SDB_MAX = 21
SDB_IDX = 21,
SDB_MAX = 22
} ESdbType;
typedef struct SSdbRaw {
......
......@@ -60,6 +60,8 @@ const char *sdbTableName(ESdbType type) {
return "db";
case SDB_FUNC:
return "func";
case SDB_IDX:
return "idx";
default:
return "undefine";
}
......
......@@ -59,7 +59,7 @@ void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode);
int32_t vnodeSyncCommit(SVnode *pVnode);
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeBegin(SVnode *pVnode);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
......@@ -137,6 +137,7 @@ typedef struct SMetaFltParam {
int16_t type;
void *val;
bool reverse;
bool equal;
int (*filterFunc)(void *a, void *b, int16_t type);
} SMetaFltParam;
......@@ -270,8 +271,8 @@ int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen,
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock2(STqReader *pReader);
bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData** pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData** pSubmitTbDataRet);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
......
......@@ -142,6 +142,9 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage);
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
......
......@@ -1108,26 +1108,30 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
}
int32_t valid = 0;
while (1) {
int32_t count = 0;
static const int8_t TRY_ERROR_LIMIT = 1;
do {
void *entryKey = NULL;
int32_t nEntryKey = -1;
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
if (valid < 0) break;
SCtimeIdxKey *p = entryKey;
if (count > TRY_ERROR_LIMIT) break;
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
if (cmp == 0) taosArrayPush(pUids, &p->uid);
if (param->reverse == false) {
if (cmp == -1) break;
} else if (param->reverse) {
if (cmp == 1) break;
if (cmp == 0)
taosArrayPush(pUids, &p->uid);
else {
if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break;
count++;
}
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break;
}
} while (1);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
......@@ -1162,29 +1166,34 @@ int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
if (tdbTbcMoveTo(pCursor->pCur, pName, strlen(pName) + 1, &cmp) < 0) {
goto END;
}
bool first = true;
int32_t valid = 0;
while (1) {
int32_t count = 0;
int32_t TRY_ERROR_LIMIT = 1;
do {
void *pEntryKey = NULL, *pEntryVal = NULL;
int32_t nEntryKey = -1, nEntryVal = 0;
valid = tdbTbcGet(pCursor->pCur, (const void **)pEntryKey, &nEntryKey, (const void **)&pEntryVal, &nEntryVal);
if (valid < 0) break;
if (count > TRY_ERROR_LIMIT) break;
char *pTableKey = (char *)pEntryKey;
cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
if (cmp == 0) {
tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// next
} else {
break;
if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break;
count++;
}
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
}
}
} while (1);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
......@@ -1240,7 +1249,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
pCursor->type = param->type;
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
if (ret < 0) {
goto END;
}
......@@ -1284,32 +1293,40 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
goto END;
}
bool first = true;
int count = 0;
int32_t valid = 0;
while (1) {
bool found = false;
static const int8_t TRY_ERROR_LIMIT = 1;
/// src: [[suid, cid1, type1]....[suid, cid2, type2]....[suid, cid3, type3]...]
/// target: [suid, cid2, type2]
do {
void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal;
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
if (valid < 0) {
tdbFree(entryVal);
break;
}
if (count > TRY_ERROR_LIMIT) {
break;
}
STagIdxKey *p = entryKey;
if (p == NULL) break;
if (p->type != pCursor->type) {
if (first) {
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break;
continue;
} else {
if (p->type != pCursor->type || p->suid != pCursor->suid || p->cid != pCursor->cid) {
if (found == true) break;
count++;
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
} else {
continue;
}
}
if (p->suid != pKey->suid) {
break;
}
first = false;
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
if (cmp == 0) {
// match
......@@ -1320,17 +1337,18 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
}
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// not match but should continue to iter
found = true;
} else {
// not match and no more result
break;
if (param->equal == true) {
if (count > TRY_ERROR_LIMIT) break;
count++;
}
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
}
}
} while (1);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
......@@ -1358,7 +1376,7 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi
return ret;
}
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags) {
const int32_t LIMIT = 128;
const int32_t LIMIT = 4096;
int32_t isLock = false;
int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
......@@ -1401,6 +1419,7 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj
taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
}
}
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
......
......@@ -96,6 +96,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
metaRsp.numOfColumns = schema.nCols;
metaRsp.precision = pVnode->config.tsdbCfg.precision;
metaRsp.sversion = schema.version;
metaRsp.tversion = schemaTag.version;
metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
......@@ -264,26 +265,25 @@ _exit:
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void* p) {
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void *p) {
if (NULL == p) {
return;
}
SBatchRspMsg* pRsp = (SBatchRspMsg*)p;
SBatchRspMsg *pRsp = (SBatchRspMsg *)p;
rpcFreeCont(pRsp->msg);
}
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t rspSize = 0;
SBatchReq batchReq = {0};
SBatchMsg *req = NULL;
int32_t code = 0;
int32_t rspSize = 0;
SBatchReq batchReq = {0};
SBatchMsg *req = NULL;
SBatchRspMsg rsp = {0};
SBatchRsp batchRsp = {0};
SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void *pRsp = NULL;
SBatchRsp batchRsp = {0};
SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void *pRsp = NULL;
if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -291,7 +291,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit;
}
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
qError("too many msgs %d in vnode batch meta req", msgNum);
......@@ -405,7 +405,8 @@ void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64, "nInsert");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64, "nInsertSuccess");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64, "nBatchInsert");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64, "nBatchInsertSuccess");
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64,
"nBatchInsertSuccess");
}
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
......
......@@ -33,6 +33,8 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) {
......@@ -419,6 +421,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case TDMT_VND_COMMIT:
needCommit = true;
break;
case TDMT_VND_CREATE_INDEX:
vnodeProcessCreateIndexReq(pVnode, version, pReq, len, pRsp);
break;
case TDMT_VND_DROP_INDEX:
vnodeProcessDropIndexReq(pVnode, version, pReq, len, pRsp);
break;
case TDMT_VND_COMPACT:
vnodeProcessCompactVnodeReq(pVnode, version, pReq, len, pRsp);
goto _exit;
......@@ -1566,6 +1574,49 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
_err:
return code;
}
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SDecoder dc = {0};
pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
tDecoderInit(&dc, pReq, len);
// decode req
if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
return -1;
}
if (metaAddIndexToSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno;
goto _err;
}
tDecoderClear(&dc);
return 0;
_err:
tDecoderClear(&dc);
return -1;
}
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SDropIndexReq req = {0};
pRsp->msgType = TDMT_VND_CREATE_INDEX_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
if (tDeserializeSDropIdxReq(pReq, len, &req)) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (metaDropIndexFromSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno;
return -1;
}
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SCompactVnodeReq req = {0};
......@@ -1580,4 +1631,4 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t version, void
vnodeBegin(pVnode);
return 0;
}
\ No newline at end of file
}
......@@ -43,7 +43,14 @@ typedef struct tagFilterAssist {
SArray* cInfoList;
} tagFilterAssist;
static int32_t removeInvalidTable(SArray* uids, SHashObj* tags);
typedef enum {
FILTER_NO_LOGIC = 1,
FILTER_AND,
FILTER_OTHER,
} FilterCondType;
static FilterCondType checkTagCond(SNode* cond);
static int32_t removeInvalidTable(SArray* uids, SHashObj* tags);
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SHashObj* tags);
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond);
static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond,
......@@ -392,7 +399,8 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara
return TSDB_CODE_SUCCESS;
}
static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray* uidList, SNode* pTagCond) {
static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray* uidList, SNode* pTagCond,
SIdxFltStatus status) {
int32_t code = TSDB_CODE_SUCCESS;
SArray* pBlockList = NULL;
SSDataBlock* pResBlock = NULL;
......@@ -430,14 +438,22 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
// int64_t stt = taosGetTimestampUs();
tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
FilterCondType condType = checkTagCond(pTagCond);
int32_t filter = optimizeTbnameInCond(metaHandle, suid, uidList, pTagCond, tags);
if (filter == -1) {
code = metaGetTableTags(metaHandle, suid, uidList, tags);
if ((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) {
code = metaGetTableTagsByUids(metaHandle, suid, uidList, tags);
} else {
code = metaGetTableTags(metaHandle, suid, uidList, tags);
}
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), suid);
terrno = code;
goto end;
}
} else {
qDebug("succ to get table tags from meta by tbname in cond, suid:%" PRIu64, suid);
}
if (suid != 0) {
removeInvalidTable(uidList, tags);
......@@ -842,6 +858,15 @@ static int tableUidCompare(const void* a, const void* b) {
return u1 < u2 ? -1 : 1;
}
static FilterCondType checkTagCond(SNode* cond) {
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
return FILTER_NO_LOGIC;
}
if (nodeType(cond) != QUERY_NODE_LOGIC_CONDITION || ((SLogicConditionNode*)cond)->condType != LOGIC_COND_TYPE_AND) {
return FILTER_AND;
}
return FILTER_OTHER;
}
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) {
int32_t ret = -1;
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
......@@ -954,7 +979,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
return -1;
}
} else {
// qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
// qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
terrno = 0;
}
}
......@@ -983,13 +1008,14 @@ static void genTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pContext) {
taosMemoryFree(payload);
}
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* res, SNode* pTagCond, void* metaHandle) {
static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* res, SNode* pTagCond, void* metaHandle,
SIdxFltStatus status) {
if (pTagCond == NULL) {
return TSDB_CODE_SUCCESS;
}
terrno = TDB_CODE_SUCCESS;
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond, status);
if (terrno != TDB_CODE_SUCCESS) {
colDataDestroy(pColInfoData);
taosMemoryFreeClear(pColInfoData);
......@@ -1034,12 +1060,13 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
pListInfo->suid = pScanNode->suid;
SArray* res = taosArrayInit(8, sizeof(uint64_t));
SIdxFltStatus status = SFLT_NOT_INDEX;
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
if (metaIsTableExist(metaHandle, tableUid)) {
taosArrayPush(res, &tableUid);
}
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle);
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle, status);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1064,16 +1091,17 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
SIndexMetaArg metaArg = {
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
SIdxFltStatus status = SFLT_NOT_INDEX;
code = doFilterTag(pTagIndexCond, &metaArg, res, &status);
if (code != 0 || status == SFLT_NOT_INDEX) {
qError("failed to get tableIds from index, reason:%s, suid:%" PRIu64, tstrerror(code), tableUid);
code = TDB_CODE_SUCCESS;
} else {
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(res));
}
}
}
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle);
code = doFilterByTagCond(pListInfo, res, pTagCond, metaHandle, status);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -134,7 +134,7 @@ static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName);
static void destroySysScanOperator(void* param);
static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code);
static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo);
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse);
static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse, bool* equal);
static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable,
SMetaReader* smrChildTable, const char* dbname, const char* tableName,
......@@ -164,7 +164,8 @@ int32_t sysFilte__DbName(void* arg, SNode* pNode, SArray* result) {
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
int ret = func(dbname, pVal->datum.p, TSDB_DATA_TYPE_VARCHAR);
......@@ -182,9 +183,9 @@ int32_t sysFilte__VgroupId(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool reverse = false;
bool equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
int ret = func(&vgId, &pVal->datum.i, TSDB_DATA_TYPE_BIGINT);
......@@ -198,9 +199,9 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool reverse = false, equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
SMetaFltParam param = {.suid = 0,
......@@ -208,6 +209,7 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
.type = TSDB_DATA_TYPE_VARCHAR,
.val = pVal->datum.p,
.reverse = reverse,
.equal = equal,
.filterFunc = func};
return -1;
}
......@@ -217,9 +219,9 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool reverse = false, equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
SMetaFltParam param = {.suid = 0,
......@@ -227,6 +229,7 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
.type = TSDB_DATA_TYPE_BIGINT,
.val = &pVal->datum.i,
.reverse = reverse,
.equal = equal,
.filterFunc = func};
int32_t ret = metaFilterCreateTime(pMeta, &param, result);
......@@ -239,8 +242,8 @@ int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
return -1;
}
......@@ -251,8 +254,8 @@ int32_t sysFilte__Ttl(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
return -1;
}
......@@ -263,8 +266,8 @@ int32_t sysFilte__STableName(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
return -1;
}
......@@ -275,8 +278,8 @@ int32_t sysFilte__Uid(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
return -1;
}
......@@ -287,8 +290,8 @@ int32_t sysFilte__Type(void* arg, SNode* pNode, SArray* result) {
SOperatorNode* pOper = (SOperatorNode*)pNode;
SValueNode* pVal = (SValueNode*)pOper->pRight;
bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
bool equal = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse, &equal);
if (func == NULL) return -1;
return -1;
}
......@@ -359,10 +362,13 @@ void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhy
static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name,
SSDataBlock* pBlock);
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) {
__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse, bool* equal) {
if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) {
*reverse = true;
}
if (ctype == OP_TYPE_EQUAL) {
*equal = true;
}
if (ctype == OP_TYPE_LOWER_THAN)
return optSysFilterFuncImpl__LowerThan;
else if (ctype == OP_TYPE_LOWER_EQUAL)
......
......@@ -22,8 +22,8 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 8 * 512 * 1024 // 8M
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20
#define MEM_THRESHOLD 128 * 1024 * 1024 // 8M
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 5
#define MEM_ESTIMATE_RADIO 1.5
static void idxMemRef(MemTable* tbl);
......
......@@ -383,12 +383,13 @@ static FORCE_INLINE int sifEqual(void *a, void *b, int16_t dtype) {
//__compar_fn_t func = idxGetCompar(dtype);
return (int)tDoCompare(func, QUERY_TERM, a, b);
}
static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reverse, bool *equal) {
if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) {
*reverse = true;
} else {
*reverse = false;
}
if (type == QUERY_LESS_EQUAL)
return sifLessEqual;
else if (type == QUERY_LESS_THAN)
......@@ -398,6 +399,7 @@ static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reve
else if (type == QUERY_GREATER_THAN)
return sifGreaterThan;
else if (type == QUERY_TERM) {
*equal = true;
return sifEqual;
}
return NULL;
......@@ -474,14 +476,15 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
ret = indexJsonSearch(arg->ivtIdx, mtm, output->result);
indexMultiTermQueryDestroy(mtm);
} else {
bool reverse;
FilterFunc filterFunc = sifGetFilterFunc(qtype, &reverse);
bool reverse = false, equal = false;
FilterFunc filterFunc = sifGetFilterFunc(qtype, &reverse, &equal);
SMetaFltParam param = {.suid = arg->suid,
.cid = left->colId,
.type = left->colValType,
.val = right->condValue,
.reverse = reverse,
.equal = equal,
.filterFunc = filterFunc};
char buf[128] = {0};
......
......@@ -784,7 +784,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
pCol->tableType = pTable->pMeta->tableType;
pCol->colId = pColSchema->colId;
pCol->colType = (tagFlag >= 0 ? COLUMN_TYPE_TAG : COLUMN_TYPE_COLUMN);
pCol->hasIndex = (0 == tagFlag);
pCol->hasIndex = ((0 == tagFlag) || (pColSchema != NULL && IS_IDX_ON(pColSchema)));
pCol->node.resType.type = pColSchema->type;
pCol->node.resType.bytes = pColSchema->bytes;
if (TSDB_DATA_TYPE_TIMESTAMP == pCol->node.resType.type) {
......@@ -5379,6 +5379,28 @@ static int32_t buildCreateFullTextReq(STranslateContext* pCxt, SCreateIndexStmt*
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateTagIndexReq(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SCreateTagIndexReq* pReq) {
SName name;
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->indexDbName, pStmt->indexName, &name), pReq->idxName);
memset(&name, 0, sizeof(SName));
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name), pReq->stbName);
memset(&name, 0, sizeof(SName));
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, pReq->dbFName);
SNode* pNode = NULL;
ASSERT(LIST_LENGTH(pStmt->pCols) == 1);
FOREACH(pNode, pStmt->pCols) {
SColumnNode* p = (SColumnNode*)pNode;
memcpy(pReq->colName, p->colName, sizeof(p->colName));
}
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SMCreateFullTextReq createFTReq = {0};
int32_t code = buildCreateFullTextReq(pCxt, pStmt, &createFTReq);
......@@ -5389,9 +5411,20 @@ static int32_t translateCreateFullTextIndex(STranslateContext* pCxt, SCreateInde
return code;
}
static int32_t translateCreateNormalIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
SCreateTagIndexReq createTagIdxReq = {0};
int32_t code = buildCreateTagIndexReq(pCxt, pStmt, &createTagIdxReq);
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_INDEX, (FSerializeFunc)tSerializeSCreateTagIdxReq, &createTagIdxReq);
}
return code;
}
static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt) {
if (INDEX_TYPE_FULLTEXT == pStmt->indexType) {
return translateCreateFullTextIndex(pCxt, pStmt);
} else if (INDEX_TYPE_NORMAL == pStmt->indexType) {
return translateCreateNormalIndex(pCxt, pStmt);
}
return translateCreateSmaIndex(pCxt, pStmt);
}
......
......@@ -208,6 +208,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve msg")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST, "Tag index already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_INDEX_NOT_EXIST, "Tag index not exists")
// mnode-db
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, "Database not specified or available")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exists")
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step0
$dbPrefix = ta_3_db
$tbPrefix = ta_3_tb
$mtPrefix = ta_3_mt
$tbNum = 100
$rowNum = 20
$totalNum = 200
print =============== create database
sql create database $dbPrefix
sql use $dbPrefix
print =============== create super table and register tag index
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 int, t2 int, t3 int, t4 int, t5 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mtPrefix tags( $i , $i , $i , $i , $i );
$i = $i + 1
endw
sql show tables
if $rows != $tbNum then
return -1
endi
print =============== insert data into each table
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql insert into $tb values(now, $i );
$i = $i + 1
endw
sql create index ti2 on $mtPrefix (t2)
print ==== test name conflict
#
sql_error create index ti3 on $mtPrefix(t2)
sql_error create index ti2 on $mtPrefix(t2)
sql_error create index ti2 on $mtPrefix(t1)
sql_error create index ti2 on $mtPrefix(t3)
sql_error create index ti2 on $mtPrefix(txx)
print ===== test operator equal
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2= $i ;
if $rows != 1 then
return -1
endi
$i = $i + 1
endw
print ===== test operator great equal
# great equal than
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 >= $i ;
$tmp = $tbNum - $i
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== test operator great
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 > $i ;
$tmp = $tbNum - $i
$tmp = $tmp - 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== test operator lower
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 <= $i ;
$tmp = $i + 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== test operator lower than
# lower equal than
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 < $i ;
$tmp = $i
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== add table after index created
$interval = $tbNum + $tbNum
$i = $interval
$limit = $interval + $tbNum
while $i < $limit
$tb = $tbPrefix . $i
sql insert into $tb using $mtPrefix tags( $i , $i , $i , $i , $i ) values( now, $i )
$i = $i + 1
endw
print ===== add table after index created (opeator great equal than)
# great equal than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 >= $i ;
$tmp = $limit - $i
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== add table after index created (opeator great than)
# great than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 > $i ;
$tmp = $limit - $i
$tmp = $tmp - 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== drop table after index created
# drop table
$dropTbNum = $tbNum / 2
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql drop table $tb
$i = $i + 1
endw
print ===== drop table after index created(opeator lower than)
# lower equal than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 < $i ;
$tmp = $i - $interval
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== drop table after index created(opeator lower equal than)
# lower equal than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 <= $i ;
$tmp = $i - $interval
$tmp = $tmp + 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print === show index
sql select * from information_schema.ins_indexes
if $rows != 1 then
return -1
endi
print === drop index ti2
sql drop index ti2
print === drop not exist index
sql_error drop index t2
sql_error drop index t3
sql_error create index ti0 on $mtPrefix (t1)
sql_error create index t2i on ta_3_tb17 (t2)
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step0
$dbPrefix = ta_3_db
$tbPrefix = ta_3_tb
$mtPrefix = ta_3_mt
$tbNum = 100000
$rowNum = 20
$totalNum = 200
print =============== create database
sql create database $dbPrefix
sql use $dbPrefix
print =============== create super table and register tag index
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 int, t2 int, t3 int, t4 int, t5 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mtPrefix tags( $i , $i , $i , $i , $i );
$i = $i + 1
endw
sql show tables
if $rows != $tbNum then
return -1
endi
print =============== insert data into each table
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql insert into $tb values(now, $i );
$i = $i + 1
endw
sql create index ti2 on $mtPrefix (t2)
print ==== test name conflict
#
sql_error create index ti3 on $mtPrefix(t2)
sql_error create index ti2 on $mtPrefix(t2)
sql_error create index ti2 on $mtPrefix(t1)
sql_error create index ti2 on $mtPrefix(t3)
sql_error create index ti2 on $mtPrefix(txx)
print ===== test operator equal
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2= $i ;
if $rows != 1 then
return -1
endi
$i = $i + 1
endw
print ===== test operator great equal
# great equal than
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 >= $i ;
$tmp = $tbNum - $i
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== test operator great
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 > $i ;
$tmp = $tbNum - $i
$tmp = $tmp - 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== test operator lower
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 <= $i ;
$tmp = $i + 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== test operator lower than
# lower equal than
$i = 0
while $i < $tbNum
sql select * from $mtPrefix where t2 < $i ;
$tmp = $i
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== add table after index created
$interval = $tbNum + $tbNum
$i = $interval
$limit = $interval + $tbNum
while $i < $limit
$tb = $tbPrefix . $i
sql insert into $tb using $mtPrefix tags( $i , $i , $i , $i , $i ) values( now, $i )
$i = $i + 1
endw
print ===== add table after index created (opeator great equal than)
# great equal than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 >= $i ;
$tmp = $limit - $i
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== add table after index created (opeator great than)
# great than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 > $i ;
$tmp = $limit - $i
$tmp = $tmp - 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== drop table after index created
# drop table
$dropTbNum = $tbNum / 2
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql drop table $tb
$i = $i + 1
endw
print ===== drop table after index created(opeator lower than)
# lower equal than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 < $i ;
$tmp = $i - $interval
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print ===== drop table after index created(opeator lower equal than)
# lower equal than
$i = $interval
while $i < $limit
sql select * from $mtPrefix where t2 <= $i ;
$tmp = $i - $interval
$tmp = $tmp + 1
if $rows != $tmp then
return -1
endi
$i = $i + 1
endw
print === show index
sql select * from information_schema.ins_indexes
if $rows != 1 then
return -1
endi
print === drop index ti2
sql drop index ti2
print === drop not exist index
sql_error drop index t2
sql_error drop index t3
sql_error create index ti0 on $mtPrefix (t1)
sql_error create index t2i on ta_3_tb17 (t2)
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step0
$dbPrefix = ta_3_db
$dbPrefix1 = ta_3_db1
$tbPrefix = ta_3_tb
$mtPrefix = ta_3_mt
$tbNum = 1
$rowNum = 20
$totalNum = 200
print =============== create database
sql create database $dbPrefix
sql use $dbPrefix
print =============== create super table and register tag index
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 int, t2 int, t3 int, t4 int, t5 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mtPrefix tags( $i , $i , $i , $i , $i );
$i = $i + 1
endw
sql show tables
if $rows != $tbNum then
return -1
endi
print =============== insert data into each table
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql insert into $tb values(now, $i );
$i = $i + 1
endw
print ==== create sma and tag index, global name conflict
sql create sma index t2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
sql_error create index t2i on $mtPrefix (t2)
sql drop index t2i
print ==== create tagindex and sma index, global name conflict
sql create index t2i on $mtPrefix (t2)
sql_error create sma index t2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
sql drop index t2i
print ===== iter sma and tag index
sql create index tagt2i on $mtPrefix (t2)
sql create sma index smat2i on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
sql select * from information_schema.ins_indexes
if $rows != 2 then
return -1
endi
sql drop index smat2i
$i = 0
$smaPre = sma3
while $i < 5
$sma = $smaPre . $i
$i = $i + 1
sql create sma index $sma on $mtPrefix function(max(c1)) interval(6m,10s) sliding(6m);
endw
sql select * from information_schema.ins_indexes
if $rows != 6 then
return -1
endi
sql drop stable $mtPrefix
sql select * from information_schema.ins_indexes
if $rows != 0 then
return -1
endi
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 int, t2 int, t3 int, t4 int, t5 int)
sql create index tagt2i on $mtPrefix (t2)
sql drop database $dbPrefix
sql select * from information_schema.ins_indexes
if $rows != 0 then
return -1
endi
print ===== drop tag and del tag index
sql create database $dbPrefix
sql use $dbPrefix
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 int, t2 int, t3 int, t4 int, t5 int)
sql create index tagt2i on $mtPrefix (t2)
sql select * from information_schema.ins_indexes
if $rows != 1 then
return -1
endi
sql alter table $mtPrefix drop tag t2
sql select * from information_schema.ins_indexes
if $rows != 0 then
return -1
endi
print ==== rename tag name, and update index colName
sql create index tagt3i on $mtPrefix (t3)
sql select * from information_schema.ins_indexes
if $rows != 1 then
return -1
endi
sql alter table $mtPrefix rename tag t3 txxx
sql select * from information_schema.ins_indexes
if $rows != 1 then
return -1
endi
if $data05 != txxx then
return -1
endi
print ====== diff db has same index name
sql create database $dbPrefix1
sql use $dbPrefix1
sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 int, t2 int, t3 int, t4 int, t5 int)
sql create index tagt3i on $mtPrefix (t3)
sql select * from information_schema.ins_indexes
if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -22,7 +22,7 @@ class TDTestCase:
tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)")
tdSql.query("select count(*) from information_schema.ins_columns")
tdSql.checkData(0, 0, 267)
tdSql.checkData(0, 0, 269)
tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'")
tdSql.checkRows(14)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册