diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d1e02af2875c4a23f7dc57046dc36088edfeb82a..65860d4959928aba2a28a620789351a0bef670cf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1866,10 +1866,59 @@ typedef struct { uint64_t tableUid; // super/common table uid int64_t interval; int64_t sliding; - col_id_t* colIds; // N.B. sorted column ids - uint16_t* funcIds; // N.B. sorted sma function ids + col_id_t* colIds; // sorted column ids + uint16_t* funcIds; // sorted sma function ids } STSma; // Time-range-wise SMA +typedef struct { + int8_t msgType; // 0 create, 1 recreate + STSma tSma; + STimeWindow window; +} SCreateTSmaMsg; + +typedef struct { + STimeWindow window; + char indexName[TSDB_INDEX_NAME_LEN + 1]; +} SDropTSmaMsg; + +typedef struct { + STimeWindow tsWindow; // [skey, ekey] + uint64_t tableUid; // sub/common table uid + int32_t numOfBlocks; // number of sma blocks for each column, total number is numOfBlocks*numOfColId + int32_t dataLen; // total data length + col_id_t* colIds; // e.g. 2,4,9,10 + col_id_t numOfColIds; // e.g. 4 + char data[]; // the sma blocks +} STSmaData; + +// TODO: move to the final location afte schema of STSma/STSmaData defined +static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) { + if (pSmaData) { + if (pSmaData->colIds) { + tfree(pSmaData->colIds); + } + tfree(pSmaData); + } +} + +// RSma: Time-range-wise Rollup SMA +// TODO: refactor when rSma grammar defined finally => +typedef struct { + int64_t interval; + int32_t retention; // unit: day + uint16_t days; // unit: day + int8_t intervalUnit; +} SSmaParams; +// TODO: refactor when rSma grammar defined finally <= + +typedef struct { + // TODO: refactor to use the real schema => + STSma tsma; + float xFilesFactor; + SArray* smaParams; // SSmaParams + // TODO: refactor to use the real schema <= +} SRSma; + typedef struct { uint32_t number; STSma* tSma; @@ -1885,12 +1934,17 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) { } } -static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { - if (pSW && pSW->tSma) { - for (uint32_t i = 0; i < pSW->number; ++i) { - tdDestroyTSma(pSW->tSma + i, false); +static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool releaseSelf) { + if (pSW) { + if (pSW->tSma) { + for (uint32_t i = 0; i < pSW->number; ++i) { + tdDestroyTSma(pSW->tSma + i, false); + } + tfree(pSW->tSma); + } + if (releaseSelf) { + free(pSW); } - tfree(pSW->tSma); } } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cae186ba16ed7af71d888eee367f13b067b712d5..0f0c4729bc70cf4fbcfdc56d331794fe8e846158 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -184,6 +184,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) diff --git a/include/common/trow.h b/include/common/trow.h index 49bc2f3515a3304e8421d3f6f4cdc33a901be430..ef30796d787528aeb4ec37e1811e310f2d46993a 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -118,6 +118,8 @@ typedef struct { } SKvRow; typedef struct { + /// timestamp + TSKEY ts; union { /// union field for encode and decode uint32_t info; @@ -138,8 +140,6 @@ typedef struct { uint32_t len; /// row version uint64_t ver; - /// timestamp - TSKEY ts; /// the inline data, maybe a tuple or a k-v tuple char data[]; } STSRow; @@ -173,7 +173,7 @@ typedef struct { #define TD_ROW_DATA(r) ((r)->data) #define TD_ROW_LEN(r) ((r)->len) #define TD_ROW_KEY(r) ((r)->ts) -#define TD_ROW_KEY_ADDR(r) POINTER_SHIFT((r), 16) +#define TD_ROW_KEY_ADDR(r) (r) // N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and // (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index e5ad43a4ee17e10926321ead4f628c88e367a6a4..b20be691ef2b0b21848089c9ab16aeb76da1a849 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -57,6 +57,7 @@ STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); +STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 6bc89ddd66e919afb642bc6db47ffe269719e007..5513742c73da9aadfaa61ee82411170bb5ddf101 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -87,6 +87,27 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); +/** + * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData); + +/** + * @brief Insert RSma(Time-range-wise Rollup SMA) data. + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData); + + // STsdbCfg int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 98c0de32a851fdfc1d50366f77dfaf8d16303f61..96a76ea7d4a0c7bd24b587afa7f98129f0855ac4 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -35,6 +35,7 @@ #include "tsdbMemory.h" #include "tsdbOptions.h" #include "tsdbReadImpl.h" +#include "tsdbSma.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index 71f35a9eca28845cca84baa38f21521a28086fd6..641255a294f36da5d8b437662066d7b99ff57f66 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -42,6 +42,7 @@ typedef struct { typedef struct { STsdbFSMeta meta; // FS meta SArray * df; // data file array + SArray * smaf; // sma data file array } SFSStatus; typedef struct { diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h new file mode 100644 index 0000000000000000000000000000000000000000..2a326eece80131c2c9fcbeac58a2aaccdd3ffebf --- /dev/null +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_SMA_H_ +#define _TD_TSDB_SMA_H_ + +// insert/update interface +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData); +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); + + +// query interface +// TODO: This is the basic params, and should wrap the params to a queryHandle. +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult); + +// management interface +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void* result); +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); + + + + +// internal func +static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { + int32_t len = 0; + len += taosEncodeFixedU64(pData, tableUid); + len += taosEncodeFixedU16(pData, colId); + len += taosEncodeFixedI64(pData, tsKey); + return len; +} + +#if 0 + +typedef struct { + int minFid; + int midFid; + int maxFid; + TSKEY minKey; +} SRtn; + +typedef struct { + uint64_t uid; + int64_t offset; + int64_t size; +} SKVRecord; + +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); + +static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { + if (key < 0) { + return (int)((key + 1) / tsTickPerDay[precision] / days - 1); + } else { + return (int)((key / tsTickPerDay[precision] / days)); + } +} + +static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { + if (fid >= pRtn->maxFid) { + return 0; + } else if (fid >= pRtn->midFid) { + return 1; + } else if (fid >= pRtn->minFid) { + return 2; + } else { + return -1; + } +} + +#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) + +int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); +void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); +void *tsdbCommitData(STsdbRepo *pRepo); +int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); +int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); +int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, + bool isLast, bool isSuper, void **ppBuf, void **ppCBuf); +int tsdbApplyRtn(STsdbRepo *pRepo); + +#endif + +#endif /* _TD_TSDB_SMA_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index c31f28d98356d1c01d4a2d8abb8175ca40eea49c..03a29376796417e8c01cc92292cc6b59c0b32585 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -833,6 +833,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { } pCur->uid = uid; + // TODO: lock? ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0); if (ret != 0) { free(pCur); @@ -852,25 +853,68 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) { } } -const char* metaSmaCursorNext(SMSmaCursor *pCur) { - DBT skey = {0}; - DBT pkey = {0}; - DBT pval = {0}; - void *pBuf; +const char *metaSmaCursorNext(SMSmaCursor *pCur) { + DBT skey = {0}; + DBT pkey = {0}; + DBT pval = {0}; // Set key skey.data = &(pCur->uid); skey.size = sizeof(pCur->uid); - + // TODO: lock? if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) { - const char* indexName = (const char *)pkey.data; + const char *indexName = (const char *)pkey.data; assert(indexName != NULL); return indexName; } else { - return 0; + return NULL; } } +STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) { + STSmaWrapper *pSW = NULL; + + pSW = calloc(sizeof(*pSW), 1); + if (pSW == NULL) { + return NULL; + } + + SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); + if (pCur == NULL) { + free(pSW); + return NULL; + } + + DBT skey = {.data = &(pCur->uid)}; + DBT pval = {.size = sizeof(pCur->uid)}; + void *pBuf = NULL; + + while (true) { + // TODO: lock? + if (pCur->pCur->pget(pCur->pCur, &skey, NULL, &pval, DB_NEXT) == 0) { + ++pSW->number; + STSma *tptr = (STSma *)realloc(pSW->tSma, pSW->number * sizeof(STSma)); + if (tptr == NULL) { + metaCloseSmaCurosr(pCur); + tdDestroyTSmaWrapper(pSW, true); + return NULL; + } + pSW->tSma = tptr; + pBuf = pval.data; + if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) { + metaCloseSmaCurosr(pCur); + tdDestroyTSmaWrapper(pSW, true); + return NULL; + } + continue; + } + break; + } + + metaCloseSmaCurosr(pCur); + + return pSW; +} static void metaDBWLock(SMetaDB *pDB) { #if IMPL_WITH_LOCK diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c new file mode 100644 index 0000000000000000000000000000000000000000..b465dc3a88d2e56f716cf595a391c0f0985372c4 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -0,0 +1,550 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbDef.h" + +#define SMA_STORAGE_TSDB_DAYS 30 +#define SMA_STORAGE_SPLIT_HOURS 24 +#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 + +#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks + +typedef enum { + SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma + SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name} +} ESmaStorageLevel; + +typedef struct { + STsdb * pTsdb; + char * pDFile; // TODO: use the real DFile type, not char* + int32_t interval; // interval with the precision of DB + int32_t blockSize; // size of SMA block item + // TODO +} STSmaWriteH; + +typedef struct { + int32_t iter; +} SmaFsIter; +typedef struct { + STsdb * pTsdb; + char * pDFile; // TODO: use the real DFile type, not char* + int32_t interval; // interval with the precision of DB + int32_t blockSize; // size of SMA block item + int8_t storageLevel; + int8_t days; + SmaFsIter smaFsIter; + // TODO +} STSmaReadH; + +// declaration of static functions +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t sectionDataLen, int32_t nBlocks); +static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); +static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, + int32_t fid, int32_t *nSmaBlocks); +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, + int32_t fid); + +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); + +/** + * @brief Judge the tSma storage level + * + * @param interval + * @param intervalUnit + * @return int32_t + */ +static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { + // TODO: configurable for SMA_STORAGE_SPLIT_HOURS? + switch (intervalUnit) { + case TD_TIME_UNIT_HOUR: + if (interval < SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MINUTE: + if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_SEC: + if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MILLISEC: + if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MICROSEC: + if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_NANOSEC: + if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + default: + break; + } + return SMA_STORAGE_LEVEL_TSDB; +} + +/** + * @brief Insert TSma data blocks to B+Tree + * + * @param bTree + * @param smaKey + * @param pData + * @param dataLen + * @return int32_t + */ +static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { + // TODO: insert sma data blocks into B+Tree + printf("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d\n", + *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + return TSDB_CODE_SUCCESS; +} + +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision) { + if (intervalUnit < TD_TIME_UNIT_MILLISEC) { + switch (intervalUnit) { + case TD_TIME_UNIT_YEAR: + case TD_TIME_UNIT_SEASON: + case TD_TIME_UNIT_MONTH: + case TD_TIME_UNIT_WEEK: + // illegal time unit + tsdbError("invalid interval unit: %d\n", intervalUnit); + TASSERT(0); + break; + case TD_TIME_UNIT_DAY: // the interval for tSma calculation must <= day + interval *= 86400 * 1e3; + break; + case TD_TIME_UNIT_HOUR: + interval *= 3600 * 1e3; + break; + case TD_TIME_UNIT_MINUTE: + interval *= 60 * 1e3; + break; + case TD_TIME_UNIT_SEC: + interval *= 1e3; + break; + default: + break; + } + } + + switch (intervalUnit) { + case TD_TIME_UNIT_MILLISEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval * 1e3; + } else { // nano second + return interval * 1e6; + } + break; + case TD_TIME_UNIT_MICROSEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval / 1e3; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval; + } else { // nano second + return interval * 1e3; + } + break; + case TD_TIME_UNIT_NANOSEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval / 1e6; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval / 1e3; + } else { // nano second + return interval; + } + break; + default: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval * 1e3; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval * 1e6; + } else { // nano second + return interval * 1e9; + } + break; + } + return interval; +} + +/** + * @brief Split the TSma data blocks into expected size and insert into B+Tree. + * + * @param pSmaH + * @param pData + * @param nOffset The nOffset of blocks since fid changes. + * @param nBlocks The nBlocks with the same fid since nOffset. + * @return int32_t + */ +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t nOffset, int32_t nBlocks) { + STsdb *pTsdb = pSmaH->pTsdb; + + TASSERT(pData->colIds != NULL); + + tsdbDebug("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d", nOffset, nBlocks); + printf("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d\n", nOffset, nBlocks); + + int32_t colDataLen = pData->dataLen / pData->numOfColIds; + int32_t sectionDataLen = pSmaH->blockSize * nBlocks; + + for (col_id_t i = 0; i < pData->numOfColIds; ++i) { + // param: pointer of B+Tree, key, value, dataLen + void *bTree = pSmaH->pDFile; +#ifndef SMA_STORE_SINGLE_BLOCKS + // save tSma data blocks as a whole + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + nOffset * pSmaH->interval, + (void **)&pSmaKey); + if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + nOffset * pSmaH->blockSize, sectionDataLen) < + 0) { + tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } +#else + // save tSma data blocks separately + for (int32_t n = 0; n < nBlocks; ++n) { + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + (nOffset + n) * pSmaH->interval, + (void **)&pSmaKey); + if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + (nOffset + n) * pSmaH->blockSize, + pSmaH->blockSize) < 0) { + tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + } +#endif + } + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { + pSmaH->pTsdb = pTsdb; + pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); +} + +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, + int32_t fid) { + // TODO + pSmaH->pDFile = "tSma_interval_file_name"; + + return TSDB_CODE_SUCCESS; +} /** + * @brief Split the sma data blocks by fid. + * + * @param pSmaH + * @param param + * @param pData + * @param nOffset + * @param fid + * @param nSmaBlocks + * @return int32_t + */ +static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, + int32_t fid, int32_t *nSmaBlocks) { + STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb); + + // TODO: use binary search + for (int32_t n = nOffset + 1; n < pData->numOfBlocks; ++n) { + // TODO: The tsWindow.skey should use the precision of DB. + int32_t tFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey + pSmaH->interval * n, days, pCfg->precision)); + if (tFid > fid) { + *nSmaBlocks = n - nOffset; + break; + } + } + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Insert/Update Time-range-wise SMA data. + * - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g. + * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files. + * - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The + * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d). + * - The destination file of one data block for some interval is determined by its start TS key. + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); + STSmaData * curData = pData; + STSmaWriteH tSmaH = {0}; + + tsdbInitTSmaWriteH(&tSmaH, pTsdb, param, pData); + + if (pData->numOfBlocks <= 0 || pData->numOfColIds <= 0 || pData->dataLen <= 0) { + TASSERT(0); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + // Step 1: Judge the storage level + int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; + + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file + // - Set and open the DFile or the B+Tree file + + int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, daysPerFile, pCfg->precision)); + int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, daysPerFile, pCfg->precision)); + + if (minFid == maxFid) { + // Save all the TSma data to one file + // TODO: tsdbStartTSmaCommit(); + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, minFid); + tsdbInsertTSmaDataSection(&tSmaH, pData, 0, pData->numOfBlocks); + // TODO:tsdbEndTSmaCommit(); + } else if (minFid < maxFid) { + // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files + // actually. + // TODO: tsdbStartTSmaCommit(); + int32_t tFid = minFid; + int32_t nOffset = 0; + int32_t nSmaBlocks = 0; + do { + tsdbTSmaDataSplit(&tSmaH, param, pData, daysPerFile, nOffset, tFid, &nSmaBlocks); + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); + if (tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, nSmaBlocks) < 0) { + return terrno; + } + + ++tFid; + nOffset += nSmaBlocks; + + if (tFid == maxFid) { + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); + tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, pData->numOfBlocks - nOffset); + break; + } + } while (true); + + // TODO:tsdbEndTSmaCommit(); + } else { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, SRSma *param, STSmaData *pData, int32_t fid) { + // TODO + pSmaH->pDFile = "rSma_interval_file_name"; + + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); + STSma * tParam = ¶m->tsma; + STSmaData * curData = pData; + STSmaWriteH tSmaH = {0}; + + tsdbInitTSmaWriteH(&tSmaH, pTsdb, tParam, pData); + + int32_t nSmaBlocks = pData->numOfBlocks; + int32_t colDataLen = pData->dataLen / nSmaBlocks; + + // Step 2.2: Storage of SMA_STORAGE_LEVEL_DFILESET + // TODO: Use the daysPerFile for rSma data, not for TS data. + // TODO: The lifecycle of rSma data should be processed like the TS data files. + int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, pCfg->daysPerFile, pCfg->precision)); + int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, pCfg->daysPerFile, pCfg->precision)); + + if (minFid == maxFid) { + // Save all the TSma data to one file + tsdbSetRSmaDataFile(&tSmaH, param, pData, minFid); + // TODO: tsdbStartTSmaCommit(); + tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); + // TODO:tsdbEndTSmaCommit(); + } else if (minFid < maxFid) { + // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files + // actually. + // TODO: tsdbStartTSmaCommit(); + int32_t tmpFid = 0; + int32_t step = 0; + for (int32_t n = 0; n < pData->numOfBlocks; ++n) { + } + tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); + // TODO:tsdbEndTSmaCommit(); + } else { + TASSERT(0); + return TSDB_CODE_INVALID_PARA; + } + // Step 4: finish + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Init of tSma ReadH + * + * @param pSmaH + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { + pSmaH->pTsdb = pTsdb; + pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); +} + +/** + * @brief Init of tSma FS + * + * @param pReadH + * @param param + * @param queryWin + * @return int32_t + */ +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { + int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t daysPerFile = + storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; + pReadH->storageLevel = storageLevel; + pReadH->days = daysPerFile; + pReadH->smaFsIter.iter = 0; +} + +/** + * @brief Set and open tSma file if it has key locates in queryWin. + * + * @param pReadH + * @param param + * @param queryWin + * @return true + * @return false + */ +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { + SArray *smaFs = pReadH->pTsdb->fs->cstatus->smaf; + int32_t nSmaFs = taosArrayGetSize(smaFs); + + pReadH->pDFile = NULL; + + while (pReadH->smaFsIter.iter < nSmaFs) { + void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter); + if (pSmaFile) { // match(indexName, queryWindow) + // TODO: select the file by index_name ... + pReadH->pDFile = pSmaFile; + ++pReadH->smaFsIter.iter; + break; + } + ++pReadH->smaFsIter.iter; + } + + if (pReadH->pDFile != NULL) { + tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); + return true; + } + + return false; +} + +/** + * @brief Return the data between queryWin and fill the pData. + * + * @param pTsdb + * @param param + * @param pData + * @param queryWin + * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. + * @return int32_t + */ +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) { + STSmaReadH tReadH = {0}; + tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData); + + tsdbInitTSmaFile(&tReadH, param, queryWin); + + int32_t nResult = 0; + int64_t lastKey = 0; + + while (true) { + if (nResult >= nMaxResult) { + break; + } + + // set and open the file according to the STSma param + if (tsdbSetAndOpenTSmaFile(&tReadH, param, queryWin)) { + char bTree[100] = "\0"; + while (strncmp(bTree, "has more nodes", 100) == 0) { + if (nResult >= nMaxResult) { + break; + } + // tsdbGetDataFromBTree(bTree, queryWin, lastKey) + // fill the pData + ++nResult; + } + } + } + + // read data from file and fill the result + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Get the start TS key of the last data block of one interval/sliding. + * + * @param pTsdb + * @param param + * @param result + * @return int32_t + * 1) Return 0 and fill the result if the check procedure is normal; + * 2) Return -1 if error occurs during the check procedure. + */ +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) { + const char *procedure = ""; + if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) { + return -1; + } + // fill the result + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Remove the tSma data files related to param between pWin. + * + * @param pTsdb + * @param param + * @param pWin + * @return int32_t + */ +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin) { + // for ("tSmaFiles of param-interval-sliding between pWin") { + // // remove the tSmaFile + // } + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 78067f8f83bc345057430caa77b7880aa8147d0c..ba8eea809ed05a74c7cd1bff04993b666cb58f7d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -15,6 +15,14 @@ #include "tsdbDef.h" +/** + * @brief insert TS data + * + * @param pTsdb + * @param pMsg + * @param pRsp + * @return int + */ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { // Check if mem is there. If not, create one. if (pTsdb->mem == NULL) { @@ -24,4 +32,37 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { } } return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); +} + +/** + * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + * TODO: Who is responsible for resource allocate and release? + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertTSmaDataImpl(pTsdb, param, pData)) < 0) { + tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +/** + * @brief Insert Time-range-wise Rollup Sma(RSma) data + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertRSmaDataImpl(pTsdb, param, pData)) < 0) { + tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index a2616307ffcc26e3947f2bf2c3660195dc221056..f3f21dc9c05d3001a244447e55a0064a26005d6c 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -132,6 +132,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; + case TDMT_VND_CREATE_SMA: { // timeRangeSMA + // 1. tdCreateSmaMeta(pVnode->pMeta,...); + // 2. tdCreateSmaDataInit(); + // 3. tdCreateSmaData + } break; + case TDMT_VND_CANCEL_SMA: { // timeRangeSMA + } break; + case TDMT_VND_DROP_SMA: { // timeRangeSMA + } break; default: ASSERT(0); break; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 986986aa7024167c7b7bc3c662a28b08f0402641..97157fc49cf8f58840a52e9e3fc6957cb796c315 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -95,7 +95,7 @@ TEST(testCase, tSmaEncodeDecodeTest) { // resource release tdDestroyTSma(&tSma, false); - tdDestroyTSmaWrapper(&dstTSmaWrapper); + tdDestroyTSmaWrapper(&dstTSmaWrapper, false); } TEST(testCase, tSma_DB_Put_Get_Del_Test) { @@ -161,7 +161,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { EXPECT_EQ(qSmaCfg->interval, tSma.interval); tdDestroyTSma(qSmaCfg, true); - // get value by table uid + // get index name by table uid SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid); assert(pSmaCur != NULL); uint32_t indexCnt = 0; @@ -176,6 +176,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { EXPECT_EQ(indexCnt, 2); metaCloseSmaCurosr(pSmaCur); + // get wrapper by table uid + STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid); + assert(pSW != NULL); + EXPECT_EQ(pSW->number, 2); + EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1); + EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid); + EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2); + EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid); + // resource release metaRemoveSmaFromDb(pMeta, smaIndexName1); metaRemoveSmaFromDb(pMeta, smaIndexName2); @@ -197,15 +206,15 @@ TEST(testCase, tSmaInsertTest) { int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); int32_t numOfColIds = 3; - int32_t numOfSmaBlocks = 10; + int32_t numOfBlocks = 10; - int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize; + int32_t dataLen = numOfColIds * numOfBlocks * blockSize; pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen); ASSERT_EQ(pSmaData != NULL, true); pSmaData->tableUid = 3232329230; pSmaData->numOfColIds = numOfColIds; - pSmaData->numOfSmaBlocks = numOfSmaBlocks; + pSmaData->numOfBlocks = numOfBlocks; pSmaData->dataLen = dataLen; pSmaData->tsWindow.skey = 1640000000; pSmaData->tsWindow.ekey = 1645788649;