diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 30db821dc6fccaf4d093c42e3cd8cf95b032a349..3f532398f4d73c95e6c5b7fe73722f2ce10e90d8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2014,7 +2014,6 @@ typedef struct { int8_t slidingUnit; // MACRO: TIME_UNIT_XXX int8_t timezoneInt; // sma data expired if timezone changes. char indexName[TSDB_INDEX_NAME_LEN]; - char timezone[TD_TIMEZONE_LEN]; int32_t exprLen; int32_t tagsFilterLen; int64_t indexUid; @@ -2052,32 +2051,6 @@ void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq); int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq); void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq); -typedef struct { - col_id_t colId; - uint16_t blockSize; // sma data block size - char data[]; -} STSmaColData; - -typedef struct { - tb_uid_t tableUid; // super/child/normal table uid - int32_t dataLen; // not including head - char data[]; -} STSmaTbData; - -typedef struct { - int64_t indexUid; - TSKEY skey; // startKey of one interval/sliding window - int64_t interval; - int32_t dataLen; // not including head - int8_t intervalUnit; - char data[]; -} STSmaDataWrapper; // sma data for a interval/sliding window - -// interval/sliding => window - -// => window->table->colId -// => 当一个window下所有的表均计算完成时,流计算告知tsdb清除window的过期标记 - // RSma: Rollup SMA typedef struct { int64_t interval; diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index f1083c0d918d9a158cc5013f6b0eaa993590ac62..0d3fcffe7d1aa38fc8dac2dc993240b61e572b19 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -100,10 +100,11 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, const char *msg); * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine * * @param pTsdb + * @param indexUid * @param msg * @return int32_t */ -int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); +int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); /** * @brief Drop tSma data and local cache. @@ -130,16 +131,11 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); * @param pTsdb * @param pData * @param indexUid - * @param interval - * @param intervalUnit - * @param tableUid - * @param colId * @param querySKey * @param nMaxResult * @return int32_t */ -int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, - tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult); +int32_t tsdbGetTSmaData(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult); // STsdbCfg int tsdbOptionsInit(STsdbCfg *); diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index e0170c90e72441a3a634f753ff77c15ca53c8cdc..ffb98699143bf3d7c2a8f4aa23ab09441de89c85 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -44,11 +44,10 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); #endif // internal func -static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { +static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) { int32_t len = 0; - len += taosEncodeFixedI64(pData, tableUid); - len += taosEncodeFixedU16(pData, colId); len += taosEncodeFixedI64(pData, tsKey); + len += taosEncodeFixedI64(pData, groupId); return len; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 07eafd6df06d6e4d3e9223f21068976183482a2e..91b4e83dd00f32e041a527228d8ec50900bf33c8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -24,7 +24,7 @@ static const char *TSDB_SMA_DNAME[] = { #define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_TIMES 10 #define SMA_STORAGE_SPLIT_HOURS 24 -#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 +#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8 #define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds #define SMA_STATE_HASH_SLOT 4 @@ -38,10 +38,10 @@ typedef enum { } ESmaStorageLevel; typedef struct { - STsdb *pTsdb; - SDBFile dFile; - SSDataBlock *pData; // sma data - int32_t interval; // interval with the precision of DB + STsdb *pTsdb; + SDBFile dFile; + const SArray *pDataBlocks; // sma data + int32_t interval; // interval with the precision of DB } STSmaWriteH; typedef struct { @@ -94,26 +94,24 @@ static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); // read data // TODO: This is the basic params, and should wrap the params to a queryHandle. -static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, - int32_t nMaxResult); +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult); // insert data -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval, +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, const SArray *pDataBlocks, int64_t interval, int8_t intervalUnit); static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); +static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid); static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); -static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted); static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel); -static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid); +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid); static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey); static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]); -static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); -static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg); +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); @@ -387,7 +385,6 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_SUCCESS; }; - static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { @@ -480,18 +477,15 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; #endif - // Firstly, assume that tSma can only be created on super table/normal table. // getActiveTimeWindow - - SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, TSDB_SMA_TYPE_TIME_RANGE); + SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, TSDB_SMA_TYPE_TIME_RANGE); SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL); - // basic procedure // TODO: optimization tsdbRefSmaStat(pTsdb, pStat); @@ -523,11 +517,11 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { tdFreeTSmaWrapper(pSW); break; } - if(pSW == NULL) { - if((pSW =metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) { + if (pSW == NULL) { + if ((pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) { break; } - if((pSW->number) <= 0 || (pSW->tSma == NULL)) { + if ((pSW->number) <= 0 || (pSW->tSma == NULL)) { tdFreeTSmaWrapper(pSW); break; } @@ -683,9 +677,15 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k * @param interval * @param intervalUnit * @param precision + * @param adjusted Interval already adjusted according to DB precision * @return int64_t */ -static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision) { +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted) { + + if (adjusted) { + return interval; + } + switch (intervalUnit) { case TIME_UNIT_YEAR: // approximate value interval *= 365 * 86400 * 1e3; @@ -753,59 +753,12 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit return interval; } -/** - * @brief Split the TSma data blocks into expected size and insert into B+Tree. - * - * @param pSmaH - * @param pData - * @param nOffset The nOffset of blocks since fid changes. - * @param nBlocks The nBlocks with the same fid since nOffset. - * @return int32_t - */ -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData) { - STsdb *pTsdb = pSmaH->pTsdb; - - tsdbDebug("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64, pData->indexUid, pData->skey); - - // TODO: check the data integrity - - int32_t len = 0; - while (true) { - if (len >= pData->dataLen) { - break; - } - assert(pData->dataLen > 0); - STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pData->data, len); - - int32_t tbLen = 0; - while (true) { - if (tbLen >= pTbData->dataLen) { - break; - } - assert(pTbData->dataLen > 0); - STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pTbData->data, tbLen); - char smaKey[SMA_KEY_LEN] = {0}; - void *pSmaKey = &smaKey; -#if 0 - printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n", - pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId); -#endif - tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey); - if (tsdbInsertTSmaBlocks(pSmaH, smaKey, SMA_KEY_LEN, pColData->data, pColData->blockSize) < 0) { - tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); - } - tbLen += (sizeof(STSmaColData) + pColData->blockSize); - } - len += (sizeof(STSmaTbData) + pTbData->dataLen); - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval, int8_t intervalUnit) { +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, const SArray *pDataBlocks, int64_t interval, + int8_t intervalUnit) { pSmaH->pTsdb = pTsdb; - pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision); - pSmaH->pData = pData; + pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true); + pSmaH->pDataBlocks = pDataBlocks; + pSmaH->dFile.fid = TSDB_IVLD_FID; return TSDB_CODE_SUCCESS; } @@ -815,7 +768,7 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { } } -static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid) { +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL); @@ -859,11 +812,10 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe * @param msg * @return int32_t */ -static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg *pCfg = REPO_CFG(pTsdb); - SSDataBlock *pData = (SSDataBlock *)msg; - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); - int64_t indexUid = SMA_TEST_INDEX_UID; +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg) { + STsdbCfg *pCfg = REPO_CFG(pTsdb); + const SArray *pDataBlocks = (const SArray *)msg; + SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); if (pEnv == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -871,15 +823,15 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { return terrno; } - if (pData == NULL) { + if (pDataBlocks == NULL) { terrno = TSDB_CODE_INVALID_PTR; - tsdbWarn("vgId:%d insert tSma data failed since pData is NULL", REPO_ID(pTsdb)); + tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb)); return terrno; } - if (taosArrayGetSize(pData->pDataBlock) <= 0) { + if (taosArrayGetSize(pDataBlocks) <= 0) { terrno = TSDB_CODE_INVALID_PARA; - tsdbWarn("vgId:%d insert tSma data failed since pDataBlock is empty", REPO_ID(pTsdb)); + tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is empty", REPO_ID(pTsdb)); return TSDB_CODE_FAILED; } @@ -899,10 +851,9 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { } STSma *pSma = pItem->pSma; - STSmaWriteH tSmaH = {0}; - if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) { + if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pDataBlocks, pSma->interval, pSma->intervalUnit) != 0) { return TSDB_CODE_FAILED; } @@ -921,33 +872,134 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); + // key: skey + groupId + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + char dataBuf[512] = {0}; + void *pDataBuf = &dataBuf; + int32_t sz = taosArrayGetSize(pDataBlocks); + for (int32_t i = 0; i < sz; ++i) { + SSDataBlock *pDataBlock = *(SSDataBlock **)taosArrayGet(pDataBlocks, i); + int32_t colNum = pDataBlock->info.numOfCols; + int32_t rows = pDataBlock->info.rows; + int32_t rowSize = pDataBlock->info.rowSize; + int64_t groupId = pDataBlock->info.groupId; + for (int32_t j = 0; j < rows; ++j) { + printf("|"); + TSKEY skey = TSKEY_INITIAL_VAL; // the start key of TS window by interval + int32_t tlen = 0; + for (int32_t k = 0; k < colNum; ++k) { + SColumnInfoData *pColInfoData = *(SColumnInfoData **)taosArrayGet(pDataBlock->pDataBlock, k); + void *var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + switch (pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + skey = *(TSKEY *)var; + printf(" skey = %" PRIi64 " groupId = %" PRId64 "|", skey, groupId); + tsdbEncodeTSmaKey(groupId, skey, &pSmaKey); + break; + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: + printf(" %15d |", *(uint8_t *)var); + tlen += taosEncodeFixedU8(&pDataBuf, *(uint8_t *)var); + break; + case TSDB_DATA_TYPE_TINYINT: + printf(" %15d |", *(int8_t *)var); + tlen += taosEncodeFixedI8(&pDataBuf, *(int8_t *)var); + break; + case TSDB_DATA_TYPE_SMALLINT: + printf(" %15d |", *(int16_t *)var); + tlen += taosEncodeFixedI16(&pDataBuf, *(int16_t *)var); + break; + case TSDB_DATA_TYPE_USMALLINT: + printf(" %15d |", *(uint16_t *)var); + tlen += taosEncodeFixedU16(&pDataBuf, *(uint16_t *)var); + break; + case TSDB_DATA_TYPE_INT: + printf(" %15d |", *(int32_t *)var); + tlen += taosEncodeFixedI32(&pDataBuf, *(int32_t *)var); + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_UINT: + printf(" %15u |", *(uint32_t *)var); + tlen += taosEncodeFixedU32(&pDataBuf, *(uint32_t *)var); + break; + case TSDB_DATA_TYPE_BIGINT: + printf(" %15ld |", *(int64_t *)var); + tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var); + break; + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_UBIGINT: + printf(" %15lu |", *(uint64_t *)var); + tlen += taosEncodeFixedU64(&pDataBuf, *(uint64_t *)var); + break; + case TSDB_DATA_TYPE_NCHAR: { + char tmpChar[100] = {0}; + strncpy(tmpChar, varDataVal(var), varDataLen(var)); + printf(" %s |", tmpChar); + tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var)); + break; + } + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY + char tmpChar[100] = {0}; + strncpy(tmpChar, varDataVal(var), varDataLen(var)); + printf(" %s |", tmpChar); + tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var)); + break; + } + case TSDB_DATA_TYPE_VARBINARY: + // TODO: add binary/varbinary + TASSERT(0); + default: + printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); + TASSERT(0); + break; + } + } + if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) { + int32_t fid = (int32_t)(TSDB_KEY_FID(skey, daysPerFile, pCfg->precision)); + + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index + // file + // - Set and open the DFile or the B+Tree file + // TODO: tsdbStartTSmaCommit(); + if (fid != tSmaH.dFile.fid) { + if (tSmaH.dFile.fid != TSDB_IVLD_FID) { + tsdbCloseDBF(&tSmaH.dFile); + } + tsdbSetTSmaDataFile(&tSmaH, indexUid, fid); + if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) { + tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), + tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_FAILED; + } + } -#if 0 - int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); + if (tsdbInsertTSmaBlocks(&tSmaH, pSmaKey, SMA_KEY_LEN, pDataBuf, tlen) != 0) { + tsdbWarn("vgId:%d insert tSma data blocks failed for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 + " since %s", + REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_FAILED; + } else { + tsdbWarn("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, + REPO_ID(pTsdb), indexUid, skey, groupId); + } + // TODO:tsdbEndTSmaCommit(); - // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file - // - Set and open the DFile or the B+Tree file - // TODO: tsdbStartTSmaCommit(); - tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid); - if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) { - tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), - tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); - tsdbDestroyTSmaWriteH(&tSmaH); - tsdbUnRefSmaStat(pTsdb, pStat); - return TSDB_CODE_FAILED; - } + // Step 3: reset the SSmaStat + tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), indexUid, skey); + } else { + tsdbWarn("vgId:%d invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64, + REPO_ID(pTsdb), skey, tlen, indexUid); + } - if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) { - tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); - tsdbDestroyTSmaWriteH(&tSmaH); - tsdbUnRefSmaStat(pTsdb, pStat); - return TSDB_CODE_FAILED; + printf("\n"); + } } - // TODO:tsdbEndTSmaCommit(); - // Step 3: reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); -#endif tsdbDestroyTSmaWriteH(&tSmaH); tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_SUCCESS; @@ -1002,7 +1054,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { // TODO: } -static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { +static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; char tSmaFile[TSDB_FILENAME_LEN] = {0}; @@ -1012,11 +1064,11 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, return TSDB_CODE_SUCCESS; } -static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg *pCfg = REPO_CFG(pTsdb); - SSDataBlock *pData = (SSDataBlock *)msg; - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv); - int64_t indexUid = SMA_TEST_INDEX_UID; +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) { + STsdbCfg *pCfg = REPO_CFG(pTsdb); + const SArray *pDataBlocks = (const SArray *)msg; + SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv); + int64_t indexUid = SMA_TEST_INDEX_UID; if (pEnv == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -1030,15 +1082,15 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { return terrno; } - if (pData == NULL) { + if (pDataBlocks == NULL) { terrno = TSDB_CODE_INVALID_PTR; - tsdbWarn("vgId:%d insert rSma data failed since pData is NULL", REPO_ID(pTsdb)); + tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb)); return terrno; } - if (taosArrayGetSize(pData->pDataBlock) <= 0) { + if (taosArrayGetSize(pDataBlocks) <= 0) { terrno = TSDB_CODE_INVALID_PARA; - tsdbWarn("vgId:%d insert rSma data failed since pDataBlock is empty", REPO_ID(pTsdb)); + tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is empty", REPO_ID(pTsdb)); return TSDB_CODE_FAILED; } @@ -1061,12 +1113,12 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { STSmaWriteH tSmaH = {0}; - if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) { + if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pDataBlocks, pSma->interval, pSma->intervalUnit) != 0) { return TSDB_CODE_FAILED; } - char rPath[TSDB_FILENAME_LEN] = {0}; - char aPath[TSDB_FILENAME_LEN] = {0}; + char rPath[TSDB_FILENAME_LEN] = {0}; + char aPath[TSDB_FILENAME_LEN] = {0}; snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid); tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath); if (!taosCheckExistFile(aPath)) { @@ -1078,7 +1130,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { // Step 1: Judge the storage level and days int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); - #if 0 +#if 0 int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file @@ -1119,7 +1171,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { */ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) { pSmaH->pTsdb = pTsdb; - pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true); pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit); pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel); } @@ -1185,17 +1237,11 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { * @param pTsdb Return the data between queryWin and fill the pData. * @param pData * @param indexUid - * @param interval - * @param intervalUnit - * @param tableUid - * @param colId * @param pQuerySKey * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @return int32_t */ -static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, - int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, - int32_t nMaxResult) { +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) { SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); if (!pEnv) { @@ -1243,13 +1289,18 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ tsdbDebug("vgId:%d skey %" PRIi64 " of window not in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey, indexUid); } - tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + + STSma *pTSma = pItem->pSma; + + #endif STSmaReadH tReadH = {0}; - tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit); + tsdbInitTSmaReadH(&tReadH, pTsdb, pTSma->interval, pTSma->intervalUnit); tsdbCloseDBF(&tReadH.dFile); + + tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); tsdbInitTSmaFile(&tReadH, indexUid, querySKey); if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) { @@ -1259,7 +1310,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ char smaKey[SMA_KEY_LEN] = {0}; void *pSmaKey = &smaKey; - tsdbEncodeTSmaKey(tableUid, colId, querySKey, (void **)&pSmaKey); + int64_t queryGroupId = 1; + tsdbEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey); tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb), tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), @@ -1347,11 +1399,10 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) { } #endif - // TODO: Who is responsible for resource allocate and release? -int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { +int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) { + if ((code = tsdbInsertTSmaDataImpl(pTsdb, indexUid, msg)) < 0) { tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; @@ -1373,18 +1424,14 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { return code; } - -int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, - tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { +int32_t tsdbGetTSmaData(STsdb *pTsdb, char*pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySKey, - nMaxResult)) < 0) { + if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, querySKey, nMaxResult)) < 0) { tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; } - int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) { int32_t code = TSDB_CODE_SUCCESS; if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index c220e6001fc43a4886d5b973af5542e62989474c..cd47d1a9424e9bc2a54444904b2143077f4c6abb 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -17,7 +17,9 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO + blockDebugShowData(data); + tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); } void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { @@ -201,7 +203,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vCreateSmaReq.tSma.indexUid); // record current timezone of server side - tstrncpy(vCreateSmaReq.tSma.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); + vCreateSmaReq.tSma.timezoneInt = tsTimezone; if (metaCreateTSma(pVnode->pMeta, &vCreateSmaReq) < 0) { // TODO: handle error diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index d010ea4437e948553e8b1cd0d3272c063269eb2e..4c8ddd9ead2f8e04325294f2da4fd68f6f0b3de9 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -15,6 +15,7 @@ #include #include + #include #include #include @@ -280,7 +281,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { } #endif -#if 0 +#if 1 TEST(testCase, tSma_Data_Insert_Query_Test) { // step 1: prepare meta const char *smaIndexName1 = "sma_index_test_1"; @@ -299,9 +300,9 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { // encode STSma tSma = {0}; tSma.version = 0; - tSma.intervalUnit = TIME_UNIT_DAY; + tSma.intervalUnit = TIME_UNIT_MINUTE; tSma.interval = 1; - tSma.slidingUnit = TIME_UNIT_HOUR; + tSma.slidingUnit = TIME_UNIT_MINUTE; tSma.sliding = 1; // sliding = interval when it's convert window tSma.indexUid = indexUid1; tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); @@ -330,8 +331,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ASSERT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); // step 2: insert data - STsdb *pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb)); - STsdbCfg *pCfg = &pTsdb->config; + STsdb *pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb)); + STsdbCfg *pCfg = &pTsdb->config; pTsdb->pMeta = pMeta; pTsdb->vgId = 2; @@ -405,15 +406,94 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ASSERT_EQ(tsdbUpdateSmaWindow(pTsdb, (const char *)pMsg), 0); // init - int32_t allocCnt = 0; - int32_t allocStep = 16384; - int32_t buffer = 1024; - void *buf = NULL; - ASSERT_EQ(tsdbMakeRoom(&buf, allocStep), 0); - int32_t bufSize = taosTSizeof(buf); - int32_t numOfTables = 10; - col_id_t numOfCols = 4096; - ASSERT_GT(numOfCols, 0); + const int32_t tSmaGroupSize = 4; + const int32_t tSmaNumOfTags = 2; + const int64_t tSmaGroupId = 12345670; + const col_id_t tSmaNumOfCols = 9; // binary/nchar/varbinary/varchar are only used for tags for group by conditions. + const int32_t tSmaNumOfRows = 2; + + SArray *pDataBlocks = taosArrayInit(tSmaGroupSize, sizeof(SSDataBlock *)); + ASSERT_NE(pDataBlocks, nullptr); + int32_t tSmaTypeArray[tSmaNumOfCols] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL, TSDB_DATA_TYPE_INT, + TSDB_DATA_TYPE_UBIGINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_FLOAT, + TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_VARCHAR, TSDB_DATA_TYPE_NCHAR}; + // last 2 columns for group by tags + // int32_t tSmaTypeArray[tSmaNumOfCols] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL}; + const char *tSmaGroupbyTags[tSmaGroupSize * tSmaNumOfTags] = {"BeiJing", "HaiDian", "BeiJing", "ChaoYang", + "ShangHai", "PuDong", "ShangHai", "MinHang"}; + TSKEY tSmaSKeyMs = (int64_t)1648535332 * 1000; + int64_t tSmaIntervalMs = tSma.interval * 60 * 1000; + int64_t tSmaInitVal = 0; + + for (int32_t g = 0; g < tSmaGroupSize; ++g) { + SSDataBlock *pDataBlock = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); + ASSERT_NE(pDataBlock, nullptr); + pDataBlock->pBlockAgg = NULL; + pDataBlock->info.numOfCols = tSmaNumOfCols; + pDataBlock->info.rows = tSmaNumOfRows; + pDataBlock->info.groupId = tSmaGroupId + g; + + pDataBlock->pDataBlock = taosArrayInit(tSmaNumOfCols, sizeof(SColumnInfoData *)); + ASSERT_NE(pDataBlock->pDataBlock, nullptr); + for (int32_t c = 0; c < tSmaNumOfCols; ++c) { + + SColumnInfoData *pColInfoData = (SColumnInfoData *)taosMemoryCalloc(1, sizeof(SColumnInfoData)); + ASSERT_NE(pColInfoData, nullptr); + + pColInfoData->info.type = tSmaTypeArray[c]; + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + pColInfoData->info.bytes = 100; // update accordingly + } else { + pColInfoData->info.bytes = TYPE_BYTES[pColInfoData->info.type]; + } + pColInfoData->pData = (char *)taosMemoryCalloc(1, tSmaNumOfRows * pColInfoData->info.bytes); + + for (int32_t r = 0; r < tSmaNumOfRows; ++r) { + void *pCellData = pColInfoData->pData + r * pColInfoData->info.bytes; + switch (pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + *(TSKEY *)pCellData = tSmaSKeyMs + tSmaIntervalMs * r; + break; + case TSDB_DATA_TYPE_BOOL: + *(bool *)pCellData = (bool)tSmaInitVal++; + break; + case TSDB_DATA_TYPE_INT: + *(int *)pCellData = (int)tSmaInitVal++; + break; + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t *)pCellData = (uint64_t)tSmaInitVal++; + break; + case TSDB_DATA_TYPE_SMALLINT: + *(int16_t *)pCellData = (int16_t)tSmaInitVal++; + break; + case TSDB_DATA_TYPE_FLOAT: + *(float *)pCellData = (float)tSmaInitVal++; + break; + case TSDB_DATA_TYPE_DOUBLE: + *(double *)pCellData = (double)tSmaInitVal++; + break; + case TSDB_DATA_TYPE_VARCHAR: // city + varDataSetLen(pCellData, strlen(tSmaGroupbyTags[g * 2])); + memcpy(varDataVal(pCellData), tSmaGroupbyTags[g * 2], varDataLen(pCellData)); + break; + case TSDB_DATA_TYPE_NCHAR: // district + varDataSetLen(pCellData, strlen(tSmaGroupbyTags[g * 2 + 1])); + memcpy(varDataVal(pCellData), tSmaGroupbyTags[g * 2 + 1], varDataLen(pCellData)); + break; + default: + ASSERT_EQ(0, 1); // add definition + break; + } + } + // push SColumnInfoData + taosArrayPush(pDataBlock->pDataBlock, &pColInfoData); + } + // push SSDataBlock + taosArrayPush(pDataBlocks, &pDataBlock); + } + + // execute + ASSERT_EQ(tsdbInsertTSmaData(pTsdb, tSma.indexUid, (const char *)pDataBlocks), TSDB_CODE_SUCCESS); #if 0 STSmaDataWrapper *pSmaData = NULL; @@ -464,26 +544,30 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ASSERT_EQ(tsdbInsertTSmaData(pTsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); #endif - SSDataBlock *pSmaData = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); - - - // step 3: query uint32_t checkDataCnt = 0; - for (int32_t t = 0; t < numOfTables; ++t) { - for (col_id_t c = 0; c < numOfCols; ++c) { - ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t, - c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1), - TSDB_CODE_SUCCESS); - ++checkDataCnt; - } - } + ASSERT_EQ(tsdbGetTSmaData(pTsdb, NULL, indexUid1, skey1, 1), TSDB_CODE_SUCCESS); + ++checkDataCnt; printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt); // release data taosMemoryFreeClear(pMsg); - taosTZfree(buf); + + for (int32_t i = 0; i < taosArrayGetSize(pDataBlocks); ++i) { + SSDataBlock *pDataBlock = (SSDataBlock *)taosArrayGet(pDataBlocks, i); + int32_t numOfOutput = taosArrayGetSize(pDataBlock->pDataBlock); + for (int32_t j = 0; j < numOfOutput; ++j) { + SColumnInfoData *pColInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock->pDataBlock, j); + colDataDestroy(pColInfoData); + } + + taosArrayDestroy(pDataBlock->pDataBlock); + taosMemoryFreeClear(pDataBlock->pBlockAgg); + taosMemoryFreeClear(pDataBlock); + } + taosArrayDestroy(pDataBlocks); + // release meta tdDestroyTSma(&tSma); tfsClose(pTsdb->pTfs);