提交 cf9f9ab4 编写于 作者: dengyihao's avatar dengyihao

rebuild index at tag0

上级 1990a891
......@@ -79,9 +79,12 @@ int32_t mndInitIdx(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
static int32_t mndFindSuperTableTagId(const SStbObj *pStb, const char *tagName) {
static int32_t mndFindSuperTableTagId(const SStbObj *pStb, const char *tagName, int8_t *hasIdx) {
for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
if (strcasecmp(pStb->pTags[tag].name, tagName) == 0) {
if (IS_IDX_ON(&pStb->pTags[tag])) {
*hasIdx = 1;
}
return tag;
}
}
......@@ -597,7 +600,8 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
pNew->updateTime = taosGetTimestampMs();
pNew->lock = 0;
int32_t tag = mndFindSuperTableTagId(pOld, tagName);
int8_t hasIdx = 0;
int32_t tag = mndFindSuperTableTagId(pOld, tagName, &hasIdx);
if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
return -1;
......@@ -612,14 +616,14 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
SSchema *pTag = pNew->pTags + tag;
if (on == 1) {
if (IS_IDX_ON(pTag)) {
if (hasIdx && tag != 0) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
return -1;
} else {
SSCHMEA_SET_IDX_ON(pTag);
}
} else {
if (!IS_IDX_ON(pTag)) {
if (hasIdx == 0) {
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
} else {
SSCHMEA_SET_IDX_OFF(pTag);
......@@ -667,7 +671,42 @@ _OVER:
mndTransDrop(pTrans);
return code;
}
int8_t mndCheckIndexNameByTagName(SMnode *pMnode, SIdxObj *pIdxObj) {
// build index on first tag, and no index name;
int8_t exist = 0;
SDbObj *pDb = NULL;
if (strlen(pIdxObj->db) > 0) {
pDb = mndAcquireDb(pMnode, pIdxObj->db);
if (pDb == NULL) return 0;
}
SSmaAndTagIter *pIter = NULL;
SIdxObj *pIdx = NULL;
SSdb *pSdb = pMnode->pSdb;
while (1) {
pIter = sdbFetch(pSdb, SDB_IDX, pIter, (void **)&pIdx);
if (pIter == NULL) break;
if (NULL != pDb && pIdx->dbUid != pDb->uid) {
sdbRelease(pSdb, pIdx);
continue;
}
if (pIdxObj->stbUid != pIdx->stbUid) {
sdbRelease(pSdb, pIdx);
continue;
}
if (strncmp(pIdxObj->colName, pIdx->colName, TSDB_COL_NAME_LEN) == 0) {
sdbRelease(pSdb, pIdx);
sdbCancelFetch(pSdb, pIdx);
exist = 1;
break;
}
sdbRelease(pSdb, pIdx);
}
mndReleaseDb(pMnode, pDb);
return exist;
}
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1;
SIdxObj idxObj = {0};
......@@ -681,11 +720,20 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *re
idxObj.stbUid = pStb->uid;
idxObj.dbUid = pStb->dbUid;
int32_t tag = mndFindSuperTableTagId(pStb, req->colName);
int8_t hasIdx = 0;
int32_t tag = mndFindSuperTableTagId(pStb, req->colName, &hasIdx);
if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
return -1;
} else if (tag == 0) {
}
int8_t exist = 0;
if (tag == 0 && hasIdx == 1) {
exist = mndCheckIndexNameByTagName(pMnode, &idxObj);
if (exist) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
return -1;
}
} else if (hasIdx == 1) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
return -1;
}
......@@ -695,11 +743,11 @@ static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *re
return -1;
}
SSchema *pTag = pStb->pTags + tag;
if (IS_IDX_ON(pTag)) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
return -1;
}
// SSchema *pTag = pStb->pTags + tag;
// if (IS_IDX_ON(pTag)) {
// terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
// return -1;
// }
code = mndAddIndexImpl(pMnode, pReq, pDb, pStb, &idxObj);
return code;
......
......@@ -17,8 +17,8 @@
#include "osMemory.h"
#include "tencode.h"
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI) {
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
metaReaderDoInit(pReader, pMeta, flags);
pReader->pAPI = pAPI;
}
......@@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) {
int code = 0;
SMetaReader mr = {0};
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
code = metaGetTableEntryByName(&mr, tbName);
if (code == 0) *tbType = mr.me.type;
......@@ -244,7 +244,7 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
return NULL;
}
SVnode* pVnodeObj = pVnode;
SVnode *pVnodeObj = pVnode;
// tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
pTbCur->pMeta = pVnodeObj->pMeta;
pTbCur->paused = 1;
......@@ -1139,7 +1139,7 @@ int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
pCursor->type = param->type;
metaRLock(pMeta);
//ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
// ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
......@@ -1194,7 +1194,7 @@ int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
ret = -1;
for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
if (schema->colId == param->cid && param->type == schema->type && (IS_IDX_ON(schema) || i == 0)) {
if (schema->colId == param->cid && param->type == schema->type && (IS_IDX_ON(schema))) {
ret = 0;
}
}
......
......@@ -450,12 +450,13 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
goto _err;
}
if (IS_IDX_ON(pNew) && !IS_IDX_ON(pOld)) {
if (diffIdx != -1) goto _err;
// if (diffIdx != -1) goto _err;
diffIdx = i;
break;
}
}
if (diffIdx == -1 || diffIdx == 0) {
if (diffIdx == -1) {
goto _err;
}
......@@ -586,7 +587,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
for (int i = 0; i < oStbEntry.stbEntry.schemaTag.nCols; i++) {
SSchema *schema = oStbEntry.stbEntry.schemaTag.pSchema + i;
if (0 == strncmp(schema->name, pReq->colName, sizeof(pReq->colName))) {
if (i != 0 || IS_IDX_ON(schema)) {
if (IS_IDX_ON(schema)) {
pCol = schema;
}
break;
......@@ -2094,7 +2095,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
} else {
for (int i = 0; i < pTagSchema->nCols; i++) {
pTagColumn = &pTagSchema->pSchema[i];
if (i != 0 && !IS_IDX_ON(pTagColumn)) continue;
if (!IS_IDX_ON(pTagColumn)) continue;
STagVal tagVal = {.cid = pTagColumn->colId};
tTagGet((const STag *)pCtbEntry->ctbEntry.pTags, &tagVal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册