diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6863e3c336e60bb32d0907914e0d80c7c0663fe6..4a6368ef3029c6e22e4a300d2d10da8fddafa732 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -198,7 +198,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) - + TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) TD_NEW_MSG_SEG(TDMT_QND_MSG) //shared by snode and vnode diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2f7ca249ef829af37f7705de10cee526b1956366..945238ba1ccd073b79b321978ea89311e1a89248 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -498,7 +498,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq } if (pReq->commentLen > 0) { - if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1; + if (tEncodeCStrWithLen(&encoder, pReq->comment, pReq->commentLen) < 0) return -1; } if (pReq->ast1Len > 0) { if (tEncodeBinary(&encoder, pReq->pAst1, pReq->ast1Len) < 0) return -1; @@ -561,7 +561,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR } if (pReq->commentLen > 0) { - pReq->comment = taosMemoryMalloc(pReq->commentLen); + pReq->comment = taosMemoryCalloc(1, pReq->commentLen + 1); if (pReq->comment == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6f89c97f8322e90a3a3a422ade34abacb110cf15..d498b640cb2708a96457fa2fa73859622c35ddf3 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -671,12 +671,12 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->numOfTags = pCreate->numOfTags; pDst->commentLen = pCreate->commentLen; if (pDst->commentLen > 0) { - pDst->comment = taosMemoryCalloc(pDst->commentLen, 1); + pDst->comment = taosMemoryCalloc(pDst->commentLen + 1, 1); if (pDst->comment == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(pDst->comment, pCreate->comment, pDst->commentLen); + memcpy(pDst->comment, pCreate->comment, pDst->commentLen + 1); } pDst->ast1Len = pCreate->ast1Len; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 300a5f890e7436c124eea2b74bf62a2d1bcf2d8a..ea856796b367321ce85a5d02c8b5566b007cc2f4 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -87,6 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); +int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); @@ -105,6 +106,7 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); +int metaTtlSmaller(SMeta *pMeta, uint64_t time, SArray *uidList); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index a6339125c42e873a62b231ab3932736023f2204f..4f43f99c855f589a19f7d0af3cfee37d21c68562 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -95,7 +95,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { metaULock(pMeta); - return 0; + return uid; } int metaReadNext(SMetaReader *pReader) { @@ -218,6 +218,40 @@ _err: return NULL; } +int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList){ + metaRLock(pMeta); + TBC * pCur; + int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL); + if (ret < 0) { + metaULock(pMeta); + return ret; + } + + STtlIdxKey ttlKey = {0}; + ttlKey.dtime = ttl; + ttlKey.uid = INT64_MAX; + int c = 0; + tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c); + if (c < 0) { + tdbTbcMoveToPrev(pCur); + } + + void *pKey = NULL; + int kLen = 0; + while(1){ + ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL); + if (ret < 0) { + break; + } + ttlKey = *(STtlIdxKey*)pKey; + taosArrayPush(uidList, &ttlKey.uid); + } + tdbTbcClose(pCur); + + tdbFree(pKey); + return 0; +} + struct SMCtbCursor { SMeta * pMeta; TBC * pCur; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 9a05f9e5a0c1fd7ca4bb1070deb78101faa8666f..717344fff2d829085bfbc2249d4116fd3b7c4f32 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -359,7 +359,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi metaDropTableByUid(pMeta, uid, &type); metaULock(pMeta); - if (type == TSDB_CHILD_TABLE && tbUids) { + if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) { taosArrayPush(tbUids, &uid); } @@ -367,6 +367,47 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi return 0; } +int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { + metaWLock(pMeta); + int ret = metaTtlSmaller(pMeta, ttl, tbUids); + if(ret != 0){ + return ret; + } + for (int i = 0; i < taosArrayGetSize(tbUids); ++i) { + tb_uid_t *uid = (tb_uid_t *)taosArrayGet(tbUids, i); + metaDropTableByUid(pMeta, *uid, NULL); + } + metaULock(pMeta); + return 0; +} + +static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME){ + int32_t ttlDays; + int64_t ctime; + if (pME->type == TSDB_CHILD_TABLE) { + ctime = pME->ctbEntry.ctime; + ttlDays = pME->ctbEntry.ttlDays; + } else if (pME->type == TSDB_NORMAL_TABLE) { + ctime = pME->ntbEntry.ctime; + ttlDays = pME->ntbEntry.ttlDays; + } else { + ASSERT(0); + } + + if (ttlDays <= 0) return; + + ttlKey->dtime = ctime + ttlDays * 24 * 60 * 60; + ttlKey->uid = pME->uid; +} + +static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { + STtlIdxKey ttlKey = {0}; + metaBuildTtlIdxKey(&ttlKey, pME); + if(ttlKey.dtime == 0) return 0; + return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), &pMeta->txn); +} + + static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { void * pData = NULL; int nData = 0; @@ -388,11 +429,12 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pMeta->txn); tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, &pMeta->txn); tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn); + if(e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e); + if (e.type == TSDB_CHILD_TABLE) { tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn); } else if (e.type == TSDB_NORMAL_TABLE) { // drop schema.db (todo) - // drop ttl.idx (todo) } else if (e.type == TSDB_SUPER_TABLE) { // drop schema.db (todo) } @@ -706,8 +748,18 @@ _err: } static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { - // TODO - ASSERT(0); + if (commentLen > 0) { + pNew->commentLen = commentLen; + pNew->comment = taosMemoryCalloc(1, commentLen); + if (pNew->comment == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(pNew->comment, pComment, commentLen); + } + if (ttl >= 0) { + pNew->ttl = ttl; + } return 0; } @@ -786,25 +838,9 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { } static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { - int32_t ttlDays; - int64_t ctime; - STtlIdxKey ttlKey; - - if (pME->type == TSDB_CHILD_TABLE) { - ctime = pME->ctbEntry.ctime; - ttlDays = pME->ctbEntry.ttlDays; - } else if (pME->type == TSDB_NORMAL_TABLE) { - ctime = pME->ntbEntry.ctime; - ttlDays = pME->ntbEntry.ttlDays; - } else { - ASSERT(0); - } - - if (ttlDays <= 0) return 0; - - ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60; - ttlKey.uid = pME->uid; - + STtlIdxKey ttlKey = {0}; + metaBuildTtlIdxKey(&ttlKey, pME); + if(ttlKey.dtime == 0) return 0; return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 10e0888988a5b828708fabbaa9e9140eb6653fc8..2540f953e06ddefc9f7b3d06811594407876b533 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -26,6 +26,7 @@ static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void * static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp); +static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -33,7 +34,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_VND_CREATE_TABLE: { - int64_t ctime = taosGetTimestampMs(); + int64_t ctime = taosGetTimestampSec(); int32_t nReqs; tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); @@ -60,7 +61,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) { SSubmitMsgIter msgIter = {0}; SSubmitReq *pSubmitReq = (SSubmitReq *)pMsg->pCont; SSubmitBlk *pBlock = NULL; - int64_t ctime = taosGetTimestampMs(); + int64_t ctime = taosGetTimestampSec(); tb_uid_t uid; tInitSubmitMsgIter(pSubmitReq, &msgIter); @@ -134,6 +135,9 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp case TDMT_VND_DROP_TABLE: if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; + case TDMT_VND_DROP_TTL_TABLE: + if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; + break; case TDMT_VND_CREATE_SMA: { if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; } break; @@ -389,6 +393,22 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return ret; } +static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp){ + + SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); + if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + int32_t ret = metaTtlDropTable(pVnode->pMeta, *(int64_t*)pReq, tbUids); + if(ret != 0){ + goto end; + } + tqUpdateTbUidList(pVnode->pTq, tbUids, false); + +end: + taosArrayDestroy(tbUids); + return ret; +} + static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateStbReq req = {0}; SDecoder coder; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index e887d00e2ce4939dfa12d75ab3814a7f32de30d3..05f1c648243bbb7cb9dfea782aea60d319126d91 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3050,7 +3050,7 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStmt, SMCreateStbReq* pReq) { pReq->igExists = pStmt->ignoreExists; pReq->xFilesFactor = pStmt->pOptions->filesFactor; - pReq->ttl = pStmt->pOptions->ttl; +// pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); @@ -4982,7 +4982,7 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p } } - if (TSDB_CODE_SUCCESS == code && '\0' != pStmt->pOptions->comment[0]) { + if (TSDB_CODE_SUCCESS == code) { pReq->updateComment = true; pReq->newComment = strdup(pStmt->pOptions->comment); if (NULL == pReq->newComment) { @@ -5105,6 +5105,9 @@ static int32_t rewriteAlterTableImpl(STranslateContext* pCxt, SAlterTableStmt* p pStmt->alterType == TSDB_ALTER_TABLE_UPDATE_TAG_BYTES)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG); } + if (pStmt->alterType == TSDB_ALTER_TABLE_UPDATE_OPTIONS && -1 != pStmt->pOptions->ttl) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE); + } return TSDB_CODE_SUCCESS; } else if (TSDB_CHILD_TABLE != pTableMeta->tableType && TSDB_NORMAL_TABLE != pTableMeta->tableType) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ALTER_TABLE); diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 5912fd800c92805013cb79cb967040a8e73c8af2..628bf6d7aa594e8cb18d68ec3f36b6ce273a08ce 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -58,6 +58,7 @@ int32_t tdbTbcMoveToPrev(TBC *pTbc); int32_t tdbTbcGet(TBC *pTbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen); int32_t tdbTbcDelete(TBC *pTbc); int32_t tdbTbcNext(TBC *pTbc, void **ppKey, int *kLen, void **ppVal, int *vLen); +int32_t tdbTbcPrev(TBC *pTbc, void **ppKey, int *kLen, void **ppVal, int *vLen); int32_t tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, int nData, int insert); // TXN diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index fffda68731cb15da91669c72913f776cb67496ca..9368706378d22269f7b8b2376c8e12575520effb 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1245,6 +1245,52 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { return 0; } +int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { + SCell *pCell; + SCellDecoder cd; + void *pKey, *pVal; + int ret; + + // current cursor points to an invalid position + if (pBtc->idx < 0) { + return -1; + } + + pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx); + + tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd); + + pKey = tdbRealloc(*ppKey, cd.kLen); + if (pKey == NULL) { + return -1; + } + + *ppKey = pKey; + *kLen = cd.kLen; + memcpy(pKey, cd.pKey, cd.kLen); + + if (ppVal) { + // TODO: vLen may be zero + pVal = tdbRealloc(*ppVal, cd.vLen); + if (pVal == NULL) { + tdbFree(pKey); + return -1; + } + + *ppVal = pVal; + *vLen = cd.vLen; + memcpy(pVal, cd.pVal, cd.vLen); + } + + ret = tdbBtcMoveToPrev(pBtc); + if (ret < 0) { + ASSERT(0); + return -1; + } + + return 0; +} + int tdbBtcMoveToNext(SBTC *pBtc) { int nCells; int ret; diff --git a/source/libs/tdb/src/db/tdbTable.c b/source/libs/tdb/src/db/tdbTable.c index 239aa5d7ef786b0941e857bf9e3f73a655f65d5a..1575f9f206d3dd6b77eec6e61a5840e47efedc4b 100644 --- a/source/libs/tdb/src/db/tdbTable.c +++ b/source/libs/tdb/src/db/tdbTable.c @@ -132,6 +132,10 @@ int tdbTbcNext(TBC *pTbc, void **ppKey, int *kLen, void **ppVal, int *vLen) { return tdbBtreeNext(&pTbc->btc, ppKey, kLen, ppVal, vLen); } +int tdbTbcPrev(TBC *pTbc, void **ppKey, int *kLen, void **ppVal, int *vLen) { + return tdbBtreePrev(&pTbc->btc, ppKey, kLen, ppVal, vLen); +} + int tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, int nData, int insert) { return tdbBtcUpsert(&pTbc->btc, pKey, nKey, pData, nData, insert); } diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 6524e3c9bcd873180378b5cfea2404b1a461ac7b..71e009cabfef8ea27a72381e09126cefa3fe7734 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -156,6 +156,7 @@ int tdbBtcMoveToLast(SBTC *pBtc); int tdbBtcMoveToNext(SBTC *pBtc); int tdbBtcMoveToPrev(SBTC *pBtc); int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); +int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen); int tdbBtcDelete(SBTC *pBtc); int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int nData, int insert);