提交 4d6e84a5 编写于 作者: dengyihao's avatar dengyihao

opt index

上级 01b89be0
...@@ -16,6 +16,7 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); ...@@ -16,6 +16,7 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndGetTableIdx(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist); int32_t mndGetTableIdx(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -39,12 +39,12 @@ static int32_t mndIdxActionInsert(SSdb *pSdb, SIdxObj *pIdx); ...@@ -39,12 +39,12 @@ static int32_t mndIdxActionInsert(SSdb *pSdb, SIdxObj *pIdx);
static int32_t mndIdxActionDelete(SSdb *pSdb, SIdxObj *pIdx); static int32_t mndIdxActionDelete(SSdb *pSdb, SIdxObj *pIdx);
static int32_t mndIdxActionUpdate(SSdb *pSdb, SIdxObj *pOld, SIdxObj *pNew); static int32_t mndIdxActionUpdate(SSdb *pSdb, SIdxObj *pOld, SIdxObj *pNew);
static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq); static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq);
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq); // static int32_t mndProcessDropIdxReq(SRpcMsg *pReq);
static int32_t mndProcessGetIdxReq(SRpcMsg *pReq); static int32_t mndProcessGetIdxReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbIdxReq(SRpcMsg *pReq); static int32_t mndProcessGetTbIdxReq(SRpcMsg *pReq);
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter); static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter);
static void mndDestroyIdxObj(SIdxObj *pIdxObj); static void mndDestroyIdxObj(SIdxObj *pIdxObj);
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb); static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb);
...@@ -60,7 +60,7 @@ int32_t mndInitIdx(SMnode *pMnode) { ...@@ -60,7 +60,7 @@ int32_t mndInitIdx(SMnode *pMnode) {
}; };
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIdxReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIdxReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIdxReq); // mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIdxReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp);
...@@ -124,7 +124,9 @@ int mndSetCreateIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStb ...@@ -124,7 +124,9 @@ int mndSetCreateIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStb
return 0; return 0;
} }
static void *mndBuildDropIdxReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStbObj, SIdxObj *pIdx, int32_t *contLen) { static void *mndBuildDropIdxReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStbObj, SIdxObj *pIdx, int32_t *contLen) {
// TODO int32_t len = 0;
int32_t ret = 0;
SDropIndexReq req = {0}; SDropIndexReq req = {0};
memcpy(req.colName, pIdx->colName, sizeof(pIdx->colName)); memcpy(req.colName, pIdx->colName, sizeof(pIdx->colName));
memcpy(req.stb, pIdx->stb, sizeof(pIdx->stb)); memcpy(req.stb, pIdx->stb, sizeof(pIdx->stb));
...@@ -132,20 +134,29 @@ static void *mndBuildDropIdxReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStbOb ...@@ -132,20 +134,29 @@ static void *mndBuildDropIdxReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStbOb
req.stbUid = pIdx->stbUid; req.stbUid = pIdx->stbUid;
mInfo("idx: %s start to build drop index req", pIdx->name); mInfo("idx: %s start to build drop index req", pIdx->name);
int32_t len = tSerializeSDropIdxReq(NULL, 0, &req);
if (len < 0) { len = tSerializeSDropIdxReq(NULL, 0, &req);
terrno = TSDB_CODE_OUT_OF_MEMORY; if (ret < 0) {
return NULL; goto _err;
} }
void *pCont = taosMemoryCalloc(1, len); len += sizeof(SMsgHead);
if (pCont == NULL) { SMsgHead *pHead = taosMemoryCalloc(1, len);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; goto _err;
} }
tSerializeSDropIdxReq(pCont, len, &req);
pHead->contLen = htonl(len);
pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tSerializeSDropIdxReq(pBuf, len - sizeof(SMsgHead), &req);
*contLen = len; *contLen = len;
return pCont; return pHead;
_err:
return NULL;
} }
int mndSetDropIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) { int mndSetDropIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -699,18 +710,13 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) { ...@@ -699,18 +710,13 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pOld, SStbObj *pNew, static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pOld, SStbObj *pNew, char *tagName,
char *tagName) { int on) {
taosRLockLatch(&pOld->lock); taosRLockLatch(&pOld->lock);
memcpy(pNew, pOld, sizeof(SStbObj)); memcpy(pNew, pOld, sizeof(SStbObj));
taosRUnLockLatch(&pOld->lock); taosRUnLockLatch(&pOld->lock);
// pNew->numOfColumns = 0;
// pNew->pColumns = NULL;
// pNew->numOfTags = 0;
pNew->pTags = NULL; pNew->pTags = NULL;
// pNew->numOfFuncs = 0;
// pNew->pFuncs = NULL;
pNew->updateTime = taosGetTimestampMs(); pNew->updateTime = taosGetTimestampMs();
pNew->lock = 0; pNew->lock = 0;
...@@ -726,13 +732,21 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb ...@@ -726,13 +732,21 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
if (mndAllocStbSchemas(pOld, pNew) != 0) { if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1; return -1;
} }
SSchema *pTag = pNew->pTags + tag; SSchema *pTag = pNew->pTags + tag;
if (IS_IDX_ON(pTag)) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST; if (on == 1) {
return -1; if (IS_IDX_ON(pTag)) {
terrno = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
return -1;
} else {
pTag->flags |= COL_IDX_ON;
}
} else { } else {
pTag->flags |= COL_IDX_ON; if (!IS_IDX_ON(pTag)) {
terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
} else {
pTag->flags = 0;
}
} }
pNew->tagVer++; pNew->tagVer++;
...@@ -759,7 +773,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt ...@@ -759,7 +773,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
if (mndSetCreateIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetCreateIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName) != 0) goto _OVER; if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1) != 0) goto _OVER;
if (mndSetCreateIdxRedoActions(pMnode, pTrans, pDb, &newStb, pIdx) != 0) goto _OVER; if (mndSetCreateIdxRedoActions(pMnode, pTrans, pDb, &newStb, pIdx) != 0) goto _OVER;
// if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; // if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
...@@ -843,15 +857,18 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p ...@@ -843,15 +857,18 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
if (mndSetDropIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetDropIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName) != 0) goto _OVER; if (mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0) != 0) goto _OVER;
if (mndSetDropIdxRedoActions(pMnode, pTrans, pDb, &newObj, pIdx) != 0) goto _OVER; if (mndSetDropIdxRedoActions(pMnode, pTrans, pDb, &newObj, pIdx) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
return code; return code;
} }
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) { int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
......
...@@ -47,7 +47,7 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo ...@@ -47,7 +47,7 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
static void mndDestroySmaObj(SSmaObj *pSmaObj); static void mndDestroySmaObj(SSmaObj *pSmaObj);
// sma index and tag index // retrieve sma index and tag index
static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
// TODO // TODO
int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows); int32_t read = mndRetrieveSma(pReq, pShow, pBlock, rows);
...@@ -56,6 +56,16 @@ static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc ...@@ -56,6 +56,16 @@ static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
return read; return read;
} }
static int32_t mndProcessDropIdxReq(SRpcMsg *pReq) {
int ret = mndProcessDropSmaReq(pReq);
if (terrno == TSDB_CODE_MND_SMA_NOT_EXIST) {
terrno = 0;
ret = mndProcessDropTagIdxReq(pReq);
} else {
}
return ret;
}
static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter) { static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter) {
// TODO // TODO
} }
...@@ -71,7 +81,7 @@ int32_t mndInitSma(SMnode *pMnode) { ...@@ -71,7 +81,7 @@ int32_t mndInitSma(SMnode *pMnode) {
}; };
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册