diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c3ef9a07f37ee2db5573e81336872f7f0f37a964..d56db2046e85ca8870e407f60fa115a800414cf5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1871,15 +1871,27 @@ typedef struct { } STSma; // Time-range-wise SMA typedef struct { - int8_t msgType; // 0 create, 1 recreate - STSma tSma; - STimeWindow window; -} SCreateTSmaMsg; + int64_t ver; // use a general definition + STSma tSma; +} SVCreateTSmaReq; typedef struct { - STimeWindow window; - char indexName[TSDB_INDEX_NAME_LEN + 1]; -} SDropTSmaMsg; + int8_t type; // 0 status report, 1 update data + char indexName[TSDB_INDEX_NAME_LEN + 1]; // + STimeWindow windows; +} STSmaMsg; + +typedef struct { + int64_t ver; // use a general definition + char indexName[TSDB_INDEX_NAME_LEN + 1]; +} SVDropTSmaReq; +typedef struct { +} SVCreateTSmaRsp, SVDropTSmaRsp; + +int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq); +void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq); +int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq); +void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq); typedef struct { STimeWindow tsWindow; // [skey, ekey] @@ -1901,22 +1913,18 @@ static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) { } } -// RSma: Time-range-wise Rollup SMA -// TODO: refactor when rSma grammar defined finally => +// RSma: Rollup SMA typedef struct { int64_t interval; int32_t retention; // unit: day uint16_t days; // unit: day int8_t intervalUnit; } SSmaParams; -// TODO: refactor when rSma grammar defined finally <= typedef struct { - // TODO: refactor to use the real schema => STSma tsma; float xFilesFactor; SArray* smaParams; // SSmaParams - // TODO: refactor to use the real schema <= } SRSma; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ae31dff3109cf732e165d68d7f6d9bb196a381a3..e7309eacc82e845679737139d14235eb4984926d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2390,3 +2390,36 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) tEndDecode(decoder); return 0; } + +int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { + int32_t tlen = 0; + + tlen += taosEncodeFixedI64(buf, pReq->ver); + tlen += tEncodeTSma(buf, &pReq->tSma); + + return tlen; +} + +void *tDeserializeSVCreateTSmaReq(void *buf, SVCreateTSmaReq *pReq) { + buf = taosDecodeFixedI64(buf, &(pReq->ver)); + + if ((buf = tDecodeTSma(buf, &pReq->tSma)) == NULL) { + tdDestroyTSma(&pReq->tSma); + } + return buf; +} + +int32_t tSerializeSVDropTSmaReq(void **buf, SVDropTSmaReq *pReq) { + int32_t tlen = 0; + + tlen += taosEncodeFixedI64(buf, pReq->ver); + tlen += taosEncodeString(buf, pReq->indexName); + + return tlen; +} +void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { + buf = taosDecodeFixedI64(buf, &(pReq->ver)); + buf = taosDecodeStringTo(buf, pReq->indexName); + + return buf; +} diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index b20be691ef2b0b21848089c9ab16aeb76da1a849..2d747d0e8000e82eef84c9dc1b27bafcb96981d2 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -40,24 +40,27 @@ typedef struct SMTbCursor SMTbCursor; typedef struct SMCtbCursor SMCtbCursor; typedef struct SMSmaCursor SMSmaCursor; -typedef SVCreateTbReq STbCfg; -typedef STSma SSmaCfg; +typedef SVCreateTbReq STbCfg; +typedef SVCreateTSmaReq SSmaCfg; // SMeta operations -SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); -void metaClose(SMeta *pMeta); -void metaRemove(const char *path); -int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); -int metaDropTable(SMeta *pMeta, tb_uid_t uid); -int metaCommit(SMeta *pMeta); +SMeta * metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); +void metaClose(SMeta *pMeta); +void metaRemove(const char *path); +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); +int metaDropTable(SMeta *pMeta, tb_uid_t uid); +int metaCommit(SMeta *pMeta); +int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); +int32_t metaDropTSma(SMeta *pMeta, char *indexName); // For Query STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); -SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); +STSma * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid); +SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); diff --git a/source/dnode/vnode/src/inc/metaDef.h b/source/dnode/vnode/src/inc/metaDef.h index 6b4c036b395de223828a701facbd8a42e2616727..16a53baef073813a707d12071c1138518a403af6 100644 --- a/source/dnode/vnode/src/inc/metaDef.h +++ b/source/dnode/vnode/src/inc/metaDef.h @@ -33,7 +33,7 @@ int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); -int metaSaveSmaToDB(SMeta* pMeta, SSmaCfg* pTbCfg); +int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName); // SMetaCache diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 2a326eece80131c2c9fcbeac58a2aaccdd3ffebf..e4de7a668558c66aa1db07b9eddd4624809ab188 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -41,55 +41,4 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, return len; } -#if 0 - -typedef struct { - int minFid; - int midFid; - int maxFid; - TSKEY minKey; -} SRtn; - -typedef struct { - uint64_t uid; - int64_t offset; - int64_t size; -} SKVRecord; - -void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); - -static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { - if (key < 0) { - return (int)((key + 1) / tsTickPerDay[precision] / days - 1); - } else { - return (int)((key / tsTickPerDay[precision] / days)); - } -} - -static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { - if (fid >= pRtn->maxFid) { - return 0; - } else if (fid >= pRtn->midFid) { - return 1; - } else if (fid >= pRtn->minFid) { - return 2; - } else { - return -1; - } -} - -#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) - -int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); -void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); -void *tsdbCommitData(STsdbRepo *pRepo); -int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); -int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); -int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); -int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, - bool isLast, bool isSuper, void **ppBuf, void **ppCBuf); -int tsdbApplyRtn(STsdbRepo *pRepo); - -#endif - #endif /* _TD_TSDB_SMA_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index c2f147bcaaa53615707243af93ae1e285b582ee1..f49515412b28bf9ff20f2a6eeda3a4381473fa37 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -226,7 +226,7 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { return 0; } -int metaSaveSmaToDB(SMeta *pMeta, SSmaCfg *pSmaCfg) { +int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { char buf[512] = {0}; // TODO: may overflow void *pBuf = NULL; DBT key1 = {0}, value1 = {0}; @@ -485,7 +485,7 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey } static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - SSmaCfg *pSmaCfg = (SSmaCfg *)(pValue->app_data); + STSma *pSmaCfg = (STSma *)(pValue->app_data); memset(pSKey, 0, sizeof(*pSKey)); pSKey->data = &(pSmaCfg->tableUid); @@ -609,8 +609,8 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { return pTbCfg; } -SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { - SSmaCfg *pCfg = NULL; +STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { + STSma * pCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT value = {0}; @@ -629,7 +629,7 @@ SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { } // Decode - pCfg = (SSmaCfg *)malloc(sizeof(SSmaCfg)); + pCfg = (STSma *)malloc(sizeof(STSma)); if (pCfg == NULL) { return NULL; } @@ -885,8 +885,8 @@ STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) { return NULL; } - DBT skey = {.data = &(pCur->uid)}; - DBT pval = {.size = sizeof(pCur->uid)}; + DBT skey = {.data = &(pCur->uid), .size = sizeof(pCur->uid)}; + DBT pval = {0}; void *pBuf = NULL; while (true) { @@ -914,10 +914,49 @@ STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) { } metaCloseSmaCurosr(pCur); - + return pSW; } +SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { + SArray * pUids = NULL; + SMetaDB *pDB = pMeta->pDB; + DBC * pCur = NULL; + DBT pkey = {0}, pval = {0}; + int ret; + + pUids = taosArrayInit(16, sizeof(tb_uid_t)); + + if (!pUids) { + return NULL; + } + + // TODO: lock? + ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0); + if (ret != 0) { + taosArrayDestroy(pUids); + return NULL; + } + + void *pBuf = NULL; + + // TODO: lock? + while (true) { + ret = pCur->get(pCur, &pkey, &pval, isDup ? DB_NEXT_DUP : DB_NEXT_NODUP); + if(ret == 0) { + taosArrayPush(pUids, pkey.data); + continue; + } + break; + } + + if (pCur) { + pCur->close(pCur); + } + + return pUids; +} + static void metaDBWLock(SMetaDB *pDB) { #if IMPL_WITH_LOCK pthread_rwlock_wrlock(&(pDB->rwlock)); diff --git a/source/dnode/vnode/src/meta/metaIdx.c b/source/dnode/vnode/src/meta/metaIdx.c index 2ca02a2b802f58813cc38ae5b0f23d018227cb44..881ea4f46dcac373a7fe3fad487ae487da2ab515 100644 --- a/source/dnode/vnode/src/meta/metaIdx.c +++ b/source/dnode/vnode/src/meta/metaIdx.c @@ -107,19 +107,27 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) { return 0; } -int metaCreateSma(SMeta *pMeta, SSmaCfg *pSmaCfg) { - // Validate the tbOptions - // if (metaValidateTbCfg(pMeta, pTbCfg) < 0) { - // // TODO: handle error - // return -1; - // } +int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) { + // TODO: Validate the cfg + // The table uid should exists and be super table or common table. + // Check other cfg value // TODO: add atomicity - if (metaSaveSmaToDB(pMeta, pSmaCfg) < 0) { + if (metaSaveSmaToDB(pMeta, &pCfg->tSma) < 0) { // TODO: handle error return -1; } - - return 0; + return TSDB_CODE_SUCCESS; } + +int32_t metaDropTSma(SMeta *pMeta, char* indexName) { + // TODO: Validate the cfg + // TODO: add atomicity + + if (metaRemoveSmaFromDb(pMeta, indexName) < 0) { + // TODO: handle error + return -1; + } + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 4725f77fa25b33368f2d993165e08489cf8070e5..5977ad832c690e5ceb663476dee67b2f42aea7ab 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -50,3 +50,4 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid) { return 0; } + diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index f3f21dc9c05d3001a244447e55a0064a26005d6c..aba8100478e973bb84deb22beac5038f243f95f2 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -69,7 +69,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: handle error } - // TODO: maybe need to clear the requst struct + // TODO: maybe need to clear the request struct free(vCreateTbReq.stbCfg.pSchema); free(vCreateTbReq.stbCfg.pTagSchema); free(vCreateTbReq.name); @@ -133,13 +133,44 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA - // 1. tdCreateSmaMeta(pVnode->pMeta,...); - // 2. tdCreateSmaDataInit(); - // 3. tdCreateSmaData + SSmaCfg vCreateSmaReq = {0}; + if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) { + // TODO: handle error + tdDestroyTSma(&vCreateSmaReq.tSma); + return -1; + } + // TODO: send msg to stream computing to create tSma + // if ((send msg to stream computing) < 0) { + // tdDestroyTSma(&vCreateSmaReq); + // return -1; + // } + tdDestroyTSma(&vCreateSmaReq.tSma); + // TODO: return directly or go on follow steps? } break; case TDMT_VND_CANCEL_SMA: { // timeRangeSMA } break; case TDMT_VND_DROP_SMA: { // timeRangeSMA + SVDropTSmaReq vDropSmaReq = {0}; + if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexName) < 0) { + // TODO: handle error + return -1; + } + // TODO: send msg to stream computing to drop tSma + // if ((send msg to stream computing) < 0) { + // tdDestroyTSma(&vCreateSmaReq); + // return -1; + // } + // TODO: return directly or go on follow steps? } break; default: ASSERT(0); diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index d97dca8469c0e195833f87ad3e4f1ab329e1a697..f3a61bdfa4213bf6a8c135e6c92dd24df4ad0fea 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -103,6 +103,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { const char *smaIndexName2 = "sma_index_test_2"; const char *smaTestDir = "./smaTest"; const uint64_t tbUid = 1234567890; + const uint32_t nCntTSma = 2; // encode STSma tSma = {0}; tSma.version = 0; @@ -125,7 +126,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { } SMeta * pMeta = NULL; - SSmaCfg * pSmaCfg = &tSma; + STSma * pSmaCfg = &tSma; const SMetaCfg *pMetaCfg = &defaultMetaOptions; taosRemoveDir(smaTestDir); @@ -146,14 +147,14 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { metaSaveSmaToDB(pMeta, pSmaCfg); // get value by indexName - SSmaCfg *qSmaCfg = NULL; + STSma *qSmaCfg = NULL; qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1); assert(qSmaCfg != NULL); printf("name1 = %s\n", qSmaCfg->indexName); EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1); EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid); tdDestroyTSma(qSmaCfg); - free(qSmaCfg); + tfree(qSmaCfg); qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2); assert(qSmaCfg != NULL); @@ -161,7 +162,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2); EXPECT_EQ(qSmaCfg->interval, tSma.interval); tdDestroyTSma(qSmaCfg); - free(qSmaCfg); + tfree(qSmaCfg); // get index name by table uid SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid); @@ -175,17 +176,30 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { printf("indexName = %s\n", indexName); ++indexCnt; } - EXPECT_EQ(indexCnt, 2); + EXPECT_EQ(indexCnt, nCntTSma); metaCloseSmaCurosr(pSmaCur); // get wrapper by table uid STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid); assert(pSW != NULL); - EXPECT_EQ(pSW->number, 2); + EXPECT_EQ(pSW->number, nCntTSma); EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1); EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid); EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2); EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid); + + tdDestroyTSmaWrapper(pSW); + tfree(pSW); + + // get all sma table uids + SArray *pUids = metaGetSmaTbUids(pMeta, false); + assert(pUids != NULL); + for (uint32_t i = 0; i < taosArrayGetSize(pUids); ++i) { + printf("metaGetSmaTbUids: uid[%" PRIu32 "] = %" PRIi64 "\n", i, *(tb_uid_t *)taosArrayGet(pUids, i)); + // printf("metaGetSmaTbUids: index[%" PRIu32 "] = %s", i, (char *)taosArrayGet(pUids, i)); + } + EXPECT_EQ(taosArrayGetSize(pUids), 1); + taosArrayDestroy(pUids); // resource release metaRemoveSmaFromDb(pMeta, smaIndexName1);